【问题标题】:Best way to receive high volume tcp socket financial data from a feeder in c#?从 C# 中的馈线接收大量 tcp 套接字财务数据的最佳方法?
【发布时间】:2013-11-28 23:21:36
【问题描述】:

我正在开发一个 C# Windows 服务,该服务将从使用 TCP 的馈线接收金融报价。我的项目必须接收和处理大量数据,因为我将跟踪 140 种不同的资产,这些资产用于每秒更新 SQL 数据库。

我在 BackgroundWork 线程中使用循环从套接字汇集数据:

  try
  {
    // Send START command with the assets.
    if (!AeSocket.AeSocket.Send(Encoding.ASCII.GetBytes(String.Format("{0}{1}START{1}BC|ATIVO{1}{2}{3}", GlobalData.GlobalData.Id, GlobalData.GlobalData.Tab, AeSocket.AeSocket.GetAssets(),
                                                                      GlobalData.GlobalData.Ret))).Contains("OK"))
    {
      throw new Exception("Advise was no accepted.");
    }

    // Pool the socket and send all received string to the queue for processing in the background.
    while (true)
    {
      // Make sure the connection to the socket is still active.
      if (!AeSocket.AeSocket.Client.Connected)
      {
        throw new Exception("The connection was closed.");
      }

      // If no data is available in the socket, loop and keep waiting.
      if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead))
      {
        continue;
      }

      // There are data waiting to be read.
      var data = new Byte[AeSocket.AeSocket.ReadBufferSize];
      var bytes = AeSocket.AeSocket.Client.Receive(data, 0);
      AeSocket.AeSocket.Response = Encoding.Default.GetString(data, 0, bytes);

      // Push into the queue for further processing in a different thread.
      GlobalData.GlobalData.RxQueue.Add(AeSocket.AeSocket.Response);
    }
  }
  catch
  {
    backgroundWorkerMain.CancelAsync();
  }
  finally
  {
    AeSocket.AeSocket.Client.Close();
    AeSocket.AeSocket.Client.Dispose();
  }

接收到的数据正在一个单独的线程中处理,以避免阻塞套接字接收活动,因为接收到的数据量很大。我正在使用 BlockingCollection (RxQueue)。

正在观察这个集合,如下代码sn-p所示:

  // Subscribe to the queue for string processing in another thread.
  // This is a blocking observable queue, so it is run in this background worker thread.
  GlobalData.GlobalData.Disposable = GlobalData.GlobalData.RxQueue
    .GetConsumingEnumerable()
    .ToObservable()
    .Subscribe(c =>
    {
      try
      {
        ProcessCommand(c);
      }
      catch
      {
        // Any error will stop the processing.
        backgroundWorkerMain.CancelAsync();
      }
    });

然后将数据添加到 ConcurrentDictionary,由一秒计时器异步读取并保存到 SQL 数据库:

      // Add or update the quotation record in the dictionary.
      GlobalData.GlobalData.QuotDict.AddOrUpdate(dataArray[0], quot, (k, v) => quot);

一秒系统计时器在另一个 BackgroundWorker 线程中处理,它保存从 ConcurrentDictionary 读取的报价数据。

  // Update the gridViewAssetQuotations.
  var list = new BindingList<MyAssetRecord>();

  foreach (var kv in GlobalData.GlobalData.QuotDict)
  {
    // Add the data in a database table...
  }

这是解决这种情况的好方法吗?

使用 BlockingCollection 作为异步队列和 ConcurrentDictionary 来允许从另一个线程异步读取是一种好方法吗?

我过去从套接字中汇集数据的方式如何:

      // If no data is available in the socket, loop and keep waiting.
      if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead))
      {
        continue;
      }

有没有更好的方法?

另外,我必须每 4 秒向 TCP 服务器发送一次 KeepAlive 命令。我可以完全异步地做到这一点,忽略上面的池循环,使用另一个系统计时器,还是必须与池操作同步?服务器只允许连接一个端口。

提前感谢您的任何建议。

爱德华多·金塔纳

【问题讨论】:

    标签: c# multithreading sockets concurrency tcp


    【解决方案1】:

    您的循环将导致高处理器使用率。如果没有数据要处理,则休眠(比如 1 毫秒),如果在下一个周期仍然没有数据,则增加休眠时间,如果有数据,则减少休眠时间。这样,阅读器循环将自动适应数据流量并节省处理器周期。休息一切看起来不错。

    【讨论】:

    • 感谢您的及时回复。我添加了一个 var: private static Int32 _delay = 0;
    • 请将 _delay 初始化为 1,因为这是您需要的最小值。另外,请确保在递减它时,它不会低于 1。
    • 这是正确的评论:感谢您的及时回复。如果没有收到数据,我在循环中添加了一个线程睡眠时间,从 1 毫秒开始,一直到 10 毫秒。您对 KeepAlive 发送问题有什么建议吗?
    • 我的延迟代码如下: if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead)) { Thread.Sleep(++_delay); _delay = _delay > 10 ? 10:_延迟;继续; } _delay = 0;
    • Sleep() 循环也没有希望了。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-12-28
    • 2016-05-26
    • 1970-01-01
    • 1970-01-01
    • 2015-01-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多