我正在编写一些响应Azure Service Bus队列的功能。当前,这会在指定的队列上进行轮询,并且OnMessage会触发对调用它的原始类中的方法的回调:
partial class Class1
{
private void BeginProcessing()
{
serviceBusHelper.Listen(QueueType.Inbound, HandleTransaction);
}
private bool HandleTransaction(BrokeredMessage message)
{
...
}
}
然后是服务总线帮助程序类:
public class ServiceBusHelper : IServiceBusHelper
{
ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
public void Listen(QueueType queue, Action<BrokeredMessage> callback)
{
switch (queue)
{
case QueueType.Inbound:
inboundClient.OnMessage(message =>
{
try
{
callback(message);
}
catch (Exception ex)
{
...
}
CompletedResetEvent.WaitOne();
});
break;
...
}
}
它正在正确连接到azure服务总线队列并检索消息,但是回调似乎从未真正触发过。我试图实现的服务将持续响应OnMessage事件,尽管实际上是从ServiceBusHelper类触发了OnMessage,但仍会触发新的worker(从class1内部)。
因此,我设法通过修改异步OnMessage对应项来使其正常工作。我相信根本原因与未将OnMessageOptions指定给OnMessage有关,或者与CompletedResetEvent没有按我的预期进行交互有关。
public void Listen(QueueType queue, Action<BrokeredMessage> callback)
{
OnMessageOptions options = new OnMessageOptions
{
MaxConcurrentCalls = maxConcurrent,
AutoComplete = false
};
switch (queue)
{
case QueueType.Inbound:
inboundClient.OnMessageAsync(async message =>
{
bool shouldAbandon = false;
try
{
callback(message);
// complete if successful processing
await message.CompleteAsync();
}
catch (Exception ex)
{
shouldAbandon = true;
Debug.WriteLine(ex);
}
if (shouldAbandon)
{
await m.AbandonAsync();
}
}, options);
break;
...
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句