Splitting a WebClient Post of a Streaming Flux into JSON Arrays

lafual :

I am using a third-party REST controller which accepts an array of JSON objects and returns a single object response. When I POST from a WebClient using a limited Flux the code works (I assume, because the Flux completes).

However, when the Flux is potentially unlimited, how do I;

  1. POST in chunks of arrays?
  2. Capture the response, per POSTed array?
  3. Stop the transmission of the Flux?

Here is my bean;

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

This is how I assume that the third-party client looks like;

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

And here is my code. When I POST flux2 , on completion a JSON array is sent. However, when I POST flux1, nothing is sent after the first take(5). How do POST the next chunks of 5?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}
Phil Clay :
  1. How do I POST in chunks of arrays?

Use one of the variants of Flux.window to split the main flux into windowed fluxes, and then send the requests using the windowed fluxes via .flatMap

        Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));

        WebClient client = WebClient.create("http://localhost:8080");
        Disposable disposable = flux1
                // 1
                .window(5)
                .flatMap(windowedFlux -> client
                        .post()
                        .uri("/cars")
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(windowedFlux, Car.class)
                        .exchange()
                        // 2
                        .doOnNext(response -> System.out.println(response.statusCode()))
                        .flatMap(response -> response.bodyToMono(...)))
                .subscribe();

        Thread.sleep(10000);

        // 3
        disposable.dispose();

  1. How do I capture the response, per POSTed array?

You can analyze the response via operators after .exchange().

In the example I provided, the response can be seen in the doOnNext operator, but you can use any operator that operates on onNext signals, such as map or handle.

Be sure to read the response body fully to ensure the connection is returned back to the pool (see note). Here, I have used .bodyToMono, but any .body or .toEntity method will work.

  1. Stop the transmission of the Flux?

When using the subscribe method as you have done, you can stop the flow using the returned disposable.dispose().

Alternatively, you can return the Flux from the sendCars() method and delegate the subscription and disposing to the caller.

Note that in the example I provided, I just used Thread.sleep() to simulate waiting. In a real application, you should use something more advanced, and avoid Thread.sleep()

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  3. 3

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  4. 4

    pump.io port in URL

  5. 5

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

  9. 9

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  10. 10

    How to remove the extra space from right in a webview?

  11. 11

    java.lang.NullPointerException: Cannot read the array length because "<local3>" is null

  12. 12

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  13. 13

    flutter: dropdown item programmatically unselect problem

  14. 14

    How to use merge windows unallocated space into Ubuntu using GParted?

  15. 15

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  16. 16

    Nuget add packages gives access denied errors

  17. 17

    Svchost high CPU from Microsoft.BingWeather app errors

  18. 18

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  19. 19

    12.04.3--- Dconf Editor won't show com>canonical>unity option

  20. 20

    Any way to remove trailing whitespace *FOR EDITED* lines in Eclipse [for Java]?

  21. 21

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

HotTag

Archive