From c941a73dec4fa692bf9b101ebfa960217505d507 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Danae=CC=88=20Miller-Clendon?= Date: Fri, 26 Mar 2021 13:27:53 +1300 Subject: [PATCH] [BUGFIX #317] Make queueJob threadsafe - Separate functionality into protected methods for readability - Wrap queueJob read/write functionality in withTransaction() --- src/Services/QueuedJobService.php | 168 ++++++++++++++++++++---------- 1 file changed, 113 insertions(+), 55 deletions(-) diff --git a/src/Services/QueuedJobService.php b/src/Services/QueuedJobService.php index 271f197f..cb37db94 100644 --- a/src/Services/QueuedJobService.php +++ b/src/Services/QueuedJobService.php @@ -235,64 +235,30 @@ public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $qu { $signature = $job->getSignature(); - // see if we already have this job in a queue - $filter = [ - 'Signature' => $signature, - 'JobStatus' => [ - QueuedJob::STATUS_NEW, - QueuedJob::STATUS_INIT, - ], - ]; - - $existing = QueuedJobDescriptor::get() - ->filter($filter) - ->first(); - - if ($existing && $existing->ID) { - return $existing->ID; - } - - $jobDescriptor = new QueuedJobDescriptor(); - $jobDescriptor->JobTitle = $job->getTitle(); - $jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType(); - $jobDescriptor->Signature = $signature; - $jobDescriptor->Implementation = get_class($job); - $jobDescriptor->StartAfter = $startAfter; - - // no user provided - fallback to job user default - if ($userId === null && $job instanceof UserContextInterface) { - $userId = $job->getRunAsMemberID(); - } + // Create the initial object + $jobDescriptor = $this->createJobDescriptor($job, $signature, $startAfter, $userId, $queueName); - // still no user - fallback to current user - if ($userId === null) { - if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) { - // current user available - $runAsID = Security::getCurrentUser()->ID; - } else { - // current user unavailable - $runAsID = 0; - } - } else { - $runAsID = $userId; + try { + return $this->findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter); + } catch (\Throwable $e) { + // note that error here may not be an issue as failing to acquire a job lock is a valid state + // which happens when other process claimed the job lock first + $this->getLogger()->debug( + sprintf( + '[%s] - Queued Jobs - Failed to acquire job lock %s %d %s', + DBDatetime::now()->Rfc2822(), + $e->getMessage(), + $signature, + PHP_EOL + ), + [ + 'file' => __FILE__, + 'line' => __LINE__, + ] + ); } - $jobDescriptor->RunAsID = $runAsID; - - // use this to populate custom data columns before job is queued - // note: you can pass arbitrary data to your job and then move it to job descriptor - // this is useful if you need some data that needs to be exposed as a separate - // DB column as opposed to serialised data - $this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job); - - // copy data - $this->copyJobToDescriptor($job, $jobDescriptor); - - $jobDescriptor->write(); - - $this->startJob($jobDescriptor, $startAfter); - - return $jobDescriptor->ID; + return false; } /** @@ -334,6 +300,98 @@ public function isAtMaxJobs() return false; } + /** + * Using a job signature, returns the JobDescriptor ID and whether the + * job descriptor is new or existing + * + * @param string $signature + * @param QueuedJob $job + * @param QueuedJobDescriptor $jobDescriptor + * @param null|string $startAfter + * @return int|null + */ + protected function findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter) + { + // Start a transaction which will hold until we have a lock on this signature. + return DB::get_conn()->withTransaction(function () use ($signature, $job, $jobDescriptor, $startAfter) { + $query = 'SELECT "ID" FROM "QueuedJobDescriptor" WHERE "Signature" = ? FOR UPDATE'; + + // Retrieve first record + $result = DB::prepared_query($query, [$signature]); + + if ($result === null) { + throw new Exception('Failed to execute query to retrieve job signature'); + } + + $ID = $result->value(); + + // If the record does not exist + if (!$ID) { + // use this to populate custom data columns before job is queued + // note: you can pass arbitrary data to your job and then move it to job descriptor + // this is useful if you need some data that needs to be exposed as a separate + // DB column as opposed to serialised data + $this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job); + + // copy data + $this->copyJobToDescriptor($job, $jobDescriptor); + + // Write the record + $jobDescriptorID = $jobDescriptor->write(); + + $this->startJob($jobDescriptor, $startAfter); + } else { + $jobDescriptorID = $ID; + } + }); + } + + /** + * @param QueuedJob $job + * @param string $signature + * @param null $startAfter + * @param null $userId + * @param null $queueName + * @return QueuedJobDescriptor + */ + protected function createJobDescriptor( + QueuedJob $job, + $signature, + $startAfter = null, + $userId = null, + $queueName = null + ) + { + $jobDescriptor = QueuedJobDescriptor::create(); + $jobDescriptor->JobTitle = $job->getTitle(); + $jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType(); + $jobDescriptor->Signature = $signature; + $jobDescriptor->Implementation = get_class($job); + $jobDescriptor->StartAfter = $startAfter; + + // no user provided - fallback to job user default + if ($userId === null && $job instanceof UserContextInterface) { + $userId = $job->getRunAsMemberID(); + } + + // still no user - fallback to current user + if ($userId === null) { + if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) { + // current user available + $runAsID = Security::getCurrentUser()->ID; + } else { + // current user unavailable + $runAsID = 0; + } + } else { + $runAsID = $userId; + } + + $jobDescriptor->RunAsID = $runAsID; + + return $jobDescriptor; + } + /** * Copies data from a job into a descriptor for persisting *