【问题标题】:How to Update Counter in ElasticSearch from Spark Structured Streaming?如何从 Spark Structured Streaming 更新 ElasticSearch 中的计数器?
【发布时间】:2018-10-27 23:58:55
【问题描述】:
我正在开发一个 Spark Structured Streaming 项目,目标是将用户活动日志更新到 ElasticSearch。
问题:
- 最近8小时内首次出现
user_id时,在ElasticSearch中新建一个条目,将文档中的counter设置为1;
- 如果同一用户在过去 8 小时内有更多活动(日志),请更新
counter 字段,将活动数量添加到其值中,最后更新 update_time 字段。
设置"es.mapping.id" -> "user_id"和"es.write.operation" -> "upsert"是我能做到的,但更新时我无法更新计数器和时间。也许es.update.script.inline 会有所帮助?
【问题讨论】:
标签:
elasticsearch
spark-structured-streaming
【解决方案1】:
在阅读ES Scripted Updates document 之后,这里有一个使用无痛内联脚本更新counter 的简单解决方案。
所以,关键是使用无痛脚本ctx._source.counter += params.counter,其中counter 代表我的DataFrame 列'counter,应该提前聚合。
毕竟我的结局是这样的:
val esOptions = Map(
"es.write.operation" -> "upsert"
,"es.mapping.id" -> "user_id"
,"es.update.script.lang" -> "painless"
,"es.update.script.inline" -> "ctx._source.counter += params.counter"
,"es.update.script.params" -> "counter:counter"
df.writeStream.options(esOptions)
.format("org.elasticsearch.spark.sql")
.start("user_activity/log")
同样,这只解决了计数器更新问题。稍后我会附加更新update_time字段的方法。