I'm currently utilizing the rxjs
library.
In my application, I have a Browser
object overseeing multiple instances of Page
objects. Each page emits a stream of events through an Observable<Event>
.
Pages can be opened and closed dynamically, leading me to the challenge of merging all event streams from active pages into a single observable named TheOneObservable
. Additionally, I need to include custom events from the Browser
itself in this merged stream.
When a Page
is closed, its subscription should also be terminated to prevent any memory leaks.
The issue lies in the constantly changing number of Observable sources due to the opening and closing of pages. While considering using an Observable of Pages
with mergeMap
, there are issues like subscribers only receiving events from newly opened pages after subscribing.
Please note that a similar question has been addressed for .NET
at this link: here. However, it relies on ObservableCollection
which is not available in rxjs
.
Below is a code snippet illustrating the problem:
class Page {
private _events = new Subject<Event>();
get events(): Observable<Event> {
return this._events.asObservable();
}
}
class Browser {
pages = [] as Page[];
private _ownEvents = new Subject<Event>();
addPage(page : Page) {
this.pages.push(page);
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
}
get oneObservable() {
//this won't work for aforementioned reasons
return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
}
}
This code snippet is written in TypeScript, but should be comprehensible.