我们 要消费 Queues 下 Virtual host 名称为cloud_platform 队列name为 biz_cloud_info
这个时候 需要程序里面设置消费队列的基本信息 host ,port user 还有password vhost 例如
'mq' => [
'host' => '0.0.0.0',
'port' => '5672',
'user' => 'crmtest',
'password' => '123321',
'vhost' => 'crmtest'
],
还需要设置是那个队列
//交付信息同步队列
'read_biz_cloud' => [
'exchange_name' => 'ex_cloud_platform', 那个通道
'queue_name' => 'biz_cloud_info', 队列名称
'route_key' => 'rk_biz_cloud_info',
'consumer_tag' => ''
],
当然这些信息都是要放到config.php里面
<?php
require_once 'vendor/autoload.php';
require_once 'common/mq.php';
require_once 'common/redis.php';
$config=require_once "config.php";
$handle=new mq();
$handle->connect(array_merge($config['mq'],$config['read_biz_cloud']));
$process = function ($msg) {
$msgArr = json_decode($msg->body, true);
//---------------+8商标-----------------------------------
if ($msgArr['q_type'] == 'biz_userbrandinfo') {
require_once 'lib/biz_userbrandinfo.php';
$m= new biz_userbrandinfo();
if($m->handle($msgArr['q_data'],$msgArr['q_id'])){
echo 'biz_userbrandinfo ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步成功'."\n";
redisLog('rabbitmq_queue同步成功[biz_userbrandinfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步成功[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'1','1',date('Y-m-d H:i:s'));
}else{
echo 'biz_userbrandinfo ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步失败'."\n";
redisLog('rabbitmq_queue同步失败[biz_userbrandinfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步失败[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'1','0',date('Y-m-d H:i:s'));
$m->handle_error($msgArr);
}
}
//---------------商标流程-----------------------------------
if ($msgArr['q_type'] == 'order_sync_biz') {
require_once 'lib/order_sync_biz.php';
$m= new order_sync_biz();
if($m->handle($msgArr['q_data'],$msgArr['q_id'])){
echo 'order_sync_biz ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步成功'."\n";
redisLog('rabbitmq_queue同步成功[order_sync_biz]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步成功[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'4','1',date('Y-m-d H:i:s'));
}else{
echo 'order_sync_biz ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步失败'."\n";
redisLog('rabbitmq_queue同步失败[order_sync_biz]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步失败[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'4','0',date('Y-m-d H:i:s'));
$m->handle_error($msgArr);
}
}
//-----------------------版权--------------------------------------
if ($msgArr['q_type'] == 'biz_usercopyrightinfo') {
require_once 'lib/biz_usercopyrightinfo.php';
$m=new biz_usercopyrightinfo();
if($m->handle($msgArr['q_data'])){
echo 'biz_usercopyrightinfo['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步成功'."\n";
redisLog('rabbitmq_queue同步成功[biz_usercopyrightinfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步成功[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'2','1',date('Y-m-d H:i:s'));
}else{
echo 'biz_usercopyrightinfo['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步失败'."\n";
redisLog('rabbitmq_queue同步失败[biz_usercopyrightinfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步失败[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'2','0',date('Y-m-d H:i:s'));
$m->handle_error($msgArr);
}
}
//--------------------------专利---------------------------------
if ($msgArr['q_type'] == 'biz_userpatentinfo') {
require_once 'lib/biz_userpatentinfo.php';
$m=new biz_userpatentinfo();
if($m->handle($msgArr['q_data'])){
echo 'biz_userpatentinfo['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步成功'."\n";
redisLog('rabbitmq_queue同步成功[biz_userpatentinfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步成功[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'3','1',date('Y-m-d H:i:s'));
}else{
echo 'biz_userpatentinfo['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步失败'."\n";
redisLog('rabbitmq_queue同步失败[biz_userpatentinfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步失败[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'3','0',date('Y-m-d H:i:s'));
$m->handle_error($msgArr);
}
}
//--------------------------更新状态---------------------------------
if ($msgArr['q_type'] == 'CloudProcessStatusInfo') {
require_once 'lib/cloud_biz_info.php';
$m= new cloud_biz_info();
if($m->handle($msgArr['q_data'],$msgArr['q_id'])){
echo 'CloudProcessStatusInfo ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步成功'."\n";
redisLog('rabbitmq_queue同步成功[CloudProcessStatusInfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步成功[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'5','1',date('Y-m-d H:i:s'));
}else{
echo 'CloudProcessStatusInfo ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步失败'."\n";
redisLog('rabbitmq_queue同步失败[CloudProcessStatusInfo]:'.$msg->body);
$pid=isset($msgArr['q_data']['PID'])?$msgArr['q_data']['PID']:'null';
redisLog('rabbitmq_queue同步失败[q_id]:'.$msgArr['q_id']);
$m->sync_queue_log($pid,$msgArr['q_id'],'5','0',date('Y-m-d H:i:s'));
$m->handle_error($msgArr);
}
}
//---------------处理过程-----------------------------------
if ($msgArr['q_type'] == 'biz_cloudfileconfirm'){
require_once 'lib/biz_confirm.php';
$m= new biz_confirm();
if($m->handle($msgArr['q_data'],$msg->body)){
echo 'biz_confirm ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步成功'."\n";
redisLog('rabbitmq_queue同步成功[biz_confirm]:'.$msg->body);
}else{
echo 'biz_confirm ['.$msgArr['q_id'].']['.date('Y-m-d H:i:s').']同步失败'."\n";
redisLog('rabbitmq_queue同步失败[biz_confirm]:'.$msg->body);
$m->handle_error($msgArr);
}
}
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//-----------------------------------------------------------
// die;
};
$handle->consume($process);
这东西 如果要测试 ,同时不消耗的话, 就把$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 注释掉, 同时die;y一下。
需要注意的主要事情就是 有些消息一进来就 “Unacked” 我只会一种方式处里
查看 Consumers 下有几个消费者 ,一一点进去
点击 Details 下的Connection IP进去 下拉点击 按钮 force close
自己留下痕迹 ,自己用, 别人看不懂 概不负责