音乐播放器
Great Wei
 
文章 标签
16

Powered by Gridea | Theme: Fog

RabbitMQ 简介

Beanstalkd

Beantalkd 一个轻量级消息中间件,他的最大特点是将自己定位为基于管道 (tube) 和任务 (job) 的工作队列 (work-queue)
有以下特点:

任务优先级 (priority):

任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn)。

最大堆与最小堆


堆树是一种二叉树,堆树中某个节点的值总是不大于或不小于其孩子节点的值。
当父节点的键值总是大于或等于任何一个子节点的键值时为最大堆。 当父节点的键值总是小于或等于任何一个子节点的键值时为最小堆。

延时任务 (delay):

有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时,或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with )。

任务超时重发 (time-to-run):

Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run)。

任务预留 (buried):

如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick 能够一次性把 n 条被保留的任务踢回队列。

消息持久化

通过日志实现消息的持久化。

速度优势


Beanstalkd协议基于tcp上。客户端连接服务器并发送指令和数据,然后等待响应并关闭连接。对于每个连接,服务器按照接收命令的序列依次处理并响应。

场景优势

延时系统,比如延迟20分钟发送短信,******,在投放的时候就设定一定的延迟时间值,让任务在延迟时间到了之后进入ready队列,等待worker预订处理。

轮询系统,如下图,一个被投放的任务,在延迟时间过后需要再检查一遍状态,如果不符合,继续释放(release with delay)为延迟投放状态(DELAYED),直到时间过期之后,再次进入ready队列,被worker预订,进行一些逻辑判断,比如微信银行卡退款是否成功,如果成功,删除该任务,如果没成功,继续释放(release with delay)为延迟投放状态。

缺陷

Beanstalk单点部署,不支持集群,当服务发送故障造成服务不可用,任务积压问题,不能够灵活的设置消息持久化。


RabbitMQ

Rabbitmq特性

可靠性:持久化存储、ACK消息确认、发布confirm、事务支持。
灵活的路由:交换机功能。交换机类型:direct,topic,headers,fanout。
镜像,master-slave 多协议支持,集群节点
多语言客户端支持:java、c#、ruby、Python、php、c、scale、nodejs、go、erlang…
管理界面功能丰富、命令行rabbitmqctl、RPC远程调度

AMQP高级消息协议

一个AMQP服务器类似于邮件服务器,exchange类似于消息传输代理(email里的概念),message queue类似于邮箱。Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息。

AMQP术语
Channel(信道): 在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。
​ 在多线程/进程的应用程序中正确做法是,对于每一个线程/进程,应分别建立一个通道,而不是多个线程/进程之间去共享一个通道。
Exchange(交换器):用于接受、分配消息;可以有好几种模式、相当于邮箱
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
Broker(消息代理):消息代理会接收来自生产者(publishers/producers)生产的消息,并将它们路由(route,可以理解成按指定规则转发)给相应的消费者(consumers)手中。
VHOST(虚拟主机):为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。

模拟器

hello world

生产者生产消息,消费者消费消息,消息存储在队列中。

生产者:

函数:
// 声明队列
public function queue_declare(
        $queue = '',            //队列名称
        $passive = false,      //如果只是需要知道该队列是否存在,传递true
        $durable = false,       //队列可持久话,如果希望节点关闭也不会丢失队列,传递true
        $exclusive = false,      //排他性,针对首次建立连接的,一个连接下面多个通道也是可见的, 对于其他连接是不可见的,断开连接后自动删除
        $auto_delete = true,   //自动删除,默认是开启的,最后一个消息被消费后自动删除队列
        $nowait = false,
        $arguments = array(), //可用于镜像队列配置
        $ticket = null
    ) {}

// 发布消息
     public function basic_publish(
        $msg,                       // 消息体
        $exchange = '',           // 交换器,默认direct。
        $routing_key = '',         // 路由key  
        $mandatory = false,      //当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body)
        $immediate = false,         //当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
        $ticket = null                     // 暂时没发现用途
    ) {}

消费者:

public function basic_consume(
        $queue = '',                        // 队列名称
        $consumer_tag = '',             // 消费者标签
        $no_local = false,                 // 
        $no_ack = false,                   // 消息ack
        $exclusive = false,
        $nowait = false,
        $callback = null,                   // 回调函数
        $ticket = null,
        $arguments = array()
    ) {}

工作队列

一个工作队列,该队列将用于在多个工作人员之间分配耗时的任务,默认情况下,RabbitMQ将按顺序将每个消费者都会收到相同数量的消息,采用循环发送的方式。

消息确认

