我有以下代码:
var channel = Channel.CreateUnbounded<string>();
var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));
var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}));
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
await Task.WhenAll(consumers);
哪个工作正常,但是我希望它在生产的同时消费。然而
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
阻止消费者运行直到它完成,我想不出让它们都运行的方法?
在consumers
和producers
变量类型IEnumerable<Task>
。这是一个延迟可枚举,需要具体化才能创建任务。您可以通过ToArray
在 LINQ 查询上链接运算符来实现可枚举。通过这样做,两个变量的类型将变为Task[]
,这意味着您的任务已实例化并启动并运行。
附带说明一下,该ContinueWith
方法需要明确地将 theTaskScheduler.Default
作为参数传递,否则您将受到任何TaskScheduler.Current
可能的支配TaskScheduler
(例如,它可能是 UI )。这是正确的用法ContinueWith
:
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
另一个问题是任何抛出的异常producers
都会被吞掉,因为任务没有被等待。只等待继续,这不太可能失败。为了解决这个问题,你可以放弃原语 ContinueWith
,而是使用 async-await 组合(一个等待然后完成通道的异步本地函数producers
)。在这种情况下,甚至不需要。你可以简单地这样做:
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
通道会Complete
在Task.WhenAll(producers)
任务的任何结果之后,因此consumers
不会卡住。
第三个问题是某些 的失败producers
将导致当前方法的立即终止,然后等待consumers
. 然后,这些任务将成为即发即弃的任务。我把它留给你去寻找如何确保在所有情况下,在成功或出现错误退出方法之前可以等待所有任务。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句