在计划任务中使用Flux

迪利乌斯

我正在从事Spring Webflux项目,并且在尝试在计划任务中发布和使用Flux时遇到了问题。

@Scheduled(fixedRate = 20*1000)
fun updateNews() {
    try {
        logger.info("Automatic Update at: ${LocalDateTime.now()}")
        articleRepository.saveAll(
                sourceRepository.findAll().publishOn(Schedulers.parallel())
                        .map { source -> source.generate() }
                        .flatMap { it?.read() ?: Flux.empty() })
                        .timeout(Duration.ofSeconds(20)
        ).subscribeOn(Schedulers.parallel())
    } catch(e: Throwable) {
        logger.log(Level.SEVERE, "Error in Scheduler", e)
    }
}

我配置的调度程序:

ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))

除非我有意阻止,否则此任务将永远不会完成:

.then().block()

我最初并不理会直接引用发布/订阅调度程序的问题,并且我尝试了所有看似合理的方案,但没有任何效果。

我的日志事件发生了,但似乎当调度程序中用于此任务的线程死亡时,通量也被清除了;即使我指定了publishOn和subscriptionOn行为后,它们也应该位于自己的线程池中?

我想使此操作完全被动,请提供任何建议。

西蒙·巴斯莱

@Scheduled没有与集成Flux,因此Flux如果您将其退回,它将不知道该怎么办将其与以下事实结合起来:在Reactor(通常是Reactive Streams)中,直到您没有任何反应subscribe(),您才可以开始发现问题所在。

block()实际上是的一种形式subscribe(),这就是一旦将其添加到代码中即可使用的原因。实际上,这可能是最好的选择,因为您需要将被动代码(从ReactiveRepository接到命令式阻塞世界(您的@Scheduled fun)中。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章