【问题标题】:Reading continuous message from Socket从 Socket 读取连续消息
【发布时间】:2013-06-13 20:38:48
【问题描述】:

我的目标是从套接字读取消息,其中每条消息都用 ETX 字符分隔。这是一个高频市场数据馈送,所以我认为逐字节的方法没有意义,完整消息的大小也是未知的。

有没有办法通过使用NetworkStream 类来阅读此消息?为此,我还尝试使用 Socket 类,但不是从套接字中逐一读取消息,而是从套接字读取所有消息,这会随着系统速度变慢而成为问题。

【问题讨论】:

  • 我在新答案中添加了一个示例实现
  • 感谢马克的帮助。您的回答很有帮助。

标签: c# sockets


【解决方案1】:

我们来了;这是从SocketStream 等源读取标记分隔消息列表的基本 过程。棘手的一点是跟踪您在 incoming 缓冲区中使用的内容,以及来自早期缓冲区的未使用数据的任何积压。请注意,在 SocketStream 之间更改此代码实质上是将 Receive 更改为 Read - 除了方法相同。

以下内容基本上应该可以满足您的需求。您可以使用ReadNext() API,直到获得null(表示流结束),也可以使用ReadAll(),它为您提供IEnumerable<string> 序列。编码和缓冲区大小可供您通过构造函数进行调整,但默认为合理的值。

foreach (var s in reader.ReadAll())
    Console.WriteLine(s);

代码:

class EtxReader : IDisposable
{
    public IEnumerable<string> ReadAll()
    {
        string s;
        while ((s = ReadNext()) != null) yield return s;
    }
    public void Dispose()
    {
        if (socket != null) socket.Dispose();
        socket = null;
        if (backlog != null) backlog.Dispose();
        backlog = null;
        buffer = null;
        encoding = null;
    }
    public EtxReader(Socket socket, Encoding encoding = null, int bufferSize = 4096)
    {
        this.socket = socket;
        this.encoding = encoding ?? Encoding.UTF8;
        this.buffer = new byte[bufferSize];
    }
    private Encoding encoding;
    private Socket socket;
    int index, count;
    byte[] buffer;
    private bool ReadMore()
    {
        index = count = 0;
        int bytes = socket.Receive(buffer);
        if (bytes > 0)
        {
            count = bytes;
            return true;
        }
        return false;
    }
    public const byte ETX = 3;
    private MemoryStream backlog = new MemoryStream();
    public string ReadNext()
    {
        string s;
        if (count == 0)
        {
            if (!ReadMore()) return null;
        }
        // at this point, we expect there to be *some* data;
        // this may or may not include the ETX terminator
        var etxIndex = Array.IndexOf(buffer, ETX, index);
        if (etxIndex >= 0)
        {
            // found another message in the existing buffer
            int len = etxIndex - index;
            s = encoding.GetString(buffer, index, len);
            index = etxIndex + 1;
            count -= (len + 1);
            return s;
        }
        // no ETX in the buffer, so we'll need to fetch more data;
        // buffer the unconsumed data that we have
        backlog.SetLength(0);
        backlog.Write(buffer, index, count);

        bool haveEtx;
        do
        {
            if (!ReadMore())
            {
                // we had unused data; this must signal an error
                throw new EndOfStreamException();
            }
            etxIndex = Array.IndexOf(buffer, ETX, index);
            haveEtx = etxIndex >= 0;
            if (!haveEtx)
            {
                // keep buffering
                backlog.Write(buffer, index, count);
            }

        } while (!haveEtx);

        // now we have some data in the backlog, and the ETX in the buffer;
        // for convenience, copy the rest of the next message into
        // the backlog
        backlog.Write(buffer, 0, etxIndex);
        s = encoding.GetString(backlog.GetBuffer(), 0, (int)backlog.Length);
        index = etxIndex + 1;
        count -= (etxIndex + 1);
        return s;
    }
}

【讨论】:

  • Something's buggy) count 变量必须作为最后一个参数传递给每个 Array.IndexOf 方法,否则可能会意外检测到在之前获取的 buffer 数据中的 ETX 条目。 .. 崩溃
【解决方案2】:

那么,这大概是一个基于文本的 API。使用NetworkStreamSocket 之间没有实际区别; StreamSocket 都不会“阅读所有消息” - 只有 您的代码 会这样做。

在这两种情况下,您都需要一个几乎完全相同的循环来获取下一个数据块(这不是“消息”的同义词),并开始寻找您的标记值(您的意思是ETX?) - 根据需要进行处理或缓冲。除非您知道传入的提要采用单字节编码,否则您可能最好将其视为字节,直到您实际将其拆分为逻辑消息,然后然后运行文本解码器获取此消息的文本,然后再转到下一条。

【讨论】:

    【解决方案3】:

    您应该研究异步通信和TcpListener 类。我的方法是:

    1. 创建监听器
    2. 让它持续监听连接 (BeginAccept/EndAccecpt)。
    3. 对于每个连接,从 NetworkStream 异步读取,直到客户端断开连接 (BeginRead/EndRead)。您可以读取数据块,例如,您可以尝试一次读取 512 个字节 - 如果缓冲区中的字节数少于 512 个字节,那么您将获得少于 512 个字节的数据。
    4. 将任何内容附加到StringBuilder(每个连接一个,在将byte[] 转换为string 时注意正确的编码)
    5. 如果StringBuilder 包含分隔符,则将该消息拆分并写入队列(不要忘记在入队前锁定队列!)
    6. 让一个单独的线程持续监视该队列中的新消息并处理它们。如果您使用 ManualResetEvent 将新内容放入队列中,您也可以向线程发出信号。

    这只是一个粗略的大纲,但我相信你明白了。

    没有读取“消息”之类的东西——通过 TCP/IP 传入的所有内容都只是字节流——这就是为什么你会得到一个网络。消息是您为解释传入的数据而发明的概念。

    【讨论】:

    • 然而,它的核心并不依赖于任何特定的实现;无论是使用SocketStream 还是TcpListener,这同样适用。
    • 您还需要非常小心如何在字节和字符之间切换;如果你没有使用单字节编码,在你知道你有一个有效的字符串之前进行解码可能是灾难性的——你可以在多次调用“read”之间分割一个字符
    • 是的,这是真的。问题是是否可以使用NetworkStream 来做到这一点。由于我喜欢使用更高级别的 API,因此我建议使用 TcpListener 而不是 Socket 类。关于编码:他想使用ETX 作为分隔符——如何以多字节编码进行编码? (个人兴趣问题)
    • 嗯,在 UTF-8 中,它将被简单地编码为 0x03 - 但同一流中的一些 other 字符可能是两个、三个等字节 - 我的重点不是关于如何找到终点——而是关于在中间使用StringBuilder。我希望只看到积压的字节(不是字符),然后在消息边界已知时对它们运行 Encoding.GetString
    猜你喜欢
    • 2018-10-23
    • 1970-01-01
    • 2014-04-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多