How to use rxjs buffer with takeWhile

Node Mario

I am working on webrtc. The application sends icecandidates to backend firestore server. The problem is the call to signaling server is made multiple times as onicecandidate is triggered multiple time. I want collect all the icecandidates and make a single call to signaling server. The idea is to buffer all the events untill iceGathering is finished. This below attempt does not work

this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> =  fromEvent(this.pc, 'icecandidate');
const takeWhile$ = source
        .pipe(
            takeWhile(val=> val.currentTarget.iceGatheringState === 'gathering'
    ))
const buff = source.pipe(buffer(takeWhile$));
    buff.subscribe(() => {
        // this.pc.onicecandidate = onicecandidateCallback;
    })
bharat1226

Method 1:

You are almost there.

The takeWhile$ takes values and emits them while condition is met. So in buff, whenever takeWhile$ emits a value, buff emits a buffer of icecandidate events.

So you only need to emit one value in takeWhile$.

So what you need is takeLast() operator to only emit the last value.

When you put takeLast(1) in takeWhile$, it only emits last value and in buff, last emitted value leads to creation of buffer of icecandidate events.

this.pc = new RTCPeerConnection(iceServers);

const source: Observable<any> = fromEvent(this.pc, "icecandidate");

const takeWhile$ = source.pipe(
  takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
  takeLast(1)
);

const buff = source.pipe(buffer(takeWhile$));

buff.subscribe((bufferValues) => {

   // bufferValues has a buffer of icecandidate events

  // this.pc.onicecandidate = onicecandidateCallback;
});

You'll have access to buffer of icecandidate events in the subscription as bufferValues in above code.

Method 2:

You can also use reduce operator to achieve same scenario

this.pc = new RTCPeerConnection(iceServers);

const source: Observable<any> = fromEvent(this.pc, "icecandidate");

const takeWhile$ = source.pipe(
  takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
  reduce((acc, val) => [...acc,val], [])
);

takeWhile$.subscribe((bufferValues) => {

  // bufferValues has a buffer of icecandidate events

 // this.pc.onicecandidate = onicecandidateCallback;
})

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related