網站首頁 Thinkphp 正文
本文簡單介紹消息隊列的使用,以及在使用過程中的注意事項。
項目環境:thinkphp6.0.3 + think-queue3.0
一、think-queue 安裝與基本使用
1、使用 composer
安裝 thinkphp6
與 think-queue
# 安裝 thinkphp6
composer create-project topthink/think 6.0.*
# 安裝 think-queue
composer require topthink/think-queue
2、queue
的配置
think-queue
內置了 Redis,Database,Sync等驅動,
這里介紹使用Redis做消息隊列存儲,
結合supervisor進程管理使隊列常駐進程。
配置文件位于 config/queue.php
return [
// 驅動類型,可選擇 sync(默認):同步執行,database:數據庫驅動,redis:Redis驅動//或其他自定義的完整的類名
'default' => 'redis', // Redis驅動
'connections' => [
...
'redis' => [
'type' => 'redis', // Redis 驅動
'queue' => 'default', // 默認的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '123456', // redis 密碼
'select' => 0, // 使用哪一個 db,默認為 db0
'timeout' => 0, // redis連接的超時時間
'persistent' => false, // 是否是長連接
],
...
]
];
3、創建任務類
單模塊項目推薦使用 app\job 作為任務類的命名空間 多模塊項目可用使用 app\module\job 作為任務類的命名空間 也可以放在任意可以自動加載到的地方
任務類不需繼承任何類,如果這個類只有一個任務,那么就只需要提供一個fire方法就可以了,如果有多個小任務,就寫多個方法,下面發布任務的時候會有區別
每個方法會傳入兩個參數 think\queue\Job $job(當前的任務對象) 和 $data(發布任務時自定義的數據)
namespace app\job;
use think\queue\Job;
class MyJob
{
public function fire(Job $job, $data){
print("開始執行隊列"." \n");
//....這里執行具體的任務
if ($job->attempts() > 3) {
//通過這個方法可以檢查這個任務已經重試了幾次了
}
//如果任務執行成功后 記得刪除任務,不然這個任務會重復執行,直到達到最大重試次數后失敗后,執行failed方法
$job->delete();
// 也可以重新發布這個任務
$job->release($delay); //$delay為延遲時間
}
public function failed($data){
// ...任務達到最大重試次數后,失敗了
print("隊列執行失敗"." \n");
}
}
4、發布任務(生產者)
think\facade\Queue::push($job, $data = '', $queue = null)
think\facade\Queue::later($delay, $job, $data = '', $queue = null)
# 兩個方法,前者是立即執行,后者是在$delay秒后執行
- $job 是任務名
單模塊的,且命名空間是app\job
的,比如上面的例子一,寫Job1類名即可
多模塊的,且命名空間是app\module\job
的,寫model/Job1
其他的需要些完整的類名,比如上面的例子二,需要寫完整的類名app\lib\job\Job2
如果一個任務類里有多個小任務的話,如上面的例子二,需要用@+方法名
,app\lib\job\Job2@task1
、app\lib\job\Job2@task2
- $data 是你要傳到任務里的參數
- $queue 隊列名,指定這個任務是在哪個隊列上執行,同下面監控隊列的時候指定的隊列名,可不填
namespace app\controller;
use think\facade\Queue;
class MyQueue extends Base
{
public function index($job_name = '', $job_class = '', $job_data = [], $type = 'later', $time = 5) //jio_data 需要發送的數據
{
// 當前任務將由哪個類來負責處理。
$job_class = "app\job\MyJob@fire"; // 任務類 - 執行時調用該類的deal方法
$job_queue_name = 'Check'; // 隊列名稱
empty($job_data) && $job_data = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ];
switch ($type) {
case 'later':
$is_push = Queue::later($time, $job_class, $job_data, $job_queue_name); // 延遲發送任務 5秒
break;
case 'push':
$is_push = Queue::push($job_class, $job_data, $job_queue_name); // 立即發送任務
break;
}
// database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false
if ($is_push !== false) {
echo date('Y-m-d H:i:s') . " 新增任務" . "
";
} else {
echo '新增任務錯誤';
}
}
}打開瀏覽器訪問
http://域名/MyQueue/index
,就可以看到任務新增成功5、處理任務(消費者)
打開終端切換到當前項目根目錄下,執行下面的命令:
work和listen的區別詳細點擊(think-queue 解析上)查看,命令 描述 php think queue:work 監聽隊列 php think queue:listen 監聽隊列 php think queue:restart 重啟隊列 php think queue:subscribe 暫無,可能是保留的 官方有什么其他想法但是還沒實現
兩種,具體的可選參數可以輸入命令加 --help 查看。
(注意:使用不同版本的thinkphp時,可選用的參數不同)
(如:thinkphp6中就取消掉了--daemon
參數,隊列默認為循環模式)php think queue:work --queue Check
二、supervisor的安裝和配置
supervisor是用Python開發的一個client/server服務,是Linux/Unix系統下的一個進程管理工具。
可以很方便的監聽、啟動、停止、重啟一個或多個進程。1、安裝
1.配置好yum源后,可以直接安裝
yum install supervisor
2.Debian/Ubuntu可通過apt安裝
apt-get install supervisor
3.Python使用pip安裝
pip install supervisor
2、supervisor的相關配置
本項目演示使用yum安裝:
打開supervisor
的主配置文件:/etc/supervisord.conf
,找到最后一行[include]
;files = supervisord.d/*.ini
files = /etc/supervisord.d/*.conf最后一行,個人習慣命名conf。
進入子進程配置文件路徑:/etc/supervisord.d/
,新建Check.conf
,相關配置說明:# 項目名
[program:Check]
# 腳本目錄
directory=/www/.../Check
# 腳本執行命令
command=php think queue:work --queue Check
# supervisor啟動的時候是否隨著同時啟動,默認True
autostart=true
# 當程序exit的時候,這個program不會自動重啟,默認unexpected,設置子進程掛掉后自動重啟的情況,有三個選項,false,unexpected和true。如果為false的時候,無論什么情況下,都不會被重新啟動,如果為unexpected,只有當進程的退出碼不在下面的exitcodes里面定義的
autorestart=false
# 這個選項是子進程啟動多少秒之后,此時狀態如果是running,則我們認為啟動成功了。默認值為1
startsecs=1
# 腳本運行的用戶身份
user=www
# 日志輸出
stderr_logfile=/tmp/Check_stderr.log
stdout_logfile=/tmp/Check_stdout.log
# 把stderr重定向到stdout,默認 false
redirect_stderr = true
# stdout日志文件大小,默認 50MB
stdout_logfile_maxbytes = 20M
# stdout日志文件備份數
stdout_logfile_backups = 20#說明同上
[program:Check]
directory=/www/.../Check
command=php think queue:work --queue Check --tries 10
autostart=true
autorestart=false
stderr_logfile=/tmp/Check_stderr.log
stdout_logfile=/tmp/Check_stdout.log
#user = www對于單模塊而言,不同的業務邏輯為了區分可能會存在多個隊列名,這種情況將多個隊列名用逗號拼接起來,內容如下:
[program:queue]
user=www
command=php /www/wwwroot/tpqueue/think queue:work --queue Check,Check1,Check23、supervisor相關命令
啟動 supervisor
supervisorctl -c /etc/supervisord.conf
上面這個命令會進入 supervisorctl 的 shell 界面,然后可以執行不同的命令了:
supervisorctl restart
// 重啟指定應用
supervisorctl stop// 停止指定應用
supervisorctl start// 啟動指定應用
supervisorctl restart all // 重啟所有應用
supervisorctl stop all // 停止所有應用
supervisorctl start all // 啟動所有應用
supervisorctl status //查看所有進程的狀態
supervisorctl update //配置文件修改后使用該命令加載新的配置
supervisorctl reload //重新啟動配置中的所有程序若是centos7,加入開機啟動:
systemctl start supervisord //啟動supervisor并加載默認配置文件
systemctl enable supervisord //將supervisor加入開機啟動項執行命令來驗證是否為開機啟動
systemctl is-enabled supervisord
注意事項:
1、在消息對列表執行過程中,當消費者執行隊列出現問題時(達到失敗后最大嘗試次數),將會自動化清除當前隊列。
原文鏈接:https://fengkui.net/article/137
- 上一篇:tp5.1 獲取原始上傳文件名
- 下一篇:基于ThinkPHP刪除目錄及目錄文件函數
相關推薦
- 2023-10-16 使用elemnt移動端字體大小會自動變小
- 2022-04-05 expected a string (for built-in components) or a c
- 2021-12-14 常用時間處理方法
- 2022-10-22 關于分布式鎖的三種實現方式_Redis
- 2022-09-27 OpenCV中findContours函數參數詳解_C 語言
- 2022-04-17 Bootstrap typeahead自動補全插件的坑
- 2023-05-31 pandas.DataFrame的for循環迭代的實現_python
- 2022-09-30 LeetCode189輪轉數組python示例_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支