【问题标题】:How to Update Counter in ElasticSearch from Spark Structured Streaming?如何从 Spark Structured Streaming 更新 ElasticSearch 中的计数器?
【发布时间】:2018-10-27 23:58:55
【问题描述】:

我正在开发一个 Spark Structured Streaming 项目,目标是将用户活动日志更新到 ElasticSearch。

问题

  1. 最近8小时内首次出现user_id时,在ElasticSearch中新建一个条目,将文档中的counter设置为1;
  2. 如果同一用户在过去 8 小时内有更多活动(日志),请更新 counter 字段,将活动数量添加到其值中,最后更新 update_time 字段。

设置"es.mapping.id" -> "user_id""es.write.operation" -> "upsert"是我能做到的,但更新时我无法更新计数器和时间。也许es.update.script.inline 会有所帮助?

【问题讨论】:

    标签: elasticsearch spark-structured-streaming


    【解决方案1】:

    在阅读ES Scripted Updates document 之后,这里有一个使用无痛内联脚本更新counter 的简单解决方案。

    所以,关键是使用无痛脚本ctx._source.counter += params.counter,其中counter 代表我的DataFrame 列'counter,应该提前聚合。

    毕竟我的结局是这样的:

    val esOptions = Map(
       "es.write.operation"      -> "upsert"
      ,"es.mapping.id"           -> "user_id"
      ,"es.update.script.lang"   -> "painless"
      ,"es.update.script.inline" -> "ctx._source.counter += params.counter"
      ,"es.update.script.params" -> "counter:counter"
    
    df.writeStream.options(esOptions)
      .format("org.elasticsearch.spark.sql")
      .start("user_activity/log")
    

    同样,这只解决了计数器更新问题。稍后我会附加更新update_time字段的方法。

    【讨论】:

      猜你喜欢
      • 2018-06-04
      • 2023-04-07
      • 2017-04-25
      • 2020-09-12
      • 2019-04-26
      • 1970-01-01
      • 2020-03-19
      • 2023-03-31
      • 1970-01-01
      相关资源
      最近更新 更多