Imagine having a subscription set up like this:
mySubject$.subscribe(async input => await complexFunction(input))
Assuming that mySubject$
is a Subject under your control. You require a method to achieve the following:
await mySubject$.next(input)
This allows you to wait for the completion of the complexFunction
.
You attempted to devise a custom Subject implementation for this, but it proved to be challenging. Could there be a simpler solution that you are overlooking?
What other possibilities exist in this scenario?
Update:
In alignment with the principle that rxjs APIs should accept Promises, an attempt was made using the code below. Unfortunately, it was unsuccessful, leaving you at a loss once again.
function enhanceObservable<T>(observable: Observable<T>) {
let resolve;
return {
waiter: new Promise<void>>(res => resolve = res),
observable: observable.pipe(tap(() => resolve()))
};
}
fdescribe('test', () => {
it('should function correctly', async () => {
const subject = new Subject<string>();
const wrapper = enhanceObservable(subject);
let testValue;
wrapper.observable.subscribe(input => from(input).pipe(delay(1000), tap(value => testValue = value)).toPromise());
subject.next('message');
await wrapper.waiter;
expect(testValue).toEqual('message');
});
});