【问题标题】:Nifi custom Kafka processor code works for a limited timeNifi 自定义 Kafka 处理器代码在有限的时间内工作
【发布时间】:2021-03-03 09:54:24
【问题描述】:

这是我的自定义 kafka 处理器的代码,它只是从 kafka 主题中消费并产生一些数据

ConsumerRecords<byte[],byte[]> records = consumer.poll(1000);
records.forEach(record -> {
    FlowFile flowFile = session.create();
    if (flowFile == null) {
       return;
    }
    try {
       byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : 
       genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
       flowFile = session.write(flowFile, rawOut -> {
           rawOut.write(outputBytes);
           consumer.commitSync();
           });
    } catch (ProcessException pe) {
       getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
       session.transfer(flowFile, REL_FAILURE);
       return;
    }
    flowFile = session.putAttribute(flowFile, "topic", record.topic());
    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    getLogger().info("flowFile id " + flowFile.getId());
    session.transfer(flowFile, REL_SUCCESS);
});

这段代码需要一批大约 500 条 kakfa 消息,并生成一些 flowFile 用于输出。我需要的显然是将它放在一个while循环中,一遍又一遍地做同样的事情。但是,当我这样做时,处理器中没有任何东西。虽然仍然,信息日志显示 flowFile ids 增加了,并且似乎产生了实际的 flowFile。我测试的一件事是这只发生在无限的while循环中。当我使用有限的 for 循环时,处理器工作正常。我想知道可能有一些我不知道的关于 nifi flow internal 的问题。

【问题讨论】:

    标签: java apache-kafka apache-nifi


    【解决方案1】:

    问题是我没有手动提交会话。所以它只有在方法返回时才被提交,这在无限循环的情况下从未发生过。人为的解决方案最终变成了这样。

    while(true)
        ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(1000));
        records.forEach(record -> {
            FlowFile flowFile = session.create();
            if (flowFile == null) {
               return;
            }
            try {
               byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : 
               genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
               flowFile = session.write(flowFile, rawOut -> {
                   rawOut.write(outputBytes);
                   consumer.commitSync();
                   });
            } catch (ProcessException pe) {
               getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
               session.transfer(flowFile, REL_FAILURE);
               return;
            }
            flowFile = session.putAttribute(flowFile, "topic", record.topic());
            flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
            getLogger().info("flowFile id " + flowFile.getId());
            session.transfer(flowFile, REL_SUCCESS);
            session.commit();
        });
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多