1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| <?php
namespace app\api\command;
require_once APP_PATH . 'api.php';
use think\console\{ Command, Input, Output }; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable;
class Test extends Command { protected function configure() { $this->setName('cmd_test') ->setDescription('RabbitMQ Consumer Test'); }
protected function execute(Input $input, Output $output) { $exchangeName = "test_exchange"; $queueName = "api_test_queue"; $routingKey = "api_test"; $connection = new AMQPStreamConnection( RABBITMQ_CONFIG['host'], RABBITMQ_CONFIG['port'], RABBITMQ_CONFIG['TEST']['user'], RABBITMQ_CONFIG['TEST']['password'], RABBITMQ_CONFIG['TEST']['vhost'] ); $channel = $connection->channel(); $channel->exchange_declare($exchangeName, 'direct', false, false, false); $table = new AMQPTable(); $table->set('x-queue-mode', "lazy"); $channel->queue_declare($queueName, false, true, false, false, false, $table); $channel->queue_bind($queueName, $exchangeName, $routingKey);
$callback = function ($msg) { try { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } catch (\Exception $e) { var_dump($e->getMessage()); } };
$channel->basic_qos(null, 1, null); $channel->basic_consume($queueName, '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } }
|