RabbitMQ 再深入 II

之前有多篇RabbitMQ的学习, 实践是检验真理的唯一标准, 所以本篇是实践篇
消息队列之 RabbitMQ
RabbitMQ 实战记录
RabbitMq 再深入

方法参数

1
2
// 1.1 建立连接
$conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  • $host: RabbitMQ服务器主机ID地址
  • $port: RabbitMQ服务器端口
  • $user: 连接RabbitMQ服务器的用户名
  • $password: 连接RabbitMQ服务器的用户密码
  • $vhost: 连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
1
2
// 1.2 建立信道
$channel = $conn->channel($channel_id);
  • $channel_id: 信道ID, 不传则获取$channel[“”]信道,再无则循环$this->channel数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids
1
2
3
4
5
6
7
8
9
10
11
12
// 1.3 声明交换机
$channel->exchange_declare(
$exchange_name,
$type,
$passive = false,
$durable = false,
$auto_delete = false,
$internal = false,
$nowait = false,
$arguments = array(),
$ticket = null
);
  • $exchange_name: 交换机名称
  • $type: 交换机类型
    • direct: (默认)直接交换器,工作方式类似于单播,exchange会将消息发送完全匹配route_keyqueue;
    • fanout: 广播式交换器,不管消息的route_key设置为什么,exchange都会将消息转发给所有绑定的queue;
    • topic: 主题交换机, 工作方式类似于组播, exchange会将消息转发和route_key匹配模式相同的所有队列;
    • headers: 根据消息体的header匹配
  • $passive: 是否检测同名队列
  • $durable: 交换机是否开启持久化
  • $auto_delete: 通道关闭后是否删除队列
1
2
3
4
5
6
7
8
9
10
11
// 1.4 声明队列
$channel->queue_declare(
$queue_name = '',
$passive = false,
$durable = false,
$exclusive = false,
$auto_delete = true,
$nowait = false,
$arguments = array(),
$ticket = null
);
  • $queue_name: 队列名称
  • $passive: 是否检测同名队列
  • $durable: 是否开启队列持久化
  • $exclusive: 队列是否可以被其他队列访问
  • $auto_delete: 通道关闭后是否删除队列
1
2
3
4
5
6
7
8
// 1.5 创建要发送的消息, 可以创建多个消息
$msg = new AMQPMessage($data, $properties);
// 单个发送
$channel->basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null);
// 多个发送
// 1.多次调用 $channel->batch_basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null)
// 内部实现:往$this->batch_messages[]塞
// 2.再调用一次$channel->publish_batch(), 完成发送
  • $data: 要发送的消息
  • $properties: 设置的属性, 比如设置该消息持久化 ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
  • $msg: 消息内容
  • $exchange: 交换机
  • $routing_key: 路由键
  • $mandatory: 匹配不到队列时,是否立即丢弃消息
  • $immediate: 队列无消费者时,是否立即丢弃消息
  • $ticket:
1
2
3
4
5
6
7
8
9
// 1.6 路由绑定
$channel->queue_bind(
$queue,
$exchange,
$routing_key = '',
$nowait = false,
$arguments = array(),
$ticket = null
);
  • $queue: 队列名
  • $exchange: 交换机名
  • $routing_key: 路由键
  • $nowait: 声明队列无需等待
  • $arguments: 其他参数
  • $ticket:
1
2
3
4
5
6
7
8
9
10
11
12
// 1.7 消费消息
$channel->basic_consume(
$queue = ',
$consumer_tag = ',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback = null,
$ticket = null,
$arguments = array()
);
  • $queue: 被消费队列名称
  • $consumer_tag: 消费者客户端身份标识,用于区分多个客户端
  • $no_local: 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
  • $no_ack: 收到消息后,是否不需要回复确认即被认为被消费
  • $exclusive: 收到消息后,是否不需要回复确认即被认为被消费
  • $nowait: 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
  • $callback: 回调逻辑处理函数
1
2
3
4
5
6
7
// 1.8 手动ack示例
$callback = function ($msg) {
sleep($msg->body);
echo " [x] Received sleep ", $msg->body, "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
echo " [x] Ack "."\n";
};
1
2
3
// 1.9 限制分发示例
// 限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
$channel->basic_qos(null, 1, null);

常用场景

无交换机, 直接队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public function send()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
}

public function consume()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}

工作队列按消费能力分发

1
2
3
// 生产者和消费者均增加
$channel->basic_qos(null, 1, null);
// 即可。

fanout广播示例 注册行为

例如注册后需要发送欢迎短信和邮件,将注册行为广播至短信和邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 生产者
// 定义交换机
$channel->exchange_declare('register', 'fanout', false, false, false);
$msg = new AMQPMessage('register event');
$channel->basic_publish($msg, 'register');

