在RabbitMQ中对接收到的消息进行分组,最好使用Spring AMQP?

约翰

我正在从服务(S)接收消息,该服务将每个单独的属性更改作为单独的消息发布到实体。一个人为的例子是这样的实体:

Person {
    id: 123
    name: "Something",
    address: {...}
}

如果名称和地址在同一笔交易中更新,则(S)将发布两条消息,PersonNameCorrectedPersonMoved问题出在接收方,我正在存储此Person实体的投影,并且每个属性更改都会导致对数据库的写入。因此,在此示例中,将有两次写入数据库的操作,但是如果我可以在短时间内批处理消息并将它们按id分组,则只需对数据库进行一次写入操作。

通常在RabbitMQ中如何处理此问题?Spring AMQP是否提供更简单的抽象?

请注意,我已经简要介绍了预取功能,但是不确定是否要这样做。如果我正确理解,则预取也是基于每个连接的。我试图在每个队列实现此目标,因为如果要采用批处理(并因此增加延迟),则我不想将此延迟添加到我的服务消耗的所有队列中(但仅添加到那些需要“按ID分组”功能)。

加里·罗素

预取对于这种情况没有帮助。

考虑使用Spring Integration,它具有位于Spring AMQP之上的适配器。它还提供了一个聚集器,可用于将消息分组在一起,然后再将其发送到管道的下一个阶段。

编辑

这是一个演示的快速启动应用程序...

@SpringBootApplication
public class So42969130Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So42969130Application.class, args)
            .close();
    }

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Handler handler;

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("so9130", new PersonNameChanged(123));
        this.template.convertAndSend("so9130", new PersonMoved(123));
        this.handler.latch.await(10, TimeUnit.SECONDS);
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
                        .messageConverter(converter()))
                .aggregate(a -> a
                        .correlationExpression("payload.id")
                        .releaseExpression("false") // open-ended release, timeout only
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(2000))
                .handle(handler())
                .get();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Handler handler() {
        return new Handler();
    }

    @Bean
    public Queue queue() {
        return new Queue("so9130", false, false, true);
    }

    public static class Handler {

        private final CountDownLatch latch = new CountDownLatch(1);

        @ServiceActivator
        public void handle(Collection<?> aggregatedData) {
            System.out.println(aggregatedData);
            this.latch.countDown();
        }

    }

    public static class PersonNameChanged {

        private int id;

        PersonNameChanged() {
        }

        PersonNameChanged(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonNameChanged [id=" + this.id + "]";
        }

    }

    public static class PersonMoved {

        private int id;

        PersonMoved() {
        }

        PersonMoved(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonMoved [id=" + this.id + "]";
        }

    }

}

Pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so42969130</artifactId>
    <version>2.0.0-BUILD-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so42969130</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

结果:

2017-03-23 09:56:57.501  INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章