反应流问题

外国:
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.add("1");
q.add("2");
q.add("3");

WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();

webSocketClient.execute(new URI("wss://echo.websocket.org"), session -> session
        .send(Flux.just("INIT").map(session::textMessage))
        .thenMany(session
                .send(Flux.<String>generate(sink ->
                {
                    if (q.peek() != null)
                        sink.next(q.poll());
                    else
                        sink.complete();
                }).map(session::textMessage))
        )
        .thenMany(session
                .receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(s -> "Received: " + s)
                .log()
        )
        .then())
        .subscribe();

int i = 0;
while (true)
{
    String msg = "MSG #" + i++;
    q.add(msg);
    Thread.sleep(1000);
}

输出:

INFO reactor.Flux.Map.1 - onNext(Received: INIT)
INFO reactor.Flux.Map.1 - onNext(Received: 1)
INFO reactor.Flux.Map.1 - onNext(Received: 2)
INFO reactor.Flux.Map.1 - onNext(Received: 3)
INFO reactor.Flux.Map.1 - onNext(Received: MSG #0)

然后停止。while (true)始终填充的队列。据我了解,我以前每次标记为时,使用的方式thenMany都应该生成具有ConcurrentLinkedQueue内容的新Flux complete()但这似乎不起作用。

编辑:基本上我想要的是从lambda范围之外将数据发送到websocket。这就是为什么我创建一个队列并使用的原因,.thenMany(session.send(Flux.<String>generate.....我希望它会在其他线程向其添加数据的同时继续从该队列中读取数据。

叶夫根尼·赫斯特(Evgeniy Khyst):

问题是您将发送和接收与thenManyon Mono方法结合在一起FluxthenMany该方法使Flux该通量忽略元素,仅对完成信号做出反应。

因此,除非您致电,否则什么都不会发生sink.complete()但是,在调用complete方法之后,即使被请求,也不会发送其他事件。

发送和接收应独立进行。

此外,而不是ConcurrentLinkedQueue一个FluxProcessor,并FluxSink可以使用。EmitterProcessor可以向几个订户发射,同时尊重每个订户的背压。没有订阅者时,它仍然可以接受一些数据推送到可配置bufferSize

int bufferSize = 10;
FluxProcessor<String, String> processor =
    EmitterProcessor.<String>create(bufferSize).serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);

sink.next("1");
sink.next("2");
sink.next("3");

WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"),
    session -> {
      Flux<WebSocketMessage> out = Flux.just("INIT")
          .concatWith(processor)
          .map(session::textMessage);

      session.send(out)
          .subscribe(); //instead of thenMany

      return session.receive()
          .map(WebSocketMessage::getPayloadAsText)
          .map(s -> "Received: " + s)
          .log()
          .then();
    })
    .subscribe();

for (int i = 1; i <= 10; i++) {
  sink.next("MSG #" + i);
  TimeUnit.SECONDS.sleep(1);
}
sink.complete();

日志:

17:57:54.177 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
17:57:54.178 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded)
17:57:54.304 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: INIT)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 1)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 2)
17:57:54.306 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 3)
17:57:54.307 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #1)
17:57:54.396 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #2)
17:57:55.454 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #3)
17:57:56.480 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #4)
17:57:57.505 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #5)
17:57:58.412 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #6)
17:57:59.448 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #7)
17:58:00.484 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #8)
17:58:01.496 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #9)
17:58:02.434 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #10)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章