1.centos安装rabbitmq
官网下载或者yum list |grep rabbitmq搜索安装,官网是最新的版本
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.14/rabbitmq-server-3.6.14-1.el7.noarch.rpm
[root@greg02 src]#yum list |grep rabbitmq librabbitmq.x86_64 0.5.2-1.el7 epel librabbitmq-devel.x86_64 0.5.2-1.el7 epel librabbitmq-tools.x86_64 0.5.2-1.el7 epel opensips-event_rabbitmq.x86_64 1.10.5-3.el7 epel rabbitmq-java-client.noarch 3.6.0-1.el7 epel rabbitmq-java-client-doc.noarch 3.6.0-1.el7 epel rabbitmq-java-client-javadoc.noarch 3.6.0-1.el7 epel rabbitmq-server.noarch 3.3.5-34.el7 epel [root@greg02 src]#rz [root@greg02 src]#yum install -y rabbitmq-server-3.6.14-1.el7.noarch.rpm
2.Windows安装rabbitmq
pip install pika
or
easy_install pika
3.centos启动rabbitmq
[root@greg02 src]#service rabbitmq-server start Redirecting to /bin/systemctl start rabbitmq-server.service [root@greg02 src]#netstat -lntp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:80 0.0.0.0:* LISTEN 1188/nginx: master tcp 0 0 0.0.0.0:4369 0.0.0.0:* LISTEN 2801/epmd tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 1140/sshd tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 2205/master tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 2660/beam.smp tcp6 0 0 :::3306 :::* LISTEN 2163/mysqld tcp6 0 0 :::4369 :::* LISTEN 2801/epmd tcp6 0 0 :::22 :::* LISTEN 1140/sshd tcp6 0 0 ::1:25 :::* LISTEN 2205/master tcp6 0 0 :::5672 :::* LISTEN 2660/beam.smp
查看log
[root@greg02 src]#service rabbitmq-server status Redirecting to /bin/systemctl status rabbitmq-server.service Nov 16 19:16:51 greg02 rabbitmq-server[2660]: ########## Logs: /var/log/rabbitmq/rabbit@greg02.log Nov 16 19:16:51 greg02 rabbitmq-server[2660]: ###### ## /var/log/rabbitmq/rabbit@greg02-sasl.log
切换到log目录cd /var/log/rabbitmq/
[root@greg02 rabbitmq]#ls rabbit@greg02.log rabbit@greg02-sasl.log [root@greg02 rabbitmq]#cat rabbit\@greg02.log =INFO REPORT==== 16-Nov-2017::19:16:51 === Starting RabbitMQ 3.6.14 on Erlang R16B03-1 Copyright (C) 2007-2017 Pivotal Software, Inc. Licensed under the MPL. See http://www.rabbitmq.com/ =INFO REPORT==== 16-Nov-2017::19:16:51 === node : rabbit@greg02 home dir : /var/lib/rabbitmq config file(s) : /etc/rabbitmq/rabbitmq.config (not found) cookie hash : 6QLUzrnBCl9yXMJXg59m+Q== log : /var/log/rabbitmq/rabbit@greg02.log sasl log : /var/log/rabbitmq/rabbit@greg02-sasl.log database dir : /var/lib/rabbitmq/mnesia/rabbit@greg02
显示的是没有找到配置文件,我们可以自己创建这个文件
cd /etc/rabbitmq/ vi rabbitmq.config
编辑内容如下:
[{rabbit, [{loopback_users, []}]}].
rabbitmq默认创建的用户guest,密码也是guest,这个用户默认只能是本机访问,localhost或者127.0.0.1,从外部访问需要添加上面的配置。
保存配置后重启服务:
service rabbitmq-server stop service rabbitmq-server start
4.创建用户角色
[root@greg02 rabbitmq]#rabbitmqctl add_user greg greg123 Creating user "greg" [root@greg02 rabbitmq]#rabbitmqctl set_user_tags greg administrator Setting tags for user "greg" to [administrator] [root@greg02 rabbitmq]#rabbitmqctl set_permissions -p / greg ".*" ".*" ".*" Setting permissions for user "greg" in vhost "/"
队列通信:
send端
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #声明queue channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive端
#_*_coding:utf-8_*_ import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
work queues
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
send端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 声明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()
receive端
import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上
#no_ack 默认false,即使在处理消息的时候使用CTRL + C来杀死一个工作者,也不会丢失任何东西。 工人死后不久,所有未确认的消息将被重新发送
公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
sender
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
receive
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
订阅发布
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了
交换是一件非常简单的事情。 一方面它接收来自生产者的消息,另一方则推动他们排队。 交易所必须知道如何处理收到的消息。 是否应该附加到特定的队列? 它应该附加到许多队列? 还是应该丢弃。 这些规则是由交换类型定义的。
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
sender
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
receive
#_*_coding:utf-8_*_ import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开 后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
有选择的接收消息(exchange type=direct)
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
sender
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()