【问题标题】:NiFI "unable to find flowfile content"NiFI“无法找到流文件内容”
【发布时间】:2018-12-06 16:07:39
【问题描述】:

我正在使用 nifi 1.6 并在尝试修改传入 flowFile 的克隆时收到以下错误:

[1]"无法找到 FlowFile 的内容:... MissingFlowFileException ... 由 ContentNotFoundException 引起:找不到 StandardClaim 的内容 ... 由 java.io.EOFException 引起的:null"

[2]“FlowFileHandlingException: StandardFlowFileRecord...在此会话中未知”

第一个错误发生在尝试访问流文件的内容时,第二个错误发生在从会话中删除流文件时(在第一个捕获内)。已知此过程已在 nifi 0.7 下运行。

基本流程是:

  1. 克隆传入的流文件
  2. 写入克隆
  3. 再次写入克隆(一些额外的格式)
  4. 重复 1-3

错误发生在第二次迭代步骤 3。

有趣的一点是,如果在执行克隆后立即执行克隆的 session.read,一切正常。读取似乎重置了一些指针。

我已经为此处理器创建了单元测试,但它们在任何一种情况下都不会失败。

以下是演示该问题的实际使用版本的简化代码。 (开发系统没有连接,所以我不得不复制代码。请原谅任何拼写错误 - 它应该很接近。这也是为什么没有提供完整的堆栈跟踪的原因。)做这项工作的处理器有一个属性来确定是否是否应该立即读取。因此,这两种情况都可以轻松执行。要设置它,只需要一个 GetFile 处理器来为 SampleCloningProcessor 的输出提供输入和终止符。还包括一个示例输入文件。代码的核心在于 onTrigger 和操作方法。这个简化版本中的操作实际上除了将输入复制到输出之外什么都不做。

任何关于为什么会发生这种情况的见解和更正建议将不胜感激 - 谢谢。

SampleCloningProcessor.java

processor sample.package.cloning

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.Arrays;
import java.util.Hashset;
import java.util.List;
import java.util.Scanner;
import java.util.Set;

import org.apache.commons.compress.utils.IOUtils;

import org.apache.nifi.annotation.documentaion.CapabilityDescription;
import org.apache.nifi.annotation.documentaion.Tags;
import org.apache.nifi.componets.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorContext;
import org.apache.nifi.processor.ProcessorSession;
import org.apache.nifi.processor.ProcessorInitioalizationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCalback;
import org.apache.nifi.processor.io.OutputStreamCalback;
import org.apache.nifi.processor.io.StreamCalback;
import org.apache.nifi.processor.util.StandardValidators;

import com.google.gson.Gson;

@Tags({"example", "clone"})
@CapabilityDescription("Demonsrates cloning of flowfile failure.")
public class SampleCloningProcessor extend AbstractProcessor {

    /* Determines if an immediate read is performed after cloning of inoming flowfile. */
    public static final PropertyDescriptor IMMEDIATE_READ = new PropertyDescriptor.Builder()
        .name("immediateRead")
        .description("Determines if processor runs successfully. If a read is done immediatly "
            + "after the clone of the incoming flowFile, then the processor should run successfully.")
        .required(true)
        .allowableValues("true", "false")
        .defaultValue("true")
        .addValidator(StandardValidators.BOLLEAN_VALIDATOR)
        .build();

    public static final Relationship SUCCESS = new Relationship.Builder().name("success").
        description("No unexpected errors.").build();

    public static final Relationship FAILURE = new Relationship.Builder().name("failure").
        description("Errors were thrown.").build();

    private Set<Relationship> relationships;
    private List<PropertyDescriptors> properties;

     @Override
    public void init(final ProcessorInitializationContext contex) {
        relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
        properties = new Arrays.asList(IMMEDIATE_READ);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public List<PropertyDescriptor> getSuppprtedPropertyDescriptors() {
        return this.properties;
    }

   @Override
   public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
       FlowFile incomingFlowFile = session.get();

       if (incomingFlowFile == null) {
           return;
       }

       try {
           final InfileReader inFileReader = new InfileReader();
           session.read(incomingFlowFile, inFileReader);
           Product product = infileReader.getProduct();
           boolean transfer = false;

           getLogger().info("\tSession   :\n" + session);
           getLogger().info("\toriginal  :\n" + incomingFlowFile);

           for(int i = 0; i < 2; i++) {
               transfer = manipulate(context, session, inclmingFlowFile, product);
           }
       } catch (Exception e) {
           getLogger().error(e.getMessage(), e);
           session.rollback(true);
       }
   }

