【发布时间】:2018-06-20 17:04:25
【问题描述】:
在EventProcessorHost example 之后,我们在 onEvents() 中实现了我们的自定义逻辑。有些数据没有被处理,我怀疑这是因为 Java 客户端抛出的警告。
在日志中,我们看到 StorageException(用于更新租约或检查点的 Blob 存储超时)、LeaseLostException(可能是由于之前的异常)和 EventHubException(当事件中心移动或短时间脱机时)。
基本上我的问题是:这些异常如何影响事件的处理以及我们如何确保没有事件被跳过(例如,通过重试的异常处理和作为最后手段完全关闭)?
我通读了docs 并搜索了其他无法找到满意答案的问题(this 和this 提供了一些见解)。
我们的代码:
public class EventProcessor implements IEventProcessor {
...
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
for (EventData event : events) {
try {
String message = new String(event.getBytes(), StandardCharsets.UTF_8);
mystuff.process(message);
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 50) == 0) {
context.checkpoint(data).get();
}
} catch (Exception e) {
LOG.warn("Processing event failed: {}", e.getMessage())
}
}
}
...
}
【问题讨论】:
-
我一直在寻找您的确切问题,想知道您是否最终找到了解决方案。我查看了作为 eventthubs 包 (com.microsoft.azure.eventhubs.*) 一部分的 PartitionReceiveHandler,它包含一个 onReceive,我不确定它是否能解决手头的问题。似乎 IEventProcessor 不能用作侦听器;在调用 registerEventProcessor 时,仅在所需的 onEvents() 方法中获取到该时间点的事件;然后它挂起,没有错误或退出,并且要从所需的 EventHub 获取更多消息,我们必须重新
-
老实说,我对 Azure Event Hub Java 客户端非常失望,我们遇到了一个又一个问题(在他们的 Github 存储库中查看我的问题)。我很高兴我当时工作的公司完全脱离了 Azure Event Hub,并在稍后阶段迁移到另一个流媒体平台。
标签: java azure azure-eventhub