使用IAsyncEnumerable读取文本文件

音位

在测试C#8.0功能时遇到了IAsyncEnumerable。我从Anthony Chu(https://anthonychu.ca/post/async-streams-dotnet-core-3-iasyncenumerable/)中找到了出色的例子它是异步流,并替代Task<IEnumerable<T>>

// Data Access Layer.
public async IAsyncEnumerable<Product> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    {
        foreach (var product in await iterator.ReadNextAsync())
        {
            yield return product;
        }
    }
}

// Usage
await foreach (var product in productsRepository.GetAllProducts())
{
    Console.WriteLine(product);
}

我想知道这是否可以应用于读取文本文件,如下面的用法那样逐行读取文件。

foreach (var line in File.ReadLines("Filename"))
{
    // ...process line.
}

我真的很想知道如何将异步应用于IAsyncEnumerable<string>()上述foreach循环,以便在阅读时流式传输。

如何实现迭代器,以便可以使用yield return逐行读取?

一般

完全一样,但是没有异步工作负载,所以我们假装

public async IAsyncEnumerable<string> SomeSortOfAwesomeness()
{
   foreach (var line in File.ReadLines("Filename.txt"))
   {
       // simulates an async workload, 
       // otherwise why would be using IAsyncEnumerable?
       // -- added due to popular demand 
       await Task.Delay(100);
       yield return line;
   }
}

要么

这只是包装的APM工作负载,请参阅Stephen Clearys的注释以进行澄清

public static async IAsyncEnumerable<string> SomeSortOfAwesomeness()
{
   using StreamReader reader = File.OpenText("Filename.txt");
   while(!reader.EndOfStream)
      yield return await reader.ReadLineAsync();
}

用法

await foreach(var line in SomeSortOfAwesomeness())
{
   Console.WriteLine(line);
}

更新斯蒂芬·克利

File.OpenText可悲的是只允许同步I / O ; 在这种情况下异步API的实现效果很差。要打开一个真正的异步文件,您需要使用一个FileStream传递构造函数isAsync:true或FileOptions.Asynchronous

ReadLineAsync如您所见,基本上是这段代码的结果,它只是Stream APMBeginEnd包装的方法

private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
{            
     return TaskFactory<Int32>.FromAsyncTrim(
                    this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
                    (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                    (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章