译文GitHub
原文链接
现在,我们知道了Laravel如何将作业推到不同的队列中,让我们来深入了解workers如何运作你的作业。 首先,我将workers定义为一个在后台运行的简单PHP进程,目的是从存储空间中提取作业并针对多个配置选项运行它们。
php artisan queue:work
运行此命令将指示Laravel创建应用程序的一个实例并开始执行作业,这个实例将一直存活着,启动Laravel应用程序的操作只在运行命令时发生一次,同一个实例将被用于执行你的作业,这意味着:
- 避免在每个作业上启动整个应用程序来节省服务器资源。
- 在应用程序中所做的任何代码更改后必须手动重启worker。
你也可以这样运行:
php artisan queue:work --once
这将启动应用程序的一个实例,处理单个作业,然后干掉脚本。
php artisan queue:listen
queue:listen
命令相当于无限循环地运行 queue:work --once
命令,这将导致以下问题:
- 每个循环都会启动一个应用程序实例。
- 分配的worker将选择一个工作并执行。
- worker进程将被干掉。
使用 queue:listen
确保为每个作业创建一个新的应用程序实例,这意味着代码更改以后不必手动重启worker,同时也意味着将消耗更多的服务器资源。
queue:work 命令
我们来看看 Queue\Console\WorkCommand
类的 handle()
方法,这是当你运行 php artisan queue:work
时会执行的方法:
public function handle(){ if ($this->downForMaintenance() && $this->option('once')) { return $this->worker->sleep($this->option('sleep')); } $this->listenForEvents(); $connection = $this->argument('connection') ?: $this->laravel['config']['queue.default']; $queue = $this->getQueue($connection); $this->runWorker( $connection, $queue );}
首先,我们检查应用程序是否处于维护模式,并使用 --once
选项,在这种情况下,我们希望脚本正常运行,因此我们不执行任何作业,我们只需要在完全杀死脚本前让worker在一段时间内休眠。
Queue\Worker
的 sleep()
方法看起来像这样:
public function sleep($seconds){ sleep($seconds);}
为什么我们不能在 handle() 方法中返回null来终止脚本?
如前所述, queue:listen
命令在循环中运行 WorkCommand
:
while (true) { // This process simply calls 'php artisan queue:work --once' $this->runProcess($process, $options->memory);}
如果应用程序处于维护模式,并且 WorkCommand
立即终止,这将导致循环结束,下一个在很短的时间内启动,最好在这种情况下导致一些延迟,而不是通过创建我们不会真正使用的大量应用程序实例。
监听事件
在 handle()
方法里面我们调用 listenForEvents()
方法:
protected function listenForEvents(){ $this->laravel['events']->listen(JobProcessing::class, function ($event) { $this->writeOutput($event->job, 'starting'); }); $this->laravel['events']->listen(JobProcessed::class, function ($event) { $this->writeOutput($event->job, 'success'); }); $this->laravel['events']->listen(JobFailed::class, function ($event) { $this->writeOutput($event->job, 'failed'); $this->logFailedJob($event); });}
在这个方法中我们会监听几个事件,这样我们可以在每次作业处理中,处理完或处理失败时向用户打印一些信息。
记录失败作业
一旦作业失败 logFailedJob()
方法会被调用
$this->laravel['queue.failer']->log( $event->connectionName, $event->job->getQueue(), $event->job->getRawBody(), $event->exception);
queue.failer
容器别名在 Queue\QueueServiceProvider::registerFailedJobServices()
中注册:
protected function registerFailedJobServices(){ $this->app->singleton('queue.failer', function () { $config = $this->app['config']['queue.failed']; return isset($config['table']) ? $this->databaseFailedJobProvider($config) : new NullFailedJobProvider; });}/** * Create a new database failed job provider. * * @param array $config * @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider */protected function databaseFailedJobProvider($config){ return new DatabaseFailedJobProvider( $this->app['db'], $config['database'], $config['table'] );}
如果配置了 queue.failed
,则将使用数据库队列失败,并将有关失败作业的信息简单地存储在数据库表中的:
$this->getTable()->insertGetId(compact( 'connection', 'queue', 'payload', 'exception', 'failed_at'));
运行worker
要运行worker,我们需要收集两条信息:
- worker的连接信息从作业中提取
- worker找到作业的队列
如果没有使用 queue.default
配置定义的默认连接。您可以为 queue:work
命令提供 --connection=default
选项。
队列也是一样,您可以提供一个 --queue=emails
选项,或选择连接配置中的 queue
选项。一旦这一切完成, WorkCommand::handle()
方法会运行 runWorker()
:
protected function runWorker($connection, $queue){ $this->worker->setCache($this->laravel['cache']->driver()); return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( $connection, $queue, $this->gatherWorkerOptions() );}
在worker类属性在命令构造后设置:
public function __construct(Worker $worker){ parent::__construct(); $this->worker = $worker;}
容器解析 Queue\Worker
实例,在runWorker()
中我们设置了worker将使用的缓存驱动,我们也根据--once
命令来决定我们调用什么方法。
如果使用 --once
选项,我们只需调用 runNextJob
来运行下一个可用的作业,然后脚本就会终止。 否则,我们将调用 daemon
方法来始终保持进程处理作业。
在开始工作时,我们使用 gatherWorkerOptions()
方法收集用户给出的命令选项,我们稍后会提供这些选项,这个工具是 runNextJob
或 daemon
方法。
protected function gatherWorkerOptions(){ return new WorkerOptions( $this->option('delay'), $this->option('memory'), $this->option('timeout'), $this->option('sleep'), $this->option('tries'), $this->option('force') );}
守护进程
让我看看 Worker::daemon()
方法,这个方法的第一行调用了 Worker::daemon()
方法
protected function listenForSignals(){ if ($this->supportsAsyncSignals()) { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->shouldQuit = true; }); pcntl_signal(SIGUSR2, function () { $this->paused = true; }); pcntl_signal(SIGCONT, function () { $this->paused = false; }); }}
这种方法使用PHP7.1的信号处理, supportsAsyncSignals()
方法检查我们是否在PHP7.1上,并加载 pcntl
扩展名。
之后pcntl_async_signals()
被调用来启用信号处理,然后我们为多个信号注册处理程序:
- 当脚本被指示关闭时,会引发
SIGTERM
。 -
SIGUSR2
是用户定义的信号,Laravel用来表示脚本应该暂停。 - 当暂停的脚本继续进行时,会引发
SIGCONT
。
这些信号从Process Monitor(如 )发送并与我们的脚本进行通信。
Worker::daemon()
方法中的第二行读取最后一个队列重新启动的时间戳,当我们调用queue:restart
时该值存储在缓存中,稍后我们将检查是否和上次重新启动的时间戳不符合,来指示worker在之后多次重启。
最后,该方法启动一个循环,在这个循环中,我们将完成其余获取作业的worker,运行它们,并对worker进程执行多个操作。
while (true) { if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue; } $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); $this->registerTimeoutHandler($job, $options); if ($job) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); } $this->stopIfNecessary($options, $lastRestart);}
确定worker是否应该处理作业
调用 daemonShouldRun()
检查以下情况:
- 应用程序不处于维护模式
- Worker没有暂停
- 没有事件监听器阻止循环继续
如果应用程序在维护模式下,worker使用--force
选项仍然可以处理作业:
php artisan queue:work --force
确定worker是否应该继续的条件之一是:
$this->events->until(new Events\Looping($connectionName, $queue)) === false)
这行触发 Queue\Event\Looping
事件,并检查是否有任何监听器在 handle()
方法中返回false,这种情况下你可以强制您的workers暂时停止处理作业。
如果worker应该暂停,则调用 pauseWorker()
方法:
protected function pauseWorker(WorkerOptions $options, $lastRestart){ $this->sleep($options->sleep > 0 ? $options->sleep : 1); $this->stopIfNecessary($options, $lastRestart);}
sleep
方法并传递给控制台命令的 --sleep
选项,这个方法调用
public function sleep($seconds){ sleep($seconds);}
脚本休眠了一段时间后,我们检查worker是否应该在这种情况下退出并杀死脚本,稍后我们看一下stopIfNecessary
方法,以防脚本不能被杀死,我们只需调用 continue;
开始一个新的循环:
if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue;}
Retrieving 要运行的作业
$job = $this->getNextJob( $this->manager->connection($connectionName), $queue);
getNextJob()
方法接受一个队列连接的实例,我们从队列中获取作业
protected function getNextJob($connection, $queue){ try { foreach (explode(',', $queue) as $queue) { if (! is_null($job = $connection->pop($queue))) { return $job; } } } catch (Exception $e) { $this->exceptions->report($e); $this->stopWorkerIfLostConnection($e); }}
我们简单地循环给定的队列,使用选择的队列连接从存储空间(数据库,redis,sqs,...)获取作业并返回该作业。
要从存储中retrieve作业,我们查询满足以下条件的最旧作业:
- 推送到
queue
,我们试图从中找到作业 - 没有被其他worker reserved
- 可以在给定的时间内运行,有些作业在将来被推迟运行
- 我们也取到了很久以来被冻结的作业并重试
一旦我们找到符合这一标准的作业,我们将这个作业标记为reserved,以便其他workers获取到,我们还会增加作业监控次数。
监控作业超时
下一个作业被retrieved之后,我们调用 registerTimeoutHandler()
方法:
protected function registerTimeoutHandler($job, WorkerOptions $options){ if ($this->supportsAsyncSignals()) { pcntl_signal(SIGALRM, function () { $this->kill(1); });the $timeout = $this->timeoutForJob($job, $options); pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0); }}
再次,如果 pcntl
扩展被加载,我们将注册一个信号处理程序干掉worker进程如果该作业超时的话,在配置了超时之后我们使用 pcntl_alarm()
来发送一个 SIGALRM
信号。
如果作业所花费的时间超过了超时值,处理程序将会终止该脚本,如果不是该作业将通过,并且下一个循环将设置一个新的报警覆盖第一个报警,因为进程中可能存在单个报警。
作业只在PHP7.1以上起效,在window上也无效 ¯_(ツ)_/¯
处理作业
runJob()
方法调用 process()
:
public function process($connectionName, $job, WorkerOptions $options){ try { $this->raiseBeforeJobEvent($connectionName, $job); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( $connectionName, $job, (int) $options->maxTries ); $job->fire(); $this->raiseAfterJobEvent($connectionName, $job); } catch (Exception $e) { $this->handleJobException($connectionName, $job, $options, $e); }}
raiseBeforeJobEvent()
触发 Queue\Events\JobProcessing
事件, raiseAfterJobEvent()
触发 Queue\Events\JobProcessed
事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts()
检查进程是否达到最大尝试次数,并将该作业标记为失败:
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries){ $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; if ($maxTries === 0 || $job->attempts() <= $maxTries) { return; } $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( 'A queued job has been attempted too many times. The job may have previously timed out.' )); throw $e;}
否则我们在作业对象上调用 fire()
方法来运行作业。
从哪里获取作业对象
getNextJob()
方法返回一个 Contracts\Queue\Job
的实例,这取决于我们使用相应的Job实例的队列驱动程序,例如如果数据库队列驱动则选择 Queue\Jobs\DatabaseJob
。
循环结束
在循环结束时,我们调用 stopIfNecessary()
来检查在下一个循环开始之前是否应该停止进程:
protected function stopIfNecessary(WorkerOptions $options, $lastRestart){ if ($this->shouldQuit) { $this->kill(); } if ($this->memoryExceeded($options->memory)) { $this->stop(12); } elseif ($this->queueShouldRestart($lastRestart)) { $this->stop(); }}
shouldQuit
属性在两种情况下设置,首先listenForSignals()
内部的作为 SIGTERM
信号处理程序,其次在 stopWorkerIfLostConnection()
中
protected function stopWorkerIfLostConnection($e){ if ($this->causedByLostConnection($e)) { $this->shouldQuit = true; }}
在retrieving和处理作业时,会在几个try ... catch语句中调用此方法,以确保worker应该处于被干掉的状态,以便我们的Process Control可能会启动一个新的数据库连接。
causedByLostConnection()
方法可以在 Database\DetectsLostConnections
trait中找到。
memoryExceeded()
检查内存使用情况是否超过当前设置的内存限制,您可以使用 --memory
选项设置限制。 转载请注明: 转载自
如果觉得本篇文章对您十分有益,何不 一下
本文链接地址: