【问题标题】:How to create a responding TCP listener with the System.IO.Pipelines package?如何使用 System.IO.Pipelines 包创建响应 TCP 侦听器?
【发布时间】:2021-01-05 03:32:27
【问题描述】:

我想使用 Kestrel 和 System.IO.Pipelines 包创建一个 TCP 侦听器。我收到的消息将始终是HL7 messages。示例消息可以是

MSH|^~&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5 EVN||200605290901||||200605290900 PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 古德温克雷斯特大道^^伯明翰^AL^35209^^M~尼克尔的泡菜^10000 W 第100大道^伯明翰^AL^35200^^O|||||||0105I30001^^^99DEF^AN PV1||I|W^389^1^UABH^^^^3||||12345^摩根^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^ MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L||||||||||||||| |||||||||||200605290900 OBX|1|NM|^车身高度||1.80|m^米^ISO+|||||F OBX|2|NM|^车身 重量||79|kg^公斤^ISO+|||||F AL1|1||^阿司匹林 DG1|1||786.50^CHEST 疼痛,未指定^I9|||A

唯一需要注意的是,每条传入的 HL7 消息都以垂直制表符开头,以便您知道消息的开始位置。每个 HL7 消息都包含多个段,所以我想我必须遍历每个段。处理完请求后,我想发回一条 HL7 消息作为响应。首先我想出了这个

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IDuplexPipe pipe = connection.Transport;

        await FillPipe(pipe.Output);
        await ReadPipe(pipe.Input);
    }

    private async Task FillPipe(PipeWriter pipeWriter)
    {
        const int minimumBufferSize = 512;

        while (true)
        {
            Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
            
            try
            {
                int bytesRead = 32; // not sure what to do here
                
                if (bytesRead == 0)
                {
                    break;
                }
                
                pipeWriter.Advance(bytesRead);
            }
            catch (Exception ex)
            {
                // ... something failed ...

                break;
            }

            FlushResult result = await pipeWriter.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeWriter.Complete();
    }

    private async Task ReadPipe(PipeReader pipeReader)
    {
        while (true)
        {
            ReadResult result = await pipeReader.ReadAsync();

            ReadOnlySequence<byte> buffer = result.Buffer;
            SequencePosition? position;

            do
            {
                position = buffer.PositionOf((byte)'\v');

                if (position != null)
                {
                    ReadOnlySequence<byte> line = buffer.Slice(0, position.Value);

                    // ... Process the line ...

                    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                }
            }
            while (position != null);

            pipeReader.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeReader.Complete();
    }
}

不幸的是,我正在为一些事情苦苦挣扎:

  • int bytesRead = 32;部分,我如何知道读取了多少字节?或者如何使用 writer 实例进行阅读?
  • 目前调试器没有命中// ... Process the line ...的代码。基本上我必须提取整个 HL7 消息,以便我可以使用我的 HL7 解析器来转换消息字符串。
  • 我必须在哪里回复?拨打await ReadPipe(pipe.Input);之后?通过使用await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);?

【问题讨论】:

  • 嗨;我创建了一个包含代码示例(包括完整的 GitHub 存储库)的 3 位博客系列,深入介绍了该主题;在这里尝试和总结有点太深了,但我怀疑它会有所帮助 - 从这里开始:blog.marcgravell.com/2018/07/pipe-dreams-part-1.html
  • @MarcGravell 我试着关注你的系列(第 1 部分和第 2 部分)。不幸的是,对于我这个初学者来说,Github repo 实在是太大太复杂了。我能够关注你的系列,例如管道缓冲区是如何工作的,但我无法将知识转化为我自己的问题
  • @AndyVaal 我目前正在使用 NHAPI,但我认为这个更好并且积极维护:) 谢谢你

标签: c# .net-core kestrel system.io.pipelines


【解决方案1】:

您看过 David Fowler 的 TcpEcho 示例吗?我会说这是相当规范的,因为他是发布开发博客 System.IO.Pipelines 公告的人。

他的示例处理原始套接字。我已经将它改编为 ConnectionHandler API 和 HL7 消息(但是,我对 HL7 知之甚少):

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        while (true)
        {
            var result = await connection.Transport.Input.ReadAsync();
            var buffer = result.Buffer;

            while (TryReadMessage(ref buffer, out ReadOnlySequence<byte> hl7Message))
            {
                // Process the line.
                var response = ProcessMessage(hl7Message);
                await connection.Transport.Output.WriteAsync(response);
            }

            if (result.IsCompleted)
            {
                break;
            }

            connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
        }
    }

    public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> hl7Message)
    {
        var endOfMessage = buffer.PositionOf((byte)0x1C);

        if (endOfMessage == null || !TryMatchNextByte(ref buffer, endOfMessage.Value, 0x0D, out var lastBytePosition))
        {
            hl7Message = default;
            return false;
        }

        var messageBounds = buffer.GetPosition(1, lastBytePosition.Value); // Slice() is exclusive on the upper bound
        hl7Message = buffer.Slice(0, messageBounds);
        buffer = buffer.Slice(messageBounds); // remove message from buffer
        return true;
    }

    /// <summary>
    /// Does the next byte after currentPosition match the provided value?
    /// </summary>
    private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer, SequencePosition currentPosition, byte value, out SequencePosition? nextPosition)
    {
        nextPosition = buffer.Slice(currentPosition).PositionOf(value);
        if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1, currentPosition)))
        {
            nextPosition = null;
            return false;
        }
        return true;
    }

    private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
    {
        var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
        // do something with the message and generate your response. I'm using UTF8 here
        // but not sure if that's valid for HL7.
        return Encoding.UTF8.GetBytes("Response message: OK!");
    }
}

更新:添加了有关 HL7 消息结构的最新信息。

【讨论】:

  • 嘿,威尔,谢谢您的回复!我找到了一个 HL7 包,它能够将整个 TCP 消息字符串解析为多个 HL7 消息。我试图修改您的代码,并想知道这是否应该可以以及在哪里对缓冲区进行切片..? pastebin.com/McLiQjQV
  • 嗨@OlafSvenson 我仍然建议使用 HL7 包来解析单个消息,并让管道关心从传入字节中提取消息。这是因为在读取传入字节时,不能保证您有完整的 HL7 消息或段——也许您只得到一个或两个字节,并且需要等待下一个 ReadAsync 返回剩余字节. HL7包可以处理多条消息,但是它可以处理部分消息吗?我的猜测是否定的,但管道代码可以(它在buffer = buffer.Slice 行中处理它)。
  • 我测试了您的代码并对此表示赞赏:) 1 个问题:当TryReadMessage 被调用时,切片似乎是错误的。分配hl7Message 时,您只会切断字符,因此消息将仅包含垂直制表符。我认为最好的方法是检查最后一个字符,我为stackoverflow.com/questions/64043832/…创建了一个单独的问题@
  • 我的意思是:目前我正在尝试使用您的代码提取 1 条消息。如上所述,它不能正常工作。在检查最后出现的 HL7 后缀 '\r' 时,我可以一次从缓冲区中提取所有有效消息
  • 啊!我不明白 HL7 消息结构,我以为\v 是分隔符。我更新了代码以使用0x1c, 0x0d。它添加了一个TryMatchNextByte 函数,可以查看0x1c 之后的字节,看看它是否是0x0d
猜你喜欢
  • 1970-01-01
  • 2012-06-24
  • 1970-01-01
  • 2019-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-20
  • 1970-01-01
相关资源
最近更新 更多