之前发过一篇帖子 应用.Net+Consul维护RabbitMq的高可用性,然后最近老大问我当初我这么搞是抽的什么想法- -然后顺便贴了两行C#代码:

         var factory = new ConnectionFactory()
                {
                    UserName = "username",
                    Password = "password",
                    AutomaticRecoveryEnabled = true,
                    TopologyRecoveryEnabled = true
                };
                Connection = factory.CreateConnection(new string[3] { "ip1", "ip2", "ip3" });

AutomaticRecoveryEnabled的作用就是断线重连,假如当前的连接断开了,连接不会释放

TopologyRecoveryEnabled 重连后恢复当前的工作进程,比如channel、queue、发布的消息进度等。

看上去其实却是比较方便的,为了狡辩我当场列出了我设计的方案的主要优点:

1.rabbitmq网关设计的时候一方面是为了让客户端能够保证建立与master节点的连接,直连master性能较高。

2.通过配置队列名+VirthHost可以获取队列信息,Consul可以起到配置中心的作用,可配置性高一些(其实此处是狡辩。。)

3.RabbitMQ与Master队列建立的连接在发生故障时会第一时间查询网关获取新的Mater队列信息进行重连(当然代码内部也有重试机制,不会随便就更换连接的),相比于AutomaticRecoveryEnabled效率更高一些。

4.客户端SDK内部实现定时更新队列连接,发现Master节点更换时重新建立连接

看上去我设计的方案还是有点意义的(其实是真懒得改代码)

不过此处有个问题就是既然创建连接时的参数可以提供多个IP的集合,假如RabbitMQ提供的客户端SDK内部实现的更好,那我狡辩什么不也完戏了吗。。。于是假装下载了下客户端sdk代码扫了扫,此处以C#代码为例,代码还是比较简单的。github地址

直接定位ConnectionFactory类找CreateConnection()方法

       /// <summary>
        /// Create a connection using a list of hostnames using the configured port.
        /// By default each hostname is tried in a random order until a successful connection is
        /// found or the list is exhausted using the DefaultEndpointResolver.
        /// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
        /// </summary>
        /// <param name="hostnames">
        /// List of hostnames to use for the initial
        /// connection and recovery.
        /// </param>
        /// <returns>Open connection</returns>
        /// <exception cref="BrokerUnreachableException">
        /// When no hostname was reachable.
        /// </exception>
        public IConnection CreateConnection(IList<string> hostnames)
        {
            return CreateConnection(hostnames, null);
        }

        /// <summary>
        /// Create a connection using a list of hostnames using the configured port.
        /// By default each endpoint is tried in a random order until a successful connection is
        /// found or the list is exhausted.
        /// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
        /// </summary>
        /// <param name="hostnames">
        /// List of hostnames to use for the initial
        /// connection and recovery.
        /// </param>
        /// <param name="clientProvidedName">
        /// Application-specific connection name, will be displayed in the management UI
        /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
        /// be used as a connection identifier, e.g. in HTTP API requests.
        /// This value is supposed to be human-readable.
        /// </param>
        /// <returns>Open connection</returns>
        /// <exception cref="BrokerUnreachableException">
        /// When no hostname was reachable.
        /// </exception>
        public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
        {
            var endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, this.Port, this.Ssl));
            return CreateConnection(new DefaultEndpointResolver(endpoints), clientProvidedName);
        }

        /// <summary>
        /// Create a connection using a list of endpoints. By default each endpoint will be tried
        /// in a random order until a successful connection is found or the list is exhausted.
        /// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
        /// </summary>
        /// <param name="endpoints">
        /// List of endpoints to use for the initial
        /// connection and recovery.
        /// </param>
        /// <returns>Open connection</returns>
        /// <exception cref="BrokerUnreachableException">
        /// When no hostname was reachable.
        /// </exception>
        public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
        {
            return CreateConnection(new DefaultEndpointResolver(endpoints), null);
        }

        /// <summary>
        /// Create a connection using an IEndpointResolver.
        /// </summary>
        /// <param name="endpointResolver">
        /// The endpointResolver that returns the endpoints to use for the connection attempt.
        /// </param>
        /// <param name="clientProvidedName">
        /// Application-specific connection name, will be displayed in the management UI
        /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
        /// be used as a connection identifier, e.g. in HTTP API requests.
        /// This value is supposed to be human-readable.
        /// </param>
        /// <returns>Open connection</returns>
        /// <exception cref="BrokerUnreachableException">
        /// When no hostname was reachable.
        /// </exception>
        public IConnection CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
        {
            IConnection conn;
            try
            {
                if (AutomaticRecoveryEnabled)
                {
                    var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
                    autorecoveringConnection.Init(endpointResolver);
                    conn = autorecoveringConnection;
                }
                else
                {
                    IProtocol protocol = Protocols.DefaultProtocol;
                    conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName);
                }
            }
            catch (Exception e)
            {
                throw new BrokerUnreachableException(e);
            }

            return conn;
        }
View Code

相关文章:

  • 2021-04-10
  • 2021-12-15
  • 2021-12-10
  • 2021-11-09
  • 2022-02-20
  • 2021-11-25
  • 2021-06-18
  • 2021-08-06
猜你喜欢
  • 2018-10-03
  • 2022-12-23
  • 2022-01-09
  • 2022-12-23
  • 2021-09-26
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案