PHP使用RabbitMQ + 延迟队列的实现(死信队列)

August 28, 2019 · PHP · Linux · 69次阅读

最近小弟学了一下RabbitMQ,在PHP上简单的使用

可怜的小弟我之前都是用redis队列的,自从被面试官吊打说如何保证redis队列的消息的可靠性,木得办法啊redis本身就不是专业做消息队列中间件的,只能被吊打了,所以,就下点功夫去学习了下 RabbitMQ,为此我还学了beanstalkd,这个下次跟你们讲讲。这次先来说 RabbitMQ 吧,这次学习会用上 PHP + Composer + RabbitMQ + Docker

消息队列中间件,干嘛用的应该不用介绍了吧,就是为了异步去执行我业务的,让程序解耦。

首先呢 学习RabbitMQ,我们需要安装对吧,这里呢,我为了快点就用docker去安装了,废话不多说我们动手吧。

什么是Docker

Docker 是一个开源的应用容器引擎,你可以将其理解为一个轻量级的虚拟机,开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上。只要有了Docker,用户便不再需要为这些应用配置其所需的特有的环境了,也不需要为这些应用统一环境了。说白了就是服务器装个虚拟机啦。

为什么要用Docker呢?有啥好处呢?
可以认为Docker是一个通用安装程序。简单来说,利用Docker容器,可以将任何一个或者多个程序封装起来,并提供标准的管理接口。因为使用了容器,所以可以很方便的把生产环境和开发环境分开,互不影响,这样,开发人员负责维护内容,并使用Docker进行封装,系统管理人员利用Docker的标准接口进行部署和管理。

Docker 的主要用途,目前有三大类。
(1)提供一次性的环境。比如,本地测试他人的软件、持续集成的时候提供单元测试和构建的环境。
(2)提供弹性的云服务。因为 Docker 容器可以随开随关,很适合动态扩容和缩容。
(3)组建微服务架构。通过多个容器,一台机器可以跑多个服务,因此在本机就可以模拟出微服务架构。

Docker 安装

  1. 更新update到最新的版本 命令: yum update
  2. 安装docker 命令: yum -y install docker
  3. 启动docker 命令: systemctl start docker
  4. 加入开机自启 命令: systemctl enable docker

就这样一顿操作猛如虎之后,我们就可以看看docker安装成功没有 输入
命令: docker -v
如果出现版本信息就是安装成功了,就OK完美了,接下来我们来装 RabbitMQ

启动RabbitMQ 就会自动搜索
docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

参数说明 :

  1. -d 开启守护进程,程序后台运行
  2. -p 5672:5672: 将主机的5672端口,映射到容器的5672端口,用于外网访问容器
  3. -p 15672:15672 就是映射一个界面化访问的端口,我们可以通过匿名的 IP:15672 去访问RabbitMQ的界面后台
  4. --name rabbit : 容器名字

这是我访问的 RabbitMQ的 后台 默认账号密码都是: guest

1.png

到此我吗就按照完毕了,现在我们就来使用了

这是呢先介绍一下 AMQP协议 PS:这都是些概念性的东西,当然可以跳过,直接看下面的动手PHP上手使用

图片1.png

  • RabbitMQ Server:也叫broker server,它是一种传输服务。 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。
  • Producer: 消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服务器然后将消息投递到Exchange。
  • Consumer:消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。
  • Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有fanout、direct、topic、headers四种类型,每种类型对应不同的路由规则,后面详细介绍这四种类型。
  • Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
  • RoutingKey:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。
  • Connection:(连接)。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
  • Channels:(信道)。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

下面来 撸PHP的代码:
首先我们用 composer 安装所需的包
composer require php-amqplib/php-amqplib

安装好以后动手莽就对了,队列嘛,都是那样一个生产者一个消费者,不懂的去看看我之前写redis队列的实现

先来一手 生产者的代码 :

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//这是个生产者的队列
$rb = new AMQPStreamConnection('127.0.0.1','5672','guest','guest');

//创建通道
$tongdao = $rb->channel();

$queueName = 'aa'; //队列名称aa
$exchangeName = 'jhj'; //交换机名称
$route_key = 'ly';//路由名称
//创建交换机
/**
 * 创建交换机(Exchange)
 * name: task      交换机名称
 * type: direct     交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。
 * passive: false   如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
 * durable: false  是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
 * auto_delete: false  是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
 */
$tongdao->exchange_declare($exchangeName
                        ,\PhpAmqpLib\Exchange\AMQPExchangeType::DIRECT, false, true, false);

/**
* 创建队列(Queue)
* name: hello     队列名称
* passive: false  如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
* durable: true 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
* exclusive: false  是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
* auto_delete: false  是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
*/
$tongdao->queue_declare($queueName, false, true, false, false);

/**
* 绑定消息交换机和队列
* queueName    队列名称
* exchangeName 交换机名称
* routeKey     路由key
*/
$tongdao->queue_bind($queueName,$exchangeName,$route_key);

//消息内容    
$data = [
    'time'  => date('Y-m-d H:i:s'),
    'id'    => session_create_id(md5(uniqid()))
];

/**
 * 创建AMQP消息类型 
 * delivery_mode 消息是否持久化
 *       AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化
 *       AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化
 */
$msg = new AMQPMessage(json_encode($data),

['delivery_mode' => PhpAmqpLibMessageAMQPMessage:: DELIVERY_MODE_NON_PERSISTENT]);


/**
 * 发布消息到交换机当中,并且绑定好路由关系
 * $msg             amqp消息类型
 * $exchangeName  交换机名称
 * $routeKey        路由key
 */
$tongdao->basic_publish($msg,$exchangeName,$route_key);

