【问题标题】:Read RabbitMQ from Beam/DataFlow从 Beam/DataFlow 读取 RabbitMQ
【发布时间】:2019-01-04 16:29:53
【问题描述】:

我正在尝试以流方式从光束/数据流运行 RabbitMQ 队列(以便它无限期地运行。)

我尝试运行的最小示例代码是:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class RabbitMqTest {

    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        final String serverUri = "amqp://guest:guest@localhost:5672";

        pipeline
                .apply("Read RabbitMQ message", RabbitMqIO.read().withUri(serverUri).withQueue("my_queue"))
                .apply(ParDo.of(new DoFn<RabbitMqMessage, String>() {

                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        String message = new String(c.element().getBody());
                        System.out.println();
                        c.output(message);
                    }
                }));
        pipeline.run().waitUntilFinish();
    }
}

但是它崩溃了:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:169)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

如果我没有将 withMaxReadTime() 传递给 RabbitMqIO。 如果我确实传入了 withMaxReadTime(),它会阻塞 X 秒,然后处理在此期间到达的所有消息,然后退出。

如何设置无限期地从 RabbitMQ 运行的流式传输流?

【问题讨论】:

    标签: rabbitmq google-cloud-dataflow apache-beam


    【解决方案1】:

    Dataflow 管道也有类似的问题。当试图在 Dataflow 中运行它时,我得到了:

    java.lang.NullPointerException
                org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
                org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
                org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
                org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
                org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                java.lang.Thread.run(Thread.java:745)
    

    他们的问题是RabbitMqIO 使用来自 RabbitMq 的消息中的时间戳,例如用于水印。事实证明,在我的情况下,来自 RabbitMq 的消息没有设置时间戳(默认情况下在 RabbitMq 中未设置)并且它为空。我通过为 Apache Beam 中的类准备补丁来解决这个问题。我对RabbitMqMessage 构造函数进行了更改。现在看起来像这样:

    public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
        this.routingKey = routingKey;
        body = delivery.getBody();
        contentType = delivery.getProperties().getContentType();
        contentEncoding = delivery.getProperties().getContentEncoding();
        headers = delivery.getProperties().getHeaders();
        deliveryMode = delivery.getProperties().getDeliveryMode();
        priority = delivery.getProperties().getPriority();
        correlationId = delivery.getProperties().getCorrelationId();
        replyTo = delivery.getProperties().getReplyTo();
        expiration = delivery.getProperties().getExpiration();
        messageId = delivery.getProperties().getMessageId();
        /*
          *** IMPORTANT ***
          Sometimes timestamp in RabbitMq message properties is 'null'. `RabbitMqIO` uses that value as
          watermark, when it is `null` it causes exceptions, 'null' has to be replaced with some value in this case current time
         */
        // timestamp = delivery.getProperties().getTimestamp();
        timestamp = delivery.getProperties().getTimestamp() == null ? new Date() : delivery.getProperties().getTimestamp();
        type = delivery.getProperties().getType();
        userId = delivery.getProperties().getUserId();
        appId = delivery.getProperties().getAppId();
        clusterId = delivery.getProperties().getClusterId();
    }
    

    我不得不更改RabbitMqIO 中的advance() 方法以不使用可能为空的timestamp 属性:

    @Override
        public boolean advance() throws IOException {
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
                if (delivery == null) {
                    return false;
                }
                if (source.spec.useCorrelationId()) {
                    String correlationId = delivery.getProperties().getCorrelationId();
                    if (correlationId == null) {
                        throw new IOException(
                                "RabbitMqIO.Read uses message correlation ID, but received "
                                        + "message has a null correlation ID");
                    }
                    currentRecordId = correlationId.getBytes(StandardCharsets.UTF_8);
                }
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                checkpointMark.sessionIds.add(deliveryTag);
    
                current = new RabbitMqMessage(source.spec.routingKey(), delivery);
                /*
                  *** IMPORTANT ***
                  Sometimes timestamp in RabbitMq messages is 'null' stream in Dataflow fails because
                  watermark is based on that value, 'null' has to be replaced with some value. `RabbitMqMessage` was changed
                  to use `new Date()` in this situation and now timestamp can be taken from it
                 */
                //currentTimestamp = new Instant(delivery.getProperties().getTimestamp());
                currentTimestamp = new Instant(current.getTimestamp());
                if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
                    checkpointMark.oldestTimestamp = currentTimestamp;
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
            return true;
        }
    

    再次运行我的管道后,我在其他地方再次遇到了这个异常。这次是因为没有在RabbitMQCheckpointMark 中设置oldestTimestamp 属性的默认值。我做了下一个更改,现在RabbitMQCheckpointMark 看起来像这样:

    private static class RabbitMQCheckpointMark
            implements UnboundedSource.CheckpointMark, Serializable {
        transient Channel channel;
        /*
          *** IMPORTANT *** it should be initialized with some value because without it runner (e.g Dataflow) fails with 'NullPointerException'
          Example error:
            java.lang.NullPointerException
                org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
                org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
                org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
                org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
                org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                java.lang.Thread.run(Thread.java:745)
         */
        Instant oldestTimestamp = new Instant(Long.MIN_VALUE);
        final List<Long> sessionIds = new ArrayList<>();
    
        @Override
        public void finalizeCheckpoint() throws IOException {
            for (Long sessionId : sessionIds) {
                channel.basicAck(sessionId, false);
            }
            channel.txCommit();
            oldestTimestamp = Instant.now();
            sessionIds.clear();
        }
    }
    

    所有这些更改都修复了我的管道,现在它可以按预期工作。我希望你会发现它有用。

    【讨论】:

      【解决方案2】:

      这是 Io 中的一个错误,一直是 fixed recently

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-03
        • 1970-01-01
        • 1970-01-01
        • 2018-06-24
        相关资源
        最近更新 更多