简介
RabbitMQ
RabbitMQ
是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)- RabbitMQ is the most widely deployed open source message broker.
- RabbitMQ Get Started
基础特性
- 可靠性
消息持久化、消息发送和投递确认机制、集群高可用方案
- 灵活路由
消息通过exchange
的方式路由到不同的queue
中,提供了包括fanout
,direct
,topic
等多种exchange
实现,并且支持通过编写 exchange
- 支持集群
同网段下的rabbitmq
节点可以通过集群的方法,组成一个逻辑上的单一broker
- Federation
通过Federation
可以在跨网段节点间组建集群
- 高可用消息队列
通过设置镜像队列的方式, 消息可以在镜像队列间进行复制, 使节点宕机或硬件损坏的情况下保证队列服务的高可用
- 多客户端支持
JAVA, .NET, Ruby, Python, PHP, Node, Go……
- 可视化管理界面
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
基础概念
Broker
经纪人. 提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。
Producer
消息生产者. 主要讲消息投递到对应的Exchange上面,一般是独立的程序
Consumer
消息消费者. 消息的接受者, 一般是独立的程序
Message
消息. 消息是不具名的, 它由消息头和消息体组成, 消息体是不透明的, 而消息头则由一系列的可选属性组成,这些属性包括
routing-key(路由键)
、priority
(相对于其他消息的优先权)、delivery-mode
(指出该消息可能需要持久性存储)等
Channel
信道,多路复用连接中的一条独立的双向数据流通道。
信道是建立在真实的TCP
连接内地虚拟连接,AMQP
命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。
对于操作系统来说,建立和关闭TCP
连接是有代价的,频繁的建立关闭TCP
连接对于系统的性能有很大的影响,而且TCP
的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP
连接中建立Channel
是没有上述代价的。对于Producer
或者Consumer
来说,可以并发的使用多个Channel
进行Publish
或者Receive
Exchange
消息交换机. 指定消息按照什么规则路由到哪个队列
Queue
Producer
将并不能直接将消息投递到Queue
中。需要将消息发送到Exchange
,由Exchange
将消息路由到一个或多个Queue
中(或者没有绑定Queue
的情况下将消息丢弃)。
Queue
消息队列. 消息的载体, 每条消息都会被投送到一个或多个队列中
RabbitMQ
中的消息都只能存在在Queue
中,Producer
生产消息并通过Exchange
投递到Queue
,Comsumer
可以通过Exchange
从Queue
中获取消息并消费
多个消费者可以订阅同一个Queue
,这时Queue
中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
Binding
绑定. 作用就是将
Exchang
和Queue
按照某种路由规则绑定起来. 这样RabbitMQ
就知道如何正确地将消息路由到指定的Queue
Routing Key
路由键. 生产者在将消息发送给
Exchange
的时候,一般会指定一个routing key
,来指定这个消息的路由规则,而这个routing key
需要与Exchange Type
及binding key
联合使用才能最终生效。
Binding Key
在绑定(
Binding
)Exchange
与Queue
的同时,一般会指定一个binding key
; 消费者将消息发送给Exchange
时,一般会指定一个routing key
; 当binding key
与routing key
相匹配时,消息将会被路由到对应的Queue
中.
MQ
- 消息队列(Message Queue, 简称MQ), 从字面意思上看,本质是个队列, FIFO先入先出,只不过队列中存放的内容是Message而已.
其主要用途: 不同进程Process/线程Thread之间通信
为什么会产生消息队列?
- 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程, 为了隔离这两个进程,在两进程间抽离出汗一层(一个模块), 所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
- 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
AMQP
AMQP
,即Advanced Message Queuing Protocol
, 高级消息队列协议
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
搭建RabbitMQ环境
- 安装Erlang
- 安装RabbitMQ
安装完成,启用管理工具
- 双击: RabbitMQ Command Prompt
- 输入:
rabbitmq-plugin enable rabbitmq_management
- 这样就启动了管理工具, 试一下
- 停止:
net stop RabbitMQ
- 启动:
net start RabbitMQ
- 停止:
windows下安装步骤
打开cmd
,进入rabbitmq
的sbin
目录
执行rabbitmq-plugins.bat enable rabbitmq_management
- 在浏览器中输入地址查看:
http://127.0.0.1:15672
- 使用默认账号登录:
guest
/guest
基础概念
Connection 与 Channel : 连接与信道
connection
是指的物理的连接, 一个clinet
与一个server
之间有一个连接;一个连接上可以建立多个channel
,可以理解为逻辑上的连接. 一般应用的情况下,有一个channel
就够用了,不需要创建更多的channel
- 示例代码
1 | // 创建连接和信道 |
Exchange 与 RoutingKey : 交换机与路由键
为了将不同类型的
message
进行区分, 设置了Exchange
交换机与Route
路由两个概念. 比如A类型的message
发送到名为*C1
的交换机,将类型为B的发送到C2
的交换机.
当客户端连接C1
*处理队列消息时取到的就只是A类型的message
.进一步的,如果A类型message
也非常多,需要进一步细化区分,比如某个客户端只处理A类型message
中针对K用户的message
,routingKey
就是来做这个用途的。
- 示例代码:
1 | // 交换机名 |
由以上代码可以看到,发送消息时,只要有“交换机”就够了。至于交换机后面有没有对应的处理队列,发送方是不用管的。
routingkey
可以是空的字符串。在示例中,我使用了两个key交替发送消息,是为了下面更便于理解routingkey
的作用。
对于交换机,有两个重要的概念:
交换机(Exchange)
: 可以理解为具有路由表的路由程序。每个消息都有一个路由键(routing key), 就是一个简单的字符串.交换机中有一系列的绑定(binding), 即路由规则.交换机可以有多个,多个队列可以和同一个交换机绑定,同时多个交换机也可以和同一个队列绑定(多对多关系)
A. 类型
Fanout Exchange
(不处理路由键):一个发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上.Fanout交换机发消息是最快的.
Direct Exchange
(处理路由键):如果一个队列绑定到该交换机上, 并且当前要求路由键为X. 只有路由键是X的消息才会被这个队列转发.
Topic Exchange
(将路由键和某模式进行匹配, 可以理解为模糊处理):路由键的词有
'.'
隔开, 符号'#'
表示匹配0个或多个, 符号'*'
表示匹配不多不少一个词
类型总结:
- Fanout类型最简单,这种模型忽略
routingkey
;- Direct类型是使用最多的,使用确定的
routingkey
。这种模型下,接收消息时绑定key_1
则只接收key_1
的消息;- 最后一种是Topic,这种模式与Direct类似,但是支持通配符进行匹配,比如:
key_*
,就会接受key_1
和key_2
。Topic貌似美好,但是有可能导致不严谨,所以还是推荐使用Direct。
B. 持久化
指定了持久化的交换机, 在重庆启动时才能重建, 否则需要客户端重新声明生成才行.
需要特别明确的概念: 交换机的持久化, 并不等于消息的持久化.只有在持久化队列中的消息,才能持久化;如果没有队列,消息是没有地方存储的;消息贝恩施在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的信息,不用特别指定
Queue : 队列
队里仅是这很对接受方(consumer)的, 由接收方根据需求创建的。只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的。换句话说,在建立队列之前,发出的所有消息都被丢弃了。
- 接下来看一下创建队列及接收消息的示例:
1 | // 交换机名 |
消息的处理,是有两种方式:
- 一次性: 用
$queue->get([...])
, 不管能不能拿到消息都会立即返回, 一般情况下使用轮询处理消息队列就用这种方法 - 阻塞: 用过
$queue->consume(callback, [...])
, 程序会进入持续侦听状态, 每收到一个消息就会调用callback指定函数一次, 直到某个callback函数返回false
才结束
RabbitMQ应用(五种队列)
环境
RabbitMQ V3.7.17
Erlang V22.0
PHP V7.2.1
Laravel 5.8
准备
- 安装
php-amqplib
/php-amqplib
1 | composer require php-amqplib/php-amqplib |
- 创建
RabbitMQController
1 |
|
“Hello World”
消息生产者/消息消费者模式
test_send
1 | /** |
运行结果:
[x] Sent 'Hello RabbitMQ!'
test_receive
1 | /** |
运行结果:
[*] Waiting for message. To exit press CTRL+C
[x] Received Hello RabbitMQ!
Work queues
工作队里: 工厂任务安排者(生产者)/工人(消费者)
test_task
1 | /** |
运行结果:
[x] Sent 'Hello RabbitMQ!'
test_work
1 | /** |
运行结果:
[*] Waiting for message. To exit press CTRL+C
[x] Received Hello RabbitMQ!
[x] Done
Publish/Subcribe
发布/订阅: 消息发布者/消息订阅者
test_receive_logs
1 | /** |
运行结果:
[*] Waiting for logs. To exit press CTRL+C
test_emit_log
1 | /** |
运行结果:
[x] Sent info: Hello RabbitMQ!
Routing
test_receive_logs_direct
1 | /** |
运行结果:
[*] Waiting for logs. To exit press CTRL+C
test_emit_log_direct
1 | /** |
运行结果:
[x] Sent info:Hello RabbitMQ!
Topics
test_receive_logs_topic
1 | /** |
运行结果:
[*] Waiting for logs. To exit press CTRL+C
test_emit_log_topic
1 | /** |
运行结果:
[x] Sent anonymous.info:Hello RabbitMQ!
参数说明
详情查看: Channel
- 声明队列
1 | // 声明队列 |
queue_declare()
: 不带参数方法,默认创建一个由RabbitMq命名的(amq.gen-_rV4_Be1b5pUjMbWXqNVsw)名称, 这种队列也称之为匿名队列, 排他的, 自动删除的, 非持久化的队列queue
: 队列名称;passive
: 仅检测队列是否存在durable
: 是否持久化;true
:表示持久化,会存盘,服务器重启仍然存在;false
:非持久化;exclusive
: 是否排他的;true
:排他,如果一个队列声明为排他队列,该队列公对首次声明它的连接可见,并在连接断开时自动删除;auto_delete
: 是否自动删除;true
:自动删除;自动删除后的前提: 至少有一个消息者连接到这个队列, 之后所有与这个队列连接的消息都断开时,才会自动删除;no_wait
:true
声明队列无需等待arguments
: 其他参数ticket
: int 票
arguments 参数说明
x-message-ttl
: 队列中的所有消息的过期时间x-expires
: 超过设定时间没有消费者来访问队列,就删除队列的时间(毫秒)x-max-length
: 队列的最新的消息数量,如果超过设定数量,前面的消息将从队列中移除掉x-max-length-bytes
: 队列的内容的最大空间,超过该阈值就删除之前的消息x-dead-letter-exchange
: 如果队列中的消息被拒绝或过期,则可以根据设置的交换机对消息进行转发,重新发布x-dead-letter-routing-key
: 将删除的消息推送到指定的交换机对应的路由键(路由密匙)x-max-priority
: 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级,优先级大的优先被消费x-queue-mode
: 将队列设置为延迟模式,队列中的消息保存在磁盘中,但不会主动持久化,RabbitMQ重启后消息还是会丢失x-queue-master-locator
: 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则
- 声明交换机
1 | // 声明交换机 |
exchange
: 交换机名称type
: 交换机类型
direct
: 处理路由键
fanout
: 不处理路由键
topic
: 将路由键和某模式进行匹配
headers
: 此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断passive
: 执行声明或只是检查它是否存在durable
: 是否持久化;true
:表示持久化,会存盘,服务器重启仍然存在;false
:非持久化;auto_delete
: 是否自动删除;true
:自动删除;自动删除后的前提: 至少有一个消息者连接到这个队列, 之后所有与这个队列连接的消息都断开时,才会自动删除;internal
: 是否内置;true
:表示内置的交换器,客户端程序无法直接发送消息到这个交换机中, 只能通过交换机路由到交换机的方式no-wait
: 声明队列无需等待arguments
: 其他参数ticket
: int 票
basic_publish
: (basicPublish)
1 | $this->channel->basic_publish($msg, $exchange_name, $routing_key); |
basic_publish
: 发布单条消息msg
: 需要发布的消息exchange
: 交换机名称routing_key
: 路由键
basic_qos
: (basicQos)
1 | $this->channel->basic_qos(null, 1, null); |
basic_qos
: Qos可以分配给当前的信道(channel)或者链接内的所有信道prefetch_size
:prefetch_count
: 设置消费者(Consumer)客户端同时处理队列数量a_global
:
basic_consume
: (basicConsume)
1 | $this->channel->basic_consume('task_queue', '', false, false, false, false, $callback); |
basic_consume
: 启动队列消费者queue
: 被消费队列名称consumer_tag
: 消费者客户端身份标识,用于区分多个客户端no_local
: 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现no_ack
: 收到消息后,是否不需要回复确认即被认为被消费exclusive
: 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下nowait
: 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错callback
: 回调逻辑处理函数
queue_bind
: (queueBind)
1 | $this->channel->queue_bind($queue_name, $exchange_name); |
queue_bind
: 将队列绑定到交换机queue
: 队列名称exchange
: 交换机名称