【发布时间】:2015-11-06 20:17:43
【问题描述】:
我有以下 API:
IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch)
它尝试将批处理写入数据库。如果失败,则返回整个批次,否则返回的 observable 为空。
我还有一个生产批次的来源:
IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold)
现在,我可以这样组合它们:
var failedBatchesSource = GetDataSource(filePath, 1048576)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100);
这将写入所有批次(最多同时写入 100 个)并返回一个可观察到的失败批次。
我真正想要的是在某个暂停后将失败的批次送回批次的来源,可能是在原始来源仍在生产批次的时候。当然,我可以这样写:
var failedBatchesSource = GetDataSource(filePath, 1048576)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100);
但这当然是错误的,因为:
- 这打破了在再次处理失败的批次之前暂停的要求。
- 它可能会向数据库生成超过 100 个并发写入请求。
- 这就像展开一个迭代次数未知的 for 循环 - 没有效率。
一旦我收集了所有的失败,我也可以跳出可观察的 monad,并在循环中重新开始:
var src = GetDataSource(filePath, 1048576);
for (;;)
{
var failed = await src
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100)
.ToList();
if (failed.Count == 0)
{
break;
}
src = failed.ToObservable();
}
但我想知道我是否可以在可观察的单子中做得更好。
【问题讨论】:
标签: c# system.reactive