这个就比较简单了,只是还要强调一下
如果前面执行了消费者,那么后面修改消费者代码后,重启rabbitmq服务才能立即生效
代码
代码基本都来自官网
https://www.rabbitmq.com/tutorials/tutorial-two-php.html
生产者
<?php
namespace app\rabbit\controller;
use PhpAmqpLib\Message\AMQPMessage;
class NewTask
{
public function task($message='Hello World!') {
$connection = RabbitMQConnection::getConnection();
$channel = $connection->channel();
//参数3表示,是否是持久性的队列。
$channel->queue_declare('task_queue', false, true, false, false);
//用循环,一次多放一些数据
for ($i = 0; $i < 20; $i++) {
$newmsg = '';
$newmsg = $message.'--'.$i;
//第二个参数为消息持久
$msg = new AMQPMessage($newmsg, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent '.$newmsg.PHP_EOL;
}
$channel->close();
$connection->close();
}
}
|
看效果,这里的task_queue队列有20个消息,并且重启rabbitmq服务后,该队列也不会消失了
消费者1
代码注释里我加了一些自己反复测试,得到的结论
<?php
namespace app\rabbit\controller;
use think\Request;
use think\Db;
class ReceiveWork{
public function dowork() {
// echo 1212;die();
$connection = RabbitMQConnection::getConnection();
$channel = $connection->channel();
//第三个参数,表示该队列数据是不是要持久化,持久化之后,会自动保存在erlang数据库中
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
$data = [
'content' => '[111111] Received '.$msg->getBody(),
// 'content' => $msg->delivery_info['delivery_tag'],
'add_time' => time(),
];
Db::name('logs')->insert($data);
sleep(1);
//不加这句,就只保存了1次数据,应该是没有收到确认
//$msg->delivery_info['channel'] 拿到了消息对应的队列频道对象
//basic_ack是AMQPChannel类的方法,用于消息确认确认
//$msg->delivery_info['delivery_tag'] 打印出来的值是 1应该表示的是这条消息是不是确认收到
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//分配的任务不能超过1个,也就是说当前任务没有完成时,不分配新的任务
//没有这一句,用两个消费者去一个队列里取值时,只有第一个消费者在一直工作,
$channel->basic_qos(null, 1, null);
//第四个参数,是否不用确认,false为要确认,设置为false后,进程关闭,任务会继续执行
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
|
消费者2
与消费者1代码基本一样,只是在记录的时候为了做区分,改成222222。然后休眠时间改成2
'content' => '[222222] Received '.$msg->getBody(),
sleep(2);
测试
在url访问生产者接口,队列里有了数据后,再访问消费者1,消费者2的接口。到数据库看效果
消费者1每次存数据后sleep(1)秒,消费者2每次存数据后sleep(2)秒。就有了如下效果。消费者1处理的速度比消费者2处理的速度快,存入数据库的数据就多。