【问题标题】:Using IAsyncEnumerable with Dapper将 IAsyncEnumerable 与 Dapper 一起使用
【发布时间】:2020-05-14 07:59:45
【问题描述】:

我们最近将使用 Dapper 的 ASP.NET Core API 迁移到 .NET Core 3.1。迁移后,我们觉得有机会将 C# 8 的最新 IAsyncEnumerable 功能用于我们的一个端点。

这是修改前的伪代码:

public async Task<IEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
       return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
        foreach (var item in items)
        {
            yield return item;
        }
    }     
}

IAsyncEnumerable 代码更改后:

// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
        return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
       await foreach (var item in items.ToAsyncEnumerable())
       {
           yield return item;
       }
    }
 }

上述方法是使用ToAsyncEnumerable 的灵感来自this post,但我不能100% 确定我是否在正确的地方/上下文中使用它。

问题:

  • dapper 库只返回IEnumerable,但我们可以使用ToAsyncEnumerable 将其转换为IAsyncEnumerable for async stream 像上面一样吗?

注意:这个问题与What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? 类似,但我认为这不能回答我的问题。

【问题讨论】:

  • 如果 dapper 不暴露 IAsyncEnumerable&lt;T&gt; API,你希望通过包装 IEnumerable&lt;T&gt; API 获得什么?
  • 像这样包裹IAsyncEnumerable,你将获得nothingIAsyncEnumerable 允许您在值到达时返回它们。但是,您的代码所做的是检索所有内容,然后使用虚假的异步操作将其返回。客户认为他们很快就会得到结果,但实际上他们必须像以前一样等待
  • 嗨@abatishchev,谢谢你这么说。这种情况发生了很多次,以至于我不再问为什么我被否决了。我会花 15 分钟到半小时来提出问题,有时我会在几分钟内被否决。猜猜,否决票的定义不是很清楚。对我来说,如果一个问题格式正确且精确,如果不赞成,则可能不值得反对。
  • @AnkitVijay:干杯!由于某种原因,评论被删除(被版主?)。搞砸这个。
  • 嗨@svw,如果这个问题是你搜索结果的顶部,它只是表明网络上没有太多关于这个主题的信息。您如何期望有人在这种情况下进行更多研究?我认为假设在问题发布到 SO 之前没有进行任何研究是错误的。我相信您会同意,提出一个问题以获得社区的良好响应是一项相当大的努力。无论如何,downvotes 并没有真正困扰我了。 :)

标签: c# async-await dapper iasyncenumerable


【解决方案1】:

更新:当我第一次写这个答案时,我不知道异步迭代器。感谢 Theodor Zoulias 指出这一点。鉴于此,一种更简单的方法是可能的:

using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

while (await reader.ReadAsync()) {
    yield return rowParser(reader);
}

原答案:

这是我编写的 IAsyncEnumerable 包装器,它可以帮助那些想要使用 async/await 流式传输无缓冲数据并且还想要 Dapper 类型映射的强大功能的人:

public class ReaderParser<T> : IAsyncEnumerable<T> {
    public ReaderParser(SqlDataReader reader) {
        Reader = reader;
    }
    private SqlDataReader Reader { get; }
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
        return new ReaderParserEnumerator<T>(Reader);
    }
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
    public ReaderParserEnumerator(SqlDataReader reader) {
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    }
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader { get; }
    private Func<IDataReader, T> RowParser { get; }
    public async ValueTask DisposeAsync() {
        await Reader.DisposeAsync();
    }
    public async ValueTask<bool> MoveNextAsync() {
        return await Reader.ReadAsync();
    }
}

用法:

var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

然后,包System.Linq.Async 基本上添加了所有你知道和喜欢的漂亮的IEnumerable 扩展,例如在我的使用中:

var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();

【讨论】:

  • AFAICS cancellationToken 方法的 GetAsyncEnumerator 参数被忽略。另外,当您可以编写 async iterator 时,为什么要显式实现接口?
  • 这并不是一个强大的解决方案,真的。诚然,我不知道异步迭代器。我会尝试进行更改并适当地编辑我的答案。
  • 对于那些在未来找到这个答案的人 - 它有效,但请记住,即使枚举器被释放,读者也会从数据库中获取(并在客户端忽略)整个内容预先。不幸的是,我还没有找到任何使用 Dapper 的方法来规避这个问题
猜你喜欢
  • 1970-01-01
  • 2012-03-26
  • 1970-01-01
  • 2019-09-04
  • 2013-04-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多