这生产者的代码,也就是说如何使用php给RabbitMQ 插入队列啦
下面这段是消费者的代码,就是从RabbitMQ,上拿到相应的数据做业务处理:

<?php
 require_once __DIR__.'/vendor/autoload.php';
 use PhpAmqpLib\Connection\AMQPStreamConnection;

 //这是个 消费者的队列也就是说

 // 连接 Rabbitmq
 $rabb = new AMQPStreamConnection('127.0.0.1','5672','guest','guest');

 //创建通道
 $channel = $rabb->channel();

// 队列名称
 $QueueName = 'aa';
 /**
 * queue:                被消费的队列名称
 * consumer_tag:   消费者客户端身份标识,用于区分多个客户端
 * no_local: false    这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
 * no_ack: true       收到消息后,是否不需要回复确认即被认为被消费
 * exclusive: false   是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
 * nowait: false    不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
 * callback: $callback     回调逻辑处理函数
 */
$channel->basic_consume($QueueName, 'test', false, false, false, false, function ($message){
    // $message 队列里的消息对象
    // $message->body 消息数据 
    var_dump($message->body);
    echo "处理业务\n";
    //ack 应答 这是手动的ACK 应答 确保消息不会丢失 确保 消息的可靠性
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
});

//监听方法 监听有没队列数据
while ($channel->is_consuming()) {
    //阻塞消费 开始处理队列的数据
    $channel->wait();
}

$rabb->close(); //关闭连接
$channel->close(); //关闭通道

也不是很复杂的代码,用起来还是挺简单的,使用我们是实现了可是 RabbitMQ 不支持延迟队列怎么办,下面就得说到我们的死信队列了

Rabbitmq延迟队列实现原理:
由于rabbitMQ本身并没有提供延迟队列的功能,所以需要借助一些特性来实现延迟队列,利用rabbitMQ

1、rabbitMQ可以针对 Queue和Message 设置 x-message-ttl 来控制消息的生存时间,如果超时,消息变为 dead letter(死信)

2、rabbitMQ 的queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing两个参数,来控制队列出现 dead letter 的时候,重新发送消息的目的地

比如:设置一条消息存活时间为10秒钟,那么10秒钟之内,如果没有消费者消费,这条消息就会变为dead letter(死信),就会把该消息转发到指定交换机跟队列当中,然后被消费

在队列上指定一个Exchange,则在该队列上发生如下情况,
1.消息被拒绝(basic.reject or basic.nack),且requeue=false
2.消息过期而被删除(TTL)
3.消息数量超过队列最大限制而被删除
4.消息总大小超过队列最大限制而被删除

就会把该消息转发到指定的一个exchange同时也可以指定一个可选的x-dead-letter-routing-key,表示默认的routing-key,如果没有指定,则使用消息的routeing-key(也跟指定的exchange有关,如果是Fanout类型的exchange,则会转发到所有绑定到该exchange的所有队列)

这时候我们只要修改下我们的生产者代码就好,君请看 :

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();

//判断运行脚本是否带参数
if (isset($argv[1])){
    $expiration =  intval($argv[1]);
}else{
    $expiration = 10000;
}

$cache_exchange_name = 'cache_exchange'.$expiration;//死信交换机名
$cache_queue_name = 'cache_queue'.$expiration;// 死信队列名称

//创建一个交换机
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
//创建死信队列交换机
$channel->exchange_declare($cache_exchange_name, 'direct',false,false,false);
//设置死信队列信息
$tale = new AMQPTable();
$tale->set('x-dead-letter-exchange', 'delay_exchange'); //死亡后发送到的交换机
$tale->set('x-dead-letter-routing-key','delay_exchange'); //死亡后发送的到的队列
$tale->set('x-message-ttl',$expiration); //死信队列生存时间
//创建死信队列
$channel->queue_declare($cache_queue_name,false,true,false,false,false,$tale);
//死信队列绑定交换机
$channel->queue_bind($cache_queue_name, $cache_exchange_name,'');
//创建死信队列 死亡之后要运行的队列
$channel->queue_declare('delay_queue',false,true,false,false,false);
//绑定队列交换机
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');

//创建 AMQP消息类型
$msg = new AMQPMessage('Hello World'.$argv[1],array(
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
));
//消息发布到交换机中
$channel->basic_publish($msg,$cache_exchange_name,'');
echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;

$channel->close();
$connection->close();

这是新的生产者 通过php send.php 1000(时间毫秒)

新的消费者

<?php
 require_once __DIR__.'/vendor/autoload.php';
 use PhpAmqpLib\Connection\AMQPStreamConnection;

 $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
 $channel = $connection->channel();
 
 //创建交换机
 $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
 $channel->exchange_declare('cache_exchange', 'direct',false,false,false);
 
 //创建队列并绑定交换机
 $channel->queue_declare('delay_queue',false,true,false,false,false);
 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
 
 echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;
 
 //设置消息队列回调
 $callback = function ($msg){
     echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
 
      $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
 
 };
 
 //只有consumer已经处理并确认了上一条message时queue才分派新的message给它
 $channel->basic_qos(null, 1, null);
 //设置消费队列的信息
 $channel->basic_consume('delay_queue','',false,false,false,false,$callback);
 
 //监听队列
 while (count($channel->callbacks)) {
     $channel->wait();//消费队列
 }
 $channel->close();
 $connection->close();

以上就是代码呢 ,分别开两个窗口调试,效果如下

4.png

喝杯水

标签:none

最后编辑于:2019/08/28 19:05

添加新评论

  1. 李大爷 李大爷
    2019-08-30 16:14

    看得两眼一抹黑

    回复
    1. 2019-09-01 15:48

      有问题可以提出来哦,大佬

      回复

控制面板