Rxjs in nestjs - Observable subscription error

Joey587

I am newbie trying out rxjs and nestjs. The use case that I am currently trying to accomplish is for educational purpose. So I wanted to read a json file (throw an observable error in case of the file being empty or cannot be read) using the "fs" module. Now I create an observable by reading the file asynchronously, set the observer in the subject and then subscribe to the subject in the controller. Here is my code in the service

@Injectable()
export class NewProviderService {
    private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
    // this is the variable that should be exposed. make the subject as private
    // this allows the service to be the sole propertier to modify the stream and
    // not the controller or components
    serviceSubject$:  Observable<HttpResponseModel[]>;

    private serviceErrorSubject: BehaviorSubject<any>;
    serviceErrorSubject$: Observable<any>;
    filePath: string;
    httpResponseObjectArray: HttpResponseModel[];
    constructor() {
        this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
        this.serviceSubject$ = this.serviceSubject.asObservable();

        this.serviceErrorSubject = new BehaviorSubject<any>(null);
        this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();

        this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
    }
     readFileFromJson() {
          return new Promise((resolve, reject) => {
            fs.exists(this.filePath.toString(), exists => {
                if (exists) {
                    fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
                        if (err) {
                            logger.info('error in reading file', err);
                            return reject('Error in reading the file' + err.message);
                        }

                        logger.info('file read without parsing fg', data.length);
                        if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
                            // this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
                            // logger.info('array obj is:', this.httpResponseObjectArray);
                            logger.info('file read after parsing new', JSON.parse(data));
                            return resolve(JSON.parse(data).HttpTestResponse);
                        } else {
                            return reject(new FileExceptionHandler('no data in file'));
                        }
                    });
                } else {
                    return reject(new FileExceptionHandler('file cannot be read at the moment'));
                }
            });
          });
    }


        getData() {
                 from(this.readFileFromJson()).pipe(map(data => {
                        logger.info('data in obs', data);
                        this.httpResponseObjectArray = data as HttpResponseModel[];
                        return this.httpResponseObjectArray;
                 }), catchError(error => {
                     return Observable.throw(error);
                 }))
                 .subscribe(actualData => {
                    this.serviceSubject.next(actualData);
                }, err => {
                    logger.info('err in sub', typeof err, err);
                    this.serviceErrorSubject.next(err);
                });
            }

Now this is the controller class

@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {

  await this.newService.getData();
  this.newService.serviceSubject$.subscribe(data => {
    logger.info('data subscribed', data, _.isEmpty(data));
    if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
      logger.info('coming in');
      res.status(HttpStatus.OK).send(data);
      res.end();
    }
  });
}

The problem I face is that I can get the file details for the first time and the subscription is getting called once > its working fine. On the subsequent requests

Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
    at ServerResponse.setHeader (_http_outgoing.js:470:11)
    at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
    at Ser

and the endpoint /getJsonData results in an error. Could someone help me out. i believe the subscription is not getting properly after the first call, but not sure how to end that and how to resolve that

Kim Kern

The problem is that you're subscribing to your serviceSubject in your controller. Every time a new value is emitted, it will try to send the response. This works the first time, but the second time it will tell you it can't send the same response again; the request has already been handled.

You can use the pipeable first() operator to complete the Observable after the first value:

@Get('/getJsonData')
public async getJsonData() {
  await this.newService.getData();
  return this.newService.serviceSubject$.pipe(first())
}

You want your Observable to be shared (hot), so that every subscriber always gets the same, latest value. That's exactly what a BehaviourSubject does. So you should not convert your Subject to an Observable when you expose it publicly because you will lose this desired behavior. Instead, you can just cast your Subject to Observable, so that internally it is still a subject but it will not expose the next() method to emit new values publicly:

private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
   return this.serviceSubject;
}

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

RxJS Observable: Subscription lost?

Rxjs Sharing an observable subscription

RxJS Keep Subject/Observable Subscription Active After Error

rxjs - stop observable with a subject subscription

Return a Observable from a Subscription with RxJS

Sending an error to the subscription RXJS

Angular listen on subscription start of rxjs observable?

Error constructing an rxjs/Observable

Type 'Observable<Subscription>' is not assignable to type Observable<MyData> rxjs angular

how to create a new observable subscription based on existing observable in rxjs?

Angular RxJS Observable: takeUntil vs. unsubscribe with a Subscription

How to do initialization logic on an observable pipe once per subscription in rxjs

RxJS Observable: performing cleanup when the last subscription is disposed?

Angular rxjs: Delay subscription till other Observable emits

Error rxjs_Observable__.Observable.forkJoin is not a function?

RxJS typing error missing Observable

throw error inside rxjs Observable

Angular RxJS Observable loading error

Why does Rxjs unsubscribe on error in the subscription callback?

Angular rxjs: keep subscription after error reached

RxJs not switchMap if an error occured in the top level subscription

Observable continues to retry subscription after catching error

Subscription to Observable

Angular/RxJS type error when mapping observable

'body' property missing on rxjs observable catch error

Specify type of rxjs observable's "error" handler

How to call both next and error on RxJS Observable

Rxjs error type 'Observable<HttpEvent<any>>'

RxJS Error Observable when another emits value