RabbitMQ 死信队列 延迟队列 惰性队列

本篇是RabbitMQ 再深入III

死信队列

死信队列: DLX, dead-letter-exchange

死信队列的产生

  1. 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  2. 消息TTL过期
  3. 队列达到最大长度

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
<?php

namespace RabbitMQ;

require_once '../vendor/autoload.php';

use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

/**
* Class RabbitMQ
* @package RabbitMQ
*/
class RabbitMQ
{
/**
* 生产延迟消息队列
* @param array $message_data
* @param int $ttl 单位: 秒
* @param int $max_delay_second 单位: 秒
* @param bool $need_time_suffix
* @throws Exception
*/
public function delayProduce(array $message_data, int $ttl, int $max_delay_second = 0, bool $need_time_suffix = false)
{
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'banana', '123456', 'banana');

$channel = $connection->channel();

$queue = 'dead_letter_queue';
$exchange_name = 'dead_letter_exchange';

$delay_table = new AMQPTable();
// 设置死信交换机
$delay_table->set('x-dead-letter-exchange', "delay_$exchange_name");
// 设置死信路由键
$delay_table->set('x-dead-letter-routing-key', "delay_$queue");
// 死信队列消息存活时间, 单位: 毫秒
$delay_table->set('x-message-ttl', $max_delay_second * 1000);

$suffix = $need_time_suffix ? "_{$ttl}" : '';

// 普通队列, 消息过期写入死信队列
$channel->queue_declare("ttl_{$queue}{$suffix}", false, true, false, false, false, $delay_table);
// 这里是ttl队列, 所以交换机这里要durable是false
$channel->exchange_declare("ttl_{$exchange_name}{$suffix}", 'direct', false, false, false, false);
$channel->queue_bind("ttl_{$queue}{$suffix}", "ttl_{$exchange_name}{$suffix}", "ttl_{$queue}{$suffix}");

// 死信队列, 正常消耗
$channel->queue_declare("delay_$queue", false, true, false, false, false);
$channel->exchange_declare("delay_{$exchange_name}", 'direct', false, true, false);
$channel->queue_bind("delay_{$queue}", "delay_{$exchange_name}", "delay_{$queue}");

// 消息过期时间
// 1. 若消息在队列头: 消息过期时间与队列消息过期时间取最小值
// 例如: x-message-ttl=30000, expiration=5000, 则5秒过期
// 2. 若消息在队列中: 先消耗之前的消息,才能消耗,即使已过期也继续等待
// 例如: x-message-ttl=30000 expiration: 消息1=25000 消息2=5000, 则两个消息均25秒过期
$message = new AMQPMessage(
serialize($message_data),
[
'content_type' => 'text/plain',
'expiration' => $ttl * 1000, // 单位: 毫秒
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);

$channel->basic_publish($message, "ttl_{$exchange_name}{$suffix}", "ttl_{$queue}{$suffix}");

$channel->close();
$connection->close();
}
}

$message_data = ['Test Dead Letter'];
$ttl = 5;
$max_delay_second = 30;
$need_time_suffix = false;

try {
(new RabbitMQ())->delayProduce($message_data, $ttl, $max_delay_second, $need_time_suffix);
} catch (Exception $e) {
echo "异常消息: " . $e->getMessage();
}

延迟队列

使用DLX(死信交换机)+TTL(消息超时时间)实现

假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

惰性队列

惰性队列会尽可能的将消息存入磁盘中,在消费者消费到相应的消息时才会被加载到内存中。

1
2
3
4
$table = new AMQPTable();
$table->set('x-queue-mode', "lazy");
$channel->queue_declare($queueName, false, true, false, false, false, $table);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
  • 优点

      1. 它可以存储更多消息支持更长队列因为消息在硬盘中。
      1. 惰性队列可以避免消息堆积导致的内存崩溃。
  • 缺点

      1. 需要i/o 增加磁盘i/o。

参考

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2022 Keep It Simple And Stupid All Rights Reserved.

访客数 : | 访问量 :