【问题标题】:Need a ZeroMQ implementation of an ICommunicationListener on Azure Service Fabric需要在 Azure Service Fabric 上实现 ICommunicationListener 的 ZeroMQ
【发布时间】:2018-10-04 01:42:05
【问题描述】:

我正在寻找 ICommunicationListener 的 ZeroMQ 实现,我可以使用它与服务结构一起在 Azure 上运行 ZeroMQ 端点。

我找了几个小时也没找到。有谁知道这个的解决方案?我目前使用的是“Service App Fabric / .net core 2.0 stateless service”模板,
这允许我覆盖
IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners(),
当我有 ZeroMQ 的 ICommunicationListener 实现时,
或覆盖Task RunAsync(CancellationToken cancellationToken),
当我想自己设置套接字时。

我的第一次尝试不起作用:

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    using (var server = new ResponseSocket("tcp://xx.xx.xx.xx:xxxxx"))
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var message = server.ReceiveFrameBytes();
            ServiceEventSource.Current.ServiceMessage(this.Context, "Message {0}",
                System.Text.Encoding.UTF8.GetString(message));
        }
    }
}

上述结果是服务无法启动。除此以外找不到太多日志记录:

“CodePackage 激活过程中出现错误。服务主机以退出代码终止:255”

【问题讨论】:

  • 您发布的错误消息非常笼统,意味着您的服务无法启动。如果您调试您的服务并获得确切的错误消息,或者在您的逻辑中添加一个 try catch 以将错误记录到文件、数据库或事件日志中,这将有所帮助

标签: azure asp.net-core zeromq azure-service-fabric netmq


【解决方案1】:

如果不存在,您可以通过创建ICommunicationListener 的实现并从CreateServiceInstanceListeners 返回它来创建自己的。 使用OpenAsync 打开频道并开始收听。使用CloseAsync 停止收听。

查看this implementation for Service Bus,获取灵感。

【讨论】:

  • 谢谢,这确实是我选择的方式。我保持问题开放,以便发布结果。
【解决方案2】:

这是 ZeroMQ 的 ICommunicationListener 实现的粗略示例。此实现将充当 ZeroMQ ResponseSocket,但可以轻松更改为 RequestSocketSubscriberSocket 或您喜欢的任何类型的 NetMQ.Sockets.* 套接字实现。当然,它需要在实现中提供更多细节,例如在检索消息时不引发异常,但它应该清楚地了解它是如何完成的。它受到ICommunicationListener 接口的现有dotnetcore 实现的极大启发。

public class ZeroMqResponseSocketCommunicationListener : ICommunicationListener, IDisposable
{
    private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
    private readonly ResponseSocket _responseSocket = new ResponseSocket();
    private readonly ServiceContext _serviceContext;
    private readonly string _endpointName;

    public ZeroMqResponseSocketCommunicationListener(ServiceContext serviceContext, string endpointName)
    {
        if (string.IsNullOrEmpty(endpointName))
            throw new ArgumentException("endpointName cannot be null or empty string.");

        _serviceContext = serviceContext;
        _endpointName = endpointName;
    }

    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        var address = GetListenerUrl();
        if (address == null)
            throw new InvalidOperationException("No Url returned from ZeroMqResponseSocketCommunicationListener.GetListenerUrl");


        _responseSocket.Bind(address);

        ThreadPool.QueueUserWorkItem(state => MessageHandler(_cancellationToken.Token));

        return Task.FromResult(address);
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        _responseSocket.Close();
        return Task.FromResult(true);
    }

    public void Abort()
    {
        _responseSocket.Close();
    }

    private void MessageHandler(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var message = _responseSocket.ReceiveFrameBytes();
            if (message != null)
                throw new Exception($"Message {Encoding.UTF8.GetString(message)}");
        }
    }

    private string GetListenerUrl()
    {
        var endpoints = _serviceContext.CodePackageActivationContext.GetEndpoints();

        if (!endpoints.Contains(_endpointName))
            throw new InvalidOperationException($"{_endpointName} not found in Service Manifest.");

        var serviceEndpoint = _serviceContext.CodePackageActivationContext.GetEndpoint(_endpointName);

        if (string.IsNullOrEmpty(serviceEndpoint.IpAddressOrFqdn))
            throw new InvalidOperationException("IpAddressOrFqdn not set on endpoint");

        if (serviceEndpoint.Port <= 0)
            throw new InvalidOperationException("Port not set on endpoint");

        var listenUrl = $"{serviceEndpoint.Protocol.ToString().ToLower()}://{serviceEndpoint.IpAddressOrFqdn}:{serviceEndpoint.Port}";

        return listenUrl;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!disposing || _responseSocket == null) return;

        try
        {
            _responseSocket.Close();
            _responseSocket.Dispose();
        }
        catch (Exception ex)
        {
            ServiceEventSource.Current.Message(ex.Message);
        }
    }
}

并在您的应用结构服务中返回 ZeroMqResponseSocketCommunicationListener:

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
    yield return new ServiceInstanceListener(listener => new ZeroMqResponseSocketCommunicationListener(listener, "EndpointName"));
}

确保您在服务的 ServiceManifest.xml 中指定了一个端点:

<Resources>
  <Endpoints>
    <Endpoint Name="EndpointName" Port="80" Protocol="tcp" />
  </Endpoints>
</Resources>

【讨论】:

  • 您能否分享一下您在构建它时查看的参考实现?谢谢。
猜你喜欢
  • 2017-11-25
  • 2019-11-30
  • 2016-12-08
  • 1970-01-01
  • 1970-01-01
  • 2017-06-05
  • 2017-04-09
  • 2015-09-17
  • 2018-08-29
相关资源
最近更新 更多