【发布时间】:2018-06-24 23:37:07
【问题描述】:
我有一个 spark 数据框,我想将其写入 Kafka。我在sn-p下面试过了,
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
for row in df.rdd.collect():
producer.send('topic',str(row.asDict()))
producer.flush()
这可行,但这个 sn-p 的问题是它不是可扩展的,因为每次收集运行时,数据都会在驱动程序节点上聚合,并且会减慢所有操作。
因为数据帧上的 foreach 操作可以在工作节点上并行运行。我尝试了以下方法。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
def custom_fun(row):
producer.send('topic',str(row.asDict()))
producer.flush()
df.foreach(custom_fun)
这不会并给出酸洗错误。 PicklingError: Cannot pickle objects of type <type 'itertools.count'> 无法理解此错误背后的原因。谁能帮我理解这个错误或提供任何其他并行解决方案?
【问题讨论】:
-
什么是Spark版本和Python版本?当您使用干净的会话运行此代码时,您是否会遇到同样的错误?
-
嗨,spark 版本是 2.1,python 是 2.7。不知道您所说的干净会话是什么意思,但每次我使用 spark-submit 在 yan 上启动作业时都会遇到相同的错误。
-
我的意思是错误看起来与 Kafka 写入无关
-
@NachiketKate:你找到答案了吗?我面临同样的问题。无法编写融合的 kafka 主题。
标签: apache-spark pyspark apache-kafka spark-dataframe spark-streaming