【问题标题】:jdbc source and spark structured streamingjdbc 源和 spark 结构化流
【发布时间】:2018-03-22 04:45:12
【问题描述】:

我一直在使用 Spark 结构化流式传输,对此非常满意。我目前正在执行 ETL 类型的活动。我有一个基于 PostgreSQL 的表,其中包含元数据类型信息,我希望将其与流数据帧合并。

metadataDf = spark \
    .read \
    .jdbc(url=jdbcUrl, \
        table = query,
        properties = connectionProperties) 

streamDF = spark \
    .readStream \
    .option("maxFilesPerTrigger",10) \
    .option("latestFirst",True) \
    .schema(sensorSchema) \
    .json(sensorPath)

joined_metadata = streamDF \
    .join(metadataDf,["uid"],"left")

write_query = joined_metadata \
    .writeStream \
    .trigger(processingTime=arbitarytime) \
    .format("json") \
    .option("checkpointLocation",chkploc) \
    .option("path",write_path) \
    .start()

postgresql 上的元数据表可以每隔几天更新一次。我想知道,我是否需要通过某种 while 循环来适应 spark 上的表刷新。或者 spark 的惰性 eval 是否会处理这种特定情况。

谢谢

【问题讨论】:

  • 你最终采用的解决方案是什么?
  • 我按原样去了,答案说明了原因。

标签: apache-spark


【解决方案1】:

只要程序正在运行,Spark 就会处理它。如果您不指定触发间隔,Spark 将连续处理此流(每批在最后一批完成后开始)

要指定触发间隔,请参阅 df.trigger() herein the docs

:)

【讨论】:

  • 对不起,我有一个设置了 trigger() 的写查询。我已经更新了这个问题。那么,在这种情况下,只是为了确认写入查询是否会从 jdbc 连接中获得一批新的表?
  • 是的,它每次都会尝试从jdbc 连接读取/写入。如果您的 streamDF.count() > 0
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-11-14
  • 2017-03-31
  • 2017-05-04
  • 1970-01-01
  • 2019-09-21
  • 2019-11-12
  • 2018-08-18
相关资源
最近更新 更多