【问题标题】:How do I write to Kafka using pyspark?如何使用 pyspark 写入 Kafka?
【发布时间】:2018-10-17 10:29:49
【问题描述】:

我正在尝试使用 PySpark 写入 Kafka。
我被困在零阶段:

[Stage 0:>                                                          (0 + 8) / 9]

然后我得到一个超时错误:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

代码是:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages
 org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

def main():
    spark = SparkSession.builder.master("local").appName("Spark CSV Reader")
     .getOrCreate();

    dirpath =  os.path.abspath(sys.argv[1])
    os.chdir(dirpath)

    mySchema = StructType([
     StructField("id", IntegerType()),StructField("name", StringType()),\
     StructField("year", IntegerType()),StructField("rating", DoubleType()),\
     StructField("duration", IntegerType())   ])
    streamingDataFrame = spark.readStream.schema(mySchema)
     .csv('file://' + dirpath + "/" )

    streamingDataFrame.selectExpr("CAST(id AS STRING) AS key",
     "to_json(struct(*)) AS value").\
      writeStream.format("kafka").option("topic", "topicName")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("checkpointLocation", "./chkpt").start()

我正在运行 HDP 2.6。

【问题讨论】:

  • 您正在运行分布式的Spark,但只写信给localhost Kafka Broker...?顺便说一句,HDF 包括 Kafka,而 HDP 没有
  • 谢谢。我把它改成了同样的问题。我在测试 Kafka 时收到 connection refused,所以我希望是这样。
  • 将什么更改为 YARN? kafka.bootstrap.servers 需要指向 Kafka 集群...我非常怀疑您的每个 Spark 执行器也在 Kafka 集群中
  • “问题所在”是localhost:9092 需要是 Kafka 代理的外部地址(理想情况下不止一个)
  • 是的,谢谢,更改了kafka.bootstrap.servers,它起作用了。提出答案,我会接受。

标签: apache-spark pyspark apache-kafka hortonworks-data-platform


【解决方案1】:

正如我在 cmets 中提到的,Spark 在多台机器上运行,所有这些机器都不太可能成为 Kafka 代理。

使用 Kafka 集群的外部地址

.option("kafka.bootstrap.servers", "<kafka-broker-1>:9092,<kafka-broker-2>:9092")\  

【讨论】:

  • 所以对于 localhost 选项将是: .option("kafka.bootstrap.servers", "127.0.0.1:9092") 我有多少经纪人并不重要?虽然就我而言,我只有一个经纪人
  • 如果你在本地运行过 Kafka 和 Zookeeper,Spark master 设置为本地,那么是的,localhost 或 127.0.0.1...。如果你在一台机器上有多个 broker,你仍然可以列出localhost 用于其他代理,但您需要不同的端口
  • 如果我在创建 spark session 时只使用声明 master 或者我需要在写 Kafka 时明确提及是否可以?
  • Spark master 与 Kafka 完全无关。例如,spark-shell --masterspark-submit --master
  • @cricket_007 你能看看这个问题吗:stackoverflow.com/questions/50762812/…
猜你喜欢
  • 1970-01-01
  • 2020-10-06
  • 2018-06-24
  • 1970-01-01
  • 1970-01-01
  • 2020-11-02
  • 1970-01-01
  • 2020-10-03
  • 2019-05-14
相关资源
最近更新 更多