Combine activities from a dynamic array of Observables

I'm currently utilizing the rxjs library.

In my application, I have a Browser object overseeing multiple instances of Page objects. Each page emits a stream of events through an Observable<Event>.

Pages can be opened and closed dynamically, leading me to the challenge of merging all event streams from active pages into a single observable named TheOneObservable. Additionally, I need to include custom events from the Browser itself in this merged stream.

When a Page is closed, its subscription should also be terminated to prevent any memory leaks.

The issue lies in the constantly changing number of Observable sources due to the opening and closing of pages. While considering using an Observable of Pages with mergeMap, there are issues like subscribers only receiving events from newly opened pages after subscribing.


Please note that a similar question has been addressed for .NET at this link: here. However, it relies on ObservableCollection which is not available in rxjs.


Below is a code snippet illustrating the problem:

class Page {
    private _events = new Subject<Event>();

    get events(): Observable<Event> {
        return this._events.asObservable();
    }
}

class Browser {
    pages = [] as Page[];
    private _ownEvents = new Subject<Event>();

    addPage(page : Page) {
        this.pages.push(page);
    }

    removePage(page : Page) {
        let ixPage = this.pages.indexOf(page);
        if (ixPage < 0) return;
        this.pages.splice(ixPage, 1);
    }

    get oneObservable() {
        //this won't work for aforementioned reasons
        return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
    }
}

This code snippet is written in TypeScript, but should be comprehensible.

Answer №1

Utilize the switchMap() function on a Subject() that is connected to changes in an array, allowing you to replace oneObservable with a new one whenever the array undergoes a change.

pagesChanged = new Rx.Subject();

addPage(page : Page) {
  this.pages.push(page);
  this.pagesChanged.next();
}

removePage(page : Page) {
  let ixPage = this.pages.indexOf(page);
  if (ixPage < 0) return;
  this.pages.splice(ixPage, 1);
  this.pagesChanged.next();
}

get oneObservable() {
  return pagesChanged
    .switchMap(changeEvent =>
      Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
    )
}

Here is a testing scenario:

const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }

let pages = []; 
const pagesChanged = new Rx.Subject();
const addPage = (page) => { 
  pages.push(page); 
  pagesChanged.next(); 
}
const removePage = (page) => { 
  let ixPage = pages.indexOf(page);
  if (ixPage < 0) return;
  pages.splice(ixPage, 1);
  pagesChanged.next(); 
}

const _ownEvents = Rx.Observable.of('ownEvent')

const oneObservable = 
  pagesChanged
    .switchMap(pp => 
      Rx.Observable.from(pages)
        .mergeMap(x => x.events)
        .merge(_ownEvents)
    )

oneObservable.subscribe(x => console.log('subscribe', x))

console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Answer №2

To effectively handle subscriptions to pages and input their events into the resulting subject, it is important that you take charge of managing the subscriptions:

const mainObservable$ = new Subject<Event>();

function openPage(page: Page): Subscription {
  return page.events$.subscribe(value => this.mainObservable$.next(value));
}

When closing a page by calling unsubscribe on the returned subscription, all necessary actions are automatically performed.

Keep in mind that mainObservable$ operates as a hot observable in this scenario.

If desired, you can enhance this process by developing your custom observable type that encompasses this API. This approach would facilitate unsubscribing all internal observables upon closure.


Alternatively, consider the following modified strategy:

const childObservables$ = new Subject<Observable<Event>>();
const mainObservable$ = childObservables$.mergeMap(obs$ => obs$);

// Incorporate a page's events using takeUntil for efficient unsubscription.
childObservables$.next(page.events$.takeUntil(page.closed$));

This method offers an advantage as it autonomously manages the unsubscription process for inner observables once the observable is unsubscribed.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Rendering React Router server-side with client-side session information

Currently, I am working with mozilla client-sessions in conjunction with express/node. My goal is to pass my session.user to the react-router within a standard * request. Despite my efforts and attempts, I keep encountering an issue where it becomes unde ...

Tips for altering a key within a tree-view:

