1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| <?php
namespace RabbitMQ;
require_once '../vendor/autoload.php';
use Exception; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable;
class RabbitMQ {
public function delayProduce(array $message_data, int $ttl, int $max_delay_second = 0, bool $need_time_suffix = false) { $connection = new AMQPStreamConnection('rabbitmq', 5672, 'banana', '123456', 'banana');
$channel = $connection->channel();
$queue = 'dead_letter_queue'; $exchange_name = 'dead_letter_exchange';
$delay_table = new AMQPTable(); $delay_table->set('x-dead-letter-exchange', "delay_$exchange_name"); $delay_table->set('x-dead-letter-routing-key', "delay_$queue"); $delay_table->set('x-message-ttl', $max_delay_second * 1000);
$suffix = $need_time_suffix ? "_{$ttl}" : '';
$channel->queue_declare("ttl_{$queue}{$suffix}", false, true, false, false, false, $delay_table); $channel->exchange_declare("ttl_{$exchange_name}{$suffix}", 'direct', false, false, false, false); $channel->queue_bind("ttl_{$queue}{$suffix}", "ttl_{$exchange_name}{$suffix}", "ttl_{$queue}{$suffix}");
$channel->queue_declare("delay_$queue", false, true, false, false, false); $channel->exchange_declare("delay_{$exchange_name}", 'direct', false, true, false); $channel->queue_bind("delay_{$queue}", "delay_{$exchange_name}", "delay_{$queue}");
$message = new AMQPMessage( serialize($message_data), [ 'content_type' => 'text/plain', 'expiration' => $ttl * 1000, 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ] );
$channel->basic_publish($message, "ttl_{$exchange_name}{$suffix}", "ttl_{$queue}{$suffix}");
$channel->close(); $connection->close(); } }
$message_data = ['Test Dead Letter']; $ttl = 5; $max_delay_second = 30; $need_time_suffix = false;
try { (new RabbitMQ())->delayProduce($message_data, $ttl, $max_delay_second, $need_time_suffix); } catch (Exception $e) { echo "异常消息: " . $e->getMessage(); }
|