【问题标题】:Awaiting server responses in an async way to different calls at the same time同时以异步方式等待服务器对不同调用的响应
【发布时间】:2014-05-16 22:53:39
【问题描述】:

我正在通过 TCP 为模型铁路控制器编写客户端库。服务器嵌入在控制单元中。

当客户端向服务器发送命令时,例如set(5, addr[3]),服务器会以回复标头<REPLY set(5, addr[3])> 进行响应,该命令的结果以及是否有错误。

因为所有这些东西都是异步的,所以我必须将回复与命令相匹配。 (即使客户端只发送一个命令然后等待响应也有服务器端事件)

为了给这个库提供一个良好且易于理解的界面,我使用了async await 模式。这意味着客户端代码会进行类似await client.Set(5, "addr", "3") 的调用,在服务器发回响应并由客户端代码评估响应后代码继续。

我目前正在使用 IDictionary<string, EventWaitHandle> 实现此功能,其中字符串是命令,而 EventWaitHandle 在方法 SendeBefehlAwaitResonse(string befehl)await Task.Run(() => signal = e.WaitOne(timeout)); 中等待

有没有更常见的方法来做到这一点?对于 NetworkClient,我还首先使用了 EventWaitHandle 来等待新消息发送(并使用我的 MessageDelay)属性。我发现使用无限循环调用await Task.Delay(100); 的性能要好得多。

问题:

  • 有没有更好的方法来等待服务器响应?也许使用 Reactive Extensions 或其他一些库?

如果我必须重写库的某些部分,这对我来说没什么大不了的。我写这个库主要是为了学习。尽管代码(主要是 TCP 客户端)在某种程度上是被黑的代码,但我尽我所能为项目提供更好且易于理解的结构。

提前感谢您的帮助!

您可以在这里找到代码:https://github.com/schjan/RailNet | Message Dispatcher Class

【问题讨论】:

  • 如果连续快速发送相同的命令会发生什么?还有可能得不到回应吗?
  • 因为只有像“set junction 1 left”这样的“绝对”命令,而没有像“switch junction 1”这样的命令,所以我只发送一次这个命令或重复使用服务器答案。 SendeBefehlAwaitResponse 是你的朋友。其他选择是按顺序匹配响应。该协议不是我写的,它由ESU 提供。
  • 我建议您使用 Rx 或连续的async 内部读取(表示响应/服务器更新的传入)。然后为“发送此命令并等待其匹配响应”添加一个单独的更高级别的async API。

标签: c# asynchronous client system.reactive waithandle


【解决方案1】:

Rx 可能会让您的生活更轻松。我还想它会减少代码中的一些潜在竞争条件,并且最终会减少很多管道式代码(即维护缓存的代码)。

