【问题标题】:Can't get NetMQ pub-sub pattern to work with ReceiveReady无法让 NetMQ 发布-订阅模式与 ReceiveReady 一起使用
【发布时间】:2016-10-23 13:22:59
【问题描述】:

我正在尝试使用 NetMQ (3.3.3.4) 并创建 pub-sub 模式。

我希望主机/服务器监听一个端口 (9000) 上的所有传入数据,并将数据转发到另一个端口 (9001) 上的所有客户端/订阅者。

然后,客户端将在 9000 上发送数据,并在 9001 上接收(由任何人)发送的所有消息。

按照文档,我创建了类似下面的代码,但我无法让它工作。我相信主要是因为ReceiveReady 永远不会被调用!

我认为它应该如何工作:

  • client.Publish 应该导致 host.SubscriberSocket_ReceiveReady 中的第一行解除阻塞并将数据传递到另一个套接字
  • 当数据被传递时,它应该出现在客户端无限运行的Task

结果:

  • 永远不会到达// This line is never reached 上的断点
  • 任何地方都没有例外。
  • 在主机上切换端口,使发布 = 9000 和订阅 = 9001 无效
  • Windows 防火墙已关闭,因此不应有任何阻止
  • 如果我将地址放入 PublisherSocket 构造函数,或者我在 Host 中使用 _publisherSocket.Bind(address) 或在 Client 中使用 _publisherSocket.Connect(address),这没有区别

我做错了什么?

主持人

public class MyNetMQHost {

    private NetMQSocket _publishSocket;
    private NetMQSocket _subscribeSocket;
    private NetMQPoller _poller;

    public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
        Task.Factory.StartNew(() => {
            using (_publishSocket = new PublisherSocket(publishAddress))
            using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
            using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
                _subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
                _poller.Run();
            }
        });
    }

    private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
        var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
        _publishSocket.SendMultipartBytes(data);
    }
}

客户

public class MyNetMQClient {

    private readonly NetMQSocket _publishSocket;
    private readonly NetMQSocket _subscribeSocket;

    public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
        _publishSocket = new PublisherSocket(publishAddress);
        _subscribeSocket = new SubscriberSocket(subscribeAddress);

        Task.Factory.StartNew(() => {
            while (true) {
                byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
                int one = 1; // This line is never reached
            }
        });
    }

    public void Publish(byte[] data) {
        _publishSocket.SendFrame(data);
    }
}

测试员

public class Tester {
    public void MyTester() {
        MyNetMQHost host = new MyNetMQHost();
        MyNetMQClient client = new MyNetMQClient();

        client.Publish(Encoding.Unicode.GetBytes("Hello world!");
    }
}

【问题讨论】:

    标签: c# zeromq publish-subscribe netmq


    【解决方案1】:

    您的经纪人和客户都从不致电 suscribe。 在代理上调用 suscriber.Subscribe("") 订阅所有。在您的客户上订阅您想要的任何内容。

    在您的代理中,您实际上应该使用 XSubscriber 和 XPublisher 来移动 susvriptions。这样你就不需要全部订阅。您可以为此使用代理类。

    【讨论】:

    • 在阅读了有关 XSub/XPub 的文档以及一些试验和错误之后,我想我已经成功了!谢谢:)
    猜你喜欢
    • 2017-04-24
    • 2014-10-06
    • 2016-01-25
    • 1970-01-01
    • 2021-09-09
    • 1970-01-01
    • 1970-01-01
    • 2016-12-14
    相关资源
    最近更新 更多