【发布时间】:2018-10-17 10:29:49
【问题描述】:
我正在尝试使用 PySpark 写入 Kafka。
我被困在零阶段:
[Stage 0:> (0 + 8) / 9]
然后我得到一个超时错误:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
代码是:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def main():
spark = SparkSession.builder.master("local").appName("Spark CSV Reader")
.getOrCreate();
dirpath = os.path.abspath(sys.argv[1])
os.chdir(dirpath)
mySchema = StructType([
StructField("id", IntegerType()),StructField("name", StringType()),\
StructField("year", IntegerType()),StructField("rating", DoubleType()),\
StructField("duration", IntegerType()) ])
streamingDataFrame = spark.readStream.schema(mySchema)
.csv('file://' + dirpath + "/" )
streamingDataFrame.selectExpr("CAST(id AS STRING) AS key",
"to_json(struct(*)) AS value").\
writeStream.format("kafka").option("topic", "topicName")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("checkpointLocation", "./chkpt").start()
我正在运行 HDP 2.6。
【问题讨论】:
-
您正在运行分布式的Spark,但只写信给
localhostKafka Broker...?顺便说一句,HDF 包括 Kafka,而 HDP 没有 -
谢谢。我把它改成了同样的问题。我在测试 Kafka 时收到
connection refused,所以我希望是这样。 -
将什么更改为 YARN?
kafka.bootstrap.servers需要指向 Kafka 集群...我非常怀疑您的每个 Spark 执行器也在 Kafka 集群中 -
“问题所在”是
localhost:9092需要是 Kafka 代理的外部地址(理想情况下不止一个) -
是的,谢谢,更改了
kafka.bootstrap.servers,它起作用了。提出答案,我会接受。
标签: apache-spark pyspark apache-kafka hortonworks-data-platform