消息队列的地位越来越重要,几乎是面试的必问问题了,不会使用几种消息队列都显得尴尬,正好本文使用C#来带你认识rabbitmq消息队列

  首先,我们要安装rabbitmq,当然,如果有现成的,也可以使用,不知道曾几何时,我喜欢将数据库等等软件安装在linux虚拟机,如果没现成的rabbitmq,按照下面的来吧,嘿嘿

  rabbitmq安装:https://www.cnblogs.com/shanfeng1000/p/11951703.html

  如果要实现rabbitmq集群,参考:https://www.cnblogs.com/shanfeng1000/p/12097054.html

  我这里使用的是rabbitmq集群,但是没有比较,只是已经安装好了,就直接使用算了

  虚拟机集群地址:192.168.209.133,192.168.209.134,192.168.209.135

  端口使用的默认端口,都是5672,也就是AMQP协议端口

  Rabbitmq的工作模式

  先说说几个概念

  生产者(producer):负责生产消息,可以有多个生产者,可以理解为生成消息的那部分逻辑

  消费者(consumer):从队列中获取消息,对消息处理的那部分逻辑

  队列(queue):用于存放消息,可以理解为先进先出的一个对象

  交换机(exchange):顾名思义,就是个中介的角色,将接收到的消息按不同的规则转发到其他交换机或者队列中

  路由(route):就是交换机分发消息的规则,交换机可以指定路由规则,生产者在发布消息时也可以指定消息路由,比如交换机中设置A路由表示将消息转发到队列1,B路由表示将消息转发到队列2,那么当交换机接收到消息时,如果消息的路由满足A路由,则将消息转发到队列1,如果满足B路由则将消息转发到队列2

  虚拟主机(virtual host):虚拟地址,用于进行逻辑隔离,一个虚拟主机里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue

  再看看rabbitmq的几种工作模式,具体可参考rabbitmq官网给出的Demo:https://www.rabbitmq.com/getstarted.html

    C# .net 环境下使用rabbitmq消息队列

  其中,第6中类似我们常用的请求-响应模式,但是使用的RPC请求响应,用的比较少,这里就不过多解释,感兴趣的可以参考官网文档:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  总的来说,就是生产者将消息发布到rabbitmq上,然后消费者连接rabbitmq,获取到消息就消费,但是有几点说明一下

  1、rabbitmq中的消息是可被多次消费的,因为rabbitmq提供了ack机制,当消费者在消费消息时,如果将自动ack设置成false,那么需要手动提交ack才能告诉rabbitmq消息已被使用,否则当通道关闭时,消息会继续呆在队列中等待消费

  2、当存在多个消费者时,默认情况下,一个消费者获取一个消息,处理完成后再获取下一个,但是rabbitmq消费一次性获取多个,当然后当这些消息消费完成后,再获取下一批,这也就是rabbitmq的Qos机制

  

  C#使用rabbitmq

  如果感兴趣的人多,到时候再单独开一篇博文,现在就介绍其中的1-5种,也可以分类成两种:不使用交换机和使用交换机,所以下面就分这两种来说明

  首先,我们创建了两个Demo项目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分别使用使用nuget安装RabbitMQ.Client:

  C# .net 环境下使用rabbitmq消息队列

  其中RabbitMQ.PublishConsole是用来生产消息,RabbitMQ.ConsumeConsole用来消费消息  

  这里我们安装的是最新版本,旧版本和新版本在使用上可能会有一些区别


 

  不使用交换机情形

  不使用交换机有两种模式:简单模式和工作模式

  这里先贴上生产者生成消息的代码,简单模式和工作模式这部分测试代码是一样的:  

  C# .net 环境下使用rabbitmq消息队列
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.PublishConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //创建一个连接工厂
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
            //一个连接可以创建多个通道
            var connection = factory.CreateConnection(hosts);

            string queue = "queue1";//队列名称

            //创建一个通道
            //此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
            var channel = connection.CreateModel();
            //给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是arguments参数,否则会报错
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
            channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

            //发布10条消息
            for (var i = 0; i < 10; i++)
            {
                var buffer = Encoding.UTF8.GetBytes(i.ToString());
                channel.BasicPublish("", queue, null, buffer);
            }
            channel.Close();

            Console.ReadKey();
        }
    }
}
RabbitMQ.PublishConsole

相关文章: