【问题标题】:Filter a DataSet in terms of another DataSet in Scala flink在 Scala flink 中根据另一个 DataSet 过滤一个 DataSet
【发布时间】:2018-08-30 17:36:37
【问题描述】:

我正在尝试复制此 python 代码:

cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])

其中xy 是向量,uyy 上的唯一值,例如0,1

在 flink 中,我有:

val uy = y.distinct.collect
val condHx = for (i ← uy)
    yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))

但是,filterWithBcVariable 似乎没有采用 y 上的每个值,它只采用第一个。

我也试过了:

for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)

但是我的内存用完了。

如何根据y 上的值过滤x

x.zip(y) 之类的东西会这样做,但不受支持。

有什么想法吗?

【问题讨论】:

    标签: python scala numpy apache-flink entropy


    【解决方案1】:

    我提出了一种解决方案,可能不是最好的,但至少它有效。

    现在,我没有传递 xy 作为分隔的 DataSets,而是传递一个只有一列的 DataSet[LabeledVector]

    val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))
    

    然后我将xy 传递给我的函数:

    def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
        // Get the label
        val y = xy map (_.label)
        // Get probs for the label
        val p = probs(y).toArray.asBreeze
        // Get unique values in label
        val values = y.distinct.collect
        // Compute Conditional Entropy
        val condH = for (i ← values)
          yield entropy(xy.filter(_.label == i))
        p.dot(seq2Breeze(condH))
      }
    

    【讨论】:

      猜你喜欢
      • 2012-03-24
      • 2011-05-08
      • 2010-10-14
      • 2011-09-22
      • 2011-04-06
      • 2011-06-21
      • 2013-12-21
      • 2012-02-08
      • 2021-02-21
      相关资源
      最近更新 更多