網(wǎng)站首頁(yè) PHP其他 正文
thinkphp6、thinkphp5.0 使用think-queue實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列
作者:小吳-斌 更新時(shí)間: 2023-08-30 PHP其他-
何為異步消息隊(duì)列:
所謂消息隊(duì)列,就是一個(gè)以隊(duì)列數(shù)據(jù)結(jié)構(gòu)為基礎(chǔ)的一個(gè)實(shí)體,這個(gè)實(shí)體是真實(shí)存在的,比如程序中的數(shù)組,數(shù)據(jù)庫(kù)中的表,或者redis等等,都可以。 -
異步隊(duì)列的作用:
個(gè)人認(rèn)為消息隊(duì)列的主要特點(diǎn)是異步處理,主要目的是減少請(qǐng)求響應(yīng)時(shí)間和解耦。所以主要的使用場(chǎng)景就是將比較耗時(shí)而且不需要即時(shí)(同步)返回結(jié)果的操作作為消息放入消息隊(duì)列
composer 安裝 think-queue
https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
# tp5.0
composer require topthink/think-queue=1.1.6
# tp5.1.x
composer require topthink/think-queue 2.0.4
# tp6
composer require topthink/think-queue
判斷是否安裝成功
php think queue:work -h
配置文件
Tp5.0
文件位置:根目錄/config/queue.php
return [
// 'connector' => 'Sync'
'connector' => 'Redis', // Redis 驅(qū)動(dòng)
'expire' => 60, // 任務(wù)的過(guò)期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
'default' => 'default', // 默認(rèn)的隊(duì)列名稱
'host' => '127.0.0.1', // redis 主機(jī)ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 1, // 使用哪一個(gè) db,默認(rèn)為 db0
'timeout' => 0, // redis連接的超時(shí)時(shí)間
'persistent' => false, // 是否是長(zhǎng)連接
];
Tp5.1.x
文件位置:根目錄/config/queue.php
return [
// 'connector' => 'Sync'
'connector' => 'Redis', // Redis 驅(qū)動(dòng)
'expire' => 60, // 任務(wù)的過(guò)期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
'default' => 'default', // 默認(rèn)的隊(duì)列名稱
'host' => '127.0.0.1', // redis 主機(jī)ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 1, // 使用哪一個(gè) db,默認(rèn)為 db0
'timeout' => 0, // redis連接的超時(shí)時(shí)間
'persistent' => false, // 是否是長(zhǎng)連接
];
Tp6
配置文件在統(tǒng)一目錄下/config/queue.php
<?php
return [
'default' => 'redis', // 使用redis
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 1,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
在項(xiàng)目下新建一個(gè)Job目錄,編寫對(duì)應(yīng)的消費(fèi)者類
Tp6
<?php
namespace app\job;
use think\facade\Log;
use think\queue\Job;
/**
* 消費(fèi)者類
* 用于處理隊(duì)列中的任務(wù)
*/
class CronJob
{
/**
* fire是消息隊(duì)列默認(rèn)調(diào)用的方法
* @param Job $job 當(dāng)前的任務(wù)對(duì)象
* @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
*/
public function fire(Job $job, $data)
{
Log::channel('job')->info('一條測(cè)試日志');
if (empty($data)) {
Log::channel('job')->error(sprintf('[%s][%s] 隊(duì)列無(wú)消息', __CLASS__, __FUNCTION__));
return;
}
//有效消息到達(dá)消費(fèi)者時(shí)可能已經(jīng)不再需要執(zhí)行了
if (!$this->checkJob($data)) {
$job->delete();
Log::channel('job')->record("Job does not need to be executed");
return;
}
//執(zhí)行業(yè)務(wù)處理
if ($this->doJob($data)) {
$job->delete();//任務(wù)執(zhí)行成功后刪除
Log::channel('job')->record("job has been down and deleted");
} else {
//檢查任務(wù)重試次數(shù)
if ($job->attempts() > 3) {
Log::channel('job')->record("job has been retried more that 3 times");
// $job->release(10); // 10秒后在執(zhí)行
$job->delete(); // 刪除任務(wù)
}
}
}
/**
* 消息在到達(dá)消費(fèi)者時(shí)可能已經(jīng)不需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function checkJob($data)
{
Log::channel('job')->record('驗(yàn)證任務(wù)是否需要執(zhí)行');
return true;
}
/**
* 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
*/
private function doJob($data)
{
// 實(shí)際業(yè)務(wù)流程處理
print_r($data['msg'] ?? '實(shí)際業(yè)務(wù)流程處理');
Log::channel('job')->record('實(shí)際業(yè)務(wù)流程處理');
return true;
}
function task1(){
print_r("task 1");
}
public function failed($data)
{
// ...任務(wù)達(dá)到最大重試次數(shù)后,失敗了
Log::channel('job')->error('任務(wù)達(dá)到最大重試次數(shù)后,失敗了');
}
}
上面使用了日志通道 配置文件路徑 /config/log.php
<?php
// +----------------------------------------------------------------------
// | 日志設(shè)置
// +----------------------------------------------------------------------
return [
// 默認(rèn)日志記錄通道
'default' => env('log.channel', 'file'),
// 日志記錄級(jí)別
'level' => [],
// 日志類型記錄的通道 ['error'=>'email',...]
'type_channel' => [],
// 關(guān)閉全局日志寫入
'close' => false,
// 全局日志處理 支持閉包
'processor' => null,
// 日志通道列表
'channels' => [
'file' => [
// 日志記錄方式
'type' => 'File',
// 日志保存目錄
'path' => '',
// 單文件日志寫入
'single' => false,
// 獨(dú)立日志級(jí)別
'apart_level' => [],
// 最大日志文件數(shù)量
'max_files' => 0,
// 使用JSON格式記錄
'json' => false,
// 日志處理
'processor' => null,
// 關(guān)閉通道日志寫入
'close' => false,
// 日志輸出格式化
'format' => '[%s][%s] %s',
// 是否實(shí)時(shí)寫入
'realtime_write' => false,
],
// 其它日志通道配置
'job' => [
'type' => 'File',
// 'path' => app()->getRootPath() . 'runtime/pay', // 重點(diǎn)這個(gè)路徑要寫
'path' => app()->getRuntimePath() . 'pay', // 重點(diǎn)這個(gè)路徑要寫
'time_format' => 'Y-m-d H:i:s',
'format' => '[%s][%s]:%s'
],
],
];
控制器編寫測(cè)試代碼
<?php
namespace app\api\controller\v1;
use app\job\CronJob;
use think\facade\Queue;
class User
{
public function index(): string
{
return 'v1/user/index2';
}
/**
* 投遞消息(生產(chǎn)者)
* @return string
*/
public function push(): string
{
// queue的 push方法 第一個(gè)參數(shù)可以接收字符或者對(duì)象字符串
$job = 'app\Job\CronJob'; // 當(dāng)前任務(wù)由哪個(gè)類負(fù)責(zé)處理
$queueName = 'cron_job_queue'; // 當(dāng)前隊(duì)列歸屬的隊(duì)列名稱
// // 當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù)
$data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
$data['user_id'] = 1;
// $res = Queue::push(CronJob::class, $data, $queueName); // 可以自動(dòng)獲取
$res = Queue::push($job, $data, $queueName); // 可以手動(dòng)指定 -
$data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
$res = Queue::later(10, $job, $data, $queueName); // 10秒后執(zhí)行
$data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
$res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName); // 10秒后執(zhí)行
if ($res == false) {
return '消息投遞失敗';
} else {
return '消息投遞成功';
}
}
}
測(cè)試
簡(jiǎn)述下work和listen在thinkphp框架中明顯區(qū)別
1、work模式只能跑一條隊(duì)列數(shù)據(jù),如果想持續(xù)運(yùn)行必須加 --daemon 參數(shù);
2、work模式速度和性能更快,每次執(zhí)行無(wú)需再加載框架所有文件,因此如果想修改隊(duì)列代碼必須重啟隊(duì)列,listen模式每次執(zhí)行都會(huì)重新加載thinkphp文件,相當(dāng)于用戶訪問一個(gè)接口一樣;
3、work模式因?yàn)闊o(wú)需重復(fù)加載框架文件,因此也會(huì)帶來(lái)MySQL鏈接超時(shí)的煩惱,目前thinkphp5.x全系版本無(wú)法解決此問題,所以如果隊(duì)列中有操作數(shù)據(jù)庫(kù)的不建議使用此模式;
php think queue:work --queue cron_job_queue
php think queue:listen --daemon --queue helloJobQueue
# linux上以守護(hù)進(jìn)程方式運(yùn)行
nohup php think queue:work --daemon --queue cron_job_queue &
原文鏈接:https://blog.csdn.net/qq_23564667/article/details/127919660
- 上一篇:沒有了
- 下一篇:沒有了
相關(guān)推薦
- 2022-11-17 React狀態(tài)管理Redux的使用介紹詳解_React
- 2022-09-15 golang?墻上時(shí)鐘與單調(diào)時(shí)鐘的實(shí)現(xiàn)_Golang
- 2022-08-11 python?tkinter中的錨點(diǎn)(anchor)問題及處理_python
- 2022-11-21 Go語(yǔ)言讀寫鎖RWMutex的源碼分析_Golang
- 2022-05-07 Python?識(shí)別錄音并轉(zhuǎn)為文字的實(shí)現(xiàn)_python
- 2022-08-16 C#?winform?請(qǐng)求http的實(shí)現(xiàn)(get,post)_C#教程
- 2023-12-12 設(shè)置線程名稱(兩種方法)
- 2022-06-18 C#實(shí)現(xiàn)無(wú)損壓縮圖片代碼示例_C#教程
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支