【问题标题】:Write dataframe to kafka pyspark将数据帧写入kafka pyspark
【发布时间】:2018-06-24 23:37:07
【问题描述】:

我有一个 spark 数据框,我想将其写入 Kafka。我在sn-p下面试过了,

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
for row in df.rdd.collect():
    producer.send('topic',str(row.asDict()))
    producer.flush()

这可行,但这个 sn-p 的问题是它不是可扩展的,因为每次收集运行时,数据都会在驱动程序节点上聚合,并且会减慢所有操作。

因为数据帧上的 foreach 操作可以在工作节点上并行运行。我尝试了以下方法。

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
def custom_fun(row):
    producer.send('topic',str(row.asDict()))
    producer.flush()

df.foreach(custom_fun)

这不会并给出酸洗错误。 PicklingError: Cannot pickle objects of type <type 'itertools.count'> 无法理解此错误背后的原因。谁能帮我理解这个错误或提供任何其他并行解决方案?

【问题讨论】:

  • 什么是Spark版本和Python版本?当您使用干净的会话运行此代码时,您是否会遇到同样的错误?
  • 嗨,spark 版本是 2.1,python 是 2.7。不知道您所说的干净会话是什么意思,但每次我使用 spark-submit 在 yan 上启动作业时都会遇到相同的错误。
  • 我的意思是错误看起来与 Kafka 写入无关
  • @NachiketKate:你找到答案了吗?我面临同样的问题。无法编写融合的 kafka 主题。

标签: apache-spark pyspark apache-kafka spark-dataframe spark-streaming


【解决方案1】:

您收到的错误看起来与 Kafka 写入无关。看起来您在代码中的其他地方使用了itertools.count(AFAIK,它根本没有在 Spark 的源代码中使用,它当然有可能与KafkaProducer 一起提供)由于某种原因使用cloudpickle 模块序列化。更改 Kafka 编写代码可能根本没有影响。如果KafkaProducer 是错误的来源,您应该可以使用forachPartition 解决此问题:

from kafka import KafkaProducer


def send_to_kafka(rows):
    producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
    for row in rows:
        producer.send('topic',str(row.asDict()))  
        producer.flush()

df.foreachPartition(send_to_kafka)

话虽如此:

或提供任何其他并行解决方案?

我建议使用 Kafka 源。包含Kafka SQL包,例如:

spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

还有:

from pyspark.sql.functions import to_json, col, struct

(df 
    .select(to_json(struct([col(c).alias(c) for c in df.columns])))
    .write
    .format("kafka") 
    .option("kafka.bootstrap.servers", botstrap_servers) 
    .option("topic", topic)
    .save())

【讨论】:

  • 感谢您的回答。我会试试这个,然后告诉你。
  • 使用 dataframe.write() 我得到 nosuchmethoderror。看起来与 spark、kafka、spark-sql-kafka 的版本不匹配。
  • spark-sql-kafka 组件必须匹配 Spark 和 Scala 版本
  • 只向kafka发送一列数据帧而不是整个记录怎么样?
猜你喜欢
  • 2020-10-06
  • 1970-01-01
  • 2020-11-26
  • 1970-01-01
  • 1970-01-01
  • 2023-04-11
  • 2020-07-10
  • 1970-01-01
  • 2023-04-03
相关资源
最近更新 更多