網(wǎng)站首頁 PHP其他 正文
-
何為異步消息隊列:
所謂消息隊列,就是一個以隊列數(shù)據(jù)結構為基礎的一個實體,這個實體是真實存在的,比如程序中的數(shù)組,數(shù)據(jù)庫中的表,或者redis等等,都可以。 -
異步隊列的作用:
個人認為消息隊列的主要特點是異步處理,主要目的是減少請求響應時間和解耦。所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列
composer 安裝 think-queue
https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
# tp5.0
composer require topthink/think-queue=1.1.6
# tp5.1.x
composer require topthink/think-queue 2.0.4
# tp6
composer require topthink/think-queue
判斷是否安裝成功
php think queue:work -h
配置文件
Tp5.0
文件位置:根目錄/config/queue.php
return [
// 'connector' => 'Sync'
'connector' => 'Redis', // Redis 驅動
'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
'default' => 'default', // 默認的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 1, // 使用哪一個 db,默認為 db0
'timeout' => 0, // redis連接的超時時間
'persistent' => false, // 是否是長連接
];
Tp5.1.x
文件位置:根目錄/config/queue.php
return [
// 'connector' => 'Sync'
'connector' => 'Redis', // Redis 驅動
'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
'default' => 'default', // 默認的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 1, // 使用哪一個 db,默認為 db0
'timeout' => 0, // redis連接的超時時間
'persistent' => false, // 是否是長連接
];
Tp6
配置文件在統(tǒng)一目錄下/config/queue.php
<?php
return [
'default' => 'redis', // 使用redis
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 1,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
在項目下新建一個Job目錄,編寫對應的消費者類
Tp6
<?php
namespace app\job;
use think\facade\Log;
use think\queue\Job;
/**
* 消費者類
* 用于處理隊列中的任務
*/
class CronJob
{
/**
* fire是消息隊列默認調用的方法
* @param Job $job 當前的任務對象
* @param array|mixed $data 發(fā)布任務時自定義的數(shù)據(jù)
*/
public function fire(Job $job, $data)
{
Log::channel('job')->info('一條測試日志');
if (empty($data)) {
Log::channel('job')->error(sprintf('[%s][%s] 隊列無消息', __CLASS__, __FUNCTION__));
return;
}
//有效消息到達消費者時可能已經(jīng)不再需要執(zhí)行了
if (!$this->checkJob($data)) {
$job->delete();
Log::channel('job')->record("Job does not need to be executed");
return;
}
//執(zhí)行業(yè)務處理
if ($this->doJob($data)) {
$job->delete();//任務執(zhí)行成功后刪除
Log::channel('job')->record("job has been down and deleted");
} else {
//檢查任務重試次數(shù)
if ($job->attempts() > 3) {
Log::channel('job')->record("job has been retried more that 3 times");
// $job->release(10); // 10秒后在執(zhí)行
$job->delete(); // 刪除任務
}
}
}
/**
* 消息在到達消費者時可能已經(jīng)不需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務時自定義的數(shù)據(jù)
* @return boolean 任務執(zhí)行的結果
*/
private function checkJob($data)
{
Log::channel('job')->record('驗證任務是否需要執(zhí)行');
return true;
}
/**
* 根據(jù)消息中的數(shù)據(jù)進行實際的業(yè)務處理
*/
private function doJob($data)
{
// 實際業(yè)務流程處理
print_r($data['msg'] ?? '實際業(yè)務流程處理');
Log::channel('job')->record('實際業(yè)務流程處理');
return true;
}
function task1(){
print_r("task 1");
}
public function failed($data)
{
// ...任務達到最大重試次數(shù)后,失敗了
Log::channel('job')->error('任務達到最大重試次數(shù)后,失敗了');
}
}
上面使用了日志通道 配置文件路徑 /config/log.php
<?php
// +----------------------------------------------------------------------
// | 日志設置
// +----------------------------------------------------------------------
return [
// 默認日志記錄通道
'default' => env('log.channel', 'file'),
// 日志記錄級別
'level' => [],
// 日志類型記錄的通道 ['error'=>'email',...]
'type_channel' => [],
// 關閉全局日志寫入
'close' => false,
// 全局日志處理 支持閉包
'processor' => null,
// 日志通道列表
'channels' => [
'file' => [
// 日志記錄方式
'type' => 'File',
// 日志保存目錄
'path' => '',
// 單文件日志寫入
'single' => false,
// 獨立日志級別
'apart_level' => [],
// 最大日志文件數(shù)量
'max_files' => 0,
// 使用JSON格式記錄
'json' => false,
// 日志處理
'processor' => null,
// 關閉通道日志寫入
'close' => false,
// 日志輸出格式化
'format' => '[%s][%s] %s',
// 是否實時寫入
'realtime_write' => false,
],
// 其它日志通道配置
'job' => [
'type' => 'File',
// 'path' => app()->getRootPath() . 'runtime/pay', // 重點這個路徑要寫
'path' => app()->getRuntimePath() . 'pay', // 重點這個路徑要寫
'time_format' => 'Y-m-d H:i:s',
'format' => '[%s][%s]:%s'
],
],
];
控制器編寫測試代碼
<?php
namespace app\api\controller\v1;
use app\job\CronJob;
use think\facade\Queue;
class User
{
public function index(): string
{
return 'v1/user/index2';
}
/**
* 投遞消息(生產者)
* @return string
*/
public function push(): string
{
// queue的 push方法 第一個參數(shù)可以接收字符或者對象字符串
$job = 'app\Job\CronJob'; // 當前任務由哪個類負責處理
$queueName = 'cron_job_queue'; // 當前隊列歸屬的隊列名稱
// // 當前任務所需的業(yè)務數(shù)據(jù)
$data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
$data['user_id'] = 1;
// $res = Queue::push(CronJob::class, $data, $queueName); // 可以自動獲取
$res = Queue::push($job, $data, $queueName); // 可以手動指定 -
$data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
$res = Queue::later(10, $job, $data, $queueName); // 10秒后執(zhí)行
$data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
$res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName); // 10秒后執(zhí)行
if ($res == false) {
return '消息投遞失敗';
} else {
return '消息投遞成功';
}
}
}
測試
簡述下work和listen在thinkphp框架中明顯區(qū)別
1、work模式只能跑一條隊列數(shù)據(jù),如果想持續(xù)運行必須加 --daemon 參數(shù);
2、work模式速度和性能更快,每次執(zhí)行無需再加載框架所有文件,因此如果想修改隊列代碼必須重啟隊列,listen模式每次執(zhí)行都會重新加載thinkphp文件,相當于用戶訪問一個接口一樣;
3、work模式因為無需重復加載框架文件,因此也會帶來MySQL鏈接超時的煩惱,目前thinkphp5.x全系版本無法解決此問題,所以如果隊列中有操作數(shù)據(jù)庫的不建議使用此模式;
php think queue:work --queue cron_job_queue
php think queue:listen --daemon --queue helloJobQueue
# linux上以守護進程方式運行
nohup php think queue:work --daemon --queue cron_job_queue &
原文鏈接:https://blog.csdn.net/qq_23564667/article/details/127919660
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2024-03-15 Spring Framework對DAO(Data Access Object)的支持
- 2022-11-14 Python語言中Tuple的由來分析_python
- 2022-08-05 C++詳細講解模擬實現(xiàn)位圖和布隆過濾器的方法_C 語言
- 2022-11-25 Vmware臨時文件存放路徑_VMware
- 2022-09-25 C#基礎--特殊的集合
- 2022-08-15 Android開發(fā)gradle拉取依賴的加速配置_Android
- 2022-12-25 go?slice不同初始化方式性能及數(shù)組比較詳解_Golang
- 2023-03-03 Android獲取RecyclerView滑動距離方法詳細講解_Android
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結構-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支