【问题标题】:Need to extract attributes directly from Avro using NiFi需要使用 NiFi 直接从 Avro 中提取属性
【发布时间】:2017-08-03 02:28:13
【问题描述】:

我发现在 NiFi 中无法直接从 Avro 提取属性,因此我使用 ConvertAvroToJson -> EvaluateJsonPath -> ConvertJsonToAvro 作为解决方法。

但我想编写一个脚本来从 Avro 流文件中提取属性,以便在 ExecuteScript 处理器中使用,以确定它是否是更好的方法。

有没有人有脚本可以做到这一点?否则,我可能最终会使用原始方法。

谢谢,

凯文

【问题讨论】:

  • 您能解释一下提取值后您想做什么吗?

标签: apache-nifi


【解决方案1】:

这是一个 Groovy 脚本(在其模块目录属性中需要 Avro JAR),我让用户使用 JSONPath 表达式指定动态属性,以针对 Avro 文件进行评估。具有讽刺意味的是,它执行GenericData.toString() 无论如何都会将记录转换为 JSON,但也许这里有一些代码可以重用:

import org.apache.avro.*
import org.apache.avro.generic.*
import org.apache.avro.file.*
import groovy.json.*
import org.apache.commons.io.IOUtils
import java.nio.charset.*

flowFile = session.get()
if(!flowFile) return

final GenericData genericData = GenericData.get();
slurper = new JsonSlurper().setType(JsonParserType.INDEX_OVERLAY)

pathAttrs = this.binding?.variables?.findAll {attr -> attr.key.startsWith('avro.path')}
newAttrs = [:]
try {
   session.read(flowFile, { inputStream ->
   def reader = new DataFileStream<>(inputStream, new GenericDatumReader<GenericRecord>())
   GenericRecord currRecord = null;
   if(reader.hasNext()) {
       currRecord = reader.next();
       log.info(genericData.toString(currRecord))
       record = slurper.parseText(genericData.toString(currRecord))
       pathAttrs?.each {k,v ->
         object = record
         v.value.tokenize('.').each {
            object = object[it]
          }
          newAttrs[k - "avro.path."] = String.valueOf(object)
        }
        reader.close()
   }
} as InputStreamCallback)
newAttrs.each{k,v ->
  flowFile = session.putAttribute(flowFile, k,v)
}
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
  log.error("Error during Avro Path: {}", [e.message] as Object[], e)
  session.transfer(flowFile, REL_FAILURE)
}

如果您要提取 Avro 元数据与字段(不完全确定您所说的“属性”是什么意思),还请查看 MergeContent 的 AvroMerge,因为其中有一些代码可以提取 Avro 元数据:

【讨论】:

  • 对不起,“AVRO JAr”是什么意思?
  • UPD: `@Grab('commons-io:commons-io:2.4') @Grab('org.apache.avro:avro:1.8.1') 导入 ....跨度>
【解决方案2】:

如果您要从每个流文件的单个 Avro 记录中提取简单模式,ExtractText 可能就足够了。如果您想利用 Apache NiFi 1.3.0 中可用的新记录处理,您应该从 AvroReader 开始,blogsa series 详细描述了此 process。您还可以使用 ExtractAvroMetadata 提取 Avro 元数据。

【讨论】:

    猜你喜欢
    • 2017-07-18
    • 1970-01-01
    • 2022-01-18
    • 1970-01-01
    • 2023-03-27
    • 2017-02-06
    • 2019-08-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多