日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網(wǎng)站首頁 PHP其他 正文

thinkphp6、thinkphp5.0 使用think-queue實現(xiàn)普通隊列和延遲隊列

作者:小吳-斌 更新時間: 2023-08-30 PHP其他
  • 何為異步消息隊列:
    所謂消息隊列,就是一個以隊列數(shù)據(jù)結構為基礎的一個實體,這個實體是真實存在的,比如程序中的數(shù)組,數(shù)據(jù)庫中的表,或者redis等等,都可以。

  • 異步隊列的作用:
    個人認為消息隊列的主要特點是異步處理,主要目的是減少請求響應時間和解耦。所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列

composer 安裝 think-queue

https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md

# tp5.0
composer require topthink/think-queue=1.1.6

# tp5.1.x
composer require topthink/think-queue 2.0.4

# tp6
composer require topthink/think-queue

判斷是否安裝成功

php think queue:work -h

在這里插入圖片描述

配置文件

Tp5.0
文件位置:根目錄/config/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驅動
    'expire'     => 60,        // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
    'default'    => 'default',        // 默認的隊列名稱
    'host'       => '127.0.0.1',    // redis 主機ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密碼
    'select'     => 1,        // 使用哪一個 db,默認為 db0
    'timeout'    => 0,        // redis連接的超時時間
    'persistent' => false,        // 是否是長連接
];

Tp5.1.x
文件位置:根目錄/config/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驅動
    'expire'     => 60,        // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
    'default'    => 'default',        // 默認的隊列名稱
    'host'       => '127.0.0.1',    // redis 主機ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密碼
    'select'     => 1,        // 使用哪一個 db,默認為 db0
    'timeout'    => 0,        // redis連接的超時時間
    'persistent' => false,        // 是否是長連接
];

Tp6
配置文件在統(tǒng)一目錄下/config/queue.php

<?php