I am working with a potentially infinite tree-view array: type Tree = { id: number; name: string; email: string; children: Tree[]; }; const tree: Tree[] = [ { id: 1, name: 'Truck', email: '@mail', children ...

JQuery Menu with Broken UL Formatting on Windows versions of IE and Firefox

Hey there! I've been struggling with a menu design that works perfectly on Mac browsers but is completely broken on Windows. I've spent hours trying to fix it and would really appreciate if a kind programmer could take a look and help me out. The ...

What is the process for implementing a version folder for my @types/definition?

Is there a way to access the typings for react-router in my project? My package.json file currently has this dependency: { "@types/react-router": "^4.0.3" } However, it seems to be using the standard index.d.ts file from DefinitelyTyped library which i ...

Facing a node.js installation issue on Windows 10 while using Visual Studio Code (VS

Issue encountered while trying to execute "DownloadString" with one argument: Unable to establish a secure connection due to SSL/TLS channel creation failure. At line:1 char:1 + iex ((New-Object System.Net.WebClient).DownloadString('https ...

Using Formik inside Material-UI's table components

My goal is to design a material UI table where each cell acts as a Formik input. However, I've encountered errors with Material UI when attempting to use a Formik Object within TableBody or TableItem tags. Here's an example image of what I' ...

Issue with caching: Browser cache not being utilized even after implementing Cache-Control Header

First Requesthttps://i.sstatic.net/xtJCW.png Second Inquiryhttps://i.sstatic.net/4R9ln.png I have implemented a node module(express-cache-ctrl) to activate caching on a proxy. app.use(cache.public(3600)); Despite having Cache-control headers with max-age ...

What is the method for assigning a generic type to an object received from an observer?

I am working with a simple observable that fetches data from the URL https://api.github.com/users. My goal is to map this data in a way that only the login property of each user in the array returned by the endpoint is returned. However, when I attempt to ...

NodeJs - Issue: Headers cannot be changed once they are sent & TypeError: req.next is undefined

My goal is to retrieve data from a MySQL database by calling methods to insert and read information. The reason I am doing this is because node.js operates asynchronously. Here is my code: exports.list = function(req, res){ var moduleRows; req.getCo ...

What is the reason behind FieldSelect returning a string instead of an object like FieldCheckbox?

FieldSelect component from Sharetribe documents is giving me a string, while FieldCheckbox is returning a JSON object. In a specific scenario, I want FieldSelect to store a JSON object. How can I achieve this? Below is the code snippet for reference: I& ...

Using THREE.js: Object3D Dimension Shrinkage

Is there a way to disable sizeAttenuation for an Object3D in THREE.js? I'm working on rendering a trajectory at a massive scale using arrow helpers to show the motion direction. I want the arrow heads to maintain their orientation without changing si ...

When using Rspec and Capybara, utilizing jQuery to set focus on an element may not apply the `:focus` CSS as expected

I have implemented jump links for blind and keyboard users on my website, but I've hidden them off-screen visually. When these links gain focus, they are moved into the viewport. Trying to test this behavior using RSpec and Capybara has been unsucces ...

The compatibility of Datatables responsive feature with ajax calls appears to be limited

I recently started using the datatables plugin and have encountered an issue with responsive tables. While I successfully implemented a responsive table and an AJAX call on a previous page, I am facing difficulties with it on a new page for unknown reasons ...

Application crash imminent, alert: Uncaught TypeError detected - Unable to access property 'some' of undefined within React

My application has 3 sections. The first section consists of a form where users fill in details about watches, and the data is submitted using the submitHandler function. In the second part, users can enter watches from the first section. When a user click ...

Tips on customizing the appearance of React rendering components

<div> <h3>{this.props.product.name}</h3> <h3>{this.props.product.code}</h3> {this.renderColors()} <article> <div da ...

Issue locating the bottom of the scroll bar

My attempt to detect when the scroll reaches the bottom of a div involves using this code: $('.scrollpane').scroll(function(){ if ($(this).scrollTop() + $(this).height() === $("#results").height()) { alert('scroll at bottom&a ...

Unraveling dependencies in deno for debugging purposes

When working with Node + NPM, dependencies are installed in node_modules, making it easy to debug by adding debugger statements or console logs directly in the node_modules/some-pkg/some-file.js. In Deno, things are a bit more complex as dependencies are ...

Is it possible to compare escaped data with the unescaped value of a select box in JavaScript?

On my webpage, I have a functionality that involves fetching select box options through an AJAX request. I then create the select box based on this data, and later use the selected option to retrieve additional information from the response received via AJ ...

Utilizing Props in React to Slice and Dice Data Within a Separate Component

Currently, I am in the process of creating an about text for a profile that will include an option to expand or collapse based on its length. To achieve this, I am utilizing a function from the main home component: <AboutText text={aboutData}/> Abo ...

Which Angular2 npm packages should I be installing?

When I'm trying to create an empty app without using angular-cli, it's really difficult for me to figure out which packages or libraries to include. Searching for angular2 on npmjs yields unwanted results, forcing me to click through multiple li ...