如果我从我认为是代码的关键元素(来自https://github.com/schjan/RailNet/blob/master/src/RailNet.Clients.Ecos/Basic/NachrichtenDispo.cs)开始,然后将它们提取到我看到的方法中。

private bool HasBeginAndEnd(string[] message)
{
    bool isValid = true;

    if (!message[0].StartsWith("<") || !message[0].EndsWith(">"))
        isValid = false;

    if (!message.Last().StartsWith("<END"))
        isValid = false;

    return isValid;
}
private bool IsReplyMessage(string[] message)
{
    return message.Length>0 && message[0].StartsWith("<REPLY ");
}
private BasicAntwort ParseResponse(string[] message)
{
    string header = message[0].Substring(7, message[0].Length - 8);
    return new BasicAntwort(message, header);
}

使用这些漂亮的小描述方法,我可以使用 Rx 创建一个可观察的响应序列。

var incomingMessages = Observable.FromEventPattern<MessageReceivedEventArgs>(
    h => _networkClient.MessageReceivedEvent += h,
    h => _networkClient.MessageReceivedEvent -= h)
.Select(x => x.EventArgs.Content)
.Where(HasBeginAndEnd)
.Where(IsReplyMessage)
.Select(ParseResponse);

酷,现在我们有一个传入的流/序列。

接下来,我们希望能够发出命令并为其返回适当的响应。 Rx 也可以做到这一点。

incomingMessages.Where(reply=>reply.Header == befehl)

为了继续,我们还想添加一个超时,这样如果我们在给定的时间(8000 毫秒?)内没有从 out 命令得到响应,我们应该抛出。我们可以将其转换为任务,如果我们只想要单个值,那么我们也可以使用 Rx 来完成。

incomingMessages.Where(reply=>reply.Header == befehl)
                .Timeout(TimeSpan.FromSeconds(2))
                .Take(1)
                .ToTask();

现在差不多完成了。我们只想要一种发送命令并返回带有响应(或超时)的任务的方法。没问题。只需先订阅我们的传入消息序列,以避免出现竞争条件,然后发出命令。

public Task<BasicAntwort> SendCommand(NetworkClient networkClient, string befehl)
{
    //Subscribe first to avoid race condition.
    var result = incomingMessages
                        .Where(reply=>reply.Header == befehl)
                        .Timeout(TimeSpan.FromSeconds(2))
                        .Take(1)
                        .ToTask();

    //Send command
    networkClient.SendMessage(befehl);

    return result;
}   

这是作为 LinqPad 脚本的整个代码

    void Main()
    {
        var _networkClient = new NetworkClient();
        var sendCommandTask = SendCommand(_networkClient, "MyCommand");
        BasicAntwort reply = sendCommandTask.Result;
        reply.Dump();
    }


    private static bool HasBeginAndEnd(string[] message)
    {
        bool isValid = true;

        if (!message[0].StartsWith("<") || !message[0].EndsWith(">"))
            isValid = false;

        if (!message.Last().StartsWith("<END"))
            isValid = false;

        return isValid;
    }
    private static bool IsReplyMessage(string[] message)
    {
        return message.Length>0 && message[0].StartsWith("<REPLY ");
    }
    private static BasicAntwort ParseResponse(string[] message)
    {
        string header = message[0].Substring(7, message[0].Length - 8);
        return new BasicAntwort(message, header);
    }

    public IObservable<BasicAntwort> Responses(NetworkClient networkClient)
    {
        return Observable.FromEventPattern<MessageReceivedEventArgs>(
                h => networkClient.MessageReceivedEvent += h,
                h => networkClient.MessageReceivedEvent -= h)
            .Select(x => x.EventArgs.Content)
            .Where(HasBeginAndEnd)
            .Where(IsReplyMessage)
            .Select(ParseResponse);
    }   

    public Task<BasicAntwort> SendCommand(NetworkClient networkClient, string befehl)
    {
        //Subscribe first to avoid race condition.
        var result = Responses(networkClient)
                            .Where(reply=>reply.Header == befehl)
                            .Timeout(TimeSpan.FromSeconds(2))
                            .Take(1)
                            .ToTask();

        //Send command
        networkClient.SendMessage(befehl);

        return result;
    }   


public class NetworkClient
{
public event EventHandler<MessageReceivedEventArgs> MessageReceivedEvent;
public bool Connected { get; set; }
public void SendMessage(string befehl)
{
    var handle = MessageReceivedEvent;
    if(handle!=null){

        var message = new string[3]{"<REPLY " + befehl +">", "Some content", "<END>"};

        handle(this, new UserQuery.MessageReceivedEventArgs(){Content=message});
    }
}
}
public class MessageReceivedEventArgs : EventArgs
{
public string[] Content { get; set; }
}

public class BasicAntwort
{
    public BasicAntwort(string[] message, string header)
    {
        Header = header;
        Message = message;
    }

    public string Header { get; set; }
    public string[] Message { get; set; }
}

【讨论】:

  • 非常感谢!我会按照你写的方式实现它,并在接下来的几天里删除一些其他的东西。因为我从未做过 Rx,所以这可能需要一些时间。我会随时通知你。再次:非常感谢!
猜你喜欢
  • 2021-11-24
  • 2016-09-04
  • 1970-01-01
  • 1970-01-01
  • 2013-08-20
  • 2015-11-09
  • 1970-01-01
  • 1970-01-01
  • 2019-07-26
相关资源
最近更新 更多