ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.add("1");
q.add("2");
q.add("3");
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"), session -> session
.send(Flux.just("INIT").map(session::textMessage))
.thenMany(session
.send(Flux.<String>generate(sink ->
{
if (q.peek() != null)
sink.next(q.poll());
else
sink.complete();
}).map(session::textMessage))
)
.thenMany(session
.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(s -> "Received: " + s)
.log()
)
.then())
.subscribe();
int i = 0;
while (true)
{
String msg = "MSG #" + i++;
q.add(msg);
Thread.sleep(1000);
}
输出:
INFO reactor.Flux.Map.1 - onNext(Received: INIT)
INFO reactor.Flux.Map.1 - onNext(Received: 1)
INFO reactor.Flux.Map.1 - onNext(Received: 2)
INFO reactor.Flux.Map.1 - onNext(Received: 3)
INFO reactor.Flux.Map.1 - onNext(Received: MSG #0)
然后停止。将while (true)
始终填充的队列。据我了解,我以前每次标记为时,使用的方式thenMany
都应该生成具有ConcurrentLinkedQueue
内容的新Flux complete()
。但这似乎不起作用。
编辑:基本上我想要的是从lambda范围之外将数据发送到websocket。这就是为什么我创建一个队列并使用的原因,.thenMany(session.send(Flux.<String>generate.....
我希望它会在其他线程向其添加数据的同时继续从该队列中读取数据。
问题是您将发送和接收与thenMany
on Mono
和方法结合在一起Flux
。thenMany
该方法使Flux
该通量忽略元素,仅对完成信号做出反应。
因此,除非您致电,否则什么都不会发生sink.complete()
。但是,在调用complete
方法之后,即使被请求,也不会发送其他事件。
发送和接收应独立进行。
此外,而不是ConcurrentLinkedQueue
一个FluxProcessor
,并FluxSink
可以使用。EmitterProcessor
可以向几个订户发射,同时尊重每个订户的背压。没有订阅者时,它仍然可以接受一些数据推送到可配置bufferSize
。
int bufferSize = 10;
FluxProcessor<String, String> processor =
EmitterProcessor.<String>create(bufferSize).serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
sink.next("1");
sink.next("2");
sink.next("3");
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"),
session -> {
Flux<WebSocketMessage> out = Flux.just("INIT")
.concatWith(processor)
.map(session::textMessage);
session.send(out)
.subscribe(); //instead of thenMany
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(s -> "Received: " + s)
.log()
.then();
})
.subscribe();
for (int i = 1; i <= 10; i++) {
sink.next("MSG #" + i);
TimeUnit.SECONDS.sleep(1);
}
sink.complete();
日志:
17:57:54.177 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
17:57:54.178 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded)
17:57:54.304 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: INIT)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 1)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 2)
17:57:54.306 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 3)
17:57:54.307 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #1)
17:57:54.396 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #2)
17:57:55.454 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #3)
17:57:56.480 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #4)
17:57:57.505 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #5)
17:57:58.412 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #6)
17:57:59.448 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #7)
17:58:00.484 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #8)
17:58:01.496 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #9)
17:58:02.434 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #10)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句