How can I make only one instance of an application receive each AMQP topic message (consumer group behaviour)

codependent

I am using Apache Camel's AMQP component to listen to messages from an ActiveMQ Artemis topic.

This application is run on Kubernetes with two replicas.

I've configured a durable subscription with a unique clientId per pod and a common subscription name:

<route autoStartup=true" id="myRoute">
    <from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&amp;disableReplyTo=true&amp;transacted=false&amp;subscriptionDurable=true&amp;clientId={{container-id}}&amp;durableSubscriptionName=eventSubscription"/>
    <log loggingLevel="INFO" message="Received event: ${body}"/>
    ...
</route>

The problem is both pods receive the message, when only one of them should. I'm trying to achieve something similar to Kafka's consumer groups, where only one member of the group receives each message.

Justin Bertram

If you want only one subscriber to receive the message then the subscribers must share the same subscription. Therefore, you need to:

  • set subscriptionShared=true in your _amqp_topic uri
  • use the same client ID (i.e. don't use {{container-id}})

For example:

<route autoStartup=true" id="myRoute">
    <from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&amp;disableReplyTo=true&amp;transacted=false&amp;subscriptionDurable=true&amp;clientId=myClientID&amp;durableSubscriptionName=eventSubscription&amp;subscriptionShared=true"/>
    <log loggingLevel="INFO" message="Received event: ${body}"/>
    ...
</route>

Another option would simply to use a queue instead of a topic, e.g.:

<route autoStartup=true" id="myRoute">
    <from id="_amqp_queue" uri="amqp:queue:xxx?connectionFactory=#amqpCF&amp;disableReplyTo=true&amp;transacted=false"/>
    <log loggingLevel="INFO" message="Received event: ${body}"/>
    ...
</route>

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

can only receive one message on a channel

RabbitMQ / AMQP design: How can I have some messages to be processed by one consumer, and but observed all consumers?

how can I make a line between each data in one subfigure

How can I configure an AWS autoscaling group/launch configuration with Terraform to only run a cron job on one EC2 instance?

How do I list only one instance of each comment?

How do I ensure that only one consumer actually consumes a published message?

How can I find the consumer group id of a spark structrued streaming application?

If I have multiple clients subscribed to a pusher channel, can I have only one of them receive the message?

How can I query only one instance of a label?

How can I use Redux to only update one instance of a component?

How can I have my RabbitMQ consumer receive just one message without it timing-out and sending agan?

How to make only one instance of application to be used by multiple users?

How can I make Ubuntu to run only one application on startup

how can I make a queue allows only one consumer in ActiveMQ

Can I make MassTransit reuse consumer instances instead of creating a new one per message?

Kafka: Which consumer will get the message if there are many consumers that subscribe one topic with the same group-id?

kafka multi-partition only one can receive message

In MQTT, can i send a message to single client? Or only in a topic?

How can I flush/empty my UART(RX) each time before I receive a new message?

Kafka - Ensuring that at least one consumer will receive the message

How to delete the consumer offset of a group for one specific topic

How can I iterate in dataframe and get output for each group? Now I get only one line and one group is not recognized

How can I make it so each instance in my node-group module use a specific security group?

How to shift from one consumer group to another consumer group by picking up last read message of previous consumer group

How can I make same day message in one section?

I want to set a "for" counter for each array in only one "for" declaration, rather than create two "for" declarations. How can I make it possible?

How can I issue a warning for only one instance of a module?

My code receives the message from the queue multiple time, I want to receive it only one time. How can I do it?

How can I only pull in data for radio group filtering one time instead of for each entry in the database?