【问题标题】:DataFrame to RDD[(String, String)] conversionDataFrame 到 RDD[(String, String)] 的转换
【发布时间】:2019-03-26 06:31:29
【问题描述】:

我想在 Databricks 中将 org.apache.spark.sql.DataFrame 转换为 org.apache.spark.rdd.RDD[(String, String)]有人可以帮忙吗?

背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤)变成了 2 列数据框。我想把它放到一个 Redis 缓存中,第一列作为键,第二列作为值。

更具体地说输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]。我尝试按如下方式放入 Redis:

sc.toRedisKV(lastContacts)(redisConfig)

错误信息如下所示:

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

我已经尝试过一些想法(例如函数 .rdd),但没有任何帮助。

【问题讨论】:

  • 你能把你的数据框直接转换成 RDD 吗? lastContacts.map(r => (r.getString(0), r.getString(1)).rdd
  • 谢谢;好一些。现在错误消息是这样的:org.apache.spark.sql.AnalysisException: 必须使用 writeStream.start(); 执行带有流源的查询;
  • 也许你可以使用 sc.start()、sc.awaitTermination() 和 lastContacts.foreachRDD ...spark.apache.org/docs/latest/streaming-programming-guide.html
  • 不幸的是,它不能按原样工作。错误信息:值 start 不是 org.apache.spark.SparkContext 的成员
  • 你可以通过sparkcontext创建一个流上下文stackoverflow.com/questions/40623109/…

标签: scala apache-spark redis apache-kafka databricks


【解决方案1】:

如果要将行映射到不同的 RDD 元素,可以使用 df.map(row => ...) 将数据帧转换为 RDD。

例如:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

请参考https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html关于AnalysisException: 必须使用writeStream.start() 执行带有流源的查询

您需要使用 query 等待查询终止。awaitTermination() 在查询处于活动状态时防止进程退出。

【讨论】:

  • @Csaba Faragó:如果它不起作用,请告诉我。希望对您有所帮助!
  • 感谢您的回答!转换问题已解决,但流媒体问题尚未解决。 (确实,这是最终的解决方案,因为问题是关于转换的;如果我无法解决,流式传输问题将是一个新问题。)
猜你喜欢
  • 2017-06-13
  • 2016-05-29
  • 2021-09-28
  • 2015-12-11
  • 2018-10-21
  • 1970-01-01
  • 2018-07-06
  • 2021-10-16
  • 2017-02-17
相关资源
最近更新 更多