【发布时间】:2018-07-09 23:07:59
【问题描述】:
我想使用 Rx .net 扩展来处理来自 Azure 事件中心的事件。
如何根据从 EventProcessorHost 获得的消息创建可观察流?
我没有找到此方案的参考资料,我是否在这里遗漏了一些基本内容?我是在尝试做一些没有意义的事情吗?
【问题讨论】:
标签: azure reactive-programming reactive azure-eventhub
我想使用 Rx .net 扩展来处理来自 Azure 事件中心的事件。
如何根据从 EventProcessorHost 获得的消息创建可观察流?
我没有找到此方案的参考资料,我是否在这里遗漏了一些基本内容?我是在尝试做一些没有意义的事情吗?
【问题讨论】:
标签: azure reactive-programming reactive azure-eventhub
这是否有意义取决于您。为什么需要/想要使用反应式扩展?大多数场景都涉及使用Azure Stream Analytics 近乎实时地转换和查询数据。
但是使用 EventProcessor 来处理数据是可以做到的,一个非常粗略的草图让你开始:
public class EventProcessor : IEventProcessor
{
private readonly EventStreamProcessor eventStreamProcessor;
public EventProcessor(EventStreamProcessor eventStreamProcessor)
{
this.eventStreamProcessor = eventStreamProcessor;
}
public Task OpenAsync(PartitionContext context)
{
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> eventDatas)
{
foreach(var eventData in eventDatas)
eventStreamProcessor.Post(eventData);
return Task.CompletedTask;
}
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
return Task.CompletedTask;
}
}
public sealed class EventStreamProcessor : IDisposable
{
private Subject<EventData> dataStream = new Subject<EventData>();
private readonly IDisposable subscription;
public EventStreamProcessor()
{
subscription = dataStream
.Synchronize()
.AsObservable()
.Subscribe((evenData) => {
// Do something
})
}
public void Dispose()
{
dataStream.OnCompleted();
subscription.Dispose();
}
public void Post(EventData eventData)
{
dataStream.OnNext(eventData);
}
}
需要考虑的几点:
await context.CheckpointAsync();,但您不知道 RX 管道已经处理了数据。IEventProcessorFactory 接口的实现将相同的 RX 处理器注入到每个 EventProcessor 中我认为这个Q & A 的答案对你来说也很重要。
【讨论】:
有一个事件中心实现作为 Akka.NET 反应流 Azure EventHub adapter 的源。
【讨论】: