How to write below code using MergeMap or FlatMap or some better way with rxJs-operators?

uma

I have two observable pipes. I need to run one after the other and compare two values equal or not. I tried the below code.This should work, when the first observable value emitted , it should go and take second obserbla value and should comapre it first return value.I need to some expert help , to refator this code better way.

   this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
          (res: UnitDetail) =>{
              if(res.unitTwo){
                this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub)).subscribe(
                  (unitId: string) => {
                    if(unitId ===  res.unitTwo){
                      this.sameUnit = true;
                    }else{
                      this.sameUnit = false;
                    }
                  });
              }
          }
       );
Michael D

You don't need higher order operators since the observables this.selectedUnitDetailModel$ and this.appStore.select(selectUnit) are independent of each other. Instead you could use functions like forkJoin, combineLatest or zip to get the notifications from them in parallel.

You could find difference b/n these functions here.

Try the following

forkJoin(
  this.selectedUnitDetailModel$.pipe(take(1)),      // <-- complete on first emission
  this.appStore.select(selectUnit).pipe(take(1))    // <-- complete on first emission
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

forkJoin only emits when the source observables complete, so I've piped in take(1) to each observable. The forkJoin will now emit on the first emission of each observable and complete. So the need for your shareReplayUntil(this.destroySub) is mitigated.

However, if you need to keep the emission stream from the observables open, you could use combineLatest or zip instead. In this case, you could replace the take(1) with your ``shareReplayUntil(this.destroySub)`.

Update: continuous stream of this.selectedUnitDetailModel$ observable

Like I said before, you could use combineLatest instead of forkJoin to enable a continuous stream of data.

Try the following

import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

combineLatest(
  this.selectedUnitDetailModel$,
  this.appStore.select(selectUnit)
).pipe(
  takeUntil(this.destroySub)         // <-- replaced with `takeUntil` operator
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related