【发布时间】:2018-02-28 23:07:04
【问题描述】:
我正在 PySpark 中编写 Spark 结构化流应用程序以从 Kafka 读取数据。
但是,目前 Spark 的版本是 2.1.0,它不允许我将 group id 设置为参数,并且会为每个查询生成一个唯一的 id。但是 Kafka 连接是基于组的授权,需要预先设置的组 ID。
因此,是否有任何解决方法来建立连接无需将 Spark 更新到 2.2,因为我的团队不想要它。
我的代码:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
【问题讨论】:
-
我认为你也不能在 Spark 2.2 中设置
group.id- spark.apache.org/docs/latest/… -
据此Databricks doc 从Spark 2.2开始,你可以选择设置组id。但是,使用它时要格外小心,因为这可能会导致意外行为。
-
奇怪!因为根据 Spark 2.2 文档,我们不能。可能是两个文档不匹配。
-
是的,但无论如何,我不打算更新 Spark
-
我不确定每个查询的唯一 ID。
标签: apache-spark pyspark apache-kafka spark-structured-streaming