通道多个生产者和消费者

霍拉汉

我有以下代码:

var channel = Channel.CreateUnbounded<string>();

var consumers = Enumerable
    .Range(1, 5)   
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }
        }));

var producers = Enumerable
    .Range(1, 5)    
    .Select(producerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 10; i++)
            {
                var t = $"Message {i}";
                Console.WriteLine($"Producing {t} on producer {producerNumber}");

                await channel.Writer.WriteAsync(t);
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            }
        }));

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

await Task.WhenAll(consumers);

哪个工作正常,但是我希望它在生产的同时消费。然而

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

阻止消费者运行直到它完成,我想不出让它们都运行的方法?

西奥多·祖利亚斯

consumersproducers变量类型IEnumerable<Task>这是一个延迟可枚举,需要具体化才能创建任务。您可以通过ToArray在 LINQ 查询上链接运算符来实现可枚举通过这样做,两个变量的类型将变为Task[],这意味着您的任务已实例化并启动并运行。

附带说明一下,该ContinueWith方法需要明确地将 theTaskScheduler.Default作为参数传递,否则您将受到任何TaskScheduler.Current可能的支配TaskScheduler(例如,它可能是 UI )。这是正确的用法ContinueWith

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);

另一个问题是任何抛出的异常producers都会被吞掉,因为任务没有被等待。只等待继续,这不太可能失败。为了解决这个问题,你可以放弃原语 ContinueWith,而是使用 async-await 组合(一个等待然后完成通道的异步本地函数producers)。在这种情况下,甚至不需要。你可以简单地这样做:

try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }

通道会CompleteTask.WhenAll(producers)任务的任何结果之后,因此consumers不会卡住。

第三个问题是某些 的失败producers将导致当前方法的立即终止,然后等待consumers. 然后,这些任务将成为即发即弃的任务。我把它留给你去寻找如何确保在所有情况下,在成功或出现错误退出方法之前可以等待所有任务。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

具有一个生产者和多个消费者的生产者-消费者

与多个生产者/多个消费者并发

Java中的多个消费者生产者

匹配的Kafka消费者和生产者分区

通用生产者和消费者

共享物品生产者和消费者问题

Spring JMS生产者和消费者交互

Go中的同时生产者和消费者

Kafka 2.3.0 生产者和消费者

生产者,消费者POSIX

C:生产者/消费者

“读者-作家”只是具有多个消费者的“生产者-消费者”吗?

单个生产者和多个单线程消费者

监视具有多个生产者和消费者的锻炼(Java)

如何在activeMQ中使用多个生产者和一个消费者?

是std :: queue线程对生产者和多个消费者而言是安全的

使用通道作为生产者和消费者之间通信的媒介时出现死锁

多个生产者-消费者问题坚持最后一次消费

与一个生产者和多个消费者一起使用Queue是否安全?

生产者/消费者:一个生产者,多个消费者,每个都处理相同的数据

消费者在运行的生产者和消费者python脚本上未显示任何消息

在单个消费者和任意数量的生产者的情况下,仅使用notify()的生产者-消费者

在生产者速度缓慢,消费者快速消费的情况下,如何处理通道关闭同步?

发布/订阅vs生产者/消费者?

C中的生产者-消费者错误

生产者消费者问题中的竞争状况

消费者/生产者锁定 GUI 线程

Apache Kafka 消费者-生产者混淆

测量消费者/生产者工作的时间