// 注册短信消费者
$channel->exchange_declare('register', 'fanout', false, false, false);
$channel->queue_declare('register.sms', false, false, false, false);
$channel->queue_bind('register.sms', 'register');

// 注册邮件消费者
$channel->exchange_declare('register', 'fanout', false, false, false);
$channel->queue_declare('register.mail', false, false, false, false);
$channel->queue_bind('register.mail', 'register');

topic模糊匹配示例 日志分级

例如我想一个消费者接受所有日志,一个消费者只接收error级别日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 定义交换机
$channel->exchange_declare('log', 'topic', false, false, false);
$num = rand(0, 10);
if ($num%3 == 0) {
$level = 'error';
} elseif ($num%3 == 1) {
$level = 'warning';
} else {
$level = 'common';
}
$msg = new AMQPMessage('log event ' . $level);
$channel->basic_publish($msg, 'log', 'log.'.$level);

// 全量日志消费者
$channel->exchange_declare('log', 'topic', false, false, false);
$channel->queue_declare('log.all', false, false, false, false);
$channel->queue_bind('log.all', 'log', 'log.*');

// error日志消费者
$channel->exchange_declare('log', 'topic', false, false, false);
$channel->queue_declare('log.error', false, false, false, false);
$channel->queue_bind('log.error', 'log', 'log.error');

headers匹配示例 日志分级

例如我想一个消费者接受所有日志,一个消费者只接收error级别日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 生产者
// 定义交换机
$channel->exchange_declare('log2', 'headers', false, false, false);
$num = rand(0, 10);
if ($num%3 == 0) {
$level = 'error';
} elseif ($num%3 == 1) {
$level = 'warning';
} else {
$level = 'common';
}
$msg = new AMQPMessage('log2 event '.$level);
$bindArguments = [
'level' => $level,
'type' => 'log'
];
$headers = new AMQPTable($bindArguments);
$msg->set('application_headers', $bindArguments);

$channel->basic_publish($msg, 'log2');

// 全量日志消费者
$channel->exchange_declare('log2', 'headers', false, false, false);
$channel->queue_declare('log2.all', false, false, false, false);
$bindArguments = [
'type' => 'log',
//'x-match' => 'any'
];
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.all', 'log2', '', false, $headers);

// error日志消费者
$channel->exchange_declare('log2', 'headers', false, false, false);
$channel->queue_declare('log2.error', false, false, false, false);
$bindArguments = [
'type' => 'log',
'level' => 'error',
'x-match' => 'any'
];
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.error', 'log2', '', false, $headers);

死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 定义一个没有消费者,5s后消息过期的队列
$arguments = new AMQPTable([
'x-dead-letter-exchange' => 'dead',
'x-message-tl' => 5000, // 消息存活时间(毫秒)
'x-dead-letter-routing-key' => 'dead'
]);

// 定义队列, 不要交换机
$channel->queue_declare('no_consume', false, false, false, false, false);
$now = time();
$msg = new AMQPMessage($now);
$channel->basic_publish($msg, '', 'no_consume');
echo " [x] Sent no_consume :".date('Y-m-d H:i:s', $now)."\n";
$channel->close();
$connection->close();

// 消费者
$channel->exchange_declare('dead', 'topic', false, false, false);
$channel->queue_declare('dead.all', false, false, false, false);
$channel->queue_bind('dead.all', 'dead', 'dead');
$channel->basic_qos(null, 1, null);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
var_dump('msg:'.date('Y-m-d H:i:s', $msg->body));
var_dump('now:'.date('Y-m-d H:i:s'));
echo " [x] Received log error ", $msg->body, "\n";
};
$channel->basic_consume('dead.all', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}

rabbitmq和redis用作消息队列的区别

  • 可靠性

redis: 没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中
rabbitmq: 没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中

  • 实时性

redis: 实时性高,redis作为高效的缓存服务器,所有数据都存在在服务器中,所以它具有更高的实时性

  • 持久性

redis: redis的持久化是针对于整个redis缓存的内容,它有RDB和AOF两种持久化方式(redis持久化方式,后续更新),可以将整个redis实例持久化到磁盘,以此来做数据备份,防止异常情况下导致数据丢失。
rabbitmq: 队列,消息都可以选择性持久化,持久化粒度更小,更灵活;

  • 总结:

redis: 轻量级, 低延迟, 高并发, 低可靠性;
rabbitmq: 重量级, 高可靠, 异步, 不保证实时性;

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2023 Keep It Simple And Stupid All Rights Reserved.

访客数 : | 访问量 :