【问题标题】:Service Fabric Actor subscription to Azure Service Bus TopicService Fabric Actor 订阅 Azure 服务总线主题
【发布时间】:2018-08-27 03:01:47
【问题描述】:

我正在考虑构建一个系统,该系统要求 Actor 使用特定于 Actor 实例的过滤器创建对 Azure 服务总线主题的订阅。我的问题是,如果演员(订阅了主题)在 Service Fabric 中被停用,它是否会被 Azure 服务总线发送的新消息(重新)激活?

谢谢

【问题讨论】:

    标签: azure-service-fabric actor azureservicebus azure-servicebus-topics


    【解决方案1】:

    您的 Actor 不会因收到消息而被激活。它仅通过远程呼叫和提醒激活。所以这种方法行不通。

    您可以做的是在Service 中接收消息,并将它们转发到Actor 实例。如果需要,调用 Actor 会即时创建实例。

    【讨论】:

      【解决方案2】:

      基于Actor's lifecycle,它必须被激活。来自主题的 Azure 服务总线消息不会激活参与者。相反,您需要一个主管流程来执行此操作。消息可以包含一个属性来表示所需的参与者 ID。它还允许通过拥有单个主题和横向扩展的主管来简化您的 Azure 服务总线拓扑。

      【讨论】:

        【解决方案3】:

        这可以通过提醒轻松实现。 由于需要先调用actor,所以可以这样做。

        create 方法将设置连接字符串、主题名称、订阅名称并在需要时创建它们。提醒将检查订阅客户端是否不为空,如果是则创建它。提醒将始终在失败时执行,这样您将能够控制失败并在粉碎时重新启动它。

        https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md

        public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
            {
                if (options?.ConnectionString == null)
                {
                    return false;
                }
                await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);
        
                var client = new ManagementClient(options.ConnectionString);
                try
                {
                    var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
                    if (!exist)
                    {
                       await client.CreateTopicAsync(options.TopicName, cancellationToken);
                    }
                    exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
                    if (!exist)
                    {
                        await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
                    }
                    var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
                    if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
                    {
                        SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
                        await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
                    }
        
                }
                catch (Exception ex)
                {
                    ActorEventSource.Current.ActorMessage(this, ex.Message);                
                }
                return true;
            }
            public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
            {
                var client = new ManagementClient(options.ConnectionString);
                try
                {
                    await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
                    await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
                }
                catch (Exception ex)
                {
                    ActorEventSource.Current.ActorMessage(this, ex.Message);
                }
        
            }
            private ISubscriptionClient subscriptionClient;       
            public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
            {
                var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
                if (!options.HasValue)
                {
                    ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                    return false;
                }
        
        
                var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);
        
                var msg = new Message(message.Body);
                if(message.UserProperties != null)
                {
                    foreach (var item in message.UserProperties)
                    {
                        msg.UserProperties.Add(item);
                    }
                }
                msg.Label = message.Label;
        
        
        
               await client.SendAsync(msg);
               await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);
        
                return true;
            }
            void RegisterOnMessageHandlerAndReceiveMessages()
            {
                var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                {                
                    MaxConcurrentCalls = 1,
                    AutoComplete = false
                };
                subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
            }
            async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
            {
                ActorEventSource.Current.ActorMessage(this, message.Label);
        
                await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        
        
            }
            Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
            {
        
                var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
                ActorEventSource.Current.ActorMessage(this,
                    string.Format("Exception context for troubleshooting: - Endpoint: {0}- Entity Path: {1}- Executing Action: {2} - MEssage: {3}",
                    context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
                return Task.CompletedTask;
            }
            protected override async Task OnActivateAsync()
            {
                ActorEventSource.Current.ActorMessage(this, $"Actor '{Id.GetStringId()}' activated.");
        
                IActorReminder Recieve_Message = await this.RegisterReminderAsync(
                                "Recieve_Message",
                                null,
                                TimeSpan.FromSeconds(1),    //The amount of time to delay before firing the reminder
                                TimeSpan.FromSeconds(1));
        
        
            }
            public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
            {
                if (reminderName.Equals("Recieve_Message"))
                {
                    if(subscriptionClient == null)
                    {
                        var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
                        if (!options.HasValue)
                        {
                            ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                            return;
                        }
        
                        var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);
        
                        subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);
        
                        RegisterOnMessageHandlerAndReceiveMessages();
                    }
        
                }
        
        
            }        
        

        【讨论】:

          猜你喜欢
          • 2016-02-10
          • 2016-12-29
          • 1970-01-01
          • 2017-10-23
          • 2018-07-18
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2015-11-03
          相关资源
          最近更新 更多