日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) PHP其他 正文

thinkphp6、thinkphp5.0 使用think-queue實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列

作者:小吳-斌 更新時(shí)間: 2023-08-30 PHP其他
  • 何為異步消息隊(duì)列:
    所謂消息隊(duì)列,就是一個(gè)以隊(duì)列數(shù)據(jù)結(jié)構(gòu)為基礎(chǔ)的一個(gè)實(shí)體,這個(gè)實(shí)體是真實(shí)存在的,比如程序中的數(shù)組,數(shù)據(jù)庫(kù)中的表,或者redis等等,都可以。

  • 異步隊(duì)列的作用:
    個(gè)人認(rèn)為消息隊(duì)列的主要特點(diǎn)是異步處理,主要目的是減少請(qǐng)求響應(yīng)時(shí)間和解耦。所以主要的使用場(chǎng)景就是將比較耗時(shí)而且不需要即時(shí)(同步)返回結(jié)果的操作作為消息放入消息隊(duì)列

composer 安裝 think-queue

https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md

# tp5.0
composer require topthink/think-queue=1.1.6

# tp5.1.x
composer require topthink/think-queue 2.0.4

# tp6
composer require topthink/think-queue

判斷是否安裝成功

php think queue:work -h

在這里插入圖片描述

配置文件

Tp5.0
文件位置:根目錄/config/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驅(qū)動(dòng)
    'expire'     => 60,        // 任務(wù)的過(guò)期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
    'default'    => 'default',        // 默認(rèn)的隊(duì)列名稱
    'host'       => '127.0.0.1',    // redis 主機(jī)ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密碼
    'select'     => 1,        // 使用哪一個(gè) db,默認(rèn)為 db0
    'timeout'    => 0,        // redis連接的超時(shí)時(shí)間
    'persistent' => false,        // 是否是長(zhǎng)連接
];

Tp5.1.x
文件位置:根目錄/config/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驅(qū)動(dòng)
    'expire'     => 60,        // 任務(wù)的過(guò)期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
    'default'    => 'default',        // 默認(rèn)的隊(duì)列名稱
    'host'       => '127.0.0.1',    // redis 主機(jī)ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密碼
    'select'     => 1,        // 使用哪一個(gè) db,默認(rèn)為 db0
    'timeout'    => 0,        // redis連接的超時(shí)時(shí)間
    'persistent' => false,        // 是否是長(zhǎng)連接
];

Tp6
配置文件在統(tǒng)一目錄下/config/queue.php

<?php

