一、Beanstalkd是什么?

Beanstalkd是一个高性能,轻量级的分布式内存队列

 

二、Beanstalkd特性

1、支持优先级(支持任务插队)
2、延迟(实现定时任务)
3、持久化(定时把内存中的数据刷到binlog日志)
4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)

 

三、Beanstalkd核心元素

生产者 -> 管道(tube) -> 任务(job) -> 消费者

Beanstalkd可以创建多个管道,管道里面存了很多任务,消费者从管道中取出任务进行处理。

 

四、任务job状态

delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态

Beanstalkd消息队列的安装与使用

五、安装Beanstalkd

1

http://kr.github.io/beanstalkd/download.html

下载beanstalkd-1.10.tar.gz

1

2

3

> tar -xf beanstalkd-1.10.tar.gz

> cd beanstalkd-1.10

> make

查看beanstalkd参数信息

1

> ./beanstalkd -h

启动beanstalkd

1

> ./beanstalkd -l 127.0.0.1 -p 11300 -b /data/beanstalkd/binlog &

-b表示开启binlog,断电后重启自动恢复任务  

 

六、下载Pheanstalk类

首先安装composer

1

2

3

> curl -sS https://getcomposer.org/installer | php

> mv composer.phar /usr/local/bin/composer

> composer require pda/pheanstalk

 编写一个简单脚本查看信息

1

2

3

4

5

6

7

8

<?php

require './vendor/autoload.php';

 

use Pheanstalk\Pheanstalk;

 

$p = new Pheanstalk('127.0.0.1', 11300);

//查看beanstalkd当前的状态信息

var_dump($p->stats());

  

七、Pheanstalk使用方法

维护方法

1

2

3

4

5

6

7

stats() 查看状态方法

listTubes() 目前存在的管道

listTubesWatched() 目前监听的管道

statsTube() 管道的状态

useTube() 指定使用的管道

statsJob() 查看任务的详细信息

peek() 通过任务ID获取任务

生产者方法

1

2

putInTube() 往管道中写入数据

put() 配合useTube()使用

消费者方法

1

2

3

4

5

6

7

8

9

10

11

12

13

14

watch() 监听管道,可以同时监听多个管道

ignore() 不监听管道

reserve() 以阻塞方式监听管道,获取任务

reserveFromTube()

release() 把任务重新放回管道

bury() 把任务预留

peekBuried() 把预留任务读取出来

kickJob() 把buried状态的任务设置成ready

kick() 批量把buried状态的任务设置成ready

peekReady() 把准备好的任务读取出来

peekDelayed() 把延迟的任务读取出来

pauseTube() 给管道设置延迟

resumeTube() 取消管道延迟

touch() 让任务重新计算ttr时间,给任务续命

生产者producer.php代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

<?php

require './vendor/autoload.php';

 

use Pheanstalk\Pheanstalk;

 

//创建一个Pheanstalk对象

$p = new Pheanstalk('192.168.1.222', 11300);

 

$data = array(

    'id' => 1,

    'name' => 'test',

);

 

//向userReg管道中添加任务,返回任务ID

//put()方法有四个参数

//第一个任务的数据

//第二个任务的优先级,值越小,越先处理

//第三个任务的延迟

//第四个任务的ttr超时时间

$id = $p->useTube('userReg')->put(json_encode($data));

//获取任务

$job = $p->peek($id);

//查看任务状态

print_r($p->statsJob($job));

消费者consumer.php代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

<?php

require './vendor/autoload.php';

 

use Pheanstalk\Pheanstalk;

 

//创建一个Pheanstalk对象

$p = new Pheanstalk('192.168.1.222', 11300);

 

//监听userReg管道,忽略default管道

$job = $p->watch('userReg')->ignore('default')->reserve();

 

$data = json_decode($job->getData());

//打印任务中的数据

print_r($data);

 

//最后删除任务,表示任务处理完成

$p->delete($job);

  

相关文章: