【问题标题】:How do I upsert stateful events to a Delta Lake table with an existing streaming DF?如何将有状态事件更新到具有现有流 DF 的 Delta Lake 表?
【发布时间】:2021-06-15 21:54:47
【问题描述】:

我正在尝试将来自 Kafka 的事件插入到 Delta Lake 表中。我这样做with this。 新事件即将到来,增量表中的值会根据合并条件进行更新。现在,当我停止执行然后重新运行 upsert 脚本时,Delta Lake 似乎没有按照与脚本已经运行时它们进入时相同的顺序对我的流 df 中的每一行执行 upsert。合并函数无法识别更新事件的键与增量表中应该已经存在的键之间的匹配,它只是插入每一行,即使该键应该已经被先前的事件插入。

任何人都可以向我解释是否可以从一开始就重播事件作为增量表中的 upserts?如果有,你会怎么做?

我想要什么:

  1. 带有键 a 的事件进入,a 的键和值作为新行插入
  2. 带有键 b 的事件进入,b 的键和值作为新行插入
  3. 带有键 a 的事件进来,a 的值得到更新

当我重新开始读取流和 Delta Lake 合并功能时会发生什么:

  1. 带有键 a 的事件进入,a 的键和值作为新行插入
  2. 带有键 b 的事件进入,b 的键和值作为新行插入
  3. 带有键 a 的事件进入,a 的键和值作为新行插入

我希望发生的事情: 使用 forEachBatch 的 writeStream 从 Kafka 的第一个最早的微批次开始,然后 upsertToDelta 开始从头开始顺序插入微批次中的行

我的代码:

from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
import json

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "[HOST]") \
    .option("subscribe", "[topic]") \
    .option("includeHeaders", "true") \
    .option("startingOffsets", "earliest") \
    [...].load()

dfgrouped = df.selectExpr("CAST(value AS STRING)")
records = (dfgrouped.withColumn("my_key", my_udf(dfgrouped["value"])))
# my_udf is a custom function to get a key based in the row value

deltaTable = DeltaTable.forName(spark, "mydeltable")

@udf
def get_updated_value(my_key, update_value, events_value):
    [...]
    return blob

@udf
def get_new_value(my_key, update_value):
    [...]
    return blob
    
def upsertToDelta(updatesDF, id):
    deltaTable.alias("events") \
    .merge(
        source = updatesDF.alias("updates"),
        condition = expr("events.my_key = updates.my_key") # It does not detect events.my_key for updates
    ) \
    .whenMatchedUpdate(set =
        {
        "value": get_updated_value(col("updates.my_key"), col("updates.value"), col("events.value"))
        }
    ) \
    .whenNotMatchedInsert(values =
        {
        "my_key": col("updates.my_key"),
        "value": get_new_value(col("updates.my_key"), col("updates.value"))
        }
    ) \
    .execute()
    
records.writeStream \
    .format("kafka") \
    .foreachBatch(upsertToDelta) \
    .outputMode("update") \
    .option("checkpointLocation", "/delta/events/_checkpoints/[CHECKPOINT]") \
    .option("kafka.bootstrap.servers", "[HOST]") \
    .option("topic", "[SINK_TOPIC]") \
    [...].start()    

【问题讨论】:

    标签: apache-kafka spark-streaming databricks event-sourcing delta-lake


    【解决方案1】:

    Delta Lake 在充当流接收器时仅支持仅附加或完整模式(即,将所有记录附加到表中或替换整个表) https://docs.delta.io/latest/delta-streaming.html#delta-table-as-a-sink

    【讨论】:

    • 仅当您直接使用 Delta 时(在此模式下您不能使用合并)。作者使用支持更新模式的foreachbatch。
    • 我同意@AlexOtt
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-03
    • 2012-07-31
    • 2021-06-30
    • 2022-08-05
    • 2020-12-11
    • 2022-08-10
    • 1970-01-01
    相关资源
    最近更新 更多