【问题标题】:how to calculate moving average in RxJava如何在 RxJava 中计算移动平均线
【发布时间】:2013-12-28 06:29:41
【问题描述】:

在金融领域,我们通常需要从时间序列数据流中计算移动窗口聚合值,以移动平均线为例,假设我们有以下数据流(T是时间戳,V是实际值):

[T0,V0],[T1,V1],[T2,V2],[T3,V3],[T4,V4],[T5,V5],[T6,V6],[T7,V7],[T8,V8],[T9,V9],[T10,1V0],......

从我们得到的流中计算移动平均值 3:

avg([T0,V0],[T1,V1],[T2,V2]),
avg([T1,V1],[T2,V2],[T3,V3]),
avg([T2,V2],[T3,V3],[T4,V4]),
avg([T3,V3],[T4,V4],[T5,V5]),
avg([T4,V4],[T5,V5],[T6,V6]),...

要计算移动平均线,我们似乎可以这样做:

  1. 从原始流构建一个 Observable
  2. 通过将值聚合到组中,从原始流构建 Observable
  3. 在步骤 2 中使用聚合运算符计算 Observable 的最终结果。

步骤 1 和 3 实现起来很简单,但是,对于步骤 2,当前的 RxJava 似乎没有内置运算符来生成移动窗口组,window/groupBy 运算符似乎不适合这种情况,我没有找到一种简单的方法来从现有的运营商中组合解决方案,有人可以建议如何在 RxJava 中以“优雅”的方式做到这一点吗?

【问题讨论】:

    标签: reactive-programming rx-java


    【解决方案1】:

    RxJava 版本:0.15.1

    import java.util.List;                                                          
    import rx.Observable;                                                           
    import rx.util.functions.Action1;                                               
                                                                                    
    class Bar {                                                                     
                                                                                    
        public static void main(String args[]) {                                    
                                                                                    
            Integer arr[] = {1, 2, 3, 4, 5, 6}; // N = 6                            
            Observable<Integer> oi = Observable.from(arr);                          
                                                                                    
            // 1.- bundle 3, skip 1                                                 
            oi.buffer(3, 1)                                                         
            /**                                                                     
             * 2.- take only the first X bundles                                    
             * When bundle 3, X = N - 2 => 4                                        
             * When bundle 4, X = N - 3 => 3                                        
             * When bundle a, X = N - (a-1)                                         
             */                                                                     
              .take(4)                                                              
            // 3.- calculate average                                                
              .subscribe(new Action1<List<Integer>>() {                             
                @Override                                                           
                public void call(List<Integer> lst) {                               
                    int sum = 0;                                                    
                    for(int i = 0; i < lst.size(); i++) {                           
                        sum += lst.get(i);                                          
                    }                                                               
                                                                                    
                    System.out.println("MA(3) " + lst +                             
                                       " => " + sum / lst.size());                  
                }                                                                   
            });                                                                     
                                                                                    
        }                                                                           
                                                                                    
    }  
    

    样本输出:

    MA(3) [1, 2, 3] => 2

    MA(3) [2, 3, 4] => 3

    MA(3) [3, 4, 5] => 4

    MA(3) [4, 5, 6] => 5

    【讨论】:

      【解决方案2】:

      我会这样做:

      public static Observable<Double> movingAverage(Observable<Double> o, int N) {
          return o.window(N, 1).flatMap(
              new Func1<Observable<Double>, Observable<Double>>() {
                  public Observable<Double> call(Observable<Double> window) {
                      return Observable.averageDoubles(window);
                  }
              }
          );
      }
      
      • 我使用window(它发出Observables,只消耗一定数量的内存)而不是buffer(它发出Lists,它为每个项目消耗内存)。
      • 这是一个示例,说明如何使用组合运算符而不是编写自己的循环,这是在使用 Observables 时应始终考虑的事项。

      更新:如果你想在流的末尾过滤掉元素少于n的窗口,你可以这样做:

      def movingAverage(o: Observable[Double], n: Int): Observable[Double] = {
        class State(val sum: Double, val n: Int)
        o.window(n, 1).flatMap(win => 
          win.foldLeft(new State(0.0, 0))((s, e) => new State(s.sum + e, s.n + 1))
             .filter(s => s.n == n)
             .map(s => s.sum/s.n))
      }
      

      (我选择 Scala 是因为它写起来更短,但在 Java 中,你也可以这样做,只需注意 Scala 的 foldLeft 在 Java 中称为 reduce)。

      【讨论】:

      • 感谢您的回复,在这种情况下,我在缓冲解决方案中提出了一个后续问题,因为它将值聚合到一个列表中,然后我们可以根据大小过滤掉列表的(假设我们只想要 3 的块大小,并且源只包含 5 个值,那么我们只关心前 3 个窗口并丢弃没有 3 个值的尾部),在这种情况下如何过滤掉?或者这可能是一个更通用的问题,即如何通过窗口大小过滤掉 windows(Observale>)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-11-18
      • 2020-02-04
      • 2012-01-21
      相关资源
      最近更新 更多