RxJS Observables trigger the onCompleted function after completing a series of asynchronous actions

I have been attempting to design an observable that generates values from various asynchronous actions, particularly HTTP requests from a Jenkins server, which will notify a subscriber once all the actions are finished. However, it seems like there might be a misunderstanding in my approach as it's not working as intended.

'use strict';

let Rx = require('rx');
let _ = require('lodash');
let values = [
    {'id': 1, 'status': true},
    {'id': 2, 'status': true},
    {'id': 3, 'status': true}
];

function createValuesObservable() {

    return Rx.Observable.create(function(observer) {
        _.map(values, function(value) {
            var millisecondsToWait = 1000;
            setTimeout(function() { 
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
        console.log("createValuesObservable Sending onCompleted");
        observer.onCompleted()
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
});

createValuesObservable().subscribe(observer);

Upon running this code snippet, the output I receive is:

createValuesObservable Sending onCompleted
DONE!
Sending value:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Sending value:  { id: 3, status: true }

My expected outcome would be:

Sending value:  { id: 1, status: true }
Received Data:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Received Data:  { id: 2, status: true }
Sending value:  { id: 3, status: true }
Received Data:  { id: 3, status: true }
createValuesObservable Sending onCompleted
DONE!

The issue at hand is the asynchronous nature of JavaScript, leading to the timeout function being fired before observer.onCompleted(), causing unexpected behavior. The observer disposes itself upon receiving the onCompleted event, resulting in missed onNext notifications when async actions complete.

To tackle this problem, I implemented a partial solution using a timeout within the observable function:

function createValuesObservable() {

    return Rx.Observable.create(function(observer) {
        let observableTimeout = 10000;
        setTimeout(function() {
            console.log("createValuesObservable Sending onCompleted");
            observer.onCompleted();
        }, observableTimeout);
        _.map(values, function(value) {
            let millisecondsToWait = 1000;
            setTimeout(function() {
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
    });
}

This modification allows for proper ordering of information (data followed by completion), but introduces challenges such as potential data loss or extended wait times depending on the chosen timeout duration. Is this a fundamental issue in asynchronous programming that requires acceptance?

Answer №1

Indeed, there is a more efficient approach available. Currently, your synchronization process heavily relies on time delays which can be improved by utilizing the Observable operators.

To begin with, it's recommended to steer away from direct usage of setTimeout and switch to employing timer instead:

Rx.Observable.timer(waitTime);

Following that step, you can transform the values array into an Observable where each value is emitted as an event by executing:

Rx.Observable.from(values);

Lastly, utilize flatMap to convert these values into Observables and merge them into the final sequence. This will result in an Observable that emits whenever any of the source timers emit, and concludes when all the source Observables reach completion.

Rx.Observable.from(values)
  .flatMap(
    // Transform the value into a stream
    value => Rx.Observable.timer(waitTime),
    // This function converts the value returned by the timer Observable
    // back to the original value intended for emission
    value => value
  )

Hence, the complete valuesObservable function would take this form:

function valuesObservable(values) {
  return Rx.Observable.from(values)
    .flatMap(
      value => Rx.Observable.timer(waitTime),
      value => value
    )
    .do(
      x => console.log(`Sending value: ${value}`),
      null,
      () => console.log('Sending values completed')
    );
}

If not using demo streams but real http streams, you could further simplify the process by incorporating merge (or concat if order preservation is required):

Rx.Observable.from(streams)
   .flatMap(stream => stream);

// OR
Rx.Observable.from(streams).merge();

// Or succinctly 
Rx.Observable.mergeAll(streams);

Answer №2

To create an observable, it's recommended to utilize the existing primitive along with a combination of existing operators to avoid certain challenges such as unsubscription and error management. In cases where nothing else fits your specific use case, Rx.Observable.create can be quite handy. Perhaps exploring generateWithAbsoluteTime could also be beneficial.

One common issue encountered is completing the observer before sending any data. To address this, consider implementing a more suitable completion signal such as:

  • Complete x seconds after the last emitted value if no new value is emitted
  • Complete when a value matches a specified 'end' value

Answer №3

Special thanks to @paulpdaniels for providing the final code that successfully achieved my desired outcome, including integrating calls to Jenkins:

'use strict';

let Rx = require('rx');
let jenkinsapi = require('jenkins'); // https://github.com/silas/node-jenkins/issues
let jenkinsOpts = {
    "baseUrl": "http://localhost:8080",
    "options": {"strictSSL": false},
    "job": "my-jenkins-job",
    "username": "jenkins",
    "apiToken": "f4abcdef012345678917a"
};
let jenkins = jenkinsapi(JSON.parse(JSON.stringify(jenkinsOpts)));

function jobInfoObservable(jenkins, jobName) {
    // generates an observable containing a single list of builds for a given job
    let selector = {tree: 'builds[number,url]'};

    return Rx.Observable.fromNodeCallback(function(callback) {
        jenkins.job.get(jobName, selector, callback);
    })();
}

function buildIDObservable(jenkins, jobName) {
    // produces an observable with a continuous stream of individual build IDs for a specified job
    return jobInfoObservable(jenkins, jobName).flatMap(function(jobInfo) {
        return Rx.Observable.from(jobInfo.builds)
    });
}

function buildInfoObservable(jenkins, jobName) {
    // yields an observable with a continuous stream of http responses for each build in the history of the job
    let buildIDStream = buildIDObservable(jenkins, jobName);
    let selector = {'tree': 'actions[parameters[name,value]],building,description,displayName,duration,estimatedDuration,executor,id,number,result,timestamp,url'};

    return buildIDStream.flatMap(function(buildID) {
        return Rx.Observable.fromNodeCallback(function(callback) {
            jenkins.build.get(jobName, buildID.number, selector, callback);
        })();
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // process the information
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // perform additional tasks upon completion
});

buildInfoObservable(jenkins, jenkinsOpts.job).subscribe(observer);

Utilizing the built-in operators provided by Rx enabled me to avoid intricate timing logic entirely. This approach is not only cleaner but also eliminates the need for nesting multiple Rx.Observable.create statements.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Disabling collapse in Bootstrap 5 is impossible using stopPropagation() when clicking on a particular element

I'm attempting to prevent an element from collapsing/expanding when clicking a checkbox inside it. In my example on codepen, my code worked fine with Bootstrap v4, but after switching to v5, the stopPropagation function doesn't seem to work. Yo ...

Properly setting up event handling for a file input and Material UI Button

In my attempt to create a customized form using material UI elements, I am facing an issue. The form allows users to upload files and add notes for each option, which are stored in an array in the component's state. Here is a simplified version of th ...

Having trouble locating and interacting with the textarea element in Salesforce using Selenium Webdriver

Encountering an issue with the Selenium Webdriver where it throws an error stating that the element is not visible and cannot be interacted with when attempting to access a textarea. 1. The textarea is located within a pop-up window, which can be accessed ...

Issue: Headers cannot be set after they have been sent. This is a problem in node.js

I'm trying to create an application that allows users to execute commands via a URL, but I keep encountering this error message: _http_outgoing.js:346 throw new Error('Can\'t set headers after they are sent.'); ^Error: Can't ...

Tips for incorporating a dynamic basePath into your NextJS project

For my new app, I am trying to include the groupID in the URL before the actual path, such as 'https://web-site.com/1/tasks'. The NextJs docs mention a basePath that is set at build time and cannot be modified. With numerous routes in my current ...

Vue/Vuex - using async dispatch for AJAX requests in multiple components

I am working with vuex and using a store module to load user lists in my app through ajax. After the list is loaded, it doesn't fetch again if it's already present in the vuex store. I have implemented this logic in the main application layout as ...

Is the state of the React.js component empty?

HTML: <!-- index.html --> <!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title>React Tutorial</title> <script src="https://cdnjs.cloudflare.com/ajax/libs/react/0.14.6/react.js"></script> ...

What is the best way to position a rectangle on top of a div that has been rendered using

I recently started using a waveform display/play library known as wavesurfer. Within the code snippet below, I have integrated two wavesurfer objects that are displayed and positioned inside div elements of type "container". My goal is to position my own ...

Slick Slider fails to load on web browsers

Hi everyone, I have a snippet of HTML code that I need help with: <!DOCTYPE html> <html> <head> <link rel="stylesheet" type="text/css" href="//cdn.jsdelivr.net/jquery.slick/1.6.0/slick.css"/> </head> <body> ...

I'm curious about the origin of this.on event handler. Is it part of a particular library or framework?

While casually perusing through the application.js file in the express source code, I stumbled upon this interesting piece of code. I'm curious about the origin of this '.on' event. Is it part of vanilla JavaScript or is it a feature provid ...

AngularJS application failing to initialize without a module being included

I'm feeling a bit lost when it comes to angularjs and I have a question about why my angularjs app is refusing to bootstrap without creating a module, even though egghead.io and other tutorials seem to suggest otherwise. Here's a snippet of my HT ...

Guide to creating a Discord bot that replies privately to users without other channel members being able to see the messages

As a newcomer to Discord, I am attempting to create a bot that will respond to user commands by sending a reply that only the user who issued the command can see. However, I am struggling to figure out how to implement this feature. Below is the source c ...

Trouble with mouseout listener in Wordpress causing Google Map to malfunction

Having trouble integrating a map into my WordPress site using the Google Maps API v3. The issue I am facing is that the listener assigned to the mouseout event is not functioning properly. I have copied and pasted the code from another website where it was ...

Easy way to eliminate empty elements following a class using jQuery

I'm encountering a situation where I have a group of elements following a specific class that are either empty or contain only white space. <div class="post-content"> <div class="slider-1-smart"> --- slider contents --- < ...

Inserting an HTML element into Handlebars.js during a specific iteration of an each loop

I have a channel.json file containing 7 objects of data which are iterated in hb. I am looking for a way to insert a date between these iterations without modifying the .json file. How can I add an html tag to display after the 3rd iteration within the loo ...

Displaying or concealing HTML elements using AngularJS while a modal is open

Looking for a way to display a loading spinner icon on my page when a user triggers a button that opens a modal, and then have the spinner disappear once the modal is open and its content has loaded. Currently, I've managed to make the spinner show up ...

Utilize useEffect to track a single property that relies on the values of several other properties

Below is a snippet of code: const MyComponent: React.FC<MyComponentProps> = ({ trackMyChanges, iChangeEverySecond }) => { // React Hook useEffect has missing dependencies: 'iChangeEverySecond' useEffect(() => { ...

Creating a PHP script that retrieves data from JavaScript and stores it in MySQL can be accomplished by using AJAX to send the

Hello, I am attempting to create a PHP script that can extract coordinates from this JavaScript code (or from this link ) and store them in a MySQL database. Can someone please provide me with a tutorial on how to accomplish this? <script> var ...

Tips for avoiding deleting content within a span element when using contenteditable

I am working on an HTML 5 editor that utilizes the contenteditable tag. Inside this tag, I have a span. The issue arises when all text is removed as the span also gets removed. I want to prevent the removal of the span, how can I achieve this? Here is a s ...

Error encountered with AngularJS code when attempting to load content from another page using ajax

I'm currently tackling a challenge with AngularJs and php. Whenever I try to load content from another page, AngularJs seems to stop working. Let me provide you with a sample code snippet to illustrate my issue. main-page.php <div id="form-secti ...