Using one observable to synchronize another observable

Alex

I want to synchronize an observable using another one used as clock, below is reported an example.

Main:     ---------abc----------------------------------
Clock:    -x-----x-----x-----x-----x-----x-----x-----x--
Expected: -------------a-----b-----c--------------------

I tried to achieve this synchronization using the Zip method, similar to an example described in RX documentation (http://reactivex.io/documentation/operators/zip.html):

mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)

the problem is that when I tested this implementation, it didn't work. Below there is the test I wrote to check the expected behaviour:

scheduler = new TestScheduler();


var mainValues = scheduler.CreateHotObservable(
    new Recorded<Notification<char>>(100, Notification.CreateOnNext('a')),
    new Recorded<Notification<char>>(101, Notification.CreateOnNext('b')),
    new Recorded<Notification<char>>(102, Notification.CreateOnNext('c')),
    new Recorded<Notification<char>>(103, Notification.CreateOnNext('d')),
    new Recorded<Notification<char>>(104, Notification.CreateOnNext('e')),
    new Recorded<Notification<char>>(105, Notification.CreateOnNext('f')),
    new Recorded<Notification<char>>(106, Notification.CreateOnNext('g')),
    new Recorded<Notification<char>>(107, Notification.CreateOnCompleted<char>()));

var clockValues = scheduler.CreateHotObservable(
    new Recorded<Notification<long>>(70, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(90, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(110, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(130, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(150, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(170, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(190, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(210, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(230, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(250, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(270, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(290, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(310, Notification.CreateOnCompleted<long>()));


var res = scheduler.Start(() => mainValues.Zip(clockValues, (mainValue, clockValue) => mainValue), 0, 70, long.MaxValue);

And here below the expected values and what I really got (described as comment):

res.Messages.AssertEqual(
    OnNext(110, 'a'), // Expected: 110, a - Actual: 100, a
    OnNext(130, 'b'), // Expected: 130, b - Actual: 110, b
    OnNext(150, 'c'), // Expected: 150, c - Actual: 130, c
    OnNext(170, 'd'), // Expected: 170, d - Actual: 150, d
    OnNext(190, 'e'), // Expected: 190, e - Actual: 170, e
    OnNext(210, 'f'), // Expected: 210, f - Actual: 190, f
    OnNext(230, 'g'));// Expected: 230, g - Actual: 210, g

What is the problem? Is it correct use Zip to synchronize two observable? Do I use the TestScheduler incorrectly?

Enigmativity

Give this a go:

var query =
    Observable
        .Create<char?>(o =>
        {
            IDisposable inner = null;
            IDisposable subscription = 
                mainValues
                    .Publish(mvs =>
                    {
                        var q = new System.Collections.Generic.Queue<char>();
                        inner = mvs.Subscribe(mv => q.Enqueue(mv));
                        return clockValues.Select(x => q.Count > 0 ? q.Dequeue() : (char?)null);
                    })
                    .Subscribe(o);
            return new CompositeDisposable(inner, subscription);
        });

query.Subscribe(x => Console.WriteLine(x));
scheduler.Start();

Let me know if that works the way you want it to. If it does I'll pop in some explanation.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Transform data from one observable using another observable in angular rxjs

RxJava + Kotlin - Using one observable in the creation of another

Replacing values in Observable using another Observable

Modify one observable based on another

Convert Observable of one type to another

Is it correct to call one observable in another?

Rxjs One Observable Feeding into Another

What is the proper method for using the output of one observable as the input to another in angular

Javascript: Synchronize observable events

Using data from another observable

RxJava - Control one Observable by another one

Retrieve values from one observable when an event happens in another observable

Use data from one observable in another and then return result as other observable

How to use the state of one observable to skip values of another observable?

firing custom extender of one observable when another computed observable is changed

How to use one observable values in another?

Angular 2: merging one observable "inside" another

Run observable one after another in angular

How to produce one Observable from another

Execute Observable one after another with async pipe

How to subscribe to observable one after another

RxJs: Filter content in observable using emission of another observable

Synchronously process an Observable dependent on another Observable using NGRX & Angular

How to populate observable with another observable and return observable

How to filter one observable stream using another stream's element attribute as criteria using RxJava?

Invoking observable from another observable

Wait observable inside another observable

Multiple observable events for one observable

Practical Example using computed observable with another context