11-07 1,865 views
写了一个简单的类实现
使用方法
$class = new RedisDelayQueue();
$class->execute();
<?php
namespace app\common\service;
class RedisDelay
{
protected $prefix = 'delay_queue:';
protected $redis = null;
protected $key = '';
public function __construct()
{
$this->key = $this->prefix . $this->getDefaultKey();
$this->redis = //这里使用predis 或者 redis扩展连接;
}
public function getDefaultKey()
{
return config('custom.server_redis_delay_key');
}
public function delTask($value)
{
return $this->redis->zRem($this->key, $value);
}
public function getTask()
{
//获取任务,以0和当前时间为区间,返回一条记录
return $this->redis->zRangeByScore($this->key, 0, time(), ['limit' => [0, 1]]);
}
/**
* @param array $data
* 'class' => '\\app\\common\\jobs\\video\\VideoConvert',
* 'params' => [
* 'asset_id' => $asset_id
* ]
* @param int $time
* @return mixed
*/
public function addTask(array $data, int $time)
{
//添加任务,以时间作为score,对任务队列按时间从小到大排序
return $this->redis->zAdd(
$this->key,
$time,
json_encode($data)
);
}
public function run()
{
//每次只取一条任务
$task = $this->getTask();
if (empty($task)) {
return false;
}
$task = $task[0];
//有并发的可能,这里通过zrem返回值判断谁抢到该任务
if ($this->delTask($task)) {
$info = is_null(json_decode($task)) ? [] : json_decode($task, TRUE);
if (!empty($info)) {
$class = new $info['class']($info['params']);
$class->run();
}
//删除变量
unset($info, $class);
//处理任务
return true;
}
return false;
}
}
死循环
<?php
/**
* Redis延时队列
*/
namespace app\common\command;
use app\common\lib\log\Log;
use app\common\service\PredisService;
use app\common\service\RedisDelay;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;
use think\Exception;
class RedisDelayQueue extends Command
{
protected function configure()
{
$this->setName('RedisDelayQueue')->setDescription('RedisDelayQueue running');
}
protected function execute(Input $input, Output $output)
{
$redis_delay_class = new RedisDelay();
while (true) {
try {
//判断redis是否下线
// if (!$redis->isConnected()) {
// Log::trace('handle_jobs.log', [
// 'msg' => 'redis下线'
// ]);
// throw new \Exception('redis下线');
// }
#echo microtime() . "\n";
$redis_delay_class->run();
usleep(100000);
} catch (Exception $exception) {
Log::trace('handle_delay_redis_queue_jobs.log', [
'msg' => '执行延时队列任务失败!',
'params' => [
'msg' => $exception->getMessage(),
'file' => $exception->getFile(),
'code' => $exception->getCode(),
'line' => $exception->getLine(),
#'info' => $info ?? []
]
]);
throw new \Exception('执行公共任务失败!' . json_encode([
'params' => [
'msg' => $exception->getMessage(),
'file' => $exception->getFile(),
'code' => $exception->getCode(),
'line' => $exception->getLine(),
#'info' => $info ?? []
]
]));
}
}
}
}