我正在从事Spring Webflux项目,并且在尝试在计划任务中发布和使用Flux时遇到了问题。
@Scheduled(fixedRate = 20*1000)
fun updateNews() {
try {
logger.info("Automatic Update at: ${LocalDateTime.now()}")
articleRepository.saveAll(
sourceRepository.findAll().publishOn(Schedulers.parallel())
.map { source -> source.generate() }
.flatMap { it?.read() ?: Flux.empty() })
.timeout(Duration.ofSeconds(20)
).subscribeOn(Schedulers.parallel())
} catch(e: Throwable) {
logger.log(Level.SEVERE, "Error in Scheduler", e)
}
}
我配置的调度程序:
ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))
除非我有意阻止,否则此任务将永远不会完成:
.then().block()
我最初并不理会直接引用发布/订阅调度程序的问题,并且我尝试了所有看似合理的方案,但没有任何效果。
我的日志事件发生了,但似乎当调度程序中用于此任务的线程死亡时,通量也被清除了;即使我指定了publishOn和subscriptionOn行为后,它们也应该位于自己的线程池中?
我想使此操作完全被动,请提供任何建议。
@Scheduled
没有与集成Flux
,因此Flux
如果您将其退回,它将不知道该怎么办。将其与以下事实结合起来:在Reactor(通常是Reactive Streams)中,直到您没有任何反应subscribe()
,您才可以开始发现问题所在。
block()
实际上是的一种形式subscribe()
,这就是一旦将其添加到代码中即可使用的原因。实际上,这可能是最好的选择,因为您需要将被动代码(从ReactiveRepository
)桥接到命令式阻塞世界(您的@Scheduled fun
)中。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句