从源码看Laravel queue:listen queue:work queue:work --daemon的区别

功能概述

Laravel的队列监听有多个命令可以实现,其中queue:listenqueue:work --daemon仅仅从运行命令后的的表现看不出明显的区别,那么他们到底有什么区别呢?laravelqueue:listenqueue:work的描述分别是:

queue:listen:Listen to a given queue//监听一个队列
queue:work:Start processing jobs on the queue as a daemon//作为守护进程处理队列中的任务

但是好像并没有说清楚根本区别在哪里,所以我们还是简单的查看下源码来了解下根本区别.

源码解读

queue:listen:

文件位置:Illuminate\Queue\Console命名空间下的ListenCommand.php,入口方法为fire



public function fire()
{
    // We need to get the right queue for the connection which is set in the queue
    // configuration file for the application. We will pull it based on the set
    // connection being run for the queue operation currently being executed.
    $queue = $this->getQueue(
        $connection = $this->input->getArgument('connection')
    );

    $this->listener->listen(
        $connection, $queue, $this->gatherOptions()
    );
}

首先获取要监听的队列,这没什么好说的,接着调用了listener(这个依赖被注入)中的listen方法。

public function listen($connection, $queue, ListenerOptions $options)
{
    $process = $this->makeProcess($connection, $queue, $options);

    while (true) {
        $this->runProcess($process, $options->memory);
    }
}

可以看到在listen方法中先创建了一个任务进程,然后死循环调用runProcess方法。我们继续追踪makeProcess,runProcess方法:

public function makeProcess($connection, $queue, ListenerOptions $options)
{
    $command = $this->workerCommand;

    // If the environment is set, we will append it to the command string so the
    // workers will run under the specified environment. Otherwise, they will
    // just run under the production environment which is not always right.
    if (isset($options->environment)) {
        $command = $this->addEnvironment($command, $options);
    }

    // Next, we will just format out the worker commands with all of the various
    // options available for the command. This will produce the final command
    // line that we will pass into a Symfony process object for processing.
    $command = $this->formatCommand(
        $command, $connection, $queue, $options
    );

    return new Process(
        $command, $this->commandPath, null, null, $options->timeout
    );
}

可以看到makeProcess是创建了一个命令,命令的内容在构造方法里面:

public function __construct($commandPath)
{
    $this->commandPath = $commandPath;
    $this->workerCommand = $this->buildCommandTemplate();
}

/**
 * Build the environment specific worker command.
 *
 * @return string
 */
protected function buildCommandTemplate()
{
    $command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';

    return "{$this->phpBinary()} {$this->artisanBinary()} {$command}";
}

构造方法中直接创建了queue:work命令,没有错,queue:listen命令创建了queue:work命令,具体的执行在runProcess方法中,runProcess中对于运行内存超高,任务处理超时的相关代码我们就先不研究了,我们直接看它是怎么执行的,最后追踪到:

public function start(callable $callback = null)
{
    if ($this->isRunning()) {
        throw new RuntimeException('Process is already running');
    }

    $this->resetProcessData();
    $this->starttime = $this->lastOutputTime = microtime(true);
    $this->callback = $this->buildCallback($callback);
    $this->hasCallback = null !== $callback;
    $descriptors = $this->getDescriptors();

    $commandline = $this->commandline;
    $envline = '';

    if (null !== $this->env && $this->inheritEnv) {
        if ('\\' === DIRECTORY_SEPARATOR && !empty($this->options['bypass_shell']) && !$this->enhanceWindowsCompatibility) {
            throw new LogicException('The "bypass_shell" option must be false to inherit environment variables while enhanced Windows compatibility is off');
        }
        $env = '\\' === DIRECTORY_SEPARATOR ? '(SET %s)&&' : 'export %s;';
        foreach ($this->env as $k => $v) {
            $envline .= sprintf($env, ProcessUtils::escapeArgument("$k=$v"));
        }
        $env = null;
    } else {
        $env = $this->env;
    }
    if ('\\' === DIRECTORY_SEPARATOR && $this->enhanceWindowsCompatibility) {
        $commandline = 'cmd /V:ON /E:ON /D /C "('.$envline.$commandline.')';
        foreach ($this->processPipes->getFiles() as $offset => $filename) {
            $commandline .= ' '.$offset.'>'.ProcessUtils::escapeArgument($filename);
        }
        $commandline .= '"';

        if (!isset($this->options['bypass_shell'])) {
            $this->options['bypass_shell'] = true;
        }
    } elseif (!$this->useFileHandles && $this->enhanceSigchildCompatibility && $this->isSigchildEnabled()) {
        // last exit code is output on the fourth pipe and caught to work around --enable-sigchild
        $descriptors[3] = array('pipe', 'w');

        // See https://unix.stackexchange.com/questions/71205/background-process-pipe-input
        $commandline = $envline.'{ ('.$this->commandline.') <&3 3<&- 3>/dev/null & } 3<&0;';
        $commandline .= 'pid=$!; echo $pid >&3; wait $pid; code=$?; echo $code >&3; exit $code';

        // Workaround for the bug, when PTS functionality is enabled.
        // @see : https://bugs.php.net/69442
        $ptsWorkaround = fopen(__FILE__, 'r');
    } elseif ('' !== $envline) {
        $commandline = $envline.$commandline;
    }

    $this->process = proc_open($commandline, $descriptors, $this->processPipes->pipes, $this->cwd, $env, $this->options);

    if (!is_resource($this->process)) {
        throw new RuntimeException('Unable to launch a new process.');
    }
    $this->status = self::STATUS_STARTED;

    if (isset($descriptors[3])) {
        $this->fallbackStatus['pid'] = (int) fgets($this->processPipes->pipes[3]);
    }

    if ($this->tty) {
        return;
    }

    $this->updateStatus(false);
    $this->checkTimeout();
}

