如何使用 alpakka、spring boot 和 Akka-stream 初始化连续运行的流?

坦克

全部,

我正在开发一个应用程序,它使用 alpakka spring boot 集成从 kafka 读取数据。我已经准备好了大部分代码,我唯一卡住的地方是如何初始化一个连续运行的流,因为这将是一个后端应用程序并且不会有任何 api 可以从中调用?

列维拉姆齐

据我所知,Alpakka 的 Spring 集成基本上是围绕通过 Spring HTTP 控制器公开 Akka Streams 设计的。所以我不确定将 Spring 引入其中的目的是什么,因为 Akka 应用程序倾向于喜欢的工作方式与 Spring 应用程序倾向于喜欢的工作方式之间存在相当大的阻抗不匹配。

假设您正在谈论使用 Alpakka Kafka,最惯用的做法是在您的方法中启动由Alpakka KafkaSource提供的流main,它会一直运行直到被杀死或失败。您可能希望在消费者和业务逻辑周围使用RestartSource以确保在发生故障时流重新启动(请注意,通常应该期望没有再次处理偏移提交的消息,如典型的 Kafka案件只能保证至少一次处理)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何将Alpakka Kafka与Akka Stream WebSocket结合

如何使用键入的akka解决akka.stream.Graph接收器错误akka流

使用 schema.sql 和 DDL 自动生成进行 Spring Boot 初始化?

使用Spring Boot时未初始化java.lang.IllegalState异常LifecylceProcessor和ApplicationEventMulticaster

使用Spring Boot @Configuraion和@Value初始化javafx的TextField的默认值

如何使用spring-boot初始化ActiveMQ的SystemUsage?

使用Stream的Java初始化对象和set属性

Spring Boot 2.5.3 中使用 Spring Batch 和 Flyway 进行数据库初始化的问题

如何延迟Spring Cloud Stream StreamListener的初始化?

如何使用 akka 流或 alpakka 从 S3 读取镶木地板文件

使用spring-boot:1.5.1和spring-cloud-stream时无法启动bean'inputBindingLifecycle'

JSON 反序列化器与 Spring Boot Starter Web 和 Spring Cloud Stream 冲突

使用Akka HTTP Server和客户端的Akka流

使用Akka HTTP客户端和Akka流的死信

Alpakka S3连接器流将无法处理负载,并引发akka.stream.BufferOverflowException

Spring Boot重启时如何防止db初始化

使用RabbitMQ的Spring Boot无法初始化

Alpakka Akka流无法从kafka读取

使用Akka Stream从数据库流记录

使用Akka-IO TCP初始化Akka Actor

无法在Akka Stream中使用GraphStage类运行SourceShape

如何发布或订阅实例化的Akka Stream流程图?

Akka和Spring集成

如何使用Spring Boot应用程序初始化log4j?

如何使用 JDBC 为 Spring Boot 会话自动初始化 H2 db 架构

Spring Boot和Azure:自动配置之前初始化bean

如果AppContext初始化失败,则正确终止Spring Boot和Tomcat

初始化独立数据库、Spring Boot 和 MyBatis

如何限制Akka Stream每秒只能执行和发送一条消息一次?