Spring Integration中如何使用聚合器对事件进行分组/批处理

托马斯:

我有一个频道,其有效负载是不同的POJO,它们实现了称为的接口Event

public interface Event {
    String getEventType();
}

使用网关将多种事件类型的事件一一添加到通道。我想根据事件类型对事件进行分组,然后调用服务激活器。该服务具有以下签名。

void processEventsInBatch(String eventType, List<Event> events);

重要的是,要获取列表中属于同一事件类型的多个事件,以进行批量处理并减少对外部服务的多次调用。

如何通过Spring集成实现这一目标?

托马斯:

spring-integration中的聚合器使用correlation-id标头(默认情况下)来标识同一组中的不同消息。因此,第一步是获取eventType作为correlation-id标头。稍后,我们可以在服务激活器中将该标题作为eventType参数获取,因为将为聚合器创建的组提供correlation-id标题。这可以通过以下xml配置来完成

<int:header-enricher>
    <int:correlation-id expression="payload.getEventType()"/>
<int:header-enricher>

现在可以如下所示使用聚合器。

<int:aggregator release-strategy-expression="size() >= 25"
                group-timeout="5000"
                expire-group-upon-completion="true"
                send-partial-results-on-expiry="true" />

上述聚合器在一个组中至少有25个事件或等待5秒钟时将发送一个组。我们可以调整前两个参数,以控制列表的大小和要引入的延迟。expire-group-upon-completion需要属性以确保聚合器继续创建具有相同相关ID的新组。并且send-partial-results-on-expiry必须确保如果我们在5秒钟内收到少于25个事件,则聚合器将向其发送具有其所拥有信息的组。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何使用MyBatis / Spring进行批处理操作?

如何使用Spring Boot等待完整的Kafka消息批处理?

在Spring批处理异步处理器中重试

使用spring boot进行spring批处理:从config或命令行中读取参数并在job中使用它们

如何使用Spring Kafka批处理侦听器进行有状态重试

如何使用Spring批处理FlatFileItemReader仅读取CSV文件中的子集列?

Spring集成:如何在聚合器之后处理服务中的异常?

FlatFileParseException Spring批处理

如何使用Spring Integration DSL从队列Channel聚合消息?

如何使用Spring Batch进行聚合?

如何使用Spring在Axon中配置事件处理器?

Spring Integration Flow中如何处理FtpOutboundAdapter连接异常

如何使用spring-integration-smb进行事务处理

Spring批处理动态阅读器

Spring Integration中如何处理嵌套事务异常

使用MongoDB进行Spring批处理集成测试

如果使用Spring集成的远程分区中不存在资源路径,Spring批处理中的MessageChannelPartitionHandler会如何表现?

在Spring Integration中批处理出站适配器上的发送操作

在Spring批处理中处理多行

Spring Integration轮询聚合器以编程方式

Spring批处理:在使用异步项目处理器时在写入错误侦听器中获取实际项目

使用 SftpOutboundGateway 列出 spring 批处理中的远程文件

使用 ESP 调度器的 Spring 批处理调度

如何在 Spring Boot 中搜索乘法(批处理)参数?

如何在 spring 批处理中制作非阻塞项目处理器(不仅与任务执行器异步)?

Spring批处理

如何评估在应用程序中是使用 spring 批处理还是调度程序?

如何在 itermWriterListener Spring 批处理中访问 stepExecution 内容

在 Spring 批处理器中执行查询