Dataflow 管道也有类似的问题。当试图在 Dataflow 中运行它时,我得到了:
java.lang.NullPointerException
org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
他们的问题是RabbitMqIO 使用来自 RabbitMq 的消息中的时间戳,例如用于水印。事实证明,在我的情况下,来自 RabbitMq 的消息没有设置时间戳(默认情况下在 RabbitMq 中未设置)并且它为空。我通过为 Apache Beam 中的类准备补丁来解决这个问题。我对RabbitMqMessage 构造函数进行了更改。现在看起来像这样:
public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
this.routingKey = routingKey;
body = delivery.getBody();
contentType = delivery.getProperties().getContentType();
contentEncoding = delivery.getProperties().getContentEncoding();
headers = delivery.getProperties().getHeaders();
deliveryMode = delivery.getProperties().getDeliveryMode();
priority = delivery.getProperties().getPriority();
correlationId = delivery.getProperties().getCorrelationId();
replyTo = delivery.getProperties().getReplyTo();
expiration = delivery.getProperties().getExpiration();
messageId = delivery.getProperties().getMessageId();
/*
*** IMPORTANT ***
Sometimes timestamp in RabbitMq message properties is 'null'. `RabbitMqIO` uses that value as
watermark, when it is `null` it causes exceptions, 'null' has to be replaced with some value in this case current time
*/
// timestamp = delivery.getProperties().getTimestamp();
timestamp = delivery.getProperties().getTimestamp() == null ? new Date() : delivery.getProperties().getTimestamp();
type = delivery.getProperties().getType();
userId = delivery.getProperties().getUserId();
appId = delivery.getProperties().getAppId();
clusterId = delivery.getProperties().getClusterId();
}
我不得不更改RabbitMqIO 中的advance() 方法以不使用可能为空的timestamp 属性:
@Override
public boolean advance() throws IOException {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
if (delivery == null) {
return false;
}
if (source.spec.useCorrelationId()) {
String correlationId = delivery.getProperties().getCorrelationId();
if (correlationId == null) {
throw new IOException(
"RabbitMqIO.Read uses message correlation ID, but received "
+ "message has a null correlation ID");
}
currentRecordId = correlationId.getBytes(StandardCharsets.UTF_8);
}
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
checkpointMark.sessionIds.add(deliveryTag);
current = new RabbitMqMessage(source.spec.routingKey(), delivery);
/*
*** IMPORTANT ***
Sometimes timestamp in RabbitMq messages is 'null' stream in Dataflow fails because
watermark is based on that value, 'null' has to be replaced with some value. `RabbitMqMessage` was changed
to use `new Date()` in this situation and now timestamp can be taken from it
*/
//currentTimestamp = new Instant(delivery.getProperties().getTimestamp());
currentTimestamp = new Instant(current.getTimestamp());
if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
checkpointMark.oldestTimestamp = currentTimestamp;
}
} catch (Exception e) {
throw new IOException(e);
}
return true;
}
再次运行我的管道后,我在其他地方再次遇到了这个异常。这次是因为没有在RabbitMQCheckpointMark 中设置oldestTimestamp 属性的默认值。我做了下一个更改,现在RabbitMQCheckpointMark 看起来像这样:
private static class RabbitMQCheckpointMark
implements UnboundedSource.CheckpointMark, Serializable {
transient Channel channel;
/*
*** IMPORTANT *** it should be initialized with some value because without it runner (e.g Dataflow) fails with 'NullPointerException'
Example error:
java.lang.NullPointerException
org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
*/
Instant oldestTimestamp = new Instant(Long.MIN_VALUE);
final List<Long> sessionIds = new ArrayList<>();
@Override
public void finalizeCheckpoint() throws IOException {
for (Long sessionId : sessionIds) {
channel.basicAck(sessionId, false);
}
channel.txCommit();
oldestTimestamp = Instant.now();
sessionIds.clear();
}
}
所有这些更改都修复了我的管道,现在它可以按预期工作。我希望你会发现它有用。