As a newcomer to Rx, I am struggling to locate documentation on how to compose promises where the data from the first promise is passed into the second one and so forth. Here are three basic promises with asynchronous operations involving previous data:
const p1 = () => Promise.resolve(1);
const p2 = x => {const val = x + 1; return Promise.resolve(val);};
const p3 = x => {
const isEven = x => x % 2 === 0;
return Promise.resolve(isEven(x));
};
The traditional method to achieve this composition is by chaining promises like this:
pl().then(p2).then(p3).then(console.log);
A preferred implementation for me is using Ramda's composeP and pipeP:
R.pipeP(p1, p2, p3, console.log)()
While it seems likely that Rx could handle such situations smoothly, the closest example I've found so far involves RxJS and async operations:
var Rx = require('rx'),
fs = require('fs'),
path = require('path');
var file = path.join(__dirname, 'file.txt'),
dest = path.join(__dirname, 'file1.txt'),
exists = Rx.Observable.fromCallback(fs.exists),
rename = Rx.Observable.fromNodeCallback(fs.rename),
stat = Rx.Observable.fromNodeCallback(fs.stat);
exists(file)
.concatMap(function (flag) {
return flag ?
rename(file, dest) :
Rx.Observable.throw(new Error('File does not exist.'));
})
.concatMap(function () {
return stat(dest);
})
.forEach(
function (fsStat) {
console.log(JSON.stringify(fsStat));
},
function (err) {
console.log(err);
}
);
Although concatMap appears promising, the complexity of the above code is notable. I encountered issues in my own example as well because Rx.Observable.fromPromise(p1) expects a promise instead of a function, and Rx.Observable.defer(p1) doesn't seem to handle parameters properly.
Thank you for any insights!
For a related question without data passing, check out: Chaining promises with RxJS