异步流如何与反应式扩展进行比较?

ca9163d9

如何比较以下两个?Rx更强大吗?

反应性额外资讯:

var observable = Observable.Create<char>(async (observer, cancel) =>
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        observer.OnNext(line);
    }
});

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());

异步流:

public async void Run(string path)
{
    await foreach (var line in TestAsync())
    {
        Console.WriteLine(line);
    }
}

private async IAsyncEnumerable<string> TestAsync()
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        yield return line;
    }
}
帕纳吉奥提斯·卡纳沃斯(Panagiotis Kanavos)

这两个功能一起工作。PS:算了async streams算了await foreach

异步流

异步流是(相对)低级的功能,它允许异步迭代就其本身而言,它不提供任何其他功能,例如过滤,聚合等。它是基于拉的,而Rx是基于推的。

您可以通过..... ReacticeX.NET Github存储库中System.Linq.Async库在异步流上使用LINQ运算符它速度很快,但不提供Rx的事件处理功能。

例如,没有时间概念,更不用说使用自定义调度程序的方式了。没有订阅,没有错误事件。GroupBy将消耗整个源并作为单独的IAsyncEnumerable实例发出组项目,而Rx的GroupBy将为每个组发出单独的Observable。

在问题的示例中,IAsyncEnumerable很自然,因为其中不涉及事件逻辑,而只是在异步迭代器上进行迭代。

如果该示例尝试轮询(例如)远程服务并检测到故障峰值(即,每个间隔的故障多于阈值),则IAsyncEnumerable将是不合适的,因为它将阻止等待所有响应。实际上,我们根本无法一次汇总事件。

穿线

真的没有一个-IAsyncEnumerable或await foreach调用未指定事件的产生或使用方式。如果要使用单独的任务来处理项目,则必须自己创建它,例如:

public async Task Run(string path)
{
    await foreach (var line in LoadStockTrades())
    {
        var result = await Task.Run(()=>AnalyzeTrade(line));
        Console.WriteLine($"{result} : {line});
    }
}

反应性扩展

Reactive Extensions是处理事件流的高级库。它基于推送,了解时间,但比异步流或通道等低级构造慢。

在问题的示例中,Rx可能会过大。尽管轮询和检测峰值很容易,但具有多个窗口选项。

System.Linq.Async可以使用ToObservable从IAsyncEnumerable创建一个Observable ,这意味着IAsyncEnumerable可以用作Rx的源。

穿线

默认情况下,Rx是单线程的,这对于它的主要场景-事件流处理非常有意义。

另一方面,Rx允许发布者,订阅者和操作员在相同或单独的线程上运行。在这种语言具备async/await或数据流(例如,Java,JavaScript的),Rx的使用运行在不同的线程的发布者和订阅模拟并行处理管线。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章