【问题标题】:Flink HBase input for machine learning algorithms用于机器学习算法的 Flink HBase 输入
【发布时间】:2015-09-30 20:10:14
【问题描述】:

我想使用 Flink-HBase 插件读取数据,然后作为 Flink 机器学习算法的输入,分别是 SVM 和 MLR。现在我先将提取的数据写入一个临时文件,然后通过 libSVM 方法读入,但我想应该有更复杂的方法。

您有代码 sn-p 或想法吗?

【问题讨论】:

  • Flink 是一个比较新的项目。我想,你可能会在 flink 邮件列表上得到更好的帮助。

标签: machine-learning hbase apache-flink


【解决方案1】:

无需将数据写入磁盘,然后使用MLUtils.readLibSVM 读取。原因如下。

MLUtils.readLibSVM 需要一个文本文件,其中每一行都是带有相关标签的稀疏特征向量。它使用以下格式来表示标签-特征向量对:

<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>

其中&lt;feature&gt;是后续value在特征向量中的索引。 MLUtils.readLibSVM 可以读取这种格式的文件并转换LabeledVector 实例中的每一行。因此,您在读取 ​​libSVM 文件后获得了DataSet[LabeledVector]。这正是 SVMMultipleLinearRegression 预测器所需的输入格式。

但是,根据您从 HBase 获得的数据格式,您首先必须将数据转换为 libSVM 格式。否则,MLUtils.readLibSVM 将无法读取写入的文件。如果你转换数据,那么你也可以直接将你的数据转换为DataSet[LabeledVector],并将其用作 Flink 的 ML 算法的输入。这样可以避免不必要的磁盘循环。

如果您从 HBase 获得 DataSet[String],其中每个字符串具有 libSVM 格式(请参阅上面的规范),那么您可以使用以下映射函数在 HBase DataSet 上应用 map 操作。

val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
  line =>
    // remove all comments which start with a '#'
    val commentFreeLine = line.takeWhile(_ != '#').trim

    if(commentFreeLine.nonEmpty) {
      val splits = commentFreeLine.split(' ')
      val label = splits.head.toDouble
      val sparseFeatures = splits.tail
      val coos = sparseFeatures.map {
        str =>
          val pair = str.split(':')
          require(
            pair.length == 2, 
            "Each feature entry has to have the form <feature>:<value>")

          // libSVM index is 1-based, but we expect it to be 0-based
          val index = pair(0).toInt - 1
          val value = pair(1).toDouble

          (index, value)
      }

      Some((label, coos))
    } else {
      None
    }

// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
  labelCOO =>
    labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))

val labeledVectors: DataSet[LabeledVector] = 
  labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
  var dimension = 0

  override def open(configuration: Configuration): Unit = {
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
  }

  override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
  }
}}.withBroadcastSet(dimensionDS, DIMENSION)

这会将您的 libSVM 格式数据转换为 LabeledVectors 的数据集。

【讨论】:

  • 谢谢!你的回答很有帮助!不幸的是,必须在 Java 类中获取来自 HBase 的数据集,现在我收到错误消息,即我的 DataSet 与 Scala 类中的方法不兼容:Error:(102, 29) java: incompatible types: 'org.apache.flink.api.java.DataSet&lt;java.lang.String&gt; cannot be converted to org.apache.flink.api.scala.DataSet&lt;java.lang.String&gt;'
  • 您还应该能够使用 Scala API 从 HBase 读取数据。然后你得到一个org.apache.flink.api.scala.Dataset[String]
猜你喜欢
  • 2018-08-29
  • 2011-08-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-27
  • 2021-08-01
  • 2012-03-14
相关资源
最近更新 更多