【发布时间】:2015-09-05 04:51:29
【问题描述】:
我有一项服务需要尽快读取来自 Amazon SQS 的消息。我们预计流量会很大,我希望能够以每秒 10K 条消息的速度阅读。不幸的是,我目前的速度约为 10 条消息/秒。显然,我有工作要做。
这是我正在使用的(转换为控制台应用程序以使测试更容易):
private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
_concurrentRequests = 0;
_maxConcurrentRequests = 100;
var timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
timer.Interval = 10;
timer.Enabled = true;
Console.ReadLine();
timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
if (_concurrentRequests < _maxConcurrentRequests) {
_concurrentRequests++;
ProcessMessages();
}
}
public static async Task ProcessMessages() {
var manager = new MessageManager();
manager.ProcessMessages(); // this is an async method that reads in the messages from SQS
_concurrentRequests--;
}
我没有得到接近 100 个并发请求,而且它似乎没有每 10 毫秒触发一次 OnTimedEvent。
我不确定Timer 是否是正确的方法。我对这种编码没有太多经验。在这一点上,我愿意尝试任何事情。
更新
多亏了 calebboyd,我离实现目标又近了一点。这是一些非常糟糕的代码:
private static SemaphoreSlim _locker;
public static void Main(string[] args) {
_manager = new MessageManager();
RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
_locker = new SemaphoreSlim(10, 10);
while (true) {
Thread thread = new Thread(new ParameterizedThreadStart(Process));
thread.Start();
}
}
private static async void Process(object args) {
_locker.WaitAsync();
try {
await _manager.ProcessMessages();
}
finally {
_locker.Release();
}
}
通过这个,我能够接近每秒读取相当数量的消息,但问题是我的ProcessMessages 呼叫永远不会完成(或者可能会在很长时间后完成)。我在想我可能需要限制我在任何时候运行的线程数。
关于如何改进此代码以便ProcessMessages 有机会完成的任何建议?
【问题讨论】:
-
定时器有一个限制,最多只能每 15.6 毫秒触发一次。这是 .NET 实现定时器的一个限制。
-
声称异步的代码没有
await——这很奇怪。考虑更新您的代码,使其看起来更真实。 -
这是一个非常重要的问题,可帮助您确定多线程的可能性 - 如果您在单个线程上连续运行请求,您每秒可以处理多少条消息?
-
您的主要目标是尽快汇集亚马逊 SQS 吗?
-
@Irving 所以你的数据库是瓶颈——你是否考虑过缓冲消息,并将它们批量插入数据库?
标签: c# multithreading asynchronous parallel-processing amazon-sqs