【问题标题】:How to write a Dataset to Kafka topic?如何将数据集写入 Kafka 主题?
【发布时间】:2018-09-16 13:25:54
【问题描述】:

我正在使用 Spark 2.1.0 和 Kafka 0.9.0。

我正在尝试将批处理 Spark 作业的输出推送到 kafka。该作业应该每小时运行一次,但不是流式传输。

在网上寻找答案时,我只能找到 kafka 与 Spark 流的集成,而没有找到与批处理作业的集成。

有谁知道这样的事情是否可行?

谢谢

更新:

正如 user8371915 所说,我尝试按照Writing the output of Batch Queries to Kafka 中所做的操作。

我用的是火花壳:

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

这是我尝试过的简单代码:

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

但我得到了错误:

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

知道这与什么有关吗?

谢谢

【问题讨论】:

    标签: scala apache-spark apache-kafka apache-spark-sql


    【解决方案1】:

    tl;dr 您使用过时的 Spark 版本。写入在 2.2 及更高版本中启用。

    开箱即用,您可以使用 Kafka SQL 连接器(与结构化流式处理相同)。包括

    • spark-sql-kafka 在您的依赖项中。
    • 将数据转换为DataFrame,其中至少包含value 类型为StringTypeBinaryType 的列。
    • 向 Kafka 写入数据:

      df   
        .write
        .format("kafka")
        .option("kafka.bootstrap.servers", server)
        .save()
      

    关注Structured Streaming docs了解详情(以Writing the output of Batch Queries to Kafka开头)。

    【讨论】:

      【解决方案2】:

      如果你有一个数据框,并且你想将它写入一个 kafka 主题,你需要先将列转换为包含 json 格式数据的“值”列。在scala中是

      import org.apache.spark.sql.functions._
      
      val kafkaServer: String = "localhost:9092"
      val topicSampleName: String = "kafkatopic"
      
      df.select(to_json(struct("*")).as("value"))
        .selectExpr("CAST(value AS STRING)")
        .write
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaServer)
        .option("topic", topicSampleName)
        .save()
      

      【讨论】:

      • 这是我正在寻找的答案。如何将包含多列的数据框转换为包含一列名为“值”的数据框。谢谢
      【解决方案3】:

      对于这个错误 java.lang.RuntimeException:org.apache.spark.sql.kafka010.KafkaSourceProvider 不允许创建表作为选择。 在 scala.sys.package$.error(package.scala:27​​)

      我认为您需要将消息解析为键值对。您的数据框应该有值列。

      假设您有一个带有 student_id 和分数的数据框。

      df.show()
      >> student_id | scores
          1         |  99.00
          2         |  98.00
      

      那么你应该将你的数据框修改为

      value
      {"student_id":1,"score":99.00}
      {"student_id":2,"score":98.00}
      

      要转换,您可以使用类似的代码

      df.select(to_json(struct($"student_id",$"score")).alias("value"))
      

      【讨论】:

        猜你喜欢
        • 2017-08-17
        • 2020-01-16
        • 1970-01-01
        • 1970-01-01
        • 2022-01-03
        • 2019-07-30
        • 2019-04-05
        • 2018-03-02
        • 2015-10-13
        相关资源
        最近更新 更多