【问题标题】:How to generate N FlowFiles and set the content of each FlowFile according to the data in Elastic?如何根据Elastic中的数据生成N个FlowFile并设置每个FlowFile的内容?
【发布时间】:2017-09-07 15:53:06
【问题描述】:

在 Elasticsearch 我有这个索引和映射:

PUT /myindex
{
  "mappings": {
    "myentries": {
      "_all": {
        "enabled": false
      }, 
      "properties": {
          "yid": {"type": "keyword"},
          "days": { 
              "properties": {
                        "Type1":  { "type": "date" },
                        "Type2":  { "type": "date" }
              }
            },
            "directions": { 
              "properties": {
                      "name": {"type": "keyword"},
                      "recorder":  { "type": "keyword" },
                      "direction":  { "type": "integer" }
              }
            }
        }
    }
  }
}

我想为映射directionsrecorderdirection 的值的每个组合生成N 个FlowFiles,1 个。我怎样才能在 Nifi 中做到这一点?我在考虑使用GenerateFlowFile,但是我该如何应用这个与 Elasticsearch 相关的逻辑呢?

一种可能的解决方法是使用GenerateFlowFile 生成N 个流文件,其中Batch 字段可以被硬编码并设置为10(Elastic 中的条目数)。但是我不知道下一步应该是什么?

【问题讨论】:

    标签: elasticsearch apache-nifi elasticsearch-5


    【解决方案1】:

    GenerateFlowFile 在这里可能不是正确的工具,因为它不接受传入连接,因此您将无法使用计数对其进行参数化。您可以使用SplitJson,它会将流文件拆分为多个流文件,给出一个返回 JSON 内容数组的 JSONPath 表达式。

    更新

    这是一个great tool,您可以使用它来动态评估 JSONPath 并查看它匹配的内容。在您的示例中,假设您收到如下数据:

    {
      "yid": "nifi",
      "days" : [{"Type1": "09/07/2017"},{"Type2":"10/07/2017"}],
      "directions": [
        {
            "name": "San Francisco",
          "recorder"  : "Samsung",
          "direction": "0"
        },
        {
            "name": "Santa Monica",
          "recorder"  : "iPhone",
          "direction": "270"
        },
        {
            "name": "San Diego",
          "recorder"  : "Razr",
          "direction": "180"
        },
        {
            "name": "Santa Clara",
          "recorder"  : "Android",
          "direction": "0"
        }
      ]
    }
    

    JSONPath 表达式 $.directions[*].direction 将返回:

    [
      "0",
      "270",
      "180",
      "0"
    ]
    

    这将允许SplitJson 使用派生内容和fragment 属性创建四个流文件,以将它们关联回原始流文件。

    如果您确实需要对结果方向和记录器值执行置换逻辑,您可能希望使用 ExecuteScript 和一个简单的 Groovy/Ruby/Python 脚本来内联执行该操作并拆分结果值。

    【讨论】:

    • 谢谢。你能举一个 JSONPath 表达式的例子吗?
    • 谢谢。不,我不需要执行任何排列逻辑。事实上,directionrecorder 的所有必需组合都已存储在 Ellasticsearch 中。所以,对于你的例子,我需要得到[ ["Samsung", "0"], ["iPhone","270"], ["Razr","180"], ["Android","0"] ],而不仅仅是direction。这些将是 4 个不同的流文件。是否可以调整$.directions[*].direction 以获得directionrecorder
    • 是的,只需删除该表达式的最后一个元素 -- $.directions[*]$rootdirectionsdirections 元素,[*] 表示遍历每个数组元素。跨度>
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多