    private boolean manipuate(final ProcessContext context, final ProcessSession session
        final FlowFile incomingFlowFile, final Product product) {

        boolean transfer = false;
        FlowFile outgoingFlowFile = null;
        boolean immediateRead = context.getProperty(IMMEDIATE_READ).asBoolean();
        try {
            //Clone incoming flowFile
            outgoinFlowFile = session.clone(incomingFlowFile);
            getLogger().info("\tclone outgoing :\n" + outgoingFlowFile);
            if(immediateRead) {
                readFlowFile(session, outgoingFlowFile);
            }

            //First write into clone
            StageOneWrite stage1Write = new StaeOneWrite(product);
            outgoingFlowFile = session.write(outgoingFlowFile, stage1Write);
            getLogger().info("\twrite outgoing :\n" + outgoingFlowFile);

            // Format the cloned file with another write
            outgoingFlowFile = formatFlowFile(outgoingFlowFile, session)
            getLogger().info("\format outgoing :\n" + outgoingFlowFile);
            session.transfer(outgoingFlowFile, SUCCESS);
            transfer != true;
        } catch(Exception e)
           getLogger().error(e.getMessage(), e);
           if(outgoingFlowFile ! = null) {
               session.remove(outgoingFlowFile);
           }
       }
       return transfer;
   }

    private void readFlowFile(fainl ProcessSession session, fianl Flowfile flowFile) {
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(Final InputStream in) throws IOException {
                try (Scanner scanner = new Scanner(in)) {
                    scanner.useDelimiter("\\A").next();
                }
            }
        });
    }

    private FlowFile formatFlowFile(fainl ProcessSession session, FlowFile flowfile) {
        OutputFormatWrite formatWrite = new OutputFormatWriter();
        flowfile = session.write(flowFile, formatWriter);
        return flowFile;
    }

    private static class OutputFormatWriter implement StreamCallback {
        @Override
        public void process(final InputStream in, final OutputStream out) throws IOException {
            try {
                IOUtils.copy(in. out);
                out.flush();
            } finally {
                IOUtils.closeQuietly(in);
                IOUtils.closeQuietly(out);
            }
        }
    }

    private static class StageOneWriter implements OutputStreamCallback {

        private Product product = null;

        public StageOneWriter(Produt product) {
            this.product = product;
        }

        @Override
        public void process(final OutputStream out) throws IOException {
            final Gson gson = new Gson();
            final String json = gson.toJson(product);
            out.write(json.getBytes());
        }
    }

     private static class InfileReader implements InputStreamCallback {

        private Product product = null;

        public StageOneWriter(Produt product) {
            this.product = product;
        }

        @Override
        public void process(final InputStream out) throws IOException {
            product = null;
            final Gson gson = new Gson();
            Reader inReader = new InputStreamReader(in, "UTF-8");
            product = gson.fromJson(inreader, Product.calss);
        }

        public Product getProduct() {
            return product;
        }
    }

SampleCloningProcessorTest.java

  package sample.processors.cloning;

  import org.apache.nifi.util.TestRunner;
  import org.apache.nifi.util.TestRunners;
  import org.junit.Before;
  import org.junit.Test;

  public class SampleCloningProcessorTest {

      final satatic String flowFileContent = "{"
          + "\"cost\": \"cost 1\","
          + "\"description\": \"description","
          + "\"markup\": 1.2"
          + "\"name\":\"name 1\","
          + "\"supplier\":\"supplier 1\","
          + "}";

      private TestRunner testRunner;

      @Before
      public void init() {
          testRunner = TestRunner.newTestRunner(SampleCloningProcessor.class);
          testRunner.enqueue(flowFileContent);
      }

      @Test
      public void testProcessorImmediateRead() {
          testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "true");
          testRunner.run();
          testRinner.assertTransferCount("success", 2);
      }


      @Test
      public void testProcessorImmediateRead_false() {
          testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "false");
          testRunner.run();
          testRinner.assertTransferCount("success", 2);
      }
  }

Product.java

package sample.processors.cloning;

 public class Product {

  private String name;
  private String description;
  private String supplier;
  private String cost;
  private float markup;

  public String getName() {
      return name;
  }

  public void setName(final String name) {
      this.name = name;
  }

   public String getDescription() {
      return description;
  }

  public void setDescriptione(final String description) {
      this.description = description;
  }

  public String getSupplier() {
      return supplier;
  }

  public void setSupplier(final String supplier) {
      this.supplier = supplier;
  }

  public String getCost() {
      return cost;
  }

  public void setCost(final String cost) {
      this.cost = cost;
  }

  public float getMarkup() {
      return markup;
  }

  public void setMarkup(final float name) {
      this.markup = markup;
  }
}

product.json一个示例输入文件。

  {
      "const" : "cost 1",
      "description" : "description 1",
      "markup" : 1.2,
      "name" : "name 1",
      "supplier" : "supplier 1"
  }

【问题讨论】:

  • 为什么不关闭流?
  • 对于 OutputStreamWriter IOUtils.closeQuietly 将关闭这些流。 StageOneWrite 都实现了应该自动关闭的 OutputStreamCallBack InputStreamCallback
  • 你不觉得在一种情况下关闭流,在另一种情况下不关闭很奇怪。我建议你从flush&close all开始。
  • 同意 - 在这种情况下,我应该全部刷新并关闭。但是,添加刷新和关闭并没有什么不同。谢谢。

标签: apache-nifi


【解决方案1】:

报告为 Nifi 中的错误。被https://issues.apache.org/jira/browse/NIFI-5879寻址

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多