【发布时间】:2018-03-22 04:45:12
【问题描述】:
我一直在使用 Spark 结构化流式传输,对此非常满意。我目前正在执行 ETL 类型的活动。我有一个基于 PostgreSQL 的表,其中包含元数据类型信息,我希望将其与流数据帧合并。
metadataDf = spark \
.read \
.jdbc(url=jdbcUrl, \
table = query,
properties = connectionProperties)
streamDF = spark \
.readStream \
.option("maxFilesPerTrigger",10) \
.option("latestFirst",True) \
.schema(sensorSchema) \
.json(sensorPath)
joined_metadata = streamDF \
.join(metadataDf,["uid"],"left")
write_query = joined_metadata \
.writeStream \
.trigger(processingTime=arbitarytime) \
.format("json") \
.option("checkpointLocation",chkploc) \
.option("path",write_path) \
.start()
postgresql 上的元数据表可以每隔几天更新一次。我想知道,我是否需要通过某种 while 循环来适应 spark 上的表刷新。或者 spark 的惰性 eval 是否会处理这种特定情况。
谢谢
【问题讨论】:
-
你最终采用的解决方案是什么?
-
我按原样去了,答案说明了原因。
标签: apache-spark