【问题标题】:Troubleshooting a stateful transducer对有状态转换器进行故障排除
【发布时间】:2018-05-28 23:26:19
【问题描述】:

我正在尝试创建一个有状态的转换器,join-averages (gist here)。

运行 let 块显示我正确地加入了值。但是结果输出仍然没有加入的值。

...
c' {:tick-list {:uuid 1, :last-trade-price 11.1}, :ema-list {:uuid 1, :last-trade-price-exponential 10}, :sma-list {:uuid 1, :last-trade-price-average 10.1}}
record {:uuid 1, :last-trade-price 11.1}
c' {:tick-list {:uuid 2, :last-trade-price 11.2}, :ema-list {:uuid 2, :last-trade-price-exponential 11}, :sma-list {:uuid 2, :last-trade-price-average 10.2}}
record {:uuid 2, :last-trade-price 11.2}
...
c' {:tick-list {:uuid 3, :last-trade-price 11.3}, :ema-list {:uuid 3, :last-trade-price-exponential 12}, :sma-list {:uuid 3, :last-trade-price-average 10.3}}
record {:uuid 1, :last-trade-price-exponential 10}
record {:uuid 2, :last-trade-price-exponential 11}
record {:uuid 3, :last-trade-price 11.3}
record {:uuid 3, :last-trade-price-exponential 12}

关于为什么 join-averages 有状态转换器没有返回正确结果的任何想法?

代码

(:require [clojure.core.async :refer [chan sliding-buffer <! go-loop pipeline onto-chan] :as async]
          [clojure.set :refer [subset?]]
          [clojure.tools.logging :as log])

(defn has-all-lists? [averages-map]
  (subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))

(defn join-averages []
  (let [state (atom {})]
    (fn [rf]
      (fn
        ([] (rf))
        ([accumulator] (rf accumulator))
        ([accumulator input]
         (let [uuid (:uuid input)
               entry (cond
                       (:last-trade-price-exponential input) {:ema-list input}
                       (:last-trade-price-average input) {:sma-list input}
                       (:last-trade-price input) {:tick-list input})]

           (if-let [current (get @state uuid)]

             (let [_ (swap! state update-in [uuid] merge entry)
                   c' (get @state uuid)]

               (log/info "c'" c')
               (log/info "accumulator" accumulator)
               (log/info "state" (with-out-str (clojure.pprint/pprint @state)))

               (if (has-all-lists? c')
                 c'
                 (rf accumulator input)))

             (do (swap! state merge {uuid entry})
                 (rf accumulator input)))))))))

(comment

  (let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
                  {:uuid "2" :last-trade-price-exponential 11}
                  {:uuid "3" :last-trade-price-exponential 12}]
        sma-list [{:uuid "1" :last-trade-price-average 10.1}
                  {:uuid "2" :last-trade-price-average 10.2}
                  {:uuid "3" :last-trade-price-average 10.3}]
        tick-list [{:uuid "1" :last-trade-price 11.1}
                   {:uuid "2" :last-trade-price 11.2}
                   {:uuid "3" :last-trade-price 11.3}]

        ec (chan (sliding-buffer 100))
        sc (chan (sliding-buffer 100))
        tc (chan (sliding-buffer 100))

        _ (onto-chan ec ema-list)
        _ (onto-chan sc sma-list)
        _ (onto-chan tc tick-list)

        merged-ch (async/merge [tc sc ec])
        output-ch (chan (sliding-buffer 100) (join-averages))]

    (async/pipeline 1 output-ch (join-averages) merged-ch)
    (go-loop [r (<! output-ch)]
      (when-not (nil? r)
        (log/info "record" r)
        (recur (<! output-ch))))))

【问题讨论】:

    标签: clojure core.async transducer


    【解决方案1】:

    你没有告诉我们你的结果应该是什么样子,所以我不得不猜测。

    另外,要了解如何实现有状态转换器,我通常只看distinct。您应该在传感器初始化后初始化您的状态。这应该有效:

    (defn has-all-lists? [averages-map]
      (set/subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))
    
    (defn join-averages []
      (fn [rf]
        (let [state (atom {})]
          (fn
            ([] (rf))
            ([accumulator] (rf accumulator))
            ([accumulator input]
             (let [uuid (:uuid input)
                   entry (condp #(%1 %2) input
                           :last-trade-price-exponential {:ema-list input}
                           :last-trade-price-average {:sma-list input}
                           :last-trade-price {:tick-list input})]
               (let [nv (swap! state update-in [uuid] merge entry)
                     c' (get nv uuid)]
                 (if (has-all-lists? c')
                   (rf accumulator c')
                   accumulator))))))))
    
    (let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
                    {:uuid "2" :last-trade-price-exponential 11}
                    {:uuid "3" :last-trade-price-exponential 12}]
          sma-list [{:uuid "1" :last-trade-price-average 10.1}
                    {:uuid "2" :last-trade-price-average 10.2}
                    {:uuid "3" :last-trade-price-average 10.3}]
          tick-list [{:uuid "1" :last-trade-price 11.1}
                     {:uuid "2" :last-trade-price 11.2}
                     {:uuid "3" :last-trade-price 11.3}]]
      (into []
            (join-averages)
            (concat ema-list sma-list tick-list)))
    

    【讨论】:

    • 非常感谢。这段代码的结果正是我想要的。但如果不是concating 列表,(async/merge [tc sc ec]) 频道,我没有从output-ch 得到任何结果。这就是我的困惑。所以我希望得到你的结果,但使用async/merged 频道。我错过了什么?
    猜你喜欢
    • 2011-08-03
    • 1970-01-01
    • 2012-02-12
    • 2020-10-02
    • 2011-02-07
    • 2013-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多