【问题标题】:How do you shutdown a ZMQ QueueDevice from a worker thread in c#如何从 C# 中的工作线程关闭 ZMQ QueueDevice
【发布时间】:2023-03-10 14:49:01
【问题描述】:

第一次使用 ZMQ,我正在尝试设置一个进程来处理许多 getimage 请求。调试时会发生一些异常,我正在尝试修复并实现一种方法来停止 QueueDevice 终止所有线程并正常退出。

  1. receiver.Connect(BackendBindAddress);在 NetMQ.dll 中发生类型为“NetMQ.InvalidException”的未处理异常,错误代码为 NetMQ.zmq.ErrorCode.EINVAL。为什么这个异常不会停止线程的进一步执行?
  2. 我尝试将 QueueDevice 设为静态字段并在关闭消息函数中使用 queueDevice.stop(),但随后线程开始抛出 TerminatingExceptions 并且永不退出。那我可以关闭所有线程和主线程吗?

测试驱动代码

    [TestMethod]
    public void ProgramStartupShutdownTest()
    {
        var mockClientWrapper = new Mock<IClient>(MockBehavior.Strict);

        var target = new SocketListener(2, mockClientWrapper.Object);

        var task = Task.Factory.StartNew(() => target.StartListening("tcp://localhost:81"));
        using (var client = NetMQContext.Create())
        {
           var socket = client.CreateRequestSocket();
           socket.Connect("tcp://localhost:81");
           var m = new NetMQMessage(new ShutdownMessage().CreateMessageFrames());
           socket.SendMessage(m);
        }

        var timedout = task.Wait(200);
        Assert.IsTrue(timedout);
    }

我正在使用的代码

private const string BackendBindAddress = "inproc://workers";
public SocketListener(int numberOfWorkers, IClient client )
    {
        numberOfThreads = numberOfWorkers;
        _client = client;
    }

public void StartListening(string address)
    {
        StartZeroMQ(address, context =>
        {
            for (var i = 0; i <= numberOfThreads; i++)
            {
                var t = new Thread(WorkerRoutine);
                t.Start(
                        new WorkerParamters
                        {
                            Context = context,
                            Client = _client
                        }
                    );
            }
        });
    }


    private void StartZeroMQ(string address, Action<NetMQContext> setupWorkers)
    {
        using (var context = NetMQContext.Create())
        {
            var queueDevice = new QueueDevice(context, address, BackendBindAddress, DeviceMode.Blocking);
            setupWorkers(context);
            queueDevice.Start();
        }
    }

    struct WorkerParamters
    {
        public NetMQContext Context;
        public IClient Client;
    }

    private static void WorkerRoutine(object startparameter)
    {
            var wp = (WorkerParamters) startparameter;
            var client = wp.Client;
            using (var receiver = wp.Context.CreateResponseSocket())
            {
                receiver.Connect(BackendBindAddress);
                var running = true;
                while (running)
                {
                        var message = receiver.ReceiveMessage();
                        var letter = Message.ParseMessageFrame(message,
                                                               imageMessage => GetImage(imageMessage, client),
                                                               videoMessage => GetVideo(videoMessage, client),
                                                               shutdownMessage =>
                                                               {
                                                                   running = false;
                                                                   return true;
                                                               });

                        receiver.Send(letter.ToJson(), Encoding.Unicode);
                }
            }
    }

【问题讨论】:

标签: c# multithreading zeromq netmq


【解决方案1】:

为了解决这个问题,我在设备中添加了一个 Initialize 方法,在启动 worker 之前调用它,在调用 worker 启动设备之后调用它。

你可以从here获取代码(你需要从仓库编译项目),我稍后会添加一个pull request。

你也可以手写设备逻辑,应该不复杂。

【讨论】:

  • 谢谢,我正在考虑编写它,但在几个月后不得不维护它,因为我的 zmq 知识变得越来越生疏。也就是说,我不想在我已经学过的基础上学习如何使用轮询器。
猜你喜欢
  • 1970-01-01
  • 2023-03-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-07
  • 1970-01-01
  • 2017-10-18
  • 2012-09-05
相关资源
最近更新 更多