RxJS Observables trigger the onCompleted function after completing a series of asynchronous actions

I have been attempting to design an observable that generates values from various asynchronous actions, particularly HTTP requests from a Jenkins server, which will notify a subscriber once all the actions are finished. However, it seems like there might be a misunderstanding in my approach as it's not working as intended.

'use strict';

let Rx = require('rx');
let _ = require('lodash');
let values = [
    {'id': 1, 'status': true},
    {'id': 2, 'status': true},
    {'id': 3, 'status': true}
];

function createValuesObservable() {

    return Rx.Observable.create(function(observer) {
        _.map(values, function(value) {
            var millisecondsToWait = 1000;
            setTimeout(function() { 
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
        console.log("createValuesObservable Sending onCompleted");
        observer.onCompleted()
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
});

createValuesObservable().subscribe(observer);

Upon running this code snippet, the output I receive is:

createValuesObservable Sending onCompleted
DONE!
Sending value:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Sending value:  { id: 3, status: true }

My expected outcome would be:

Sending value:  { id: 1, status: true }
Received Data:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Received Data:  { id: 2, status: true }
Sending value:  { id: 3, status: true }
Received Data:  { id: 3, status: true }
createValuesObservable Sending onCompleted
DONE!

The issue at hand is the asynchronous nature of JavaScript, leading to the timeout function being fired before observer.onCompleted(), causing unexpected behavior. The observer disposes itself upon receiving the onCompleted event, resulting in missed onNext notifications when async actions complete.

To tackle this problem, I implemented a partial solution using a timeout within the observable function:

function createValuesObservable() {

    return Rx.Observable.create(function(observer) {
        let observableTimeout = 10000;
        setTimeout(function() {
            console.log("createValuesObservable Sending onCompleted");
            observer.onCompleted();
        }, observableTimeout);
        _.map(values, function(value) {
            let millisecondsToWait = 1000;
            setTimeout(function() {
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
    });
}

This modification allows for proper ordering of information (data followed by completion), but introduces challenges such as potential data loss or extended wait times depending on the chosen timeout duration. Is this a fundamental issue in asynchronous programming that requires acceptance?

Answer №1

Indeed, there is a more efficient approach available. Currently, your synchronization process heavily relies on time delays which can be improved by utilizing the Observable operators.

To begin with, it's recommended to steer away from direct usage of setTimeout and switch to employing timer instead:

Rx.Observable.timer(waitTime);

Following that step, you can transform the values array into an Observable where each value is emitted as an event by executing:

Rx.Observable.from(values);

Lastly, utilize flatMap to convert these values into Observables and merge them into the final sequence. This will result in an Observable that emits whenever any of the source timers emit, and concludes when all the source Observables reach completion.

Rx.Observable.from(values)
  .flatMap(
    // Transform the value into a stream
    value => Rx.Observable.timer(waitTime),
    // This function converts the value returned by the timer Observable
    // back to the original value intended for emission
    value => value
  )

Hence, the complete valuesObservable function would take this form:

function valuesObservable(values) {
  return Rx.Observable.from(values)
    .flatMap(
      value => Rx.Observable.timer(waitTime),
      value => value
    )
    .do(
      x => console.log(`Sending value: ${value}`),
      null,
      () => console.log('Sending values completed')
    );
}

If not using demo streams but real http streams, you could further simplify the process by incorporating merge (or concat if order preservation is required):

Rx.Observable.from(streams)
   .flatMap(stream => stream);

// OR
Rx.Observable.from(streams).merge();

// Or succinctly 
Rx.Observable.mergeAll(streams);

Answer №2

To create an observable, it's recommended to utilize the existing primitive along with a combination of existing operators to avoid certain challenges such as unsubscription and error management. In cases where nothing else fits your specific use case, Rx.Observable.create can be quite handy. Perhaps exploring generateWithAbsoluteTime could also be beneficial.

One common issue encountered is completing the observer before sending any data. To address this, consider implementing a more suitable completion signal such as:

  • Complete x seconds after the last emitted value if no new value is emitted
  • Complete when a value matches a specified 'end' value

Answer №3

Special thanks to @paulpdaniels for providing the final code that successfully achieved my desired outcome, including integrating calls to Jenkins:

'use strict';

let Rx = require('rx');
let jenkinsapi = require('jenkins'); // https://github.com/silas/node-jenkins/issues
let jenkinsOpts = {
    "baseUrl": "http://localhost:8080",
    "options": {"strictSSL": false},
    "job": "my-jenkins-job",
    "username": "jenkins",
    "apiToken": "f4abcdef012345678917a"
};
let jenkins = jenkinsapi(JSON.parse(JSON.stringify(jenkinsOpts)));

function jobInfoObservable(jenkins, jobName) {
    // generates an observable containing a single list of builds for a given job
    let selector = {tree: 'builds[number,url]'};

    return Rx.Observable.fromNodeCallback(function(callback) {
        jenkins.job.get(jobName, selector, callback);
    })();
}

function buildIDObservable(jenkins, jobName) {
    // produces an observable with a continuous stream of individual build IDs for a specified job
    return jobInfoObservable(jenkins, jobName).flatMap(function(jobInfo) {
        return Rx.Observable.from(jobInfo.builds)
    });
}

function buildInfoObservable(jenkins, jobName) {
    // yields an observable with a continuous stream of http responses for each build in the history of the job
    let buildIDStream = buildIDObservable(jenkins, jobName);
    let selector = {'tree': 'actions[parameters[name,value]],building,description,displayName,duration,estimatedDuration,executor,id,number,result,timestamp,url'};

    return buildIDStream.flatMap(function(buildID) {
        return Rx.Observable.fromNodeCallback(function(callback) {
            jenkins.build.get(jobName, buildID.number, selector, callback);
        })();
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // process the information
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // perform additional tasks upon completion
});

buildInfoObservable(jenkins, jenkinsOpts.job).subscribe(observer);

Utilizing the built-in operators provided by Rx enabled me to avoid intricate timing logic entirely. This approach is not only cleaner but also eliminates the need for nesting multiple Rx.Observable.create statements.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

What is the best way to divide React Router into separate files?

My routes configuration is becoming cluttered, so I have decided to split them into separate files for better organization. The issue I am facing is that when using 2 separate files, the routes from the file included first are rendered, but the ones from ...

What causes the picturesArray to remain consistently void?

const fetch = require("node-fetch"); let images = []; fetch('http://www.vorohome.com//images/assets/159314_887955.png') .then(response => response.buffer()) .then(buffer => { const data = "data:" + response.headers.get ...

Performing an Axios POST request in a React Native and React app using JSON.stringify and Blob functionality

I am currently developing an application where I have encountered an issue when calling an API endpoint in react native. Interestingly, the web app (built with React) does not encounter any errors. Here is the code for the web app using React with TypeScri ...

Using JQuery's change function with a Button Group is a great way

I encountered an issue where my button group works with the on.click function but not with on.change. <div class="ui buttons"> <button class="ui button">Value 1</button> <button class="ui bu ...

The absence of PHP GET variable being set

I am encountering an issue with my registration php class. It is designed to display a form and upon clicking the registration button, it triggers a function in a login javascript file. This JavaScript file utilizes ajax to send data to an index.php file. ...

Displaying a division when a button is pressed

Despite my best efforts, I can't seem to get the chosen div to show and hide when the button is pressed. <button id="showButton" type="button">Show More</button> <div id="container"> <div id="fourthArticle"> <i ...

Exploring the advanced features of OpenOffice Draw for improved geometry analysis

Struggling with the draw:enhanced-geometry section that involves draw:enhanced-path and draw:equation. I'm working on an OOo converter but can't seem to find any concrete solutions or extensive documentation about this part. Any suggestions on ho ...

Creating HTML elements using Vue.js

Currently, I am facing an issue while attempting to render a template using the push mutation method. My goal is to push a section component, but instead of seeing the desired template content on my page, I am only getting the raw output of <vsection> ...

Sending a string parameter from an AJAX function to a Spring controller

I'm currently developing a standalone application and facing an issue with passing a string, which is generated from the change event, to the spring controller using the provided code snippet. Here is the HTML CODE snippet: <!DOCTYPE HTML> < ...

Block-level declarations are commonly used in TypeScript and Asp.net MVC 5

In my asp.net mvc5 project, I decided to incorporate TypeScript. I created an app.ts file and installed the nuget-package jquery.TypeScript.DefinitelyTyped. Here is a snippet of the app.ts code: /// <reference path="typings/jquery/jquery.d.ts"/> cl ...

Need to know how to show a DIV for a specific time frame using Javascript?

One of the features I am looking to implement in my Django application is a display that notifies the user when one of their posts is about to start. The idea is to show a DIV element with the message: <div>“Will start soon”</div>, 15 minut ...

using javascript to change a link's state with a click action

I have a question that is related to the topic discussed here: Making a link stay active displaying hover effect upon click using javascript. I am looking for a way to disable the active class when the same link is clicked again. Any assistance on this mat ...

Occasionally, the function XMLHttpRequest() performs as expected while other times it may

I've encountered an issue with my XMLHttpRequest() function where it randomly works in both Chrome and IE. The function is triggered by On-click, but I'm having trouble catching the error. The only information I could gather is that readystate = ...

Jumping Iframe Anchor Link in Src

I'm facing a challenge with an iframe positioned in the center of a webpage. I want to show content from another page within the iframe, but not starting at the very top. To achieve this, I inserted an anchor into the src of my iframe, linked to an a ...

Exploring the potential of TypeScript with native dynamic ES2020 modules, all without the need for Node.js, while also enhancing

I have a TypeScript application that utilizes es16 modules, with most being statically imported. I am now looking to incorporate a (validator) module that is only imported in debug mode. Everything seems to be functioning properly, but I am struggling to f ...

Problem with ng-include in ng-view templates

Directory Layout: --app --partials --navbar.html --submit --submission.html index.html Using ng-include in submission.html: <ng-include src="'app/partials/navbar.html'" ></ng-include> However, the navbar does not displa ...

Continuous Div Carousel with Looping in jQuery

I have developed a basic carousel to cycle through a series of divs, but I am facing some issues. I want the carousel to loop continuously so that after reaching "slide 07," it goes back to "slide 01." Unlike using a carousel plugin, I prefer having an ove ...

Cannot transfer variables from asynchronous Node.js files to other Node.js files

Is there a way to export variable output to another Node.js file, even though the asynchronous nature of fs read function is causing issues? I seem to be stuck as I am only getting 'undefined' as the output. Can someone help me identify where I ...

The function `req.on('end', callback)` in Node.js is not functioning as expected

I am currently working on building a proxy using nodejs. The syntax I am using for sending and receiving https requests and responses works well, but in my project, I have noticed that the response is sometimes larger than expected. This results in req.on( ...

The <input> element is not functioning as expected when attempting to use it as a button. I am struggling to find the solution to

My HTML file contains: <!Doctype HTML> <html> <head> <title>Orange’s Game</title> <meta charset="utf-8"> <script src="game.js”></script> </head> <body> <input type="button ...