【问题标题】:Implementing a flambo mapValues function in clojure在 clojure 中实现一个 flambo mapValues 函数
【发布时间】:2015-07-29 21:11:15
【问题描述】:

我有一个 clojure 函数,它使用 flambo v0.60 函数 api 对样本数据集进行一些分析。我注意到,当我使用 (get rdd 2) 而不是获取 rdd 集合中的第二个元素时,它获取的是 rdd 集合的第一个元素的第二个字符。我的假设是 clojure 将 rdd 集合的每一行视为一个完整的字符串,而不是一个向量,以便我能够获取集合中的第二个元素。我正在考虑使用 map-values 函数将映射值转换为可以获取第二个元素的向量,我尝试了这个:

(defn split-on-tab-transformation [xctx input]
 (assoc xctx :rdd (-> (:rdd xctx)
                   (spark/map (spark/fn [row] (s/split row #"\t")))
                   (spark/map-values vec)))) 

不幸的是,我收到了一个错误: java.lang.IllegalArgumentException: No matching method found: mapValues for class org.apache.spark.api.java.JavaRDD...

这是代码返回 rdd 中的第一个集合: (假设我删除了上述函数中的(spark/map-values vec)

(defn get-distinct-column-val
 "input = {:col val}"
  [ xctx input ]
  (let [rdds (-> (:rdd xctx)
           (f/map (f/fn [row] row))
           f/first)]
(clojure.pprint/pprint rdds)))

输出:

[2.00000 770127      200939.000000   \t6094\tBENTONVILLE, AR DPS\t22.500000\t5.000000\t2.500000\t5.000000\t0.000000\t0.000000\t0.000000\t0.000000\t0.000000\t1\tStore Tab\t0.000000\t4.50\t3.83\t5.00\t0.000000\t0.000000\t0.000000\t0.000000\t19.150000]

如果我尝试获取第二个元素770127

(defn get-distinct-column-val
 "input = {:col val}"
  [ xctx input ]
  (let [rdds (-> (:rdd xctx)
           (f/map (f/fn [row] row))
           f/first)]
   (clojure.pprint/pprint (get rdds 1)))

我明白了:

[\.]

Flambo documentation for map-values

我是 clojure 的新手,如果有任何帮助,我将不胜感激。谢谢

【问题讨论】:

  • @noisesmith 你能帮我解决这个挑战吗
  • @cbbetz 你能帮我解决这个 flambo 和 clojure 问题吗

标签: clojure apache-spark rdd flambo


【解决方案1】:

首先map-values(或Spark API 中的mapValues)是仅在PairRDD 上的有效转换(例如[:foo [1 2 3]]。具有这样值的RDD 可以解释为某种映射其中第一个元素是键,第二个是值。

如果您有这样的 RDD,mapValues 会在不更改键的情况下转换值。在这种情况下,您应该使用第二张地图,尽管它似乎已经过时,因为 clojure.string/split 已经返回了一个向量。

一个简单的使用map-values的例子:

(let [pairs [(ft/tuple :foo 1) (ft/tuple :bar 2)]
      rdd (f/parallelize-pairs sc pairs) ;; Note parallelize-pairs -> PairRDD
      result (-> rdd       
          (f/map-values inc) ;; Map values
          (f/collect))]
  (assert (= result [(ft/tuple :foo 2) (ft/tuple :bar 3)])))

从您的描述看来,您使用的是输入 RDD,而不是从 split-on-tab-transformation 返回的 RDD。如果我不得不猜测您正在尝试使用原始xctx,而不是从split-on-tab-transformation 返回的那个。由于 Clojure maps 是不可变的 assoc 不会更改传递的参数,而 get-distinct-column-val 接收 RDD[String] 而不是 RDD[Array[String]]

根据命名约定,我假设您希望获得数组中单个位置的不同值。为了清楚起见,我删除了代码中未使用的部分。首先让我们创建虚拟数据:

(spit "data.txt"
      (str "Mazda RX4\t21\t6\t160\n"
           "Mazda RX4 Wag\t21\t6\t160\n"
           "Datsun 710\t22.8\t4\t108\n"))

添加函数的重写版本

(defn split-on-tab-transformation [xctx]
   (assoc xctx :rdd (-> (:rdd xctx)
                        (f/map #(clojure.string/split % #"\t")))))

(defn get-distinct-column-val
  [xctx col]
    (-> (:rdd xctx)
      (f/map #(get % col))
        (f/distinct)))

结果

(assert
 (= #{"Mazda RX4 Wag" "Datsun 710" "Mazda RX4"}
    (-> {:sc sc :rdd (f/text-file sc "data.txt")}
      (split-on-tab-transformation)
      (get-distinct-column-val 0)
      (f/collect)
      (set))))

【讨论】:

  • 谢谢。无论如何我可以在不使用assoc的情况下使用RDD上的latest转换更新xctx映射中的:rdd,或者assoc是否会使用值更新:rdd键从转型。而且您实际上对问题的假设是正确的。非常感谢您的帮助.. 我只想清楚 assoc clojure 函数。
  • Clojure 数据结构是不可变的,因此实际上没有更新之类的东西。你总是得到一个新的数据结构。您可以将xctx 设为atom 并使用swap!,但我怀疑这是一个好主意。捕获像上面这样的输出是更清洁的解决方案,并确保参考透明度。顺便说一句,如果你喜欢这个答案,我不介意点赞:-)
  • 还有一个问题,来自示例,split-on-tab-transformation 是否会改变xctx:rdd 键的值,这样当get-distinct-column-val 可以使用xctx 中的:rdd 键时,它实际上使用来自split-on-tab-transformation 的新值而不是它以前的值?谢谢
  • 没错。 split-on-tab-transformation 返回 assoc 的结果,这是一个新映射,:rdd 等于 (-> (:rdd xctx) .... ),它被传递给 get-distinct-column-val
  • 使用可变状态会为您的代码增加一个全新级别的复杂性。所以在我看来,根本问题是它是否真的值得。一般来说,我会说不是。我不确定你为什么首先需要xctx。我没有使用flambo(今天第一次)和Clojure 的经验(除非你算上一些玩具项目),但是当谈到Spark 时,简单地传递RDD 感觉很自然。由于带有转换的 RDD 只是一个秘诀,因此它是一种轻量级方法,并且使对程序的推理变得更加容易。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多