消息队列的地位越来越重要,几乎是面试的必问问题了,不会使用几种消息队列都显得尴尬,正好本文使用C#来带你认识rabbitmq消息队列
首先,我们要安装rabbitmq,当然,如果有现成的,也可以使用,不知道曾几何时,我喜欢将数据库等等软件安装在linux虚拟机,如果没现成的rabbitmq,按照下面的来吧,嘿嘿
rabbitmq安装:https://www.cnblogs.com/shanfeng1000/p/11951703.html
如果要实现rabbitmq集群,参考:https://www.cnblogs.com/shanfeng1000/p/12097054.html
我这里使用的是rabbitmq集群,但是没有比较,只是已经安装好了,就直接使用算了
虚拟机集群地址:192.168.209.133,192.168.209.134,192.168.209.135
端口使用的默认端口,都是5672,也就是AMQP协议端口
Rabbitmq的工作模式
先说说几个概念
生产者(producer):负责生产消息,可以有多个生产者,可以理解为生成消息的那部分逻辑
消费者(consumer):从队列中获取消息,对消息处理的那部分逻辑
队列(queue):用于存放消息,可以理解为先进先出的一个对象
交换机(exchange):顾名思义,就是个中介的角色,将接收到的消息按不同的规则转发到其他交换机或者队列中
路由(route):就是交换机分发消息的规则,交换机可以指定路由规则,生产者在发布消息时也可以指定消息路由,比如交换机中设置A路由表示将消息转发到队列1,B路由表示将消息转发到队列2,那么当交换机接收到消息时,如果消息的路由满足A路由,则将消息转发到队列1,如果满足B路由则将消息转发到队列2
虚拟主机(virtual host):虚拟地址,用于进行逻辑隔离,一个虚拟主机里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue
再看看rabbitmq的几种工作模式,具体可参考rabbitmq官网给出的Demo:https://www.rabbitmq.com/getstarted.html
其中,第6中类似我们常用的请求-响应模式,但是使用的RPC请求响应,用的比较少,这里就不过多解释,感兴趣的可以参考官网文档:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html。
总的来说,就是生产者将消息发布到rabbitmq上,然后消费者连接rabbitmq,获取到消息就消费,但是有几点说明一下
1、rabbitmq中的消息是可被多次消费的,因为rabbitmq提供了ack机制,当消费者在消费消息时,如果将自动ack设置成false,那么需要手动提交ack才能告诉rabbitmq消息已被使用,否则当通道关闭时,消息会继续呆在队列中等待消费
2、当存在多个消费者时,默认情况下,一个消费者获取一个消息,处理完成后再获取下一个,但是rabbitmq消费一次性获取多个,当然后当这些消息消费完成后,再获取下一批,这也就是rabbitmq的Qos机制
C#使用rabbitmq
如果感兴趣的人多,到时候再单独开一篇博文,现在就介绍其中的1-5种,也可以分类成两种:不使用交换机和使用交换机,所以下面就分这两种来说明
首先,我们创建了两个Demo项目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分别使用使用nuget安装RabbitMQ.Client:
其中RabbitMQ.PublishConsole是用来生产消息,RabbitMQ.ConsumeConsole用来消费消息
这里我们安装的是最新版本,旧版本和新版本在使用上可能会有一些区别
不使用交换机情形
不使用交换机有两种模式:简单模式和工作模式
这里先贴上生产者生成消息的代码,简单模式和工作模式这部分测试代码是一样的:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.PublishConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //创建一个连接工厂 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成 //一个连接可以创建多个通道 var connection = factory.CreateConnection(hosts); string queue = "queue1";//队列名称 //创建一个通道 //此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成 var channel = connection.CreateModel(); //给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是arguments参数,否则会报错 var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); //发布10条消息 for (var i = 0; i < 10; i++) { var buffer = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish("", queue, null, buffer); } channel.Close(); Console.ReadKey(); } } }