【问题标题】:How do I get the data of one row of a Structured Streaming Dataframe in pyspark?如何在 pyspark 中获取结构化流数据帧的一行数据?
【发布时间】:2021-12-03 15:34:09
【问题描述】:

我有一个主题连接到 Spark 结构化流的 Kafka 代理。我的主题将数据发送到我的流式数据帧,我想获取有关该主题每一行的信息(因为我需要将每一行与另一个数据库进行比较)。

如果我可以将批次转换为 RDD,我可以轻松获得每一行。
我也看到了一些关于 DStreams 的东西,但我不知道在最新版本的 f spark 中它是否仍然有效。

DStream 是我的问题的答案,还是有其他解决方案可以逐行获取我的数据?

【问题讨论】:

  • 请提供足够的代码,以便其他人更好地理解或重现问题。
  • 您可以使用 forEachBatch 和 forEachPartition 从您的流数据帧中获取 RDD

标签: dataframe pyspark spark-structured-streaming spark-streaming-kafka discretization


【解决方案1】:

从 kafka 读取 spark 流中的数据,并在 spark 流的 foreach 编写器中编写自定义行比较 . 例如。

streamingDatasetOfString.writeStream.foreach(

新的 ForeachWriter[String] {

def open(partitionId: Long, version: Long): Boolean = {
  // Open connection
}

def process(record: String): Unit = {
  // Write string to connection
}

def close(errorOrNull: Throwable): Unit = {
  // Close the connection
}}).start()

` 自 spark 2.4 起,python、scala、java 支持此功能

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-03-30
    • 2018-03-21
    • 2021-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-10
    • 2020-02-06
    相关资源
    最近更新 更多