【问题标题】:Python ExecuteScript in NiFi: Transform flowfile attributes & contentNiFi中的Python ExecuteScript:转换流文件属性和内容
【发布时间】:2019-11-13 08:12:54
【问题描述】:

我正在尝试在 NiFi 中创建一个 Python 脚本:

  1. 从传入的流文件中读取一些属性
  2. 读取流文件的json内容并提取特定字段
  3. 将属性写入传出流文件
  4. 用脚本中创建的新内容覆盖传入的流文件(例如返回新 json 的 API 调用)并将其发送到 SUCCESS 关系或删除旧的流文件并使用所需内容创建新的流文件

到目前为止我做了什么:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback

class OutputWrite(OutputStreamCallback, obj):

def __init__(self):
    self.obj = obj

def process(self, outputStream):

    outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

###end class###

flowfile = session.get()

if flowfile != None:

**#1) Get flowfile attributes**

    headers = {
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept': 'application/json, text/plain, */*',
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Trace': 'true',
        'Authorization': flowfile.getAttribute('Authorization')
    }

    collection = flowfile.getAttribute('collection')
    dataset = flowfile.getAttribute('dataset')

    **#2)Get flowfile content**

    stream_content = session.read(flowfile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)

    records = json_content['result']['count']
    pages = records/10000

    **#3) Write flowfile attributes**

    flowfile = session.putAttribute(flowfile, 'collection', collection)
    flowfile = session.putAttribute(flowfile, 'dataset', dataset)

    **#API operations: output_json with desired data**

    output_json = {some data}

    **#4) Write final JSON data to output flowfile**

    flowfile = session.write(flowfile, OutputWrite(output_json))

    session.transfer(flowfile, REL_SUCCESS)
    session.commit()

我的问题是我找不到将所需 output_json 对象的引用作为 OutputStreamCallback 类中的参数传递的方法。关于如何解决这个问题或更好的方法的任何想法?

在这种情况下,在类的进程函数中执行所有 API 操作是否更容易,但是我如何访问进程函数中的传入流文件属性(需要会话或流文件对象)?

非常感谢任何帮助!

【问题讨论】:

标签: python apache-nifi


【解决方案1】:

你可以试试这样的-

import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil

class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            # Read input FlowFile content
            input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            input_obj = json.loads(input_text)
            # Transform content
            output_obj = input_obj   #your input content

            #perform Data tranformation on output_obj

            # Write output content
            output_text = json.dumps(outputJson)
            outputStream.write(StringUtil.toBytes(output_text))
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

【讨论】:

  • outputJson 看起来像是错字。
  • outputJson 是您将分配最终 Json 值的变量。我还没有写完整的代码。
【解决方案2】:

我在下面包含了示例 Python 代码,该代码允许自定义 PyStreamCallback 类,该类实现逻辑以从主题上的 Matt Burgess' blog article 转换流文件内容中的 JSON,但我鼓励您考虑使用本机处理器来处理 @ 987654323@ 和 EvaluateJSONPath 执行相关活动,并且仅在需要执行 NiFi 开箱即用的任务时才使用自定义代码。

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Range": 5,
          "Rating": obj['rating']['primary']['value'],
          "SecondaryRatings": {}
        }
    for key, value in obj['rating'].iteritems():
      if key != "primary":
        newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}

    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

更新:

要在回调中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,然后在process 方法中引用它。这是一个非常简单的示例,它将属性my_attr 的值连接到传入的流文件内容并将其写回:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self, flowfile):
        self.ff = flowfile
        pass
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        text += self.ff.getAttribute('my_attr')
        outputStream.write(bytearray(text.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)

传入流文件:

--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
    Value: '30'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '1690494181462176'
Key: 'my_attr'
    Value: 'This is an attribute value.'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.

输出流文件:

--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
    Value: '57'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '1690494181462176'
Key: 'my_attr'
    Value: 'This is an attribute value.'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.This is an attribute value.

【讨论】:

  • 谢谢,但在您的脚本中,我仍然看不到对流程函数中的流文件属性执行操作。我的传入流文件具有我想在传出流文件内容中处理的属性
  • 我用process 方法中访问流文件属性的代码更新了答案。
  • 是的,它比我想象的要简单得多
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-09
  • 2021-02-03
  • 1970-01-01
相关资源
最近更新 更多