【问题标题】:Issue using Kafka with Spark using pyspark使用 pyspark 将 Kafka 与 Spark 一起使用的问题
【发布时间】:2018-01-29 23:25:17
【问题描述】:

我正在尝试使用 kafka 发布的 Spark 使用数据,但我无法这样做。我正在使用 Spark 2.2。

  1. 我想使用 Spark 使用 Kafka 发送的数据,对其进行处理并存储在本地文件或 HDFS 中。
  2. 我想在运行 spark 作业后在控制台中打印出 kafka 发出的数据(由 spark 消耗)。

对于 Kafka,我正在关注本教程:https://kafka.apache.org/quickstart

    [cloudera@quickstart kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    >message 1
    >message 2 
    >message 3
    >message 4

运行 Spark python 脚本文件.py:

./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py

Pyspark 代码:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stream").getOrCreate()

df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","localhost:9092")\
.option("subscribe","test")\
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")


#Trying to save result in a file
df.writeStream\
.format("text")\
.option("checkpointLocation", "file:///home/cloudera/file.txt")\
.option("path","file:///home/cloudera/file.txt")\
.start()
# Does not write to a file

#Trying to print result in console
df.writeStream()\
.outputMode("append")\
.format("console")\
.start()
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable

有什么帮助吗?

【问题讨论】:

  • 只是为了确保,您开始 spark THEN 生成数据对吗?
  • @Falan 是的,我首先开始了 kafka。我想知道如何将数据从火花流存储到 HDFS。

标签: python apache-spark pyspark apache-kafka


【解决方案1】:

问题很可能是这一行:

df.writeStream()\

像这样从行中删除()

df.writeStream\

【讨论】:

  • 你好@antoine,欢迎来到 StackOverflow!请花一分钟时间接受the tour。当他们解释 OP 做错了什么以及为什么做错时,答案会更有帮助。请编辑您的答案以描述为什么您的代码将代替 OP 工作。
猜你喜欢
  • 2020-05-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-26
  • 2015-07-15
  • 1970-01-01
  • 2017-07-19
相关资源
最近更新 更多