【发布时间】:2019-03-26 06:31:29
【问题描述】:
我想在 Databricks 中将 org.apache.spark.sql.DataFrame 转换为 org.apache.spark.rdd.RDD[(String, String)]。有人可以帮忙吗?
背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤)变成了 2 列数据框。我想把它放到一个 Redis 缓存中,第一列作为键,第二列作为值。
更具体地说输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]。我尝试按如下方式放入 Redis:
sc.toRedisKV(lastContacts)(redisConfig)
错误信息如下所示:
notebook:20: error: type mismatch;
found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)
我已经尝试过一些想法(例如函数 .rdd),但没有任何帮助。
【问题讨论】:
-
你能把你的数据框直接转换成 RDD 吗? lastContacts.map(r => (r.getString(0), r.getString(1)).rdd
-
谢谢;好一些。现在错误消息是这样的:org.apache.spark.sql.AnalysisException: 必须使用 writeStream.start(); 执行带有流源的查询;
-
也许你可以使用 sc.start()、sc.awaitTermination() 和 lastContacts.foreachRDD ...spark.apache.org/docs/latest/streaming-programming-guide.html
-
不幸的是,它不能按原样工作。错误信息:值 start 不是 org.apache.spark.SparkContext 的成员
-
你可以通过sparkcontext创建一个流上下文stackoverflow.com/questions/40623109/…
标签: scala apache-spark redis apache-kafka databricks