【问题标题】:Processing data from Event Hubs with Reactive X streams使用 Reactive X 流处理来自事件中心的数据
【发布时间】:2018-07-09 23:07:59
【问题描述】:

我想使用 Rx .net 扩展来处理来自 Azure 事件中心的事件。

如何根据从 EventProcessorHost 获得的消息创建可观察流?

我没有找到此方案的参考资料,我是否在这里遗漏了一些基本内容?我是在尝试做一些没有意义的事情吗?

【问题讨论】:

    标签: azure reactive-programming reactive azure-eventhub


    【解决方案1】:

    这是否有意义取决于您。为什么需要/想要使用反应式扩展?大多数场景都涉及使用Azure Stream Analytics 近乎实时地转换和查询数据。

    但是使用 EventProcessor 来处理数据是可以做到的,一个非常粗略的草图让你开始:

    public class EventProcessor : IEventProcessor
    {
        private readonly EventStreamProcessor eventStreamProcessor;
    
        public EventProcessor(EventStreamProcessor eventStreamProcessor)
        {
            this.eventStreamProcessor = eventStreamProcessor;
        }
    
        public Task OpenAsync(PartitionContext context)
        {
            return Task.CompletedTask;
        }
    
        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> eventDatas)
        {
            foreach(var eventData in eventDatas)
                eventStreamProcessor.Post(eventData);
    
            return Task.CompletedTask;
        }
    
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            return Task.CompletedTask;
        }
    }
    
    public sealed class EventStreamProcessor : IDisposable
    {
        private Subject<EventData> dataStream = new Subject<EventData>();
        private readonly IDisposable subscription;
    
        public EventStreamProcessor()
        {
            subscription = dataStream
                .Synchronize()
                .AsObservable()
                .Subscribe((evenData) => {
                    // Do something
                })
        }
    
        public void Dispose()
        {
            dataStream.OnCompleted();
            subscription.Dispose();
        }
    
        public void Post(EventData eventData)
        {
            dataStream.OnNext(eventData);
        }
    }
    

    需要考虑的几点:

    • 可靠的检查点将变得很困难。在将数据推送到 RX 流后,您随时调用 await context.CheckpointAsync();,但您不知道 RX 管道已经处理了数据。
    • 多个 EventProcessor 可以在任何给定时刻处于活动状态,因此请确保将数据推送到单个 RX 流。使用单例或使用 IEventProcessorFactory 接口的实现将相同的 RX 处理器注入到每个 EventProcessor 中

    我认为这个Q & A 的答案对你来说也很重要。

    【讨论】:

      【解决方案2】:

      有一个事件中心实现作为 Akka.NET 反应流 Azure EventHub adapter 的源。

      【讨论】:

        猜你喜欢
        • 2019-06-06
        • 2016-12-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-02-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多