【问题标题】:Parallel HttpWebRequests with Reactive Extensions具有响应式扩展的并行 HttpWebRequest
【发布时间】:2011-09-08 15:26:22
【问题描述】:

我有一个具有三个属性的“图像”类:Url、Id、Content。 我有一个包含 10 个这样的图像的列表。 这是一个 Silverlight 应用程序。

我想创建一个方法:

IObservable<Image> DownloadImages(List<Image> imagesToDownload)
{
     //start downloading all images in imagesToDownload
     //OnImageDownloaded: 
                          image.Content = webResponse.Content
                          yield image

}

此方法开始并行下载所有 10 个图像。 然后,当每次下载完成时,它会将 Image.Content 设置为该下载的 WebResponse.Content。

结果应该是每个下载图像的 IObservable 流。

我是 RX 的初学者,我认为我想要的可以通过 ForkJoin 实现,但这是在我不想使用的反应式扩展 dll 的实验版本中。

另外,我真的不喜欢下载指望通过回调来检测所有图像都已下载,然后调用 onCompleted()。

在我看来,这并不符合 Rx 精神。

我还发布了到目前为止我编写的解决方案,尽管我不喜欢我的解决方案,因为它长/丑并且使用计数器。

     return Observable.Create((IObserver<Attachment> observer) =>
         {
             int downloadCount = attachmentsToBeDownloaded.Count;
                foreach (var attachment in attachmentsToBeDownloaded)
                        {
                            Action<Attachment> action = attachmentDDD =>
                            this.BeginDownloadAttachment2(attachment).Subscribe(imageDownloadWebResponse =>
                                {
                                    try
                                    {
                                        using (Stream stream = imageDownloadWebResponse.GetResponseStream())
                                        {
                                            attachment.FileContent = stream.ReadToEnd();
                                        } 
                                        observer.OnNext(attachmentDDD);

                                        lock (downloadCountLocker)
                                        {
                                            downloadCount--;
                                            if (downloadCount == 0)
                                            {
                                                observer.OnCompleted();
                                            }
                                        }
                                    } catch (Exception ex)
                                    {
                                        observer.OnError(ex);
                                    }
                                });
                            action.Invoke(attachment);
                        }

                        return () => { }; //do nothing when subscriber disposes subscription
                    });
            }

好的,根据 Jim 的回答,我确实设法让它最终发挥作用。

    var obs = from image in attachmentsToBeDownloaded.ToObservable()
               from webResponse in this.BeginDownloadAttachment2(image).ObserveOn(Scheduler.ThreadPool)
               from responseStream in Observable.Using(webResponse.GetResponseStream, Observable.Return)
               let newImage = setAttachmentValue(image, responseStream.ReadToEnd())
               select newImage;

setAttachmentValue 只需要 `image.Content = bytes;返回图像;

BeginDownloadAttachment2 代码:

        private IObservable<WebResponse> BeginDownloadAttachment2(Attachment attachment)
    {
        Uri requestUri = new Uri(this.DownloadLinkBaseUrl + attachment.Id.ToString();
        WebRequest imageDownloadWebRequest = HttpWebRequest.Create(requestUri);
        IObservable<WebResponse> imageDownloadObservable = Observable.FromAsyncPattern<WebResponse>(imageDownloadWebRequest.BeginGetResponse, imageDownloadWebRequest.EndGetResponse)();

        return imageDownloadObservable;
    }

【问题讨论】:

  • 很高兴为您提供帮助。我不得不说,这个解决方案看起来比你开始的更容易维护。
  • 善用.Let。非常好的解决方案。

标签: c# system.reactive


【解决方案1】:

我们稍微简化一下如何。获取您的图像列表并将其转换为可观察的。接下来,考虑使用 Observable.FromAsyncPattern 来管理服务请求。最后使用 SelectMany 来协调请求和响应。我正在对您如何在此处获取文件流做出一些假设。本质上,如果您可以将 BeginInvoke/EndInvoke 委托传递给 FromAsyncPattern 来满足您的服务请求,那么您就很好了。

var svcObs = Observable.FromAsyncPattern<Stream>(this.BeginDownloadAttachment2, This.EndDownloadAttchment2);

var obs = from image in imagesToDownload.ToObservable()
          from responseStream in svcObs(image)
          .ObserveOnDispatcher()
          .Do(response => image.FileContent = response.ReadToEnd())
          select image;
return obs;

【讨论】:

  • 谢谢吉姆。它几乎很好。我遇到的唯一问题是 BeginDownloadAttachment2 返回一个 IObservable。我尝试将代码转换为: ...Do(response => Observable.Using(response.GetResponseStream, stream => image.FileContent = stream.ReadToEnd())) 但我收到一些错误:无法从使用中推断类型参数.
  • 其实我想我明白了: ...Do(response => Observable.Using(response.GetResponseStream, stream => Observable.Return(stream.ReadToEnd())).Do(字节 => image.Value.FileContent = 字节))。将测试并查看它是否有效。
  • 我指的是我上次评论中的最后一个解决方案。不起作用,因为那些“Do”操作员在我的订阅者读取图像后设置字节。我无法使用您的解决方案,因为 this.BeginDownloadAttachment2 返回一个 IObservable。但无论如何,我最终还是稍微修改了您的解决方案。您可以在问题的底部看到它。非常感谢您的帮助。
猜你喜欢
  • 2012-09-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-03-15
相关资源
最近更新 更多