【发布时间】:2018-10-06 00:33:27
【问题描述】:
我在 Kafka 中使用 spark 结构化流,但是当我尝试将流写入控制台时出现错误:
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
这是我的代码:
def group_obs(obs_df):
obs = obs_df.select(f.col("obs.payload.after").alias("obs"))
filtered_obs_with_value = obs \
.union(obs.filter("obs.value_datetime is not null")
.withColumn("value", f.col("obs.value_datetime"))
.withColumn("value_type", f.lit("datetime")))
grouped_by_obsgroup = filtered_obs_with_value\
.groupBy("obs.obs_group_id", "obs.encounter_id")
.agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))
query = grouped_by_obsgroup \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)
【问题讨论】:
标签: apache-spark pyspark apache-kafka pyspark-sql spark-structured-streaming