【发布时间】:2021-06-15 21:54:47
【问题描述】:
我正在尝试将来自 Kafka 的事件插入到 Delta Lake 表中。我这样做with this。 新事件即将到来,增量表中的值会根据合并条件进行更新。现在,当我停止执行然后重新运行 upsert 脚本时,Delta Lake 似乎没有按照与脚本已经运行时它们进入时相同的顺序对我的流 df 中的每一行执行 upsert。合并函数无法识别更新事件的键与增量表中应该已经存在的键之间的匹配,它只是插入每一行,即使该键应该已经被先前的事件插入。
任何人都可以向我解释是否可以从一开始就重播事件作为增量表中的 upserts?如果有,你会怎么做?
我想要什么:
- 带有键 a 的事件进入,a 的键和值作为新行插入
- 带有键 b 的事件进入,b 的键和值作为新行插入
- 带有键 a 的事件进来,a 的值得到更新
当我重新开始读取流和 Delta Lake 合并功能时会发生什么:
- 带有键 a 的事件进入,a 的键和值作为新行插入
- 带有键 b 的事件进入,b 的键和值作为新行插入
- 带有键 a 的事件进入,a 的键和值作为新行插入
我希望发生的事情: 使用 forEachBatch 的 writeStream 从 Kafka 的第一个最早的微批次开始,然后 upsertToDelta 开始从头开始顺序插入微批次中的行
我的代码:
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
import json
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "[HOST]") \
.option("subscribe", "[topic]") \
.option("includeHeaders", "true") \
.option("startingOffsets", "earliest") \
[...].load()
dfgrouped = df.selectExpr("CAST(value AS STRING)")
records = (dfgrouped.withColumn("my_key", my_udf(dfgrouped["value"])))
# my_udf is a custom function to get a key based in the row value
deltaTable = DeltaTable.forName(spark, "mydeltable")
@udf
def get_updated_value(my_key, update_value, events_value):
[...]
return blob
@udf
def get_new_value(my_key, update_value):
[...]
return blob
def upsertToDelta(updatesDF, id):
deltaTable.alias("events") \
.merge(
source = updatesDF.alias("updates"),
condition = expr("events.my_key = updates.my_key") # It does not detect events.my_key for updates
) \
.whenMatchedUpdate(set =
{
"value": get_updated_value(col("updates.my_key"), col("updates.value"), col("events.value"))
}
) \
.whenNotMatchedInsert(values =
{
"my_key": col("updates.my_key"),
"value": get_new_value(col("updates.my_key"), col("updates.value"))
}
) \
.execute()
records.writeStream \
.format("kafka") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.option("checkpointLocation", "/delta/events/_checkpoints/[CHECKPOINT]") \
.option("kafka.bootstrap.servers", "[HOST]") \
.option("topic", "[SINK_TOPIC]") \
[...].start()
【问题讨论】:
标签: apache-kafka spark-streaming databricks event-sourcing delta-lake