How do I replace subscrib in a subscribe in Angular RxJS?

Léandre

I have a question about RxJS. I am creating a web app to manage the members of an association. I want to create a button to "reset" the database of a site. The steps are as follows:

  • send an e-mail to all members to re-register
  • Delete data
  • Refresh page

Here's the code I've made, which works, but I know there are a few things wrong. I'm new to RxJS, so I don't quite understand all the principles...

newYear(){
    this.amicalisteService.getAmicalistesValides("").subscribe({
      // Envoyer un mail à l'ensemble des amicalistes valides
      next: amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          this.amicalisteService.sendMail(null, to, cc, subject, body).subscribe({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }
          });
        }
      },
      complete: () => {
        // Supprimer amicalistes, photos et cartes amicalistes
        this.amicalisteService.getAmicalistes().subscribe(amicalistes  => {
          for(const amicaliste of amicalistes){
            this.amicalisteService.deleteAmicalisteById(amicaliste.id).subscribe();
          }
        });
        this.imageService.getImages("amicaliste").subscribe(images => {
          for(const image of images){
            this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())).subscribe();
          }
        });
        this.imageService.getImages("pdf").subscribe(pdfs => {
          for(const pdf of pdfs){
            this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())).subscribe();
          }
        })
      }
      //Refresh...
    })
  }

I've heard it's not good practice to use subscribe() inside subscribe(), but I can't figure out how to do it differently. There are several things I'd like to keep in this code, however. In the complete, the 3 subscribe() run in parallel, if I'm not mistaken. I'd like to keep that. Otherwise, I understand that using a switchMap could help me, but I can't seem to implement it. Can anyone give me some advice?

Thanks you very much !

Picci

Observables are streams of events.

Remote calls (e.g. calling a server to send mails or calling a db to clean up some data) are implemented as streams of events that notify just one event (i.e. the response of the remote call) and then complete or just `error○s.

With RxJs operators you can combine such streams. For instance you do the following:

  • you start with one stream, Stream_A, that emits event_A and then completes
  • you can have a second stream, Stream_B, that emits event_B and then completes
  • and then you combine Stream_A and Stream_B to create a third stream, Stream_A_B that first triggers the execution of Stream_A and emits event_A and, as soon as event_A has been notified, triggers the execution of Stream_B and emits all the events notified by Stream_B, which in this case is just event_B
  • In order to create this combined stream in RxJs we use the oprator concatMap (note: often people use switchMap to concatenate streams - often the result is the same but the meaning and the potential behaviors are slightly different - with sequences of calls to remote services which have to occur one after the other, concatMap is usually the preferred approach)

Another example of combination of more streams to obtain a new stream is the following:

  • There are 2 streams, Stream_1 Stream_2 and Stream_3. Each of these streams emits one value and then completes.
  • We can combine these 3 streams that waits for all 3 streams to emit and complete and then emits only one value, which is the array of all values emitted by the streams, and then complete.
  • With RxJs such new combined stream is obtained with the function forkJoin

Havin said that, with the hope to cast some clarity on RxJs and Observables, here is what I would do in your case

newYear(){
    // assume getAmicalistesValides returns an Observable that emits the result
    // of a remote call
    this.amicalisteService.getAmicalistesValides("")
    // to combine Observables we need to "pipe" operators, i.e. to execute
    // operators one after the other
    .pipe(
      // first thing to do here seems to send an email for each amicaliste
      // assuming we want to send the emails all in parallel, we can first
      // create one Observable for each mail to be sent and then use forkJoin
      // to execute them all in parallel
      // But all this has to happen after the Observable returned by getAmicalistesValides
      // has emitted its value, hence we use concatMap
      concatMap(amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          // here we create the array of Observables
          const sendMailObs = this.amicalisteService.sendMail(null, to, cc, subject, body)
          // each of these Observables can print something or react to errors
          .pipe(tap({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }))
          });
          // now we trigger the concurrent execution of all sendMail observables
          return forkJoin(sendMailObs)
      }),
      // after having sent the mails you want to do more stuff: delete data, images
      // and so on - assume each of these operations is an Observable
      // you will have to use concatMap and within it create the new Observables
      // and trigger them in parallel using forkJoin, as above
      concatMap(mailSentResults => {
         const deleteDataObs = ....
         const deleteImagesObs = ...
         ...
         return forkJoin([deleteDataObs, deleteImagesObs, // maybe other Obsevables])
      })
    )
    // up to here you have created a new stream, composing various other streams
    // and now is the time to subscribe to this new stream, which is the only stream 
    // you want to explicitely subscribe
    .subscribe({
      next: res => ... // manage the value notified by upstream
      error: err => ... // manage error
      complete: () => ... // do something when all is completed, if required
    })
  }

I hope I have understood your case and all this makes some sense

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How can I subscribe or merge new Observable in RXJS and Angular

angular RXJS how to cast subscribe response type

How to use subscribe-next RxJS in Angular?

How do I unit test with a subscribe in Angular5?

How do I subscribe to changes in an angular model on an observable?

How do I add a delay inside the next block of subscribe in Angular?

How do i fill a angular variable in Ionic after a subscribe?

How to subscribe multiple observable in rxjs 6 ? do is not a function in rxjs 6

How do I subscribe to a bug?

How do I catch an error at two different points in RxJS/Angular?

Angular: How do I Rxjs filter by property through an array of objects?

How to return value from subscribe of forkJoin with angular 6 and RxJs 6?

angular2 / RxJS - how to retry from inside subscribe()

How to chain API calls in rxJs subscribe and map with Angular?

How to manage multiple sequential subscribe methods in Angular and RxJs?

How to reactively subscribe to URL change and get URL in Angular RxJS?

Angular typeahead with RxJs switchMap subscribe

subscribe keys of and observable in Angular/RxJS

Difference between do and subscribe in rxjs

Simple subscribe within the resolver in Angular 11. How to wait until subscribe is complete so I can do a comparison of data

How to replace flatMap and mergeMap in rxjs 6.4.0 Angular

How to refactor a subscribe inside a subscribe in rxjs

How do I subscribe to security advisory notifications?

How do I subscribe to a tag in launchpad?

How do I subscribe to @computed object in mobx?

How do I subscribe to events in PencilBlue?

How to subscribe to date chage with Rxjs

How do I replace ", ," with ","

Angular and rxjs - do I need connect or refCount?