RabbitMQ 实战记录

啊 好久没有写博客了, 我的博客就是总结自己的学习之路
加油 重新启程 keep calm and think more.

准备

composer require php-amqplib/php-amqplib

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<?php

namespace app\api\controller;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\Request;

class Test
{
/**
* 消息队列
* @throws \Exception
*/
public function deliver()
{
$params = Request::instance()->param();
unset($params['sign']);
$params['send_time'] = time();
$params['create_time'] = time();

$this->deliverMessage(json_encode($params), 'test_exchange', 'api_test_queue', 'api_test');

echo 'Done';exit();
}

/**
* 消息队列 - 投递消息
* @param $message
* @param $exchangeName
* @param $queueName
* @param $routingKey
* @throws \Exception
*/
private function deliverMessage($message, $exchangeName, $queueName, $routingKey)
{
// 创建连接和信道
$connection = new AMQPStreamConnection(
RABBITMQ_CONFIG['host'],
RABBITMQ_CONFIG['port'],
RABBITMQ_CONFIG['TEST']['user'],
RABBITMQ_CONFIG['TEST']['password'],
RABBITMQ_CONFIG['TEST']['vhost']
);
if (!$connection->isConnected()) {
var_dump('Connection failed');
}

$channel = $connection->channel();
// 声明交换机
// type参数: (direct:精准推送 fanout:广播 topic:组播)
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
$table = new AMQPTable();
$table->set('x-queue-mode', 'lazy');
// 声明队列
$channel->queue_declare($queueName, false, true, false, false, false, $table);
// 将队列绑定到交换机
$channel->queue_bind($queueName, $exchangeName, $routingKey);
$msg = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
]);
// 向队列发送消息
$channel->basic_publish($msg, $exchangeName, $routingKey);
// 关闭信道, 关闭连接
$channel->close();
$connection->close();
}
}

http://tp5.test/api/test/deliver

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
<?php

namespace app\api\command;

require_once APP_PATH . 'api.php';

use think\console\{
Command, Input, Output
};
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

class Test extends Command
{
protected function configure()
{
$this->setName('cmd_test')
->setDescription('RabbitMQ Consumer Test');
}

/**
* 执行命令
* @param Input $input
* @param Output $output
* @return int|void|null
* @throws \ErrorException
* @throws \Exception
*/
protected function execute(Input $input, Output $output)
{
// 交换机
$exchangeName = "test_exchange";
// 队列
$queueName = "api_test_queue";
// 路由键
$routingKey = "api_test";
// 创建连接和信道
$connection = new AMQPStreamConnection(
RABBITMQ_CONFIG['host'],
RABBITMQ_CONFIG['port'],
RABBITMQ_CONFIG['TEST']['user'],
RABBITMQ_CONFIG['TEST']['password'],
RABBITMQ_CONFIG['TEST']['vhost']
);
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
// 设置惰性队列
$table = new AMQPTable();
$table->set('x-queue-mode', "lazy");
// 声明队列
$channel->queue_declare($queueName, false, true, false, false, false, $table);
// 将队列绑定到交换机
$channel->queue_bind($queueName, $exchangeName, $routingKey);

$callback = function ($msg) {
// TODO 处理数据
try {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
} catch (\Exception $e) {
var_dump($e->getMessage());
}
};

// 设置consumer同时处理队列数量
$channel->basic_qos(null, 1, null);
// 消费队列
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
// 关闭信道,关闭连接
$channel->close();
$connection->close();
}
}

php think cmd_test

  • api.php
1
2
3
4
5
6
7
8
9
10
// Rabbit配置信息
define('RABBITMQ_CONFIG', [
'host' => '127.0.0.1',
'port' => 5672,
'TEST' => [ // 玩法虚拟机
'user' => 'test' ,
'password' => 'test',
'vhost' => 'test'
],
]);

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2023 Keep It Simple And Stupid All Rights Reserved.

访客数 : | 访问量 :