return [
    'default'     => 'redis', // 使用redis
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 1,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

在項(xiàng)目下新建一個(gè)Job目錄,編寫對(duì)應(yīng)的消費(fèi)者類

在這里插入圖片描述

Tp6

<?php


namespace app\job;

use think\facade\Log;
use think\queue\Job;

/**
 * 消費(fèi)者類
 * 用于處理隊(duì)列中的任務(wù)
 */
class CronJob
{


    /**
     * fire是消息隊(duì)列默認(rèn)調(diào)用的方法
     * @param Job $job 當(dāng)前的任務(wù)對(duì)象
     * @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
     */
    public function fire(Job $job, $data)
    {
        Log::channel('job')->info('一條測(cè)試日志');
        if (empty($data)) {
            Log::channel('job')->error(sprintf('[%s][%s] 隊(duì)列無(wú)消息', __CLASS__, __FUNCTION__));
            return;
        }

        //有效消息到達(dá)消費(fèi)者時(shí)可能已經(jīng)不再需要執(zhí)行了
        if (!$this->checkJob($data)) {
            $job->delete();
            Log::channel('job')->record("Job does not need to be executed");
            return;
        }
        //執(zhí)行業(yè)務(wù)處理
        if ($this->doJob($data)) {
            $job->delete();//任務(wù)執(zhí)行成功后刪除
            Log::channel('job')->record("job has been down and deleted");
        } else {
            //檢查任務(wù)重試次數(shù)
            if ($job->attempts() > 3) {
                Log::channel('job')->record("job has been retried more that 3 times");
//                $job->release(10); // 10秒后在執(zhí)行
                $job->delete(); // 刪除任務(wù)
            }
        }
    }

    /**
     * 消息在到達(dá)消費(fèi)者時(shí)可能已經(jīng)不需要執(zhí)行了
     * @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
     * @return boolean 任務(wù)執(zhí)行的結(jié)果
     */
    private function checkJob($data)
    {
        Log::channel('job')->record('驗(yàn)證任務(wù)是否需要執(zhí)行');
        return true;
    }

    /**
     * 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
     */
    private function doJob($data)
    {
        // 實(shí)際業(yè)務(wù)流程處理
        print_r($data['msg'] ?? '實(shí)際業(yè)務(wù)流程處理');
        Log::channel('job')->record('實(shí)際業(yè)務(wù)流程處理');
        return true;
    }

    function task1(){
        print_r("task 1");
    }


    public function failed($data)
    {
        // ...任務(wù)達(dá)到最大重試次數(shù)后,失敗了
        Log::channel('job')->error('任務(wù)達(dá)到最大重試次數(shù)后,失敗了');
    }

}

上面使用了日志通道 配置文件路徑 /config/log.php

<?php

// +----------------------------------------------------------------------
// | 日志設(shè)置
// +----------------------------------------------------------------------
return [
    // 默認(rèn)日志記錄通道
    'default'      => env('log.channel', 'file'),
    // 日志記錄級(jí)別
    'level'        => [],
    // 日志類型記錄的通道 ['error'=>'email',...]
    'type_channel'    =>    [],
    // 關(guān)閉全局日志寫入
    'close'        => false,
    // 全局日志處理 支持閉包
    'processor'    => null,

    // 日志通道列表
    'channels'     => [
        'file' => [
            // 日志記錄方式
            'type'           => 'File',
            // 日志保存目錄
            'path'           => '',
            // 單文件日志寫入
            'single'         => false,
            // 獨(dú)立日志級(jí)別
            'apart_level'    => [],
            // 最大日志文件數(shù)量
            'max_files'      => 0,
            // 使用JSON格式記錄
            'json'           => false,
            // 日志處理
            'processor'      => null,
            // 關(guān)閉通道日志寫入
            'close'          => false,
            // 日志輸出格式化
            'format'         => '[%s][%s] %s',
            // 是否實(shí)時(shí)寫入
            'realtime_write' => false,
        ],
        // 其它日志通道配置
        'job'    =>    [
            'type' => 'File',
//            'path' => app()->getRootPath() . 'runtime/pay', // 重點(diǎn)這個(gè)路徑要寫
            'path' => app()->getRuntimePath() . 'pay', // 重點(diǎn)這個(gè)路徑要寫
            'time_format' => 'Y-m-d H:i:s',
            'format' => '[%s][%s]:%s'
        ],
    ],

];

控制器編寫測(cè)試代碼

<?php

namespace app\api\controller\v1;


use app\job\CronJob;
use think\facade\Queue;

class User
{
    public function index(): string
    {
        return 'v1/user/index2';
    }

    /**
     * 投遞消息(生產(chǎn)者)
     * @return string
     */
    public function push(): string
    {

        //  queue的 push方法 第一個(gè)參數(shù)可以接收字符或者對(duì)象字符串
        $job = 'app\Job\CronJob'; // 當(dāng)前任務(wù)由哪個(gè)類負(fù)責(zé)處理
        $queueName = 'cron_job_queue';  // 當(dāng)前隊(duì)列歸屬的隊(duì)列名稱
        //  // 當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù)
        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $data['user_id'] = 1;
//        $res = Queue::push(CronJob::class, $data, $queueName);  // 可以自動(dòng)獲取
        $res = Queue::push($job, $data, $queueName);   // 可以手動(dòng)指定 -
        $data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(10, $job, $data, $queueName);   // 10秒后執(zhí)行
        $data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName);   // 10秒后執(zhí)行
        if ($res == false) {
            return '消息投遞失敗';
        } else {
            return '消息投遞成功';
        }
    }
}


測(cè)試

簡(jiǎn)述下work和listen在thinkphp框架中明顯區(qū)別

1、work模式只能跑一條隊(duì)列數(shù)據(jù),如果想持續(xù)運(yùn)行必須加 --daemon 參數(shù);
2、work模式速度和性能更快,每次執(zhí)行無(wú)需再加載框架所有文件,因此如果想修改隊(duì)列代碼必須重啟隊(duì)列,listen模式每次執(zhí)行都會(huì)重新加載thinkphp文件,相當(dāng)于用戶訪問一個(gè)接口一樣;
3、work模式因?yàn)闊o(wú)需重復(fù)加載框架文件,因此也會(huì)帶來(lái)MySQL鏈接超時(shí)的煩惱,目前thinkphp5.x全系版本無(wú)法解決此問題,所以如果隊(duì)列中有操作數(shù)據(jù)庫(kù)的不建議使用此模式;

php think queue:work --queue cron_job_queue

php think queue:listen --daemon --queue helloJobQueue

# linux上以守護(hù)進(jìn)程方式運(yùn)行
nohup php think queue:work  --daemon --queue cron_job_queue &

在這里插入圖片描述

在這里插入圖片描述

原文鏈接:https://blog.csdn.net/qq_23564667/article/details/127919660

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新