最主要的是这句:

$this->process = proc_open($commandline, $descriptors, $this->processPipes->pipes, $this->cwd, $env, $this->options);

其实就是通过proc_open(proc_open — 执行一个命令,并且打开用来输入/输出的文件指针。)函数执行queue:work命令,一句话总结:queue:listen就是死循环proc_open queue:work

接下来我们看下queue:work的具体实现:

queue:work

文件位置:queue:work的文件位置也在Illuminate\Queue\Console命名空间下,文件为WorkCommand.php。入口方法为fire

public function fire()
{
    if ($this->downForMaintenance() && $this->option('once')) {
        return $this->worker->sleep($this->option('sleep'));
    }

    // We'll listen to the processed and failed events so we can write information
    // to the console as jobs are processed, which will let the developer watch
    // which jobs are coming through a queue and be informed on its progress.
    $this->listenForEvents();

    $connection = $this->argument('connection')
                    ?: $this->laravel['config']['queue.default'];

    // We need to get the right queue for the connection which is set in the queue
    // configuration file for the application. We will pull it based on the set
    // connection being run for the queue operation currently being executed.
    $queue = $this->getQueue($connection);

    $this->runWorker(
        $connection, $queue
    );
}

在入口方法中执行了runWorker方法:

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel['cache']->driver());

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

runWorker方法中判断了命令是否含有once选项,如果有就执行worker(这个依赖被注入)runNextJob方法,没有就是执行daemon方法。

public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
    $job = $this->getNextJob(
        $this->manager->connection($connectionName), $queue
    );

    // If we're able to pull a job off of the stack, we will process it and then return
    // from this method. If there is no job on the queue, we will "sleep" the worker
    // for the specified number of seconds, then keep processing jobs after sleep.
    if ($job) {
        return $this->runJob($job, $connectionName, $options);
    }

    $this->sleep($options->sleep);
}

可以看到runNextJob方法,是从队列中获取一个任务执行便结束了,这就是queue:work的效果执行队列中一个任务就停止,一旦加了--daemon选项便执行了daemon方法:

public function daemon($connectionName, $queue, WorkerOptions $options)
{
    $this->listenForSignals();

    $lastRestart = $this->getTimestampOfLastQueueRestart();

    while (true) {
        // Before reserving any jobs, we will make sure this queue is not paused and
        // if it is we will just pause this worker for a given amount of time and
        // make sure we do not need to kill this worker process off completely.
        if (! $this->daemonShouldRun($options)) {
            $this->pauseWorker($options, $lastRestart);

            continue;
        }

        // First, we will attempt to get the next job off of the queue. We will also
        // register the timeout handler and reset the alarm for this job so it is
        // not stuck in a frozen state forever. Then, we can fire off this job.
        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        $this->registerTimeoutHandler($job, $options);

        // If the daemon should run (not in maintenance mode, etc.), then we can run
        // fire off this job for processing. Otherwise, we will need to sleep the
        // worker so no more jobs are processed until they should be processed.
        if ($job) {
            $this->runJob($job, $connectionName, $options);
        } else {
            $this->sleep($options->sleep);
        }

        // Finally, we will check to see if we have exceeded our memory limits or if
        // the queue should restart based on other indications. If so, we'll stop
        // this worker and let whatever is "monitoring" it restart the process.
        $this->stopIfNecessary($options, $lastRestart);
    }
}

没错,我们又看到了友好的死循环调用getNextJob方法,所以--daemon以守护进程运行就是死循环执行下一个任务。

总结

虽然queue:work --daemonqueue:listen执行后看到的效果几乎是一样的(都是死循环执行queue:work嘛),但是queue:listen是在循环中不断开启新的进程执行queue:work,而queue:work --deamon而是在一个循环中直接执行getNextJob方法(该方法虽然是queue:work的最终方法,但queue:work --daemon不能等同于死循环queue:work,因为queue:work --daemon一直是一个进程),所以区别已经很明显了,在实际的开发中我们使用queue:listen调试代码,因为每次执行都是一个新的进程,我们在代码中的打印调试等信息立刻就可以在执行下一个任务的时候体现出来,实际消费队列时我们使用queue:work --daemon因为始终保持一个进程运行(代码的修改不会体现出来,除非重启进程,也就是关闭命令并重新执行命令),有些文章说这两个命令的区别是一个命令每次消费队列都加载整个框架,另一个只加载一次框架这个说法是对的,但并没有解释清楚为什么,我相信通过对源码的查看,这个区别应该更加清晰明了。

添加新评论