【发布时间】:2015-12-15 03:29:45
【问题描述】:
我正在使用 Azure 服务总线队列(或可能需要的主题),并且想知道如何将 Web 作业与队列一起使用。
当消息进入队列时,它表示将在 Web 作业中运行(或从 Web 作业启动)的进程。这个过程可能很快,30 秒,也可能很慢,1 小时等。
我可以为此使用单个 Web 作业并以某种方式说它一次应该运行不超过 10 个这样的进程吗?
【问题讨论】:
标签: azure azureservicebus azure-webjobs
我正在使用 Azure 服务总线队列(或可能需要的主题),并且想知道如何将 Web 作业与队列一起使用。
当消息进入队列时,它表示将在 Web 作业中运行(或从 Web 作业启动)的进程。这个过程可能很快,30 秒,也可能很慢,1 小时等。
我可以为此使用单个 Web 作业并以某种方式说它一次应该运行不超过 10 个这样的进程吗?
【问题讨论】:
标签: azure azureservicebus azure-webjobs
是的,您可以使用 WebJob。我创建了一个带有存储队列的简单 WebJob 来指导如何完成它。下面的工作流程一次只运行十个进程,并将所有其他请求保存在 ConcurrentQueue 的内存中。您必须实现逻辑以将其出列并使用它
public class Functions
{
public delegate void CompletedProcessHandler(object sender, CompletedProcessHandlerArgs args);
static readonly Dictionary<int, CustomProcess> _dictionary =
new Dictionary<int, CustomProcess>();
static readonly ConcurrentQueue<ProcessEntity> _remaining =
new ConcurrentQueue<ProcessEntity>();
// This function will get triggered/executed when a new message is written
// on an Azure Queue called queue.
public static void ProcessQueueMessage([QueueTrigger("testqueue")] ProcessEntity msg,
TextWriter log)
{
if (_dictionary.Count <= 10)
{
var newProcess = new CustomProcess((_dictionary.Last().Key) + 1,
msg.Duration);
}
else
{
_remaining.Enqueue(msg);
}
}
public static void CompletedProcess(object sender, CompletedProcessHandlerArgs args)
{
_dictionary[Int32.Parse(args.ProcessID)].Dispose();
_dictionary.Remove(Int32.Parse(args.ProcessID));
}
}
public class CustomProcess : IDisposable
{
public event Functions.CompletedProcessHandler OnProcessCompleted;
private CancellationTokenSource _token;
private string _id;
private Timer _timer;
public CustomProcess(int i, int duration)
{
_timer = new Timer { Enabled = true, Interval = duration * 1000 };
_timer.Elapsed += Timer_Elapsed;
_id = i.ToString();
_token = new CancellationTokenSource();
Task.Factory.StartNew(() => WriteMessages());
_timer.Start();
OnProcessCompleted += Functions.CompletedProcess;
}
private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
_token.Cancel();
OnProcessCompleted?.Invoke(this, new CompletedProcessHandlerArgs(_id));
}
private void WriteMessages()
{
while (!_token.Token.IsCancellationRequested)
{
Console.WriteLine("Test Message from process " + _id);
}
}
public void Dispose()
{
_token.Dispose();
_timer.Dispose();
}
}
public class CompletedProcessHandlerArgs : EventArgs
{
public string ProcessID { get; set; }
public CompletedProcessHandlerArgs(string ID)
{
ProcessID = ID;
}
}
public class ProcessEntity
{
public int Duration { get; set; }
}
在 web 作业的 app.config 中你需要提供两个应用设置
<add name="AzureWebJobsDashboard"
connectionString="DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]" />
<add name="AzureWebJobsStorage"
connectionString="DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]" />
程序文件是 Visual Studio 模板中的默认文件
public class Program
{
// Please set the following connection strings in app.config for this WebJob to run:
// AzureWebJobsDashboard and AzureWebJobsStorage
static void Main()
{
var host = new JobHost();
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
}
WebJob 将在消息到来的那一刻保持出队。由于您一次只希望运行 10 个消息,因此您必须将消息排入内存队列并等待运行过程完成,然后再启动一个新消息
正如@Rick 所说,您可以在 web 作业的 settings.job 文件中将 is_Singleton 属性设置为 true
【讨论】:
是的,您可以使用 Azure 服务总线队列或主题触发 Web 作业。 Visual Studio 中的 Service Bus 快速启动项目模板是一个很好的例子,可以帮助您开始。
您尤其想查看 Web Jobs SDK 提供的 ServiceBusTrigger 属性。
至于网络作业的可扩展性,这将根据您的网络应用实例进行扩展。因此,如果您说启用了始终开启的 5 个 Web 应用实例,那么您将拥有 5 个 Web 作业实例。作为对此的补充说明,如果您只需要一个包含 5 个 Web 应用程序实例的环境中的 Web 作业实例,那么您可以在 settings.job 文件中将 is_singleton 属性设置为 true。
【讨论】: