消息队列之 RabbitMQ

简介

RabbitMQ

  • RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
  • RabbitMQ is the most widely deployed open source message broker.
  • RabbitMQ Get Started

基础特性

    1. 可靠性

消息持久化、消息发送和投递确认机制、集群高可用方案

    1. 灵活路由

消息通过exchange的方式路由到不同的queue中,提供了包括fanout,direct,topic等多种exchange实现,并且支持通过编写 exchange

    1. 支持集群

同网段下的rabbitmq节点可以通过集群的方法,组成一个逻辑上的单一broker

    1. Federation

通过Federation可以在跨网段节点间组建集群

    1. 高可用消息队列

通过设置镜像队列的方式, 消息可以在镜像队列间进行复制, 使节点宕机或硬件损坏的情况下保证队列服务的高可用

    1. 多客户端支持

JAVA, .NET, Ruby, Python, PHP, Node, Go……

    1. 可视化管理界面

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

基础概念

  • Broker

经纪人. 提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。

  • Producer

消息生产者. 主要讲消息投递到对应的Exchange上面,一般是独立的程序

  • Consumer

消息消费者. 消息的接受者, 一般是独立的程序

  • Message

消息. 消息是不具名的, 它由消息头和消息体组成, 消息体是不透明的, 而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等

  • Channel

信道,多路复用连接中的一条独立的双向数据流通道。
信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。
对于操作系统来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive

  • Exchange

消息交换机. 指定消息按照什么规则路由到哪个队列Queue
Producer将并不能直接将消息投递到Queue中。需要将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者没有绑定Queue的情况下将消息丢弃)。

  • Queue

消息队列. 消息的载体, 每条消息都会被投送到一个或多个队列中
RabbitMQ中的消息都只能存在在Queue中, Producer生产消息并通过Exchange投递到Queue, Comsumer可以通过ExchangeQueue中获取消息并消费
多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

  • Binding

绑定. 作用就是将ExchangQueue按照某种路由规则绑定起来. 这样RabbitMQ就知道如何正确地将消息路由到指定的Queue

  • Routing Key

路由键. 生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Typebinding key联合使用才能最终生效。

Binding Key

在绑定(BindingExchangeQueue的同时,一般会指定一个binding key; 消费者将消息发送给Exchange时,一般会指定一个routing key; 当binding keyrouting key相匹配时,消息将会被路由到对应的Queue中.

MQ

  • 消息队列(Message Queue, 简称MQ), 从字面意思上看,本质是个队列, FIFO先入先出,只不过队列中存放的内容是Message而已.
    其主要用途: 不同进程Process/线程Thread之间通信
  • 为什么会产生消息队列?

    • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程, 为了隔离这两个进程,在两进程间抽离出汗一层(一个模块), 所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
    • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

AMQP

AMQP,即Advanced Message Queuing Protocol, 高级消息队列协议
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

搭建RabbitMQ环境

  • 安装Erlang

Erlang Download

  • 安装RabbitMQ

Downloading and Installing RabbitMQ

  • 安装完成,启用管理工具

    • 双击: RabbitMQ Command Prompt
    • 输入: rabbitmq-plugin enable rabbitmq_management
    • 这样就启动了管理工具, 试一下
      • 停止: net stop RabbitMQ
      • 启动: net start RabbitMQ

windows下安装步骤
打开 cmd,进入rabbitmqsbin目录
执行 rabbitmq-plugins.bat enable rabbitmq_management

  • 在浏览器中输入地址查看: http://127.0.0.1:15672
  • 使用默认账号登录: guest/guest

基础概念

Connection 与 Channel : 连接与信道

connection 是指的物理的连接, 一个clinet与一个server之间有一个连接;一个连接上可以建立多个channel,可以理解为逻辑上的连接. 一般应用的情况下,有一个channel就够用了,不需要创建更多的channel

  • 示例代码
1
2
3
4
5
6
7
8
9
10
// 创建连接和信道
$connection = new AMQPStreamConnection(
$host, $port, $user, $pass, $vhost
);

if (!$connection->isConnected()) {
dd('Connection failed');
}

$channel = new AMQPChannel($connection);

Exchange 与 RoutingKey : 交换机与路由键

为了将不同类型的message进行区分, 设置了Exchange交换机与Route路由两个概念. 比如A类型message发送到名为*C1的交换机,将类型为B的发送到C2的交换机.
当客户端连接
C1*处理队列消息时取到的就只是A类型message.进一步的,如果A类型message也非常多,需要进一步细化区分,比如某个客户端只处理A类型message中针对K用户的messageroutingKey就是来做这个用途的。

  • 示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
// 交换机名
$exchange_name = 'exchange_1';
// 路由键
$route_key = ['key_1', 'key_2'];
// 创建交换机
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchange_name);
// 交换机类型: direct类型
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 持久化
$exchange->setFlags(AMQP_DURABLE);

echo "Exchange Status:" . $exchange->declareExchange() . "\n";

由以上代码可以看到,发送消息时,只要有“交换机”就够了。至于交换机后面有没有对应的处理队列,发送方是不用管的。routingkey可以是空的字符串。在示例中,我使用了两个key交替发送消息,是为了下面更便于理解routingkey的作用。

对于交换机,有两个重要的概念:

交换机(Exchange): 可以理解为具有路由表的路由程序。每个消息都有一个路由键(routing key), 就是一个简单的字符串.交换机中有一系列的绑定(binding), 即路由规则.交换机可以有多个,多个队列可以和同一个交换机绑定,同时多个交换机也可以和同一个队列绑定(多对多关系)

  • A. 类型
      1. Fanout Exchange(不处理路由键):

        一个发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上.Fanout交换机发消息是最快的.

      1. Direct Exchange(处理路由键):

        如果一个队列绑定到该交换机上, 并且当前要求路由键为X. 只有路由键是X的消息才会被这个队列转发.

      1. Topic Exchange(将路由键和某模式进行匹配, 可以理解为模糊处理):

        路由键的词有 '.' 隔开, 符号'#'表示匹配0个或多个, 符号'*'表示匹配不多不少一个词

类型总结:

  • Fanout类型最简单,这种模型忽略routingkey;
  • Direct类型是使用最多的,使用确定的routingkey。这种模型下,接收消息时绑定key_1则只接收key_1的消息;
  • 最后一种是Topic,这种模式与Direct类似,但是支持通配符进行匹配,比如:key_*,就会接受key_1key_2Topic貌似美好,但是有可能导致不严谨,所以还是推荐使用Direct
  • B. 持久化

指定了持久化的交换机, 在重庆启动时才能重建, 否则需要客户端重新声明生成才行.
需要特别明确的概念: 交换机的持久化, 并不等于消息的持久化.只有在持久化队列中的消息,才能持久化;如果没有队列,消息是没有地方存储的;消息贝恩施在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的信息,不用特别指定

Queue : 队列

队里仅是这很对接受方(consumer)的, 由接收方根据需求创建的。只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的。换句话说,在建立队列之前,发出的所有消息都被丢弃了。

  • 接下来看一下创建队列及接收消息的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 交换机名
$exchange_name = 'exchange_1';
// 队列名
$queue_name = 'queue_1';
// 路由key
$route_key = '';
// 创建连接和信道
$connection = new AMQPStreamConnection($connection_args);
if (!$connection->isConnected()) {
dd('Connection failed');
}
// 创建信道
$channel = new AMQPChannel($connection);
// 创建交换机
$exchange = new \AMQPExchange($channel);
// 交换机名
$exchange->setName($exchange_name);
// 交换机类型: direct类型
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 持久化
$exchange->setFlags(AMQP_DURABLE);
echo "Exchange Status: " . $exchange->declareExchange() . "\n";

// 创建队列
$queue = new \AMQPQueue($channel);
$queue->setName($queue_name);
// 持久化
$queue->setFlags(AMQP_DURABLE);
// 绑定交换机与队列, 并指定路由键
// 阻塞模式接受消息
echo "Queue Bind: " . $queue->bind($exchange_name, $route_key) . "\n";
echo "Message: \n";
// 自动ACK应答 $connection->disconnect()
$queue->consume('processMessage', AMQP_AUTOACK);

消息的处理,是有两种方式:

  • 一次性: 用$queue->get([...]), 不管能不能拿到消息都会立即返回, 一般情况下使用轮询处理消息队列就用这种方法
  • 阻塞: 用过$queue->consume(callback, [...]), 程序会进入持续侦听状态, 每收到一个消息就会调用callback指定函数一次, 直到某个callback函数返回false才结束

RabbitMQ应用(五种队列)

环境

  • RabbitMQ V3.7.17
  • Erlang V22.0
  • PHP V7.2.1
  • Laravel 5.8

准备

  • 安装php-amqplib/php-amqplib
1
composer require php-amqplib/php-amqplib
  • 创建RabbitMQController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php

namespace App\Api\Controller;

use App\Api\Controller as BaseController;
use Collective\Annotations\Routing\Annotations\Annotations\{
Controller, Post
};
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
* Class RabbitMQController
* @package App\Api\Controller
* @Controller(prefix="/api/mq")
*/
class RabbitMQController extends BaseController
{
private $host;
private $port;
private $user;
private $pass;
private $vhost;

private $connection;
private $channel;

public function __construct()
{
$this->host = env('MQ_HOST');
$this->port = env('MQ_PORT');
$this->user = env('MQ_USERNAME');
$this->pass = env('MQ_PASSWORD');
$this->vhost = env('MQ_VHOST');

$this->connection = new AMQPStreamConnection(
$this->host,
$this->port,
$this->user,
$this->pass,
$this->vhost
);
$this->channel = $this->connection->channel();
}
}

“Hello World”

消息生产者/消息消费者模式

Hello World

  • test_send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Send(发送消息)
* @Post("/test_send")
* @throws \Exception
*/
public function test_send()
{
/**
* 声明队列
* queue: // 队列名称
* passive: // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
* durable: // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
* exclusive: // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
* auto_delete: // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
*/
$this->channel->queue_declare('hello', false, false, false, false);

/**
* 创建AMQP消息
* deliver_mode: 消息是否持久化
* AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化
* AMQPMessage::DELIVERY_MODE_PERSISTENT 不持久化
*/
$msg = new AMQPMessage('Hello RabbitMQ!');

/**
* 向队列发送消息
* msg: // AMQP消息内容
* exchange: // 交换机名称
* routing_key: // 路由键(队列名称)
*/
$this->channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello RabbitMQ!'\n";

// 关闭信道,关闭连接
$this->channel->close();
$this->connection->close();
}

运行结果:
[x] Sent 'Hello RabbitMQ!'

  • test_receive
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Receive(接收消息)
* @Post("/test_receive")
* @throws \ErrorException
*/
public function test_receive()
{
// 声明队列
$this->channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for message. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};

/**
* 消费队列
* queue: // 被消费的队列名称
* consumer_tag: // 消费者客户端身份标识, 用于区分多个客户端
* no_local: // 这个功能属于AMQP的标准, 但是RabbitMQ并没有实现
* no_ack: // 收到消息会后, 是否不需要回复确认即被认为被消费
* exclusive: // 是否排他, 即张恒队列只能由一个消费者消费. 适用于任务不允许进行并发处理的情况下
* nowait: // 不返回执行结果, 但是如果排他开启的话,则必须等待结果,如果两个一起开则会报错
* callback: // 回调逻辑处理函数
*/
$this->channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($this->channel->is_consuming()) {
$this->channel->wait();
}

// 关闭信道,关闭连接
$this->channel->close();
$this->connection->close();
}

运行结果:
[*] Waiting for message. To exit press CTRL+C
[x] Received Hello RabbitMQ!

Work queues

工作队里: 工厂任务安排者(生产者)/工人(消费者)

Work queues

  • test_task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @Post("/test_task")
* @throws \Exception
*/
public function test_task()
{
$this->channel->queue_declare('task_queue', false, true, false, false);

if (empty($data)) {
$data = "Hello RabbitMQ!";
}

$msg = new AMQPMessage(
$data,
['deliver_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$this->channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$this->channel->close();
$this->connection->close();
}

运行结果:
[x] Sent 'Hello RabbitMQ!'

  • test_work
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @Post("/test_work")
* @throws \ErrorException
*/
public function test_work()
{
$this->channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for message. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while($this->channel->is_consuming()) {
$this->channel->wait();
}

$this->channel->close();
$this->connection->close();
}

运行结果:
[*] Waiting for message. To exit press CTRL+C
[x] Received Hello RabbitMQ!
[x] Done

Publish/Subcribe

发布/订阅: 消息发布者/消息订阅者

Publish/Subcribe

  • test_receive_logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @Post("/test_receive_logs")
* @throws \Exception
*/
public function test_receive_logs()
{
/**
* 声明交换机(Exchange)
* exchange: 交换机名称
* type: 交换机类型,分别为direct/fanout/topic/headers
* passive: 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
* durable: 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
* auto_delete: 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
*/
$this->channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $this->channel->queue_declare("", false, false, true, false);

$this->channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};

$this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($this->channel->is_consuming()) {
$this->channel->wait();
}

$this->channel->close();
$this->connection->close();
}

运行结果:
[*] Waiting for logs. To exit press CTRL+C

  • test_emit_log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @Post("/test_emit_log")
* @throws \Exception
*/
public function test_emit_log()
{
$this->channel->exchange_declare('logs', 'fanout', false, false, false);

// $data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello RabbitMQ!";
}

$msg = new AMQPMessage($data);

$this->channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$this->channel->close();
$this->connection->close();
}

运行结果:
[x] Sent info: Hello RabbitMQ!

Routing

Routing

  • test_receive_logs_direct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @Post("/test_receive_logs_direct")
*/
public function test_receive_logs_direct()
{
$this->channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $this->channel->queue_declare("", false, false, true, false);

$serverites = ['info', 'warning', 'error'];

foreach ($serverites as $serverity) {
$this->channel->queue_bind($queue_name, 'direct_logs', $serverity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($this->channel->is_consuming()) {
$this->channel->wait();
}

$this->channel->close();
$this->connection->close();
}

运行结果:
[*] Waiting for logs. To exit press CTRL+C

  • test_emit_log_direct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @Post("/test_emit_log_direct")
*/
public function test_emit_log_direct()
{
$this->channel->exchange_declare('direct_logs', 'direct', false, false, false);

$serverity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

if (empty($data)) {
$data = "Hello RabbitMQ!";
}

$msg = new AMQPMessage($data);

$this->channel->basic_publish($msg, 'direct_logs', $serverity);

echo ' [x] Sent ', $serverity, ':', $data, "\n";

$this->channel->close();
$this->connection->close();
}

运行结果:
[x] Sent info:Hello RabbitMQ!

Topics

Topcis

  • test_receive_logs_topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @Post("/test_receive_logs_topic")
*/
public function test_receive_logs_topic()
{
$this->channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $this->channel->queue_declare("", false, false, true, false);

$binding_keys = ['bingding_key'];

foreach ($binding_keys as $binding_key) {
$this->channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->deliver_info['routing_key'], ':', $msg->body, "\n";
};

$this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($this->channel->is_consuming()) {
$this->channel->wait();
}

$this->channel->close();
$this->connection->close();
}

运行结果:
[*] Waiting for logs. To exit press CTRL+C

  • test_emit_log_topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @Post("/test_emit_log_topic")
*/
public function test_emit_log_topic()
{
$this->channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

if (empty($data)) {
$data = "Hello RabbitMQ!";
}

$msg = new AMQPMessage($data);

$this->channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$this->channel->close();
$this->channel->close();
}

运行结果:
[x] Sent anonymous.info:Hello RabbitMQ!

参数说明

详情查看: Channel

  • 声明队列
1
2
// 声明队列
$this->channel->queue_declare('queue_name', false, false, false, false);
  • queue_declare(): 不带参数方法,默认创建一个由RabbitMq命名的(amq.gen-_rV4_Be1b5pUjMbWXqNVsw)名称, 这种队列也称之为匿名队列, 排他的, 自动删除的, 非持久化的队列
  • queue: 队列名称;
  • passive: 仅检测队列是否存在
  • durable: 是否持久化; true:表示持久化,会存盘,服务器重启仍然存在;false:非持久化;
  • exclusive: 是否排他的; true:排他,如果一个队列声明为排他队列,该队列公对首次声明它的连接可见,并在连接断开时自动删除;
  • auto_delete: 是否自动删除; true:自动删除;自动删除后的前提: 至少有一个消息者连接到这个队列, 之后所有与这个队列连接的消息都断开时,才会自动删除;
  • no_wait: true 声明队列无需等待
  • arguments: 其他参数
  • ticket: int 票
  • arguments 参数说明
  • x-message-ttl: 队列中的所有消息的过期时间
  • x-expires: 超过设定时间没有消费者来访问队列,就删除队列的时间(毫秒)
  • x-max-length: 队列的最新的消息数量,如果超过设定数量,前面的消息将从队列中移除掉
  • x-max-length-bytes: 队列的内容的最大空间,超过该阈值就删除之前的消息
  • x-dead-letter-exchange: 如果队列中的消息被拒绝或过期,则可以根据设置的交换机对消息进行转发,重新发布
  • x-dead-letter-routing-key: 将删除的消息推送到指定的交换机对应的路由键(路由密匙)
  • x-max-priority: 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级,优先级大的优先被消费
  • x-queue-mode: 将队列设置为延迟模式,队列中的消息保存在磁盘中,但不会主动持久化,RabbitMQ重启后消息还是会丢失
  • x-queue-master-locator: 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则
  • 声明交换机
1
2
// 声明交换机
$this->channel->exchange_declare('exchange_name', 'type', false, false, false);
  • exchange: 交换机名称
  • type: 交换机类型
    • direct: 处理路由键
    • fanout: 不处理路由键
    • topic: 将路由键和某模式进行匹配
    • headers: 此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断
  • passive: 执行声明或只是检查它是否存在
  • durable: 是否持久化; true:表示持久化,会存盘,服务器重启仍然存在;false:非持久化;
  • auto_delete: 是否自动删除; true:自动删除;自动删除后的前提: 至少有一个消息者连接到这个队列, 之后所有与这个队列连接的消息都断开时,才会自动删除;
  • internal: 是否内置; true:表示内置的交换器,客户端程序无法直接发送消息到这个交换机中, 只能通过交换机路由到交换机的方式
  • no-wait: 声明队列无需等待
  • arguments: 其他参数
  • ticket: int 票
  • basic_publish: (basicPublish)
1
$this->channel->basic_publish($msg, $exchange_name, $routing_key);
  • basic_publish: 发布单条消息
  • msg: 需要发布的消息
  • exchange: 交换机名称
  • routing_key: 路由键
  • basic_qos: (basicQos)
1
$this->channel->basic_qos(null, 1, null);
  • basic_qos: Qos可以分配给当前的信道(channel)或者链接内的所有信道
  • prefetch_size:
  • prefetch_count: 设置消费者(Consumer)客户端同时处理队列数量
  • a_global:
  • basic_consume: (basicConsume)
1
$this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);
  • basic_consume: 启动队列消费者
  • queue: 被消费队列名称
  • consumer_tag: 消费者客户端身份标识,用于区分多个客户端
  • no_local: 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
  • no_ack: 收到消息后,是否不需要回复确认即被认为被消费
  • exclusive: 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
  • nowait: 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
  • callback: 回调逻辑处理函数
  • queue_bind : (queueBind)
1
$this->channel->queue_bind($queue_name, $exchange_name);
  • queue_bind: 将队列绑定到交换机
  • queue: 队列名称
  • exchange: 交换机名称

参考

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2023 Keep It Simple And Stupid All Rights Reserved.

访客数 : | 访问量 :