【发布时间】:2018-01-29 23:25:17
【问题描述】:
我正在尝试使用 kafka 发布的 Spark 使用数据,但我无法这样做。我正在使用 Spark 2.2。
- 我想使用 Spark 使用 Kafka 发送的数据,对其进行处理并存储在本地文件或 HDFS 中。
- 我想在运行 spark 作业后在控制台中打印出 kafka 发出的数据(由 spark 消耗)。
对于 Kafka,我正在关注本教程:https://kafka.apache.org/quickstart
[cloudera@quickstart kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>message 1
>message 2
>message 3
>message 4
运行 Spark python 脚本文件.py:
./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py
Pyspark 代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stream").getOrCreate()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","localhost:9092")\
.option("subscribe","test")\
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")
#Trying to save result in a file
df.writeStream\
.format("text")\
.option("checkpointLocation", "file:///home/cloudera/file.txt")\
.option("path","file:///home/cloudera/file.txt")\
.start()
# Does not write to a file
#Trying to print result in console
df.writeStream()\
.outputMode("append")\
.format("console")\
.start()
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable
有什么帮助吗?
【问题讨论】:
-
只是为了确保,您开始 spark THEN 生成数据对吗?
-
@Falan 是的,我首先开始了 kafka。我想知道如何将数据从火花流存储到 HDFS。
标签: python apache-spark pyspark apache-kafka