1 <?php
2
3 /**
4 * 使用示例
5 #定义名称
6 define('ExchangeName', 'exchange_name_15');
7 define('QueueName', 'queue_name_15');
8 define('RoutingKey', 'routing_key_15');
9
10 if(count($argv) > 1){
11
12 $amqp = new Amqp();
13 $amqp->channel();
14 $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
15 $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
16 $amqp->bindQueue(RoutingKey);
17 $amqp->publish($argv[1], RoutingKey);
18 $amqp->disconnect();
19
20 }else{
21
22 $amqp = new Amqp();
23 $amqp->channel();
24 $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
25 $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
26 $amqp->bindQueue(RoutingKey);
27
28 //接收消息
29 $amqp->consume(function($envelope, $queue){
30 $msg = $envelope->getBody();
31 echo $msg . "\n";
32 }, AMQP_AUTOACK); //自动应答
33
34 }
35 */
36
37 class Amqp extends AMQPConnection
38 {
39
40 protected $channel;
41 protected $exchange;
42 protected $queue;
43
44 public function __construct($host='127.0.0.1', $port=5672, $user='guest', $passwd='guest', $vhost='/')
45 {
46 $credentials = is_array($host) ? $host : [
47 'host' => $host,
48 'port' => $port,
49 'login' => $user,
50 'password' => $passwd,
51 'vhost' => $vhost
52 ];
53 parent::__construct($credentials);
54 }
55
56
57 /**
58 * 创建频道
59 * @return AMQPChannel
60 * @throws AMQPConnectionException
61 */
62 public function channel()
63 {
64 if (!$this->channel) {
65 parent::connect();
66 $this->channel = new \AMQPChannel($this);
67 }
68 return $this->channel;
69 }
70
71 /**
72 * 创建交换器
73 * @param string $name
74 * @param string $type
75 * @param null $flags
76 * @return AMQPExchange
77 * @throws AMQPConnectionException
78 * @throws AMQPExchangeException
79 */
80 public function exchange($name='', $type='', $flags=null)
81 {
82 if (!$this->exchange) {
83 $this->exchange = new \AMQPExchange($this->channel());
84 }
85 $name && $this->exchange->setName($name);
86 $type && $this->exchange->setType($type);
87 is_integer($flags) && $this->exchange->setFlags($flags);
88
89 return $this->exchange;
90 }
91
92 /**
93 * 创建队列
94 * @param string $name
95 * @param null $flags
96 * @return AMQPQueue
97 * @throws AMQPConnectionException
98 * @throws AMQPQueueException
99 */
100 public function queue($name='', $flags=null)
101 {
102 if (!$this->queue) {
103 $this->queue = new \AMQPQueue($this->channel());
104 }
105
106 $name && $this->queue->setName($name);
107 is_integer($flags) && $this->queue->setFlags($flags);
108
109 return $this->queue;
110 }
111
112 /**
113 * 发布交换器
114 * @return bool
115 * @throws AMQPChannelException
116 * @throws AMQPConnectionException
117 * @throws AMQPExchangeException
118 */
119 public function declareExchange()
120 {
121 return $this->exchange()->declareExchange();
122 }
123
124 /**
125 * 发布队列
126 * @return int
127 * @throws AMQPChannelException
128 * @throws AMQPConnectionException
129 * @throws AMQPQueueException
130 */
131 public function declareQueue()
132 {
133 return $this->queue()->declareQueue();
134 }
135
136 /**
137 * 绑定队列
138 * @param string $routing_key
139 * @param array $arguments
140 * @return bool
141 * @throws AMQPChannelException
142 * @throws AMQPConnectionException
143 * @throws AMQPExchangeException
144 * @throws AMQPQueueException
145 */
146 public function bindQueue($routing_key='', $arguments=[])
147 {
148 return $this->queue()->bind($this->exchange()->getName(), $routing_key, $arguments);
149 }
150
151
152 /**
153 * 发布消息
154 * @param callable|null $callback
155 * @param int $flags
156 * @param null $consumerTag
157 * @return bool
158 * @throws AMQPChannelException
159 * @throws AMQPConnectionException
160 * @throws AMQPExchangeException
161 */
162 public function publish($message, $routing_key=null, $flags=AMQP_NOPARAM, array $attributes=[])
163 {
164 return $this->exchange()->publish($message, $routing_key, $flags, $attributes);
165 }
166
167 public function consume(callable $callback=null, $flags=AMQP_NOPARAM, $consumerTag = null)
168 {
169 $this->queue()->consume($callback, $flags, $consumerTag);
170 }
171
172 /**
173 * 删除交换器
174 * @param null $name
175 * @param int $flags
176 * @throws AMQPChannelException
177 * @throws AMQPConnectionException
178 * @throws AMQPExchangeException
179 */
180 public function deleteExchange($name = null, $flags=AMQP_NOPARAM)
181 {
182 $this->exchange()->delete($name, $flags);
183 }
184
185 /**
186 * 删除队列
187 * @param int $flags
188 * @throws AMQPChannelException
189 * @throws AMQPConnectionException
190 * @throws AMQPQueueException
191 */
192 public function deleteQueue($flags=AMQP_NOPARAM)
193 {
194 $this->queue()->delete($flags);
195 }
196 }