I made changes to the code like this:
var totalChildren = 0, totalParents = 0;
function emits ( person ) {
return function ( item ) {console.log(person + " releases : " + item);};
}
function checkTotal ( person ) {
return function ( ) {
if (person === "parent") {
totalParents++;
}
else {
totalChildren++;
}
console.log("Checking : Total parents = " + totalParents + ", Total children = " + totalChildren );
};
}
function verify ( person, location ) {
return function ( data ) {
console.log("Verification : " + person + " : " + location + " :" + data);
};
}
function finish ( person ) {
return function () { console.log(person + " finished!");};
}
function combine ( person ) {
return function ( data ) { console.log('combined ' + person + ' ' + data); };
}
function additionBy1 ( number ) {
return number + 1;
}
function error () {
console.log('error');
}
var mom = Rx.Observable.from([1, 2, 3, 4, 5, 6])
.do(emits("parent"))
.publish();
var child = mom
.map(function ( x ) {return x;})
.do(emits("child"))
// .publish();
var groupedMom = mom
.groupBy(function ( x ) { return x % 2;}, function ( x ) {return "P" + x;})
.do(checkTotal("parent"))
.share();
var groupedChild = child
.groupBy(function ( x ) { return x % 3;}, function (x) {return "C" + x;})
.do(checkTotal("child"))
.share();
Rx.Observable.zip([groupedChild, groupedMom])
// .do(function ( x ) { console.log("zip args : " + x);})
.subscribe(function ( groups ) {
groups[0]
.do(function ( x ) { console.log("Child group emitted value : " + x);})
.subscribe(combine('child'), error, finish('Child Group'));
groups[1]
.do(function ( x ) { console.log("Parent group emitted value : " + x);})
.subscribe(combine('parent'), error, finish('Parent Group'));
}, error, finish('zip'));
//child.connect();
mom.connect();
This is the result :
"parent releases : 1"
"child releases : 1"
"Checking : Total parents = 0, Total children = 1"
"Checking : Total parents = 1, Total children = 1"
"Parent group emitted value : P1"
"combined parent P1"
"parent releases : 2"
"child releases : 2"
"Checking : Total parents = 1, Total children = 2"
"Checking : Total parents = 2, Total children = 2"
"Parent group emitted value : P2"
"combined parent P2"
"parent releases : 3"
"child releases : 3"
"Checking : Total parents = 2, Total children = 3"
"Parent group emitted value : P3"
"combined parent P3"
"parent releases : 4"
"child releases : 4"
"Child group emitted value : C4"
"combined child C4"
"Parent group emitted value : P4"
"combined parent P4"
"parent releases : 5"
"child releases : 5"
"Child group emitted value : C5"
"combined child C5"
"Parent group emitted value : P5"
"combined parent P5"
"parent releases : 6"
"child releases : 6"
"Parent group emitted value : P6"
"combined parent P6"
"Child Group finished!"
"Child Group finished!"
"Parent Group finished!"
"Parent Group finished!"
"zip finished!"
There are two key points to keep in mind:
Differences between zip and grouping compared to subscription time
- Grouping by creates observable items as expected for both parent and child.
In the logs, you will see that Child
generates three groups, while Parent
generates two
Zip waits until there is one value from each source specified in the parameters to subscribe. In this scenario, it means subscribing to child and parent grouped-by observables only when they have both issued values. You will observe
"Parent group emitted value : P1"
only after matching numbers on "Checking : Total parents = 1, Total children = 1"
.
You then subscribe to both grouped-by observables and log whatever comes out of them. The issue arises because the parent grouped-by observable has a value to pass on, BUT the child 'group-by' observable was created before and already passed its value, so you miss seeing that value when you subscribe late. Instead, you will see the subsequent values.
Hence, values in [1-3]
create three new child grouped-by observables that you won't see, as you subscribe too late. However, you will see values in [4-6]
. This can be confirmed in the log with entries like "combined child C4"
.
All values in the parent grouped-by observables will be visible as you subscribe to them immediately following their creation.
connect and publish
The understanding of connect and publish may not be entirely clear, but since your child has the parent as a source, delaying connection to it is unnecessary. Connecting to the parent triggers the child to automatically emit its values, hence the modification I made to your code.
This addresses your current inquiry but may not align with your original goal of achieving a cartesian product. If necessary, consider rephrasing your intention as a question to explore alternative solutions provided by others.