【发布时间】:2014-07-09 12:52:24
【问题描述】:
我的目标是通过 ftp 下载文件并以某种方式异步处理它们。 我将文件列表转换为 IObservable 并使用 SelectMany 组合器对其进行处理。里面有一些操作来下载被阻塞的文件:尝试下载重试次数的文件并返回 Tuple,或者在失败的情况下返回 Tuple 并将其包装到 Observable 中。 “延迟后可观察到的可重试”示例我采用了there 并对其进行了轻微修改。 问题是我的代码在下载几个文件后随机停止。有时它会在“订阅”方法中到达“OnNext”回调。我从未检测到代码到达“OnComplete”回调。也没有抛出异常。
files.ToObservable().SelectMany(f =>
{
var source = Observable.Defer(() => Observable.Start(() =>
{
ftpConnection.DownloadFile(avroPath, f.Name);
return Tuple.Create(true, f.Name);
}));
int attempt = 0;
return Observable.Defer(() => ((++attempt == 1)
? source
: source.DelaySubscription(TimeSpan.FromSeconds(1))))
.Retry(4)
.Catch(Observable.Return(Tuple.Create(false, f.Name)));
}).Subscribe(
res =>
{
Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
if (res.Item1) Process(res.Item2);
else LogOrQueueOrWhatever(res.Item2);
},
(Exception ex) =>
{
Console.Write("Never was thrown");
},
() =>
{
Console.Write("Never entered this section");
ProcessLogs();
ScheduleNExtDownloadRoutine();
});
如果有人能用更惯用的方式来处理 Observables 上的组合器,我将不胜感激。
【问题讨论】:
-
在您给它足够的时间异步下载所有文件之前,您的应用程序是否已经退出(
Subscribe不会阻塞当前线程)? -
正是如此。发消息后才意识到,所以我是新手。
-
仅供参考,请参阅 Retry 运算符的此问题:stackoverflow.com/questions/24655590/…
标签: c# .net system.reactive