Masstransit RPC(RabbitMq)吞吐量限制

尤里·乌里耶涅茨(Yuri Ulyanets)

我们将Masstransit与RabbitMq结合使用,以使RPC从系统的一个组件传递到其他组件。

最近,我们面临客户端吞吐量的限制,每秒大约完成80次响应。

尝试调查问题出在哪里时,我发现RPC服务器快速处理了请求,然后将响应放入了回调队列,然后,队列处理速度为80 M \ s

此限制仅在客户端。在同一台计算机上启动同一客户端应用程序的另一个进程会使服务器端的请求吞吐量增加一倍,但随后我看到两个填充有消息的回调队列正在以相同的80 M \ s消耗。

我们正在使用IBus的单个实例

builder.Register(c =>
{
    var busSettings = c.Resolve<RabbitSettings>();
    var busControl = MassTransitBus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(busSettings.Host), h =>
            {
                h.Username(busSettings.Username);
                h.Password(busSettings.Password);
            });

            cfg.UseSerilog();

            cfg.Send<IProcessorContext>(x =>
            {
               x.UseCorrelationId(context => context.Scope.CommandContext.CommandId);
            });

    }
);

return busControl;
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();

发送逻辑如下所示:

var busResponse = await _bus.Request<TRequest, TResult>(
                destinationAddress: _settings.Host.GetServiceUrl<TCommand>(queueType),
                message: commandContext,
                cancellationToken: default(CancellationToken),
                timeout: TimeSpan.FromSeconds(_settings.Timeout),
                callback: p => { p.WithPriority(priority); });

有没有人遇到过这样的问题?我猜想响应分配逻辑中有一些程序限制。它可能是最大线程池大小,或者是缓冲区的大小,也是响应队列的预取计数。我尝试使用.Net线程池大小,但没有任何帮助。

我是Masstransit的新手,将非常感谢您解决我的问题。希望可以通过配置方式解决

克里斯·帕特森

您可以尝试一些方法来优化性能。我还建议您检查一下MassTransit-Benchmark并在您的环境中运行它-这将使您对代理的可能吞吐量有所了解。它允许您调整诸如预取计数,并发等设置,以了解它们如何影响您的结果。

另外,我建议使用其中一个请求客户端来减少每个请求/响应的设置。例如,创建一次请求客户端,然后为每个请求使用相同的客户端。

var serviceUrl = yourMethodToGetIt<TRequest>(...); var client = Bus.CreateRequestClient<TRequest>(serviceUrl);

然后,IRequestClient<TRequest>在需要执行请求时使用该实例。

Response<Value> response = await client.GetResponse<TResponse>(new Request());

由于您仅使用RPC,因此强烈建议将接收端点队列设置为非持久,以避免将RPC请求写入磁盘。并将总线预取计数调整为更高的值(比您可能拥有的并发请求的最大数量高2倍),以确保始终将响应直接传递给等待的响应使用者(这是RabbitMQ传递消息的内部原因)。

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.PrefetchCount = 1000; }

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

事件网格吞吐量限制

Masstransit和RabbitMQ权限

MassTransit创建的RabbitMQ拓扑

MassTransit RabbitMq发送消息

如何为 isilon 存储设置用户吞吐量限制

WorkerService 使用 MassTransit 配置 RabbitMq

如何在Masstransit / RabbitMQ中组织队列?

MassTransit / RabbitMq错误队列-如何删除消息?

Masstransit RabbitMq PublisherConfirmation默认值

MassTransit,RabbitMQ-FIFO出队支持

防止消息进入 _error 队列(masstransit、rabbitmq)

未处理RabbitMQ类型异常的masstransit 3

设置消息优先级 RabbitMQ/Masstransit?

Google云存储上是否存在隐藏的吞吐量限制(可能是每个存储桶?)?

AWS Parameter是否为每个键或整个服务存储吞吐量限制?

Akka流通过流量限制并行性/处理流的吞吐量

适用于 MongoDB 的 Azure Cosmos DB API API:共享吞吐量限制

MassTransit尝试在其关闭时无限连接到RabbitMQ

在內存中運行 MassTransit/RabbitMQ 實例

最大的系统间兼容性 - 纯 RabbitMQ 或 NServiceBus,MassTransit?

未知交换类型 'x-delay-message' RabbitMq 与 MassTransit

使用MassTransit重新传递RabbitMq消息时,保留标题

MassTransit.RabbitMQ-连接失败:代理无法访问

MassTransit 和 RabbitMQ,使用 DI 时使队列在重启时存活

如何通过MassTransit和RabbitMQ发送各种命令类型?

使用RabbitMQ进行MassTransit:重复数据删除

在MassTransit.RabbitMq中向标头添加值

MassTransit RabbitMQ AspNetCore无法启动总线并注册接收端点

如何使用MassTransit从RabbitMQ DeadLetter队列中检索消息?