return [
    'default'     => 'redis', // 使用redis
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 1,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

在項目下新建一個Job目錄,編寫對應的消費者類

在這里插入圖片描述

Tp6

<?php


namespace app\job;

use think\facade\Log;
use think\queue\Job;

/**
 * 消費者類
 * 用于處理隊列中的任務
 */
class CronJob
{


    /**
     * fire是消息隊列默認調用的方法
     * @param Job $job 當前的任務對象
     * @param array|mixed $data 發(fā)布任務時自定義的數(shù)據(jù)
     */
    public function fire(Job $job, $data)
    {
        Log::channel('job')->info('一條測試日志');
        if (empty($data)) {
            Log::channel('job')->error(sprintf('[%s][%s] 隊列無消息', __CLASS__, __FUNCTION__));
            return;
        }

        //有效消息到達消費者時可能已經(jīng)不再需要執(zhí)行了
        if (!$this->checkJob($data)) {
            $job->delete();
            Log::channel('job')->record("Job does not need to be executed");
            return;
        }
        //執(zhí)行業(yè)務處理
        if ($this->doJob($data)) {
            $job->delete();//任務執(zhí)行成功后刪除
            Log::channel('job')->record("job has been down and deleted");
        } else {
            //檢查任務重試次數(shù)
            if ($job->attempts() > 3) {
                Log::channel('job')->record("job has been retried more that 3 times");
//                $job->release(10); // 10秒后在執(zhí)行
                $job->delete(); // 刪除任務
            }
        }
    }

    /**
     * 消息在到達消費者時可能已經(jīng)不需要執(zhí)行了
     * @param array|mixed $data 發(fā)布任務時自定義的數(shù)據(jù)
     * @return boolean 任務執(zhí)行的結果
     */
    private function checkJob($data)
    {
        Log::channel('job')->record('驗證任務是否需要執(zhí)行');
        return true;
    }

    /**
     * 根據(jù)消息中的數(shù)據(jù)進行實際的業(yè)務處理
     */
    private function doJob($data)
    {
        // 實際業(yè)務流程處理
        print_r($data['msg'] ?? '實際業(yè)務流程處理');
        Log::channel('job')->record('實際業(yè)務流程處理');
        return true;
    }

    function task1(){
        print_r("task 1");
    }


    public function failed($data)
    {
        // ...任務達到最大重試次數(shù)后,失敗了
        Log::channel('job')->error('任務達到最大重試次數(shù)后,失敗了');
    }

}

上面使用了日志通道 配置文件路徑 /config/log.php

<?php

// +----------------------------------------------------------------------
// | 日志設置
// +----------------------------------------------------------------------
return [
    // 默認日志記錄通道
    'default'      => env('log.channel', 'file'),
    // 日志記錄級別
    'level'        => [],
    // 日志類型記錄的通道 ['error'=>'email',...]
    'type_channel'    =>    [],
    // 關閉全局日志寫入
    'close'        => false,
    // 全局日志處理 支持閉包
    'processor'    => null,

    // 日志通道列表
    'channels'     => [
        'file' => [
            // 日志記錄方式
            'type'           => 'File',
            // 日志保存目錄
            'path'           => '',
            // 單文件日志寫入
            'single'         => false,
            // 獨立日志級別
            'apart_level'    => [],
            // 最大日志文件數(shù)量
            'max_files'      => 0,
            // 使用JSON格式記錄
            'json'           => false,
            // 日志處理
            'processor'      => null,
            // 關閉通道日志寫入
            'close'          => false,
            // 日志輸出格式化
            'format'         => '[%s][%s] %s',
            // 是否實時寫入
            'realtime_write' => false,
        ],
        // 其它日志通道配置
        'job'    =>    [
            'type' => 'File',
//            'path' => app()->getRootPath() . 'runtime/pay', // 重點這個路徑要寫
            'path' => app()->getRuntimePath() . 'pay', // 重點這個路徑要寫
            'time_format' => 'Y-m-d H:i:s',
            'format' => '[%s][%s]:%s'
        ],
    ],

];

控制器編寫測試代碼

<?php

namespace app\api\controller\v1;


use app\job\CronJob;
use think\facade\Queue;

class User
{
    public function index(): string
    {
        return 'v1/user/index2';
    }

    /**
     * 投遞消息(生產者)
     * @return string
     */
    public function push(): string
    {

        //  queue的 push方法 第一個參數(shù)可以接收字符或者對象字符串
        $job = 'app\Job\CronJob'; // 當前任務由哪個類負責處理
        $queueName = 'cron_job_queue';  // 當前隊列歸屬的隊列名稱
        //  // 當前任務所需的業(yè)務數(shù)據(jù)
        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $data['user_id'] = 1;
//        $res = Queue::push(CronJob::class, $data, $queueName);  // 可以自動獲取
        $res = Queue::push($job, $data, $queueName);   // 可以手動指定 -
        $data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(10, $job, $data, $queueName);   // 10秒后執(zhí)行
        $data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName);   // 10秒后執(zhí)行
        if ($res == false) {
            return '消息投遞失敗';
        } else {
            return '消息投遞成功';
        }
    }
}


測試

簡述下work和listen在thinkphp框架中明顯區(qū)別

1、work模式只能跑一條隊列數(shù)據(jù),如果想持續(xù)運行必須加 --daemon 參數(shù);
2、work模式速度和性能更快,每次執(zhí)行無需再加載框架所有文件,因此如果想修改隊列代碼必須重啟隊列,listen模式每次執(zhí)行都會重新加載thinkphp文件,相當于用戶訪問一個接口一樣;
3、work模式因為無需重復加載框架文件,因此也會帶來MySQL鏈接超時的煩惱,目前thinkphp5.x全系版本無法解決此問題,所以如果隊列中有操作數(shù)據(jù)庫的不建議使用此模式;

php think queue:work --queue cron_job_queue

php think queue:listen --daemon --queue helloJobQueue

# linux上以守護進程方式運行
nohup php think queue:work  --daemon --queue cron_job_queue &

在這里插入圖片描述

在這里插入圖片描述

原文鏈接:https://blog.csdn.net/qq_23564667/article/details/127919660

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新