【发布时间】:2013-11-28 23:21:36
【问题描述】:
我正在开发一个 C# Windows 服务,该服务将从使用 TCP 的馈线接收金融报价。我的项目必须接收和处理大量数据,因为我将跟踪 140 种不同的资产,这些资产用于每秒更新 SQL 数据库。
我在 BackgroundWork 线程中使用循环从套接字汇集数据:
try
{
// Send START command with the assets.
if (!AeSocket.AeSocket.Send(Encoding.ASCII.GetBytes(String.Format("{0}{1}START{1}BC|ATIVO{1}{2}{3}", GlobalData.GlobalData.Id, GlobalData.GlobalData.Tab, AeSocket.AeSocket.GetAssets(),
GlobalData.GlobalData.Ret))).Contains("OK"))
{
throw new Exception("Advise was no accepted.");
}
// Pool the socket and send all received string to the queue for processing in the background.
while (true)
{
// Make sure the connection to the socket is still active.
if (!AeSocket.AeSocket.Client.Connected)
{
throw new Exception("The connection was closed.");
}
// If no data is available in the socket, loop and keep waiting.
if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead))
{
continue;
}
// There are data waiting to be read.
var data = new Byte[AeSocket.AeSocket.ReadBufferSize];
var bytes = AeSocket.AeSocket.Client.Receive(data, 0);
AeSocket.AeSocket.Response = Encoding.Default.GetString(data, 0, bytes);
// Push into the queue for further processing in a different thread.
GlobalData.GlobalData.RxQueue.Add(AeSocket.AeSocket.Response);
}
}
catch
{
backgroundWorkerMain.CancelAsync();
}
finally
{
AeSocket.AeSocket.Client.Close();
AeSocket.AeSocket.Client.Dispose();
}
接收到的数据正在一个单独的线程中处理,以避免阻塞套接字接收活动,因为接收到的数据量很大。我正在使用 BlockingCollection (RxQueue)。
正在观察这个集合,如下代码sn-p所示:
// Subscribe to the queue for string processing in another thread.
// This is a blocking observable queue, so it is run in this background worker thread.
GlobalData.GlobalData.Disposable = GlobalData.GlobalData.RxQueue
.GetConsumingEnumerable()
.ToObservable()
.Subscribe(c =>
{
try
{
ProcessCommand(c);
}
catch
{
// Any error will stop the processing.
backgroundWorkerMain.CancelAsync();
}
});
然后将数据添加到 ConcurrentDictionary,由一秒计时器异步读取并保存到 SQL 数据库:
// Add or update the quotation record in the dictionary.
GlobalData.GlobalData.QuotDict.AddOrUpdate(dataArray[0], quot, (k, v) => quot);
一秒系统计时器在另一个 BackgroundWorker 线程中处理,它保存从 ConcurrentDictionary 读取的报价数据。
// Update the gridViewAssetQuotations.
var list = new BindingList<MyAssetRecord>();
foreach (var kv in GlobalData.GlobalData.QuotDict)
{
// Add the data in a database table...
}
这是解决这种情况的好方法吗?
使用 BlockingCollection 作为异步队列和 ConcurrentDictionary 来允许从另一个线程异步读取是一种好方法吗?
我过去从套接字中汇集数据的方式如何:
// If no data is available in the socket, loop and keep waiting.
if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead))
{
continue;
}
有没有更好的方法?
另外,我必须每 4 秒向 TCP 服务器发送一次 KeepAlive 命令。我可以完全异步地做到这一点,忽略上面的池循环,使用另一个系统计时器,还是必须与池操作同步?服务器只允许连接一个端口。
提前感谢您的任何建议。
爱德华多·金塔纳
【问题讨论】:
标签: c# multithreading sockets concurrency tcp