【问题标题】:NetMQ.FiniteStateMachineException: Rep.XRecv - cannot receive another requestNetMQ.FiniteStateMachineException: Rep.XRecv - 无法接收另一个请求
【发布时间】:2019-05-29 08:01:38
【问题描述】:

我不断收到NetMQ.FiniteStateMachineException

当然,我的代码有效...异常不会立即发生...但在几个小时的过程中,它可能会发生。

有谁知道这里发生了什么导致这个异常?

我不太确定为什么,尽管我确实阅读了这里的解释:c# - ZeroMQ FiniteStateMachineException in REQ/REP pattern - Stack Overflow

我得到了一堆这些:

异常

NetMQ.FiniteStateMachineException: Rep.XRecv - cannot receive another request
   at NetMQ.Core.Patterns.Rep.XRecv(Msg& msg)
   at NetMQ.Core.SocketBase.TryRecv(Msg& msg, TimeSpan timeout)
   at NetMQ.NetMQSocket.TryReceive(Msg& msg, TimeSpan timeout)
   at NetMQ.ReceivingSocketExtensions.ReceiveFrameString(IReceivingSocket socket, Encoding encoding, Boolean& more)
   at NinjaTrader.NinjaScript.AddOns.anAddOn.ZeroMQ_Server()
NetMQ.FiniteStateMachineException: Rep.XRecv - cannot receive another request
   at NetMQ.Core.Patterns.Rep.XRecv(Msg& msg)
   at NetMQ.Core.SocketBase.TryRecv(Msg& msg, TimeSpan timeout)
   at NetMQ.NetMQSocket.TryReceive(Msg& msg, TimeSpan timeout)
   at NetMQ.ReceivingSocketExtensions.ReceiveFrameString(IReceivingSocket socket, Encoding encoding, Boolean& more)
   at NinjaTrader.NinjaScript.AddOns.anAddOn.ZeroMQ_Server()

我的代码


        // thread start code

                if (thread == null) {
                    print2("Addon {0}: is starting, listening on port: {1}...", GetType().Name, ZeroPort);
                    thread = new Thread(ZeroMQ_Server);
                    thread.Start();
                }

        // zeroMQ code

        #region TaskCallBack - NetMQ
        // This thread procedure performs the task.
        private void ZeroMQ_Server()
        { 
            bool quit = false;
            string bindAddress = "tcp://*:"+ZeroPort;
            try {
                while (!quit) {
                    try {
                        using (var repSocket = new ResponseSocket()) 
                        {
                            curRepSocket = repSocket;
                            print2("*** BINDING on {0} ***", bindAddress);
                            repSocket.Bind(bindAddress); 

                            while (!quit) {
                                try {
                                    Running = true;

                                    var msgStr = repSocket.ReceiveFrameString();
                                    print2("[►] {2} [REP:{0}] {1}", bindAddress, msgStr, DateTime.Now.ToString("HH:mm:ss.fff"));
                                    if (processMsg(msgStr)) {
                                        StringBuilder csv = new StringBuilder();
                                        // string building stuff here

                                        string cs = csv.ToString();
                                        print2("[◄] csv: {0}", cs);
                                        repSocket.SendFrame(cs);
                                    } else {
                                        repSocket.SendFrame("Unrecognized Command: " + msgStr);
                                        break;
                                    }

                                } catch (Exception e) {
                                    quit = isThreadAborted(e);
                                }
                            }
                        }

                    } catch (Exception e) {
                        if (e is AddressAlreadyInUseException) {
                            //print2("");
                        } else quit = isThreadAborted(e);
                    } finally {
                        curRepSocket = null;
                        Running = false;
                    }
                }
            } finally {
                //NetMQConfig.Cleanup(); 
            }
        }

        private bool isThreadAborted(Exception e) {
            if (e is ThreadAbortException) {
                print2("\n*** thread aborting... ***");
                return true;
            } else {
                print2(e);
                return false;
            }
        }

【问题讨论】:

    标签: c# zeromq netmq


    【解决方案1】:

    响应套接字是一个状态机,你必须回复每个请求。 从代码来看,如果 processMsg 抛出你没有发回任何东西,因此你不能再次接收并得到异常。

    如果客户端不在,发送失败也可以。

    尝试使用路由器,如下所示:

    while (true)
    {
        bool more;
        var msg = routerSocket.ReceiveFrameBytes(out more);
    
        // Forwarding the routing id.
        routerSocket.SendMoreFrame(msg);
    
        // Bottom, next frame is the message
        if (msg.Length == 0)
            break;
    }
    
    // Write your handling here
    
    

    【讨论】:

    • 如果processMsg() 返回truefalse 那么repSocket.SendFrame() 仍然被调用......所以这就是我感到困惑的地方。我知道你看不到processMsg() 里面的内容,但它有一个catch (Exception e) 会返回false,所以无论哪种方式,它都会返回truefalse,而不允许Exception 逃脱-对于我可以调查的内容,您还有其他想法吗?
    • 还有其他例外吗?
    • 我还没有看到 - 但是我确实使用过 AddressAlreadyInUseException s 但我想我已经解决了这个问题.. 作为一种临时解决方法,我可以把整个事情包在一个 catch 块中吗防止我的程序停止并创建一个新的代表套接字?如果遇到这个FiniteStateMachineException
    • 您将不得不重新创建响应套接字。
    • 其实,如果消息发送失败(因为客户端不在了)也会失败。
    猜你喜欢
    • 1970-01-01
    • 2015-12-29
    • 2021-09-01
    • 2018-05-08
    • 2022-11-20
    • 1970-01-01
    • 1970-01-01
    • 2017-12-24
    • 2016-06-12
    相关资源
    最近更新 更多