【问题标题】:What is the canonical way to create objects from rows of a Spark dataframe?从 Spark 数据帧的行创建对象的规范方法是什么?
【发布时间】:2021-04-13 09:41:35
【问题描述】:

我正在使用 Apache Zeppelin (0.9.0) 和 Scala (2.11.12)。我想从数据框中提取一些数据并将其存储到 InfluxDB,稍后在 Grafana 中进行可视化,但无法弄清楚。我正在尝试使用foreach 循环的幼稚方法。这个想法是遍历所有行,提取我需要的列,创建一个 Point 对象(来自this InfluxDB client library),然后将其发送到 InfluxDB 或将其添加到列表中,然后在循环之后批量发送所有点.

数据框如下所示:

+---------+---------+-------------+-----+
|timestamp|sessionId|eventDuration|speed|
+---------+---------+-------------+-----+
|        1|     ses1|          0.0|   50|
|        2|     ses1|          1.0|   50|
|        3|     ses1|          2.0|   50|

我已经尝试做上面描述的事情:

import scala.collection.mutable.ListBuffer
import spark.implicits._
import org.apache.spark.sql._
import com.paulgoldbaum.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global

val influxdb = InfluxDB.connect("172.17.0.4", 8086)
val database = influxdb.selectDatabase("test")

var influxData = new ListBuffer[Point]()
dfAnalyseReport.foreach(row => 
    {
        val point = Point("acceleration")
                    .addTag("speedBin", row.getLong(3).toString)
                    .addField("eventDuration", row.getDouble(2))
        influxData += point
    }
)
val influxDataList = influxData.toList
database.bulkWrite(influxDataList)

我在这里得到的唯一信息是一个神秘的java.lang.ClassCastException,没有其他信息,无论是在笔记本输出中还是在 Zeppelin Docker 容器的日志中。错误似乎在 foreach 中的某个地方,即使我注释掉最后两行也会出现。

我还尝试从 this answer 调整方法 1,对列使用案例类,但无济于事。我让它运行没有错误,但结果列表是空的。不幸的是,我删除了那个尝试。如有必要,我可以重建它,但我在这方面花了很多时间,我相当肯定我对如何做到这一点存在一些根本性的误解。

还有一个问题:我还尝试在构建时将每个点写入数据库(而不是批量)。唯一的区别是,我没有附加到ListBuffer,而是执行了database.write(point) 操作。当在循环外使用虚拟点完成时,它会毫无问题地通过 - 数据最终在 InfluxDB 中 - 但在循环内它会导致 org.apache.spark.SparkException: Task not serializable

有人能指出我正确的方式吗?我应该如何解决这个问题?

【问题讨论】:

  • 数据框的架构是什么?速度列是 int 还是 long 类型?如果你在一个 int 列上使用 getLong,你可能会得到一个 classcastexception。
  • 你是对的,它是一个整数。将其更改为 getInt 修复了 ClassCastException,并且它没有错误地通过。但是,结果列表和列表缓冲区仍然是空的。是什么赋予了?我错过了influxData 范围内的东西吗?
  • 在下面查看我的答案。我认为使用rdd.map 比使用df.foreach 更好。

标签: scala apache-spark apache-spark-sql influxdb apache-zeppelin


【解决方案1】:

我会用 RDD map 方法来做,并将结果收集到一个列表中:

val influxDataList = dfAnalyseReport.rdd.map(
    row => Point("acceleration")
           .addTag("speedBin", row.getInt(3).toString)
           .addField("eventDuration", row.getDouble(2))
).collect.toList

【讨论】:

  • 成功了,谢谢!你能告诉我(或建议进一步阅读)为什么我原来的方法不起作用吗?
  • 我不太确定,可能列表缓冲区没有很好地并行化。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-28
  • 2010-09-05
  • 2017-03-23
相关资源
最近更新 更多