I have been attempting to design an observable that generates values from various asynchronous actions, particularly HTTP requests from a Jenkins server, which will notify a subscriber once all the actions are finished. However, it seems like there might be a misunderstanding in my approach as it's not working as intended.
'use strict';
let Rx = require('rx');
let _ = require('lodash');
let values = [
{'id': 1, 'status': true},
{'id': 2, 'status': true},
{'id': 3, 'status': true}
];
function createValuesObservable() {
return Rx.Observable.create(function(observer) {
_.map(values, function(value) {
var millisecondsToWait = 1000;
setTimeout(function() {
console.log("Sending value: ", value);
observer.onNext(value)
}, millisecondsToWait);
});
console.log("createValuesObservable Sending onCompleted");
observer.onCompleted()
});
}
let observer = Rx.Observer.create((data) => {
console.log("Received Data: ", data);
}, (error) => {
console.log("Error: ", error);
}, () => {
console.log("DONE!");
});
createValuesObservable().subscribe(observer);
Upon running this code snippet, the output I receive is:
createValuesObservable Sending onCompleted
DONE!
Sending value: { id: 1, status: true }
Sending value: { id: 2, status: true }
Sending value: { id: 3, status: true }
My expected outcome would be:
Sending value: { id: 1, status: true }
Received Data: { id: 1, status: true }
Sending value: { id: 2, status: true }
Received Data: { id: 2, status: true }
Sending value: { id: 3, status: true }
Received Data: { id: 3, status: true }
createValuesObservable Sending onCompleted
DONE!
The issue at hand is the asynchronous nature of JavaScript, leading to the timeout function being fired before observer.onCompleted()
, causing unexpected behavior. The observer disposes itself upon receiving the onCompleted event, resulting in missed onNext notifications when async actions complete.
To tackle this problem, I implemented a partial solution using a timeout within the observable function:
function createValuesObservable() {
return Rx.Observable.create(function(observer) {
let observableTimeout = 10000;
setTimeout(function() {
console.log("createValuesObservable Sending onCompleted");
observer.onCompleted();
}, observableTimeout);
_.map(values, function(value) {
let millisecondsToWait = 1000;
setTimeout(function() {
console.log("Sending value: ", value);
observer.onNext(value)
}, millisecondsToWait);
});
});
}
This modification allows for proper ordering of information (data followed by completion), but introduces challenges such as potential data loss or extended wait times depending on the chosen timeout duration. Is this a fundamental issue in asynchronous programming that requires acceptance?