如何在.NET Core中结合使用RabbitMQ和MassTransit来正确使用消息?

Bulchsu

我正在微服务架构中实现应用程序。我想将某种事件存储附加到我的解决方案中。我是使用RabbitMQ进行消息传递的新手,可能我配置不正确。让我们简化我的解决方案,假设我有一个简单的通用服务,该服务必须向总线广播一些消息。要注册代理依赖项,我使用以下代码:

internal static IServiceCollection RegisterRabbitMQDependencies(this IServiceCollection services,   
    IConfiguration configuration)                                                                   
{                                                                                                   
    var rabbitMQSettings = configuration                                                             
        .GetSection(RabbitMQSettingsSectionKey)                                                      
        .Get<RabbitMQSettings>();                                                                   
                                                                                                    
    services                                                                                        
        .AddSingleton(serviceProvider => MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
        {                                                                                           
            configurator                                                                            
                .Host(rabbitMQSettings.HostName,                                                    
                    rabbitMQSettings.VirtualHostName,                                               
                    hostConfigurator =>                                                             
                    {                                                                               
                        hostConfigurator.Username(rabbitMQSettings.UserName);                       
                        hostConfigurator.Password(rabbitMQSettings.Password);                       
                    });                                                                             
                                                                                                    
            configurator.ExchangeType = ExchangeType.Fanout;                                        
        }))                                                                                         
        .AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>())     
        .AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>())
        .AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())                 
        .Configure<RabbitMQSettings>(configuration.GetSection(RabbitMQSettingsSectionKey));         
                                                                                                    
    return services;                                                                                
}                                                                                                   

由于我们的简化,它没有用户-它只需要向总线发布一些消息即可。来自服务的消息已正确发布,因为我可以在RabbitMq管理系统中注意到它们。问题是我的任何使用者都不响应相应的消息(可能是由于某些配置错误)。回到实现-我有一个事件存储(不同的容器)。我的事件存储区必须处理通过总线传播的所有集成事件。我已经在事件存储中配置了通信,如下所示:

public static IServiceCollection RegisterMessagingDependencies(this IServiceCollection services,               
    IConfiguration configuration) =>                                                                           
        services                                                                                               
            .AddMassTransit(configurator =>                                                                    
                {                                                                                              
                    configurator.AddConsumer<EventHandler>();                                                  
                })                                                                                             
            .AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>                                   
            {                                                                                                  
                cfg.Host("localhost", "/", h => {                                                              
                    h.Username("guest");                                                                       
                    h.Password("guest");                                                                       
                });                                                                                            
            }))                                                                                                
            .RegisterEndpointsDependencies();                                                                   

连接参数正确,我已经检查了几次。正如您所注意到的-我只有一个名为“ EventHandler”的使用者,该使用者必须处理所有IntegrationEvents。在这里,我附上EventHandler类的定义,它是父类:

public abstract class IntegrationEventHandler<TIntegrationEvent> : IIntegrationEventHandler<TIntegrationEvent>,
    IConsumer<TIntegrationEvent> where TIntegrationEvent : class, IIntegrationEvent
{
    public async Task Consume(ConsumeContext<TIntegrationEvent> context) =>
        await HandleAsync(context.Message);
    
    public abstract Task HandleAsync(TIntegrationEvent @event);
}

public sealed class EventHandler : IntegrationEventHandler<IIntegrationEvent>
{
    private readonly IEventRepository _eventRepository;

    public EventHandler(IEventRepository eventRepository)
    {
        _eventRepository = eventRepository;
    }
    
    public override async Task HandleAsync(IIntegrationEvent @event)
    {
        var readyToBeSavedEvent = new Event(@event);
        await _eventRepository.CreateAsync(readyToBeSavedEvent);
        await _eventRepository.SaveChangesAsync();
    }
}

EventHandler基类实现了IConsumer接口,因此我认为应该以某种方式将发生在总线上的所有集成事件连接起来。但是问题是尽管发布了一些事件,但从未调用过HandleAsync方法。另外,我附加了负责发布事件的代码:

public async Task PublishAsync(params IIntegrationEvent[] events)
{
    var globalPublicationTasks = events
        .Select(@event =>
        {
            @event.PublishedAtUtc = DateTime.UtcNow;
            @event.JsonContent = JsonConvert.SerializeObject(@event);

            return _publishEndpoint.Publish(@event);
        });

    await Task.WhenAll(globalPublicationTasks);
}

我想念什么吗?我究竟做错了什么?我希望在总线上发布某些集成事件时会调用HandleAsync方法。谢谢你的帮助 :)

阿列克谢·齐马列夫(Alexey Zimarev)

您使用不AddMassTransit正确。您也不要在用户端启动总线。

所有文档都已正确记录,您只需要复制粘贴示例即可。

   public void ConfigureServices(IServiceCollection services)
    {
        services.AddMassTransit(x =>
        {
            x.AddConsumer<EventConsumer>();

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.ReceiveEndpoint("event-listener", e =>
                {
                    e.ConfigureConsumer<EventConsumer>(context);
                });
            });
        });

        services.AddMassTransitHostedService();
    }

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在.Net中与不同类型的使用者一起使用RabbitMq消息?

如何在.NET Core中正确使用代码协定

如何在.NET CORE中使用SqlException?

如何在.Net Core中使用ServiceExtensions?

如何在.NET Core中使用IUserIdProvider?

如何在.Net Core 2.2中正确使用ConfigurationManager.GetSection()

如何在.net Framework 4.7.1中使用ECDSA和ECDiffieHellman实例签署消息?

如何使用MassTransit从RabbitMQ DeadLetter队列中检索消息?

如何在.NET Core 2.0中使用PrincipalContext

如何在.net Core中使用.net 4.7.2

MassTransit和.NET Core DI-如何使用无参数构造函数解决依赖关系?

如何在VB.NET中使用RabbitMQ?

如何在.net core中显示图像

如何在.NET Core中写入文件?

如何在 Net Core 中覆盖 SaveChangesAsync?

如何在gRPC和ASP Net Core 3.0中使用SSL证书?

如何在.NET Core,C#6和Visual Studio 2015中使用本地IIS?

如何在 ASP.NET Core 6 中配置和使用 Serilog?

如何在net.core 3.1上使用DI正确配置Identity Core?

我如何在尝试使用Razor页面删除ASP.NET Core中的记录时显示确认消息

.NET Core 2.2-如何在用户在Webchat中输入类型之前使用反向通道显示bot欢迎消息

如何在ASP.NET 5中将Entity Framework 6与MySQL结合使用?

如何在Mac的.Net中使用appSettings

如何在 Coro 中使用“Net::http”?

如何在asp.net中使用<%%>?

如何在 .net 中使用 PDFCreator COM?

如何在Accord.net中正确使用SVD

如何在ASP.Net Core中使用文件嵌套来嵌套接口

如何在.Net Core 2.0中使用ZXing.Net和ImageSharp创建条形码图像