yii2+rabbitmq实现队列(windows)
1.安装Erlang 官方网站,https://erlang.org/download/otp_versions_tree.html,选择和rabbitmq适配的
2.安装RabbitMQ 官方网站,windows:https://www.rabbitmq.com/docs/install-windows
3.安装下载的Erlang和RabbitMQ
碰到的问题:
1. 开启rabbitmq服务时dos窗口一闪而逝,看了服务进程状态是开启状态立马变成了未开启状态,参考了:
https://blog.csdn.net/qq_40808232/article/details/102861014
重新配置了环境变量,
dos进入rabbitmq安装目录的sbin下,
安装插件,命令:rabbitmq-plugins.bat enable rabbitmq_management,
将 C:\Users\Administrator\.erlang.cookie 同步至C:\Windows\System32\config\systemprofile\.erlang.cookie
同时删除:C:\Users\Administrator\AppData\Roaming\RabbitMQ目录
输入命令:rabbitmq-plugins.bat enable rabbitmq_management ,检查插件安装成功
这时候应该能进rabbitmq管理后台了(http://localhost:15672)
2. 当前php环境没有php_amqp扩展(有的话应该能跳过)
下载地址:https://pecl.php.net/package/amqp/1.11.0/windows,我下的是1.11.0,选择和本地php版本适配的下载,存在非线程安全 (NTS) ;线程安全 (TS),这个不是很懂,就都下载下来试了一下(试了一下下面的复制转移)
参考了
https://segmentfault.com/q/1010000015206078
复制php_amqp.dll 到php/ext/
复制rabbitmq.4.dll 到php/
复制rabbitmq.4.dll 到windows/system32/ (如32位) ,64位则windows/SysWOW64/
php.ini 加入
extension=php_amqp.dll
重启服务
3. 怎么通过yii2项目使用队列功能
composer require enqueue/amqp-lib(消息队列库)
往yii2项目的web.php的components中(用于代码调用),与web.php同级的console.php中,用于监听(应该是的),都添加同一个队列配置
'test_queue' => [
'class' => yii\queue\amqp_interop\Queue::class,
'host' => env('rabbitmq_host'),
'port' => env('rabbitmq_port'),
'user' => env('rabbitmq_username'),
'password' => env('rabbitmq_password'),
'queueName' => 'test_queue',
'exchangeName' => 'test_exchange',
'connectionTimeout' => 5,
'readTimeout' => 5,
'serializer' => yii\queue\serializers\JsonSerializer::class,
'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,
'as log' => \app\behavior\TaskBehavior::class,
'heartbeat' => 30,
],
5672:client端通信端口(应该是能更改的,没有研究)15672:管理界面ui端口
上面的rabbitmq_port使用的是client端口5672,其他的rabbitmq参数都换成自己的
继续TaskBehavior文件创建一个:
<?php
namespace app\behavior;
use app\utils\LoggerUtil;
use yii\helpers\ArrayHelper;
use yii\queue\ExecEvent;
use yii\queue\JobEvent;
use yii\queue\JobInterface;
use yii\queue\LogBehavior;
use yii\queue\PushEvent;
/**
* Class TaskBehavior
* @package app\behavior
*/
class TaskBehavior extends LogBehavior
{
/**
* @param PushEvent $event
*/
public function afterPush(PushEvent $event)
{
$this->log($event, 'push');
}
public function beforeExec(ExecEvent $event)
{
$this->log($event, 'started');
}
/**
* @param ExecEvent $event
*/
public function afterExec(ExecEvent $event)
{
$this->log($event, 'execute');
}
public function afterError(ExecEvent $event)
{
$this->log($event, 'execute_error');
}
public function getJobParams(JobEvent $event, string $status)
{
$job = $event->job;
$params = ArrayHelper::toArray($job);
$name = $event->job instanceof JobInterface ? get_class($event->job) : 'unknown job';
$id = $event->id;
return [
'job' => $name,
'id' => $id,
'params' => $params,
'status' => $status,
];
}
protected function log(JobEvent $event, string $status)
{
$data = $this->getJobParams($event, $status);
$filename= 'debug.log';
if ($status == 'execute_error') {
$data['error'] = $event->error;
$filename = 'logstash.log';
}
LoggerUtil::logstash('queue', 'debug', $data, $filename);
}
}
创建完job文件就能直接调用了
\Yii::$app->test_queue->push(new TestJob([
'time' => time()
]));
testJob文件:
<?php
namespace app\jobs;
use app\utils\LoggerUtil;
use yii\base\BaseObject;
use yii\queue\JobInterface;
class TestJob extends BaseObject implements JobInterface
{
public $time;
public function execute($queue)
{
LoggerUtil::log('test', '测试job', [
'time' => $this->time
]);
}
}
最后重要的一步就是监听,php yii test-queue/listen,由于只是本地的,所以没有做supervisor保护进程