【问题标题】:What's wrong in my Nifi python script, please suggest我的 Nifi python 脚本有什么问题,请提出建议
【发布时间】:2021-12-24 12:54:22
【问题描述】:

我正在尝试读取来自 Nifi 处理器的 json 数据,我使用下面的代码通过计算将一个键值更新到每条记录中,但我面临**list indices must be integers in <script> at line number 43** 问题。 .

当手动添加json file 时,同样的代码工作正常

我的代码是:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback


class FlowFileParser(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        finalResp = []
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text)
        newObj = data['priority']
        for k in data:
            resp = self.performCalculation(k)
            finalResp.append(resp)

        log.info(" newObj ",newObj)
        log.info(" newObj ",newObj)
        outputStream.write(bytearray(finalResp.encode('utf-8')))

    def performCalculation(self,k):
            a = int(k['a'])
            b = int(k['b'])
            log.info(a)
            log.info(b)
            total=sum((a,b))
            log.info(total)
            x = {"x":total}
            k.update(x)
            return k

flowFile = session.get()
if flowFile != None:
    #flowFile = session.putAttribute(flowFile, "priority", "5")
    priority = FlowFileParser()
    session.write(flowFile,priority)
    flowFile = session.putAttribute(flowFile, "filename", str(priority))
    session.transfer(flowFile, REL_SUCCESS)

我的 json 文件有

  [{
    "a":1,
    "b":2,
    "id":1
},{
    "a":1,
    "b":2,
    "id":2
}]

【问题讨论】:

  • 乍一看,没有什么奇怪的,但请仔细检查您的语法和操作是否与 NiFi 用来执行纯 Python 脚本的 Jython 2.7.x 相比
  • 我在你的 json 中没有看到任何对 data['priority'] 有效的东西
  • 对于我后来使用的 nifi 东西,我也删除了,然后也面临同样的问题。我更改了脚本并且它正在工作。我更新了我的答案。

标签: python session apache-nifi


【解决方案1】:

我更改了我的脚本,现在它运行良好。谢谢大家的建议

class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
            try:
                # Read input FlowFile content
                input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
                input = json.loads(input_text)
    
                # Transform content
                resp = input
                resp['total'] = resp['a'] + resp['b']
    
                # Write output content
                output_text = json.dumps(resp)
                outputStream.write(StringUtil.toBytes(output_text))
            except:
                traceback.print_exc(file=sys.stdout)
                raise
flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

我使用此链接https://github.com/BatchIQ/nifi-scripting-samples进行了推荐

【讨论】:

    猜你喜欢
    • 2016-03-09
    • 2011-10-23
    • 1970-01-01
    • 2016-05-11
    • 2011-11-26
    • 2015-04-06
    • 2014-06-29
    • 2013-09-18
    • 1970-01-01
    相关资源
    最近更新 更多