【问题标题】:Handle duplicate records in Elasticsearch处理 Elasticsearch 中的重复记录
【发布时间】:2015-12-24 21:08:47
【问题描述】:

我正在使用 Hadoop+ELK 堆栈来构建分析堆栈。我正在尝试每天刷新索引。

我正在使用来自第三方的 CSV 格式的数据。我无法控制输入数据,即我无法要求更改 CSV 文件的架构。

问题是 CSV 记录中没有唯一 ID,甚至组合列以生成唯一 ID 也不起作用。因此,在刷新 Elasticsearch 时会将重复数据添加到索引中。

所以,如果第 1 天的数据是这样的

Product1,Language1,Date1,$1
Product2,Language2,Date1,$12

Day2 数据变成

Product1,Language1,Date1,$1
Product2,Language2,Date1,$12
Product1,Language1,Date1,$1
Product2,Language2,Date1,$12
Product3,Language1,Date2,$5(new record added on day2)

在 ELK 中是否有任何好的方法来处理这个问题。我正在使用 Logstash 来使用 csv 文件。

【问题讨论】:

    标签: elasticsearch logstash elastic-stack


    【解决方案1】:

    我认为这与文档“_id”有关。

    如果您对每个文档都有一个唯一的“_id”,则不会有问题,因为您只需将文档“更新”为相同的值。如果需要,您甚至可以将映射设置为不允许更新。

    您的问题是您没有将文档的“_id”链接到文档的内容(在某些情况下这很好)。

    我想一个简单的解决方案是创建自己的“my_id”字段并将“_id”的路径设置为它,例如here

    然后问题就变成了如何创建“my_id”字段。我会在文档上使用哈希。

    python sn-p 的例子是(我相信你可以找到合适的 ruby​​ 插件):

    import hashlib
    hash_object = hashlib.sha1(b"Product2,Language2,Date1,$12")
    hex_dig = hash_object.hexdigest()
    print(hex_dig)
    

    【讨论】:

    • 感谢@eran 的回复。但正如我提到的“Product2,Language2,Date1,$12”的组合也不是唯一的。所以,对我来说,不可能从这些中创建一个 id 字段字段是唯一的。
    • 嗯...您可以访问行号吗?否则,这不是 ElasticSearch 问题。任何数据库都会遇到同样的问题,
    • 我知道这不是 Elasticsearch 的问题。我只能看到一条出路。每次都重新创建索引有点贵。正在寻找某人可能早先做过的事情。
    • 在记录之前添加一个行号,然后使用那个数字stage doc id怎么样?
    • 我也考虑过这一点,但插入可能发生在 csv 文件中的任何位置,因此会干扰整个位置。假设我将它编号为 123,现在在 2 和 3 之间插入一条新记录。所以,整个排序将是错误的。
    【解决方案2】:

    我相信解决方案的第一部分将是确定一组值,如果它们一起使用,对于文档来说将是唯一的。如果不是,则无法将重复文件与真实文件分开。 为了便于讨论,假设四个值 (Product1,Language1,Date1,$1) 定义了一个文档。如果有另一个文档具有相同的设置值,则它是先前文档的副本,而不是新文档。

    假设你有 (Product1,Language1,Date1,$1),你可以先执行一个查询来搜索这个文档是否已经存在于 ElasticSearch 中。比如:

    {
    "filter": {
        "bool": {
            "must": [
                {
                    "term": {
                        "pdtField": "Product1"
                    }
                },
                {
                    "term": {
                        "langField": "Language1"
                    }
                },
                {
                    "term": {
                        "dateField": "Date1"
                    }
                },
                {
                    "term": {
                        "costField": "$1"
                    }
                }
            ]
        }
    }
    }
    

    根据您实际使用的内容,注意此处使用的字段名称。 如果此过滤器结果有doc_count != 0,那么您无需为此创建新文档。否则,使用现有值创建一个新文档。

    或者,您可以使用从 (Product1,Language1,Date1,$1) 创建的散列创建文档 ID,然后将此散列用作文档的 _id。首先检查是否存在任何具有此_id 的文档。如果不存在,则根据哈希生成的 _id 值创建一个新文档。

    如果您无法控制创建单个文档的方式,那么也许您可以尝试使用上面建议的策略预处理您的 CSV 输入,只在 CSV 中保留所需的条目并摆脱休息,然后像往常一样携带生成的 CSV。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-02
      • 2017-10-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多