【问题标题】:Azure Webjobs and QueuesAzure Web 作业和队列
【发布时间】:2015-12-15 03:29:45
【问题描述】:

我正在使用 Azure 服务总线队列(或可能需要的主题),并且想知道如何将 Web 作业与队列一起使用。

当消息进入队列时,它表示将在 Web 作业中运行(或从 Web 作业启动)的进程。这个过程可能很快,30 秒,也可能很慢,1 小时等。

我可以为此使用单个 Web 作业并以某种方式说它一次应该运行不超过 10 个这样的进程吗?

【问题讨论】:

    标签: azure azureservicebus azure-webjobs


    【解决方案1】:

    是的,您可以使用 WebJob。我创建了一个带有存储队列的简单 WebJob 来指导如何完成它。下面的工作流程一次只运行十个进程,并将所有其他请求保存在 ConcurrentQueue 的内存中。您必须实现逻辑以将其出列并使用它

    public class Functions
    {
        public delegate void CompletedProcessHandler(object sender, CompletedProcessHandlerArgs args);
    
        static readonly Dictionary<int, CustomProcess> _dictionary =
            new Dictionary<int, CustomProcess>();
        static readonly ConcurrentQueue<ProcessEntity> _remaining =
            new ConcurrentQueue<ProcessEntity>();
    
        // This function will get triggered/executed when a new message is written 
        // on an Azure Queue called queue.
        public static void ProcessQueueMessage([QueueTrigger("testqueue")] ProcessEntity msg,
            TextWriter log)
        {
            if (_dictionary.Count <= 10)
            {
                var newProcess = new CustomProcess((_dictionary.Last().Key) + 1,
                    msg.Duration);
            }
            else
            {
                _remaining.Enqueue(msg);
            }
    
        }
    
        public static void CompletedProcess(object sender, CompletedProcessHandlerArgs args)
        {
            _dictionary[Int32.Parse(args.ProcessID)].Dispose();
            _dictionary.Remove(Int32.Parse(args.ProcessID));
        }
    }
    
    public class CustomProcess : IDisposable
    {
        public event Functions.CompletedProcessHandler OnProcessCompleted;
        private CancellationTokenSource _token;
        private string _id;
        private Timer _timer;
        public CustomProcess(int i, int duration)
        {
            _timer = new Timer { Enabled = true, Interval = duration * 1000 };
            _timer.Elapsed += Timer_Elapsed;
            _id = i.ToString();
            _token = new CancellationTokenSource();
    
            Task.Factory.StartNew(() => WriteMessages());
            _timer.Start();
    
            OnProcessCompleted += Functions.CompletedProcess;
    
        }
    
        private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            _token.Cancel();
            OnProcessCompleted?.Invoke(this, new CompletedProcessHandlerArgs(_id));
        }
    
        private void WriteMessages()
        {
            while (!_token.Token.IsCancellationRequested)
            {
                Console.WriteLine("Test Message from process " + _id);
            }
        }
    
        public void Dispose()
        {
            _token.Dispose();
            _timer.Dispose();
        }
    }
    
    
    public class CompletedProcessHandlerArgs : EventArgs
    {
        public string ProcessID { get; set; }
    
        public CompletedProcessHandlerArgs(string ID)
        {
            ProcessID = ID;
        }
    }
    
    
    public class ProcessEntity
    {
        public int Duration { get; set; }
    }
    

    在 web 作业的 app.config 中你需要提供两个应用设置

    <add name="AzureWebJobsDashboard" 
         connectionString="DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]" />
    <add name="AzureWebJobsStorage" 
         connectionString="DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]" />
    

    程序文件是 Visual Studio 模板中的默认文件

    public class Program
    {
        // Please set the following connection strings in app.config for this WebJob to run:
        // AzureWebJobsDashboard and AzureWebJobsStorage
        static void Main()
        {
            var host = new JobHost();
            // The following code ensures that the WebJob will be running continuously
            host.RunAndBlock();
        }
    }
    

    WebJob 将在消息到来的那一刻保持出队。由于您一次只希望运行 10 个消息,因此您必须将消息排入内存队列并等待运行过程完成,然后再启动一个新消息

    正如@Rick 所说,您可以在 web 作业的 settings.job 文件中将 is_Singleton 属性设置为 true

    【讨论】:

    • 所以基本上你是说你将启动一个后台进程来处理最多 10 个项目。任务将运行 x 秒,然后在取消之前将内容写入控制台?谢谢,看起来不错。
    • 是的,就是这个想法。您还需要注意从 ConcurrentQueue 中删除项目。如果 webjob 被标记为单例,这种设计也很好
    • 如果整个事情发生故障或崩溃会怎样?那这不是把 ConcurrentQueue 吹走,数据就丢失了吗?
    • 网络作业在崩溃或停止时会引发一个事件,默认情况下您有 5 秒的时间来处理保存状态。详情请见here。由于您已经有一个 Azure 存储,您可能希望通过将内容序列化为 json 来将并发队列的数据保存在一个表中
    【解决方案2】:

    是的,您可以使用 Azure 服务总线队列或主题触发 Web 作业。 Visual Studio 中的 Service Bus 快速启动项目模板是一个很好的例子,可以帮助您开始。

    您尤其想查看 Web Jobs SDK 提供的 ServiceBusTrigger 属性。

    至于网络作业的可扩展性,这将根据您的网络应用实例进行扩展。因此,如果您说启用了始终开启的 5 个 Web 应用实例,那么您将拥有 5 个 Web 作业实例。作为对此的补充说明,如果您只需要一个包含 5 个 Web 应用程序实例的环境中的 Web 作业实例,那么您可以在 settings.job 文件中将 is_singleton 属性设置为 true。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-07-07
      • 1970-01-01
      • 2015-08-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-07
      相关资源
      最近更新 更多