执行任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始一项漫长的任务并仅部分完成而死掉,会发生什么情况。使用我们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除。在这种情况下,如果您杀死一个工人,我们将丢失正在处理的消息。我们还将丢失所有发送给该特定工作人员但尚未处理的消息。但是我们不想丢失任何任务。如果一个工人死亡,我们希望将任务交付给另一个工人。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。
如果使用者死了(其通道已关闭,连接已关闭或TCP连接丢失)而没有发送确认,RabbitMQ将了解消息未完全处理,并将重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一位消费者。这样,即使工人偶尔死亡,您也可以确保不会丢失任何消息。没有任何消息超时;消费者死亡时,RabbitMQ将重新传递消息。

如果当我们的消费者大量的消息内容忘记ack后,会照成rabbitMq将会消耗越来越多的内存,因为它无法释放任何未确认的消息。
rabbitmqctl list_queues name messages_ready messages_unacknowledged
以上命令可以查询队列中的为确认消息数量。

消息持久性

以上方法,已经能够解决消费者死亡,任务也不会丢失,但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失,要确保消息不会丢失,需要做两件事,将队列与消息都是标记为持久性。

// 队列持久化
$ channel-> queue_declare('task_queue',false,true,false,false);

// 消息持久化
$ msg = new AMQPMessage(
    $ data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT)
);

将消息标记持久化并不能完全保证不会丢失消息,应为RabbitMQ接收消息但尚未保存至磁盘中时还是有很短的时间,另外,RabbitMQ不会对每条消息都执行fsync(2),可能只是保存到缓存中,而没有真正写入磁盘,持久性保证并不强,但是对于简单的任务队列而已,已经足够了,如果需要更强有力的保证,则可以使用发布者确认

$channel->confirm_select(); //开启确认模式
$channel->basic_publish($msg, $exchangeName, $routeingKey);
 
//注册ack回调
$channel->set_nack_handler(function (AMQPMessage $msg) {
    var_dump('nack');
});
$channel->set_ack_handler(function (AMQPMessage $msg) {
    var_dump("ack");
});
//等待接收ack
$channel->wait_for_pending_acks();

公平派遣

您可能已经注意到,调度仍然无法完全按照我们的要求进行。例如,在有两名工人的情况下,当所有奇怪的消息都很重,甚至消息都很轻时,一位工人将一直忙碌而另一位工人将几乎不做任何工作。好吧,RabbitMQ对此一无所知,并且仍将平均分配消息。发生这种情况是因为RabbitMQ在消息进入队列时

为了克服这一点,我们可以将basic_qos方法与 prefetch_count = 1设置一起使用。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给尚不繁忙的下一个工作人员。

$ channel-> basic_qos(null,1,null);

发布订阅

一次发布,订阅的队列都能接收到消息进行处理,将繁琐的业务逻辑进行解耦。

每个订阅者就是一个队列,虽然这场景也可以通过一个队列进行所有业务操作,但是一旦一个复杂的业务发生故障,容易服务不可用任务积压,如果能够将业务拆分的更加小,每个队列处理自己的业务逻辑,能够很好解决这一问题,采用发布订阅模式,发布者不需要关系哪些队列关注这个任务,只要订阅了,自动回接收到任务。
有几种交换类型:direct,topic,headers,fanout。
生产者:

 $channel = $connection->channel();
    $channel->exchange_declare($exchangeName, 'fanout', false, false, false);
    $n = 1;
    while ($n < 10) {
        $msg = new \PhpAmqpLib\Message\AMQPMessage(
            "fanout message !!! $n"
        );
        $channel->basic_publish($msg, $exchangeName);
        $n ++;
    }
    $channel->close();
    $connection->close();

消费者:

$channel = $connection->channel();
    $channel->queue_declare($queueName, false, false, false, false);
    $channel->queue_bind($queueName, $exchangeName);
    $callback = function ($msg) {
        echo ' [x] ', $msg->body, "\n";
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connection->close();

路由


在发布订阅的基础上,有些队列并不需要接收所有消息,而是只对自己感兴趣的内容进行订阅,这就是路由的功能,生成消息时可以添加routing-key进行绑定,发送到指定队列。
生产者:

 $channel = $connection->channel();
    $channel->exchange_declare($exchangeName, 'direct', false, false, false);
    $n = 1;
    while ($n < 10) {
        $msg = new \PhpAmqpLib\Message\AMQPMessage(
            "direct routing message !!! $n"
        );
        $channel->basic_publish($msg, $exchangeName, $routingKey);
        $n ++;
    }
    $channel->close();
    $connection->close();

消费者:

$channel = $connection->channel();
    $channel->queue_declare($queueName, false, false, false, false);
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
    $callback = function ($msg) {
        echo ' [x] ', $msg->delivery_info['routing_key'], $msg->body, "\n";
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connection->close();

主题

路由功能虽然解决了,订阅者只接收自己订阅的内容,但是如果用户可能想要接收到,某一类相关的消息,就需要同topic功能, 主题交流不具有任意routing-key,routing-key必须是单词列表以"."分隔,单词可以使用"*"通配符表示,"#"绑定表示接收所有消息,和直接订阅效果一样。
生产者:

 $channel = $connection->channel();
    $channel->exchange_declare($exchangeName, 'topic', false, false, false);
    $n = 1;
    while ($n < 10) {
        $msg = new \PhpAmqpLib\Message\AMQPMessage(
            "topic routing message !!! $n"
        );
        $channel->basic_publish($msg, $exchangeName, $routingKey);
        $n ++;
    }
    $channel->close();
    $connection->close();

消费者:

 $channel = $connection->channel();
    $channel->queue_declare($queueName, false, false, false, false);
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
    $callback = function ($msg) {
        echo ' [x] ', $msg->delivery_info['routing_key'], $msg->body, "\n";
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connection->close();

RPC

有关RPC的说明
尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。
牢记这一点,请考虑以下建议:
确保明显的是哪个函数调用是本地的,哪个是远程的。
记录您的系统。明确组件之间的依赖关系。
处理错误案例。RPC服务器长时间关闭后,客户端应如何反应?
如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道-代替类似RPC的阻塞,将结果异步推送到下一个计算阶段。

客户端启动时,它将创建一个匿名排他回调队列。
对于RPC请求,客户端发送一条消息,该消息具有两个属性: reply_to(设置为回调队列)和correlation_id(设置为每个请求的唯一值)。
该请求被发送到rpc_queue队列。
RPC工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它将使用reply_to字段中的队列来完成工作,并将消息和结果发送回客户端。
客户端等待回调队列上的数据。出现消息时,它将检查correlation_id属性。如果它与请求中的值匹配,则将响应返回给应用程序。
客户端将请求发送到rpc_queue队列,需要传递请求唯一id,请求的回调队列。
rpc_service服务端,消费rpc_queue队列,接收到消息后进行逻辑处理,再将数据放入回调队列中。
rpc_client客户端,需要生产消息到rpc_queue,同时消费回调队列的结果correlation_id相同时返回结果。

集群

RabbitMQ始终记录的四种类型的内部元数据:
队列元数据 - 队列名称和他们的属性
交换器元数据 - 交换器名称、类型和属性
绑定元数据 - 一张简单的表格展示了如何将消息路由到队列
vhost元数据 - 为vhost内的队列、交换器、绑定提供命名空间和安全属性。

单机多节点部署

RABBITMQ_NODE_PORT:指定rmq端口
RABBITMQ_NODENAME:指定节点名称与域名
rabbitmq-server -detached 后台运行
RABBITMQ_SERVER_START_ARGS:启动服务参数
-rabbitmq_management listener [{port,15673}] 管理后台监听端口
-rabbitmq_stomp tcp_listeners [61614] rabbitmq_stomp tcp服务使用端口
-rabbitmq_mqtt tcp_listeners [1884]rabbitmq_mqtt tcp服务使用端口

第一个节点:
命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit@localhost rabbitmq-server -detached
http://localhost:15674/#/ 地址能够进入对应节点管理后台。

第二个节点:
注意:在单机部署多节点的时候,命令会有所不同,需要添加一些参数修改对应服务使用的端口,否则会出现端口冲突,节点无法启动。
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}] -rabbitmq_stomp tcp_listeners [61614] -rabbitmq_mqtt tcp_listeners [1884]" RABBITMQ_NODENAME=rabbit2@localhost rabbitmq-server -detached

第三个节点:
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}] -rabbitmq_stomp tcp_listeners [61615] -rabbitmq_mqtt tcp_listeners [1885]" RABBITMQ_NODENAME=rabbit2@localhost rabbitmq-server -detached

添加节点步骤:
rabbitmqctl -n rabbit1@localhost stop_app //暂停节点服务
rabbitmqctl -n rabbit1@localhost reset //节点重置,将节点所属的cluster都删除。
rabbitmqctl -n rabbit1@localhost join_cluster rabbit@localhost //将rabbit1节点加入集群 默认是disk 磁盘节点模式 --ram 可以指定为内存节点模式
rabbitmqctl -n rabbit1@localhost start_app //启动节点
rabbitmqctl cluster_status -n rabbit@localhost //查看集群情况
rabbit1只需要加入这个集群中任意一台节点就可以加入集群。

删除节点步骤:
方法一:
rabbitmqctl -n rabbit1@localhost stop_app //暂停节点服务
rabbitmqctl -n rabbit1@localhost reset //节点重置,将节点所属的cluster都删除。

方法二:
rabbitmqctl -n rabbit1@localhost stop_app //暂停节点服务
rabbitmqctl forget_cluster_node rabbit1@localhost //集群中移除节点

内存节点与磁盘节点的区别

网上说的云里雾里的,这边总结一下,只要队列和消息指定持久化,都会落地到磁盘中,内存节点和磁盘节点的区别就是将元数据放在了内存还是硬盘,仅此而已,当在集群中声明队列、交换器和绑定 ,这些操作会同步元数据到所有节点,所以一个集群至少要有一个磁盘节点来同步元数据。
元数据必须至少保存在一个硬盘上,内存节点重启会去磁盘节点下载当前集群元数据拷贝,磁盘节点全挂了,那么集群就无法创建新的东西了,但是还能继续使用已有的东西。

默认情况下,队列只会保存在一个节点上,其他只是保存元数据,当然消息也会投递到这个队列所在的机器上
所以我们才有了创建镜像队列的需求,镜像队列则需要队列适配了策略,当一个节点挂掉后,其他节点都会有这个节点的镜像队列,选择其中一个节点作为新的队列master。

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'
将所有队列都是设置为镜像队列并且自动同步,是否需要自动同步可以根据自己需求进行设置, 也可以设置需要镜像同步的节点数,个人感觉一个master节点一个mirror节点就足够了。

负载均衡

HAProxy 提供高可用性、负载均衡以及基于 TCP 和 HTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案。
配置项:

global #全局属性
    daemon  #以daemon方式在后台运行
    maxconn 256  #最大同时256连接
   

defaults #默认参数
    mode http  #http模式
    timeout connect 5000ms  #连接server端超时5s
    timeout client 50000ms  #客户端响应超时50s
    timeout server 50000ms  #server端响应超时50s
#绑定配置
listen rabbitmq_cluster 
    bind 0.0.0.0:5670
    #配置TCP模式
    mode tcp
    #加权轮询
    balance roundrobin
    #RabbitMQ集群节点配置
    server node1 0.0.0.0:5672 check inter 2000 rise 2 fall 3
    server node2 0.0.0.0:5673 check inter 2000 rise 2 fall 3
    server node3 0.0.0.0:5674 check inter 2000 rise 2 fall 3

#haproxy监控页面地址
listen monitor
    bind  0.0.0.0:8100
    mode http
    option httplog
    stats enable
    stats uri /stats
    stats refresh 5s

haproxy -f /usr/local/haproxy/conf/haproxy.cnf

镜像队列

交换器和绑定始终可视为在所有节点上,队列可以选择性的跨多个节点进行镜像。每个镜像队列由一个master和一个或者多个mirrors组成,主节点位于一个通常称为master的节点,每个队列都有自己的主节点。

延迟队列


通过死信队列与消息的过期时间实现延时队列。
生产者:

$channel = $connection->channel();

$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_exchange', 'direct',false,false,false);

$tale = new AMQPTable();
$tale->set('x-dead-letter-exchange', 'delay_exchange');
$tale->set('x-dead-letter-routing-key','delay_exchange');
$tale->set('x-message-ttl',10000);

$channel->queue_declare('cache_queue',false,true,false,false,false,$tale);
$channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');

$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
$msg = new AMQPMessage('Hello World'.$argv[1],array(
    'expiration' => intval($argv[1]),
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
));
$channel->basic_publish($msg,'cache_exchange','cache_exchange');
echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;
$channel->close();
$connection->close();

消费者:

$channel = $connection->channel();
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_exchange', 'direct',false,false,false);
$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');

echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;

$callback = function ($msg){
    echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;

     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

};

//只有consumer已经处理并确认了上一条message时queue才分派新的message给它
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delay_queue','',false,false,false,false,$callback);


while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

死信队列

一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。
2.1 消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2.2 上面的消息的TTL到了,消息过期了。
2.3 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

优先级队列

发布消息时可以设置优先级。

$msg = new AMQPMessage("Hello World!", array(
 'delivery_mode' => 2,         // 设置消息持久化
 'priority' => 1,                   // 设置优先级
));

使用插件的延时队列

生产者:

$conn = new AMQPConnection($connectConfig);
    $conn->connect();
    $channel = new AMQPChannel($conn);
    $exchange = new AMQPExchange($channel);
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message');    //x-delayed-message类型
    /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    $channel->startTransaction();
    //RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错
    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //绑定队列和交换机
    $queue->bind($params['exchangeName'], $params['routeKey']);
    $channel->commitTransaction();

    for($i=1;$i <= 5;$i++){
        //生成消息
        echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;
        echo 'i='.$i.',延迟'.$i.'秒'.PHP_EOL;
        $message = json_encode(['order_id'=>time(),'i'=>$i]);
        $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 2000*$i]]);
        sleep(2);
    }
    $conn->disconnect();

消费者:

 $channel = $connection->channel();
    $queueName = 'delayed_queue_test';
    $callback = function ($msg) {
        echo ' [x] Received ', $msg->body, "\n";
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    while ($channel->is_consuming()) {
        $channel->wait();
    }

思维脑图

脑图


请到客户端“主题--自定义配置--配置”中填入leancloud_appID和key