Why I m still getting data from my webflux spring boot even if I unsubscribe?

Mohammed Housseyn Taleb

I'm learning about the reactive web, for a tutorial I wanted to get some twitter hashtag result search from my spring webflux rest service to an angular 6 client.

When hitting my localhost:8081/search/test in chrome, I m getting the tweets in a json format in a reactive way (tweet by tweet and browser is showing every coming one).

so for more pleasure, I made a small angular search input and I would show in console tweets

the issue is when I search for java tag I will get console logs then if I try to search for spring tag I will have spring tweets logged in the console and the java ones are still coming

I did some research and I found that I should unsubscribe my consumer from the flux.

I tried to implement this but no success

Here is What I tried

Spring WebFlux Controller

private TwitHashLocalService localService;
    private TwitHashRemoteService remoteService;

    @GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE) 
    public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
        return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
    }

My services

local mongo db

private MongoService mongoService;

    public Flux<Tweet> findByTag(String tag) {

        return mongoService.findByTag(tag);
    }

remote twitter stream flux

public Flux<Tweet> findByTag(String hashtag) throws TwitterException {

    return Flux.create(sink -> {
        TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
        twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
        twitterStream.onException(sink::error);
        twitterStream.filter(hashtag);
        sink.onCancel(twitterStream::shutdown);
    }); 

}

ANGULAR

My reactive twitter search service

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';


@Injectable({
    providedIn: 'root'
})
export class ReactiveTwitterService {

    myTweets: ITweet[] = new Array();

    tweetTag: string;

    baseUrl = 'http://localhost:8081/search';

    constructor(private http_client: HttpClient) { }

    getTweetStream(tag): Observable<Array<ITweet>> {

        this.myTweets = [];
        const url = this.baseUrl + '/' + tag;

        return Observable.create(observer => {
            const eventSource = new EventSource(url);
            eventSource.onmessage = (event) => {
                console.log('received event');
                const json = JSON.parse(event.data);
                console.log(json);
                console.log(json.tweetData.name, json.tweetData.text, json.tag);
                this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
                observer.next(this.myTweets);
            };
            eventSource.onerror = (error) => {
                // readyState === 0 (closed) means the remote source closed the connection,
                // so we can safely treat it as a normal situation. Another way of detecting the end of the stream
                // is to insert a special element in the stream of events, which the client can identify as the last one.
                if (eventSource.readyState === 0) {
                    console.log('The stream has been closed by the server.');
                    eventSource.close();
                    observer.complete();
                } else {
                    observer.error('EventSource error: ' + error);
                }
            };
        });   
    }   
}

component search bar

import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';

@Component({
    selector: 'app-serach-bar',
    templateUrl: './serach-bar.component.html',
    styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
    innerWidth: number;


    subscription: Subscription = new Subscription();
    placeholder = 'search';

    styleClass = {
        wide_screen: 'w3-input w3-light-grey',
        break_point: 'w3-input w3-white'
    };

    tweets: Observable<ITweet[]>;

    constructor(private twiterService: ReactiveTwitterService) { }

    doSearch(tag) {
        console.log('test' + tag);
        this.subscription.unsubscribe();
        this.tweets = this.twiterService.getTweetStream(tag);
        this.subscription.add(this.tweets.subscribe());
    }


    ngOnInit() {
    }

    @HostListener('window:resize', ['$event'])
    onResize(event) {
        this.innerWidth = window.innerWidth;
    }

    getStyle() {
        return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
    }
}

As you see in the search I m trying to unsubscribe before researching but this is not working

What should I do?

Oleh Dokuka

1) Make sure the cancelation is propagated in RxJs

From the code I see, there is no cancelation handler on the Angular side.

Try to do the following, to react on unsubscribe at JS side:

Observable.create(observer => {
        const eventSource = new EventSource(url);

        // your code here 

        return () => eventSource.close(); // on cancel function
    }); 

2) Add .log()ing

I would recommend adding additional logging to the Reactor's pipe, so it will be clear what signals are propagated through the pipe. Use .log() operator for that purpose.

3) Make sure the EventSource is really closed on the browser side.

Use your browsers debug console to observe all opened connections to the server. Make sure, that after changing tag/ cleaning search request the connection is closed

4) Remember about eventual consistency

All propagated events through the reactive pipe are an async and non-blocking, so there could be some delay between actual action and final cancelation on the server side

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

why i am getting "no data found" error in oracle live even if i have the data that satisfying my conditions?

In Spring OAuth2, why is my resource still unprotected even though I have marked it in my resource server?

Why am I getting this error message even after transforming my data set into a ts file for time series analysis?

I'm getting error 404 while trying to access my spring boot app on Amazon Elastic Bean Stalk

Why am I getting a validation error even though I'm providing data?

Implementation in Spring Webflux "works", but I'm trying to understand "why?"

Why I'm getting "TypeError: this.props.login is not a function" even though my login is a function

Why I am I getting additional data as response back in my ajax call from my server?

Why I'm getting junk data in the File?

Why i'm getting OutOfMemoryException even if i release/dispose the Bitmap?

Why is my scp rejected from port 22 even though I'm connecting on port 25000?

I'm only assigning one value to my final variable but still I'm getting an error

Why do I'm getting these results from my regex matcher?

Why I' m Getting Below SpringIntegration Exception while running my application with spring boot

i'm getting error whenEver i try to get data from my db using this function(i'm using kotlin with android studio)

Why am I still getting a InputMissmatchException even though i have a catch statement

I'm getting JSON data in spring boot and trying to get it on a html page via angular

I'm getting data from my sql DB and How to remove Backslashes from JSON data?

I still got ImmutableMultiDict([]) from request.files even if I add the name form my imput file

Why my state is undefined even though I'm setting it

Why I'm not getting data from the job page on Scrapy?

Why is my axios post request not sending data? I'm getting a response that means the fields are empty

why I am still getting firestore data from production when performing testing using emulator?

How is the xData is still getting allocated again even though I'm freeing here?

I tried to do unsubscribe from an observable in angular But I am getting error like unsubscribe does not exists

Why I'm getting UTC(+00:00) even I'm runnning my aws lambda at ap-northeast-2 region?

Why can I still pull firebase collection data even if I modify the API key in firebase config?

Why am I still getting an invalid credential even if I enter the correct credential?

I'm getting an object even though I'm returning an array from my javascript function

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  3. 3

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  4. 4

    pump.io port in URL

  5. 5

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    Do Idle Snowflake Connections Use Cloud Services Credits?

  9. 9

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

  10. 10

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  11. 11

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  12. 12

    Generate random UUIDv4 with Elm

  13. 13

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  14. 14

    Is it possible to Redo commits removed by GitHub Desktop's Undo on a Mac?

  15. 15

    flutter: dropdown item programmatically unselect problem

  16. 16

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  17. 17

    EXCEL: Find sum of values in one column with criteria from other column

  18. 18

    Pandas - check if dataframe has negative value in any column

  19. 19

    How to use merge windows unallocated space into Ubuntu using GParted?

  20. 20

    Make a B+ Tree concurrent thread safe

  21. 21

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

HotTag

Archive