【问题标题】:How can i send my structured streaming dataframe to kafka?如何将我的结构化流数据帧发送到 kafka?
【发布时间】:2021-12-17 02:22:25
【问题描述】:

大家好!

我正在尝试将我的结构化流数据帧发送到我的 kafka 主题之一,detection

这是结构化流数据帧的架构:

 root
 |-- timestamp: timestamp (nullable = true)
 |-- Sigma: string (nullable = true)
 |-- time: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- SourceComputer: string (nullable = true)
 |-- SourcePort: string (nullable = true)
 |-- DestinationComputer: string (nullable = true)
 |-- DestinationPort: string (nullable = false)
 |-- protocol: string (nullable = true)
 |-- packetCount: string (nullable = true)
 |-- byteCount: string (nullable = true)

然后我尝试使用这种方法发送数据帧:

dfwriter=df \
  .selectExpr("CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("checkpointLocation", "/Documents/checkpoint/logs") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("failOnDataLoss", "false") \
  .option("topic", detection) \
  .start() 

然后我得到了错误:

pyspark.sql.utils.AnalysisException:无法解析“value”给定的输入列:[DestinationComputer、DestinationPort、Sigma、SourceComputer、SourcePort、byteCount、持续时间、packetCount、processName、协议、时间、时间戳] ;第 1 行第 5 行;

如果我发送一个带有 value 列的数据框,它可以工作,我会收到关于我的 kafka 主题消费者的数据。

任何想法发送我的所有列的数据框?

谢谢!

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-structured-streaming spark-kafka-integration


    【解决方案1】:

    如错误所示,您的数据框没有 value 列。

    您需要在value StructType 列下“嵌入”所有列,然后使用to_json 之类的函数,而不是CAST( .. AS STRING)

    在 Pyspark 中,这类似于选择查询中的 struct(to_json(struct($"*")).as("value")

    类似问题 - Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe

    【讨论】:

    • 谢谢,如果 F 是我的数据框,这就是我在 pyspark 中的做法:F=F.select(to_json(struct(F.schema.names)).alias('值'))
    • 顺便说一句,如果您希望时间戳是 Kafka 记录时间戳而不是消息值的一部分,您也可以单独选择它
    猜你喜欢
    • 2018-03-21
    • 1970-01-01
    • 2019-09-11
    • 2018-11-08
    • 1970-01-01
    • 1970-01-01
    • 2021-10-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多