One advantage of using RxJS is the ability to nest streams as deeply as needed. This means that you can create a stream to enrich a single object and nest multiple streams to enrich an entire array.
For example, if we have a stream that enriches a single object and prints it to the console:
const oneObject = getObject();
forkJoin({
firstResult: this.myService.checkFirst(oneObject.id),
secondResult: this.myService.checkSecond(oneObject.id)
}).pipe(
map(({firstResult, secondResult}) => {
oneObject.first = firstResult;
oneObject.second = secondResult;
return oneObject;
})
).subscribe(
console.log
);
If the oneObject
is itself returned from an observable, we can merge or switch our object into the same stream created above:
this.myService.getOneObject().pipe(
mergeMap(oneObject =>
forkJoin({
firstResult: this.myService.checkFirst(oneObject.id),
secondResult: this.myService.checkSecond(oneObject.id)
}).pipe(
map(({firstResult, secondResult}) => {
oneObject.first = firstResult;
oneObject.second = secondResult;
return oneObject;
})
)
)
).subscribe(
console.log
);
To extend this functionality to an entire array of objects, we can utilize the forkJoin operator to run an array of observables concurrently:
this.myService.getAll().pipe(
map(allRes =>
allRes.result.map(m =>
forkJoin({
first: this.myService.checkFirst(m.id),
second: this.myService.checkSecond(m.id)
}).pipe(
map(({first, second}) => {
m.first = first;
m.second = second;
return m;
})
)
)
),
mergeMap(mArr => forkJoin(mArr)),
).subscribe(resultArr => {
// Log the enriched results for the first object in the array.
console.log(resultArr[0].first, resultArr[0].second)
});
We can simplify the previous solution by combining the map and mergeMap into a single mergeMap:
this.myService.getAll().pipe(
mergeMap(allRes =>
forkJoin(allRes.result.map(m =>
forkJoin({
first: this.myService.checkFirst(m.id),
second: this.myService.checkSecond(m.id)
}).pipe(
map(({first, second}) => {
m.first = first;
m.second = second;
return m;
})
)
))
)
).subscribe(console.log);
If there are concerns about the completion of checkFirst
and checkSecond
, consider using zip instead of forkJoin and unsubscribe with take(1) or first():
this.myService.getAll().pipe(
mergeMap(allRes =>
forkJoin(allRes.result.map(m =>
zip(
this.myService.checkFirst(m.id),
this.myService.checkSecond(m.id)
).pipe(
first(),
map(([first, second]) => {
m.first = first;
m.second = second;
return m;
})
)
))
)
).subscribe(console.log);