【问题标题】:Yield multiple IEnumerables产生多个 IEnumerables
【发布时间】:2011-08-05 14:10:24
【问题描述】:

我有一段代码可以对资产进行计算。有数百万个,所以我想计算流中的所有内容。我当前的“管道”如下所示:

我有一个作为 Datareader 执行的查询。

然后我的 Asset 类有一个接受 IDataReader 的构造函数;

Public Asset(IdataReader rdr){
  // logic that initiates fields
}

以及将 IDataReader 转换为 IEnumerable

的方法
public static IEnumerable<Asset> ToAssets(IDataReader rdr) {

    // make sure the reader is in the right formt
    CheckReaderFormat(rdr);

    // project reader into IEnumeable<Asset>
    while (rdr.Read()) yield return new Asset(rdr);

}

然后将其传递给执行实际计算的函数,然后将其投影到 IEnumerable

然后得到一个包装器,将 Answers 公开为 IDataReader,然后将其传递给 OracleBulkCopy 并将流写入 DB。

到目前为止,它就像一个魅力。由于设置,我可以将 DataReader 交换为从文件读取的 IEnumerable,或者将结果写入文件等。这一切都取决于我如何将类/函数串在一起。

现在:我可以计算几件事,例如,除了正常的答案之外,我还可以有一个 DebugAnswer 类,它还输出一些中间数字以进行调试。所以我想做的是将 IEnumerable 投影到几个输出流中,这样我就可以在这些输出流上放置“侦听器”。这样我就不必多次检查数据。我怎样才能做到这一点?有点像有几个事件,然后只有在附加了一个监听器时才触发某些代码。

有时我也会写入数据库,但也会写入 zip 文件,只是为了备份结果。那么我想在 IEnumerable 上有 2 个“侦听器”。一种是作为 IDataReader 进行投影,另一种是直接写入文件。

如何输出多个输出流以及如何将多个侦听器放在一个输出流上?是什么让我组成这样的数据流?

编辑

所以我想做一些伪代码:

foreach(Asset in Assets){
   if(DebugListener != null){
     // compute 
     DebugAnswer da = new DebugAnswer {result = 100};
     yield da to DebugListener;  // so instead of yield return yield to that stream

   }

   if(AnswerListener != null){
     // compute basic stuff 
     Answer a = new Answer { bla = 200 };
     yield a to AnswerListener;
   }
}

提前致谢,

格特-简

【问题讨论】:

  • 听起来你想看看Rx
  • 子类资产? Rx 看起来像你所追求的(以上+1)

标签: c# stream


【解决方案1】:

您所描述的内容听起来有点像 Reactive 框架通过 IObservable 接口提供的内容,但我不确定它是否允许多个订阅者订阅单个订阅流。

更新

如果您看一下documentation for IObservable,它有一个很好的例子,说明如何做您正在做的事情,单个对象有多个订阅者。

【讨论】:

  • 是的,您可以将多个观察者订阅到单个 observable。
  • 您好,感谢 RX。听说过但从未参与。我更新了问题,我的主要问题是屈服于多个流。你会碰巧有一个例子吗?还没有在 101 中看到它......
  • 查看了文档,似乎确实做到了。为我减少阅读和学习。
【解决方案2】:

使用Rx重写您的示例:

// The stream of assets
IObservable<Asset> assets = ...

// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
                                        select new DebugAnswer { result = 100 };

// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);

// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
                              select new Answer { bla = 200 };

// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);

【讨论】:

  • 嗨,谢谢你,虽然还不太明白...我会非常努力地研究 Rx。
  • 认为我正在努力获得它。如果这真的像这样工作,那将是非常棒的。我看到 Linq 延迟执行如何让人们这样做......哇。现在得走了,稍后再回来查看。
【解决方案3】:

这正是 Reactive Extensions 的工作(从 4.0 开始成为 .NET 的一部分,在 3.5 中作为库提供)。

【讨论】:

    【解决方案4】:

    您不需要多个“侦听器”,您只需要没有破坏性甚至不一定可转换的管道组件。

    IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
        foreach (T t in source) {
           Action(t);
           yield return t;
        }    
    }
    

    或者,当您在管道中进行处理时,只需引发一些要使用的事件。如果你愿意,你可以异步它们:

    static IEnumerable<Asset> ToAssets(IDataReader rdr) {
       CheckReaderFormat(rdr);
       var h = this.DebugAsset;
       while (rdr.Read()) {
          var a = new Asset(rdr);
          if (h != null) h(a);
          yield return a;
       }
    }
    
    public event EventHandler<Asset> DebugAsset;
    

    【讨论】:

    • 您好,谢谢。想到这一点,但例如 OracleBulkCopy 不返回任何内容,该类超出了我的控制范围。因此,流将停在那里,限制了我编写这些流的选择。我现实生活中的问题有点复杂,很快就会变成拼图。如果可能的话,我更喜欢一些更通用的框架来做这件事,RX 似乎就是这样。
    【解决方案5】:

    如果我没听错,应该可以替换或decorate 包装器。 WrapperDecorator 可以将调用转发到普通的OracleBulkCopy(或您正在使用的任何东西)并添加一些自定义调试代码。

    这对你有帮助吗?

    马蒂亚斯

    【讨论】:

    • 嗨,穆德,谢谢。这听起来像是 Mark 的建议,但这可以解决这个问题,但是当我获得更多要编写的流时会变得复杂。 Rx 听起来更好,因为它似乎将控件从拉动“转换”为推入和返回。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-05
    • 2022-01-07
    • 1970-01-01
    相关资源
    最近更新 更多