.NET Core 1.1 接收 RabbitMQ 消息

卡尔达里戈

这是一个简单的 .net core 1.1 控制台应用程序。使用 -r 参数调用它,它会读取 rabbitmq 队列中的所有消息,使用任何其他参数调用它,并且每个参数都作为消息入队。

这就是问题所在,我可以很好地将消息排入队列,但是所有读取消息的尝试都会导致没有消息被读取。显然我没有正确使用队列,希望得到一些指导。

谢谢!

    using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMqDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new MessagingClient();
            if (args.Length == 1 && args[0].ToLower() == "-r")
            {
                Console.WriteLine("Reading Messages from Queue.");
                var messages = client.ReceiveMessages();
                Console.WriteLine($"Read {messages.Length} message(s) from queue.");
                foreach(var msg in messages)
                    Console.WriteLine(msg);
            }
            else
            {
                foreach (var msg in args)
                {
                    client.SendMessage(msg);
                }
                Console.WriteLine($"Enqueued {args.Length} Message.");
            }
        }
    }

    internal class MessagingClient
    {
        private readonly ConnectionFactory connectionFactory;
        private string ExchangeName => "defaultExchange";
        private string RoutingKey => "";
        private string QueueName => "Demo";

        private string HostName => "localhost";


        public MessagingClient()
        {
            this.connectionFactory = new ConnectionFactory {HostName = this.HostName};
        }

        public void SendMessage(string message)
        {
            using (var connection = this.connectionFactory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    this.QueueDeclare(channel, this.QueueName);
                    var properties = this.SetMessageProperties(channel, message);

                    string messageJson = JsonConvert.SerializeObject(message);
                    var body = Encoding.UTF8.GetBytes(messageJson);

                    channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body);
                }
            }
        }

        public string[] ReceiveMessages()
        {
            var messages = new List<string>();
            using (var connection = this.connectionFactory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    this.QueueDeclare(channel, this.QueueName);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {                        
                        string bodystring = Encoding.UTF8.GetString(ea.Body);
                        messages.Add(bodystring);

                        // ReSharper disable once AccessToDisposedClosure
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer);
                }
            }
            return messages.ToArray();
        }

        private void QueueDeclare(IModel channel, string queueName)
        {
            channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct,
                durable: true,
                autoDelete: false,
                arguments: null);

            var queueDeclared = channel.QueueDeclare(queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            channel.QueueBind(queueName, ExchangeName, RoutingKey);            
        }

        private IBasicProperties SetMessageProperties(IModel channel, object message)
        {
            var properties = channel.CreateBasicProperties();
            properties.ContentType = "application/json";
            properties.Persistent = true;
            return properties;
        }

    }
}
卢克·巴肯
  • 首先,使用管理 UI 确保您的交换和队列设置正确,并且消息已发布到其中。
  • 其次,ReceiveMessages()因此您的读者可能会在事件有机会触发之前立即返回一个空数组。当消费者从 RabbitMQ 接收消息时,您无需等待代码。教程中注意如何Console.ReadLine()使用。在您的示例中,您可以使用同步对象 ( ManualResetEvent) 来防止ReceiveMessages()在读取特定消息计数之前返回。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

在PHPRatchet中接收RabbitMQ消息

如何从 AWS 中的 SQS 队列接收超过 1 条消息?

从JavaScript订阅RabbitMQ以接收推送消息

强制RabbitMQ停止接收任何消息

SignalR Core发送消息但不接收消息

RabbitMQ-如何以编程方式从消息队列接收消息?

服務器在收到 1 條消息後停止接收消息

如何在.NET Core中结合使用RabbitMQ和MassTransit来正确使用消息?

将 RabbitMQ 与 .NET Core WebAPI 一起使用时丢失消息

运动鞋未在heroku上接收消息-RabbitMQ Bigwig

TCP> COM1,用于接收消息并在POS显示杆上显示

AWS SQS 仅从两个不同的分派接收 1 条消息

RabbitMQ 竞争消费者一次按顺序处理 1 条消息

在.NET Core中使用SHA-1

在 .net core 1 中使用会话

在rabbitmq中实现消息处理程序来处理接收到的消息

1个队列中的同步和异步消息接收器(带有ActiveMQ的Spring)JMS API

RabbitMQ DeliveryTag始终为1

使用 RabbitMQ 在 Net Core 中的 WebApi

在RabbitMQ中对接收到的消息进行分组,最好使用Spring AMQP?

只有一个消费者从 RabbitMQ 上的队列接收消息

RabbitMQ。Java客户端。是否可以在不同的接收线程上确认消息?

RabbitMQ:客户端启动之前,客户端不接收发送的消息

无法使用 RabbitMQ 源 + 日志接收器流转换消息异常

与TopShelf一起用作Windows服务时,RabbitMQ不接收消息

Spring Integration和RabbitMQ:接收消息时没有可用的output-channel或ReplyChannel标头

与 RabbitMQ 一起使用时,Flask-SocketIO 不接收来自前端的消息

如何使 .Net Core Console 应用程序连接到 SignalR 服务(在无服务器模式下)然后从服务器接收消息

Dot Net Core 1 Tooling Preview离线安装程序