首頁 > 軟體

laravel原始碼分析佇列Queue方法範例

2022-04-05 13:00:06

前言

佇列 (Queue) 是 laravel 中比較常用的一個功能,佇列的目的是將耗時的任務延時處理,比如傳送郵件,從而大幅度縮短 Web 請求和響應的時間。本文我們就來分析下佇列建立和執行的原始碼。

佇列任務的建立

先通過命令建立一個 Job 類,成功之後會建立如下檔案 laravel-src/laravel/app/Jobs/DemoJob.php。

> php artisan make:job DemoJob
 
> Job created successfully.

下面我們來分析一下 Job 類的具體生成過程。

執行 php artisan make:job DemoJob 後,會觸發呼叫如下方法。

laravel-src/laravel/vendor/laravel/framework/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

/**
 * Register the command.
 * [A] make:job 時觸發的方法
 * @return void
 */
protected function registerJobMakeCommand()
{
    $this->app->singleton('command.job.make', function ($app) {
        return new JobMakeCommand($app['files']);
    });
}

接著我們來看下 JobMakeCommand 這個類,這個類裡面沒有過多的處理邏輯,處理方法在其父類別中。

 class JobMakeCommand extends GeneratorCommand

我們直接看父類別中的處理方法,GeneratorCommand->handle(),以下是該方法中的主要方法。

public function handle()
{
    // 獲取類名
    $name = $this->qualifyClass($this->getNameInput());
    // 獲取檔案路徑
    $path = $this->getPath($name);
    // 建立目錄和檔案
    $this->makeDirectory($path);
    // buildClass() 通過模板獲取新類檔案的內容
    $this->files->put($path, $this->buildClass($name));
    // $this->type 在子類中定義好了,例如 JobMakeCommand 中 type = 'Job'
    $this->info($this->type.' created successfully.');
}

方法就是通過目錄和檔案,建立對應的類檔案,至於新檔案的內容,都是基於已經設定好的模板來建立的,具體的內容在 buildClass($name) 方法中。

 protected function buildClass($name)
{
    // 得到類檔案模板,getStub() 在子類中有實現,具體看 JobMakeCommand 
    $stub = $this->files->get($this->getStub());
    // 用實際的name來替換模板中的內容,都是關鍵詞替換
    return $this->replaceNamespace($stub, $name)->replaceClass($stub, $name);
}

獲取模板檔案

protected function getStub()
{
    return $this->option('sync')
                    ? __DIR__.'/stubs/job.stub'
                    : __DIR__.'/stubs/job-queued.stub';
}

job.stub

 <?php
/**
* job 類的生成模板
*/
namespace DummyNamespace;
 
use IlluminateBusQueueable;
use IlluminateFoundationBusDispatchable;
 
class DummyClass
{
    use Dispatchable, Queueable;
 
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

job-queued.stub

<?php
/**
* job 類的生成模板
*/
namespace DummyNamespace;
 
use IlluminateBusQueueable;
use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;
 
class DummyClass implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

下面看一下前面我們建立的一個Job類,DemoJob.php,就是來源於模板 job-queued.stub。

<?php
/**
* job 類的生成模板
*/
namespace AppJobs;
use IlluminateBusQueueable;
use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;
class DemoJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

至此,我們已經大致明白了佇列任務類是如何建立的了。下面我們來分析下其是如何生效執行的。

佇列任務的分發

任務類建立後,我們就可以在需要的地方進行任務的分發,常見的方法如下:

DemoJob::dispatch(); // 任務分發
DemoJob::dispatchNow(); // 同步排程,佇列任務不會排隊,並立即在當前程序中進行

下面先以 dispatch() 為例分析下分發過程。

trait Dispatchable
{
    public static function dispatch()
    {
        return new PendingDispatch(new static(...func_get_args()));
    }
}
 class PendingDispatch
{
    protected $job;
 
    public function __construct($job)
    {   echo '[Max] ' . 'PendingDispatch ' . '__construct' . PHP_EOL;
        $this->job = $job;
    }
    public function __destruct()
    {   echo '[Max] ' . 'PendingDispatch ' . '__destruct' . PHP_EOL;
        app(Dispatcher::class)->dispatch($this->job);
    }
}

重點是 app(Dispatcher::class)->dispatch($this->job) 這部分。

我們先來分析下前部分 app(Dispatcher::class),它是在 laravel 框架中自帶的 BusServiceProvider 中向 $app 中注入的。

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });
    }
}

看一下 Dispatcher 的構造方法,至此,我們已經知道前半部分 app(Dispatcher::class) 是如何來的了。

class Dispatcher implements QueueingDispatcher
{
    protected $container;
    protected $pipeline;
    protected $queueResolver;
 
    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        /**
         * Illuminate/Bus/BusServiceProvider.php->register()中
         * $queueResolver 傳入的是一個閉包
         * function ($connection = null) use ($app) {
         *   return $app[QueueFactoryContract::class]->connection($connection);
         * }
         */
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }
    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        		// 將 $command 存入佇列
            return $this->dispatchToQueue($command);
        }
        return $this->dispatchNow($command);
    }
}

BusServiceProvider 中註冊了 Dispatcher::class ,然後 app(Dispatcher::class)->dispatch($this->job) 呼叫的即是 Dispatcher->dispatch()。

 public function dispatchToQueue($command)
{
    // 獲取任務所屬的 connection
    $connection = $command->connection ?? null;
    /*
     * 獲取佇列範例,根據config/queue.php中的設定
     * 此處我們設定 QUEUE_CONNECTION=redis 為例,則獲取的是RedisQueue
     * 至於如何通過 QUEUE_CONNECTION 的設定獲取 queue ,此處先跳過,本文後面會具體分析。
     */
    $queue = call_user_func($this->queueResolver, $connection);
    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }
    // 我們建立的DemoJob無queue方法,則不會呼叫
    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }
    // 將 job 放入佇列
    return $this->pushCommandToQueue($queue, $command);
}
protected function pushCommandToQueue($queue, $command)
{
    // 在指定了 queue 或者 delay 時會呼叫不同的方法,基本大同小異
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }
    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }
    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }
    // 此處我們先看最簡單的無引數時的情況,呼叫push()
    return $queue->push($command);
}

筆者的設定是 QUEUE_CONNECTION=redis ,估以此來分析,其他型別的原理基本類似。

設定的是 redis 時, $queue 是 RedisQueue 範例,下面我們看下 RedisQueue->push() 的內容。

Illuminate/Queue/RedisQueue.php

public function push($job, $data = '', $queue = null)
{
    /**
     * 獲取佇列名稱
     * var_dump($this->getQueue($queue));
     * 建立統一的 payload,轉成 json
     * var_dump($this->createPayload($job, $this->getQueue($queue), $data));
     */
    // 將任務和資料存入佇列
    return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
    // 寫入redis中
    $this->getConnection()->eval(
        LuaScripts::push(), 2, $this->getQueue($queue),
        $this->getQueue($queue).':notify', $payload
    );
    // 返回id
    return json_decode($payload, true)['id'] ?? null;
}

至此,我們已經分析完了任務是如何被加入到佇列中的。

以上就是laravel原始碼分析佇列Queue方法範例的詳細內容,更多關於laravel原始碼分析佇列Queue方法的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com