-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUGFIX #317] Make queueJob threadsafe #342
base: 4
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -235,64 +235,30 @@ public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $qu | |||||
{ | ||||||
$signature = $job->getSignature(); | ||||||
|
||||||
// see if we already have this job in a queue | ||||||
$filter = [ | ||||||
'Signature' => $signature, | ||||||
'JobStatus' => [ | ||||||
QueuedJob::STATUS_NEW, | ||||||
QueuedJob::STATUS_INIT, | ||||||
], | ||||||
]; | ||||||
|
||||||
$existing = QueuedJobDescriptor::get() | ||||||
->filter($filter) | ||||||
->first(); | ||||||
|
||||||
if ($existing && $existing->ID) { | ||||||
return $existing->ID; | ||||||
} | ||||||
|
||||||
$jobDescriptor = new QueuedJobDescriptor(); | ||||||
$jobDescriptor->JobTitle = $job->getTitle(); | ||||||
$jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType(); | ||||||
$jobDescriptor->Signature = $signature; | ||||||
$jobDescriptor->Implementation = get_class($job); | ||||||
$jobDescriptor->StartAfter = $startAfter; | ||||||
|
||||||
// no user provided - fallback to job user default | ||||||
if ($userId === null && $job instanceof UserContextInterface) { | ||||||
$userId = $job->getRunAsMemberID(); | ||||||
} | ||||||
// Create the initial object | ||||||
$jobDescriptor = $this->createJobDescriptor($job, $signature, $startAfter, $userId, $queueName); | ||||||
|
||||||
// still no user - fallback to current user | ||||||
if ($userId === null) { | ||||||
if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) { | ||||||
// current user available | ||||||
$runAsID = Security::getCurrentUser()->ID; | ||||||
} else { | ||||||
// current user unavailable | ||||||
$runAsID = 0; | ||||||
} | ||||||
} else { | ||||||
$runAsID = $userId; | ||||||
try { | ||||||
return $this->findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter); | ||||||
} catch (\Throwable $e) { | ||||||
// note that error here may not be an issue as failing to acquire a job lock is a valid state | ||||||
// which happens when other process claimed the job lock first | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the comments should be updated to describe the job signature, not the job lock as that is a different feature. Please also update the log message so this feature is not confused with the job locking. |
||||||
$this->getLogger()->debug( | ||||||
sprintf( | ||||||
'[%s] - Queued Jobs - Failed to acquire job lock %s %d %s', | ||||||
DBDatetime::now()->Rfc2822(), | ||||||
$e->getMessage(), | ||||||
$signature, | ||||||
PHP_EOL | ||||||
), | ||||||
[ | ||||||
'file' => __FILE__, | ||||||
'line' => __LINE__, | ||||||
] | ||||||
); | ||||||
} | ||||||
|
||||||
$jobDescriptor->RunAsID = $runAsID; | ||||||
|
||||||
// use this to populate custom data columns before job is queued | ||||||
// note: you can pass arbitrary data to your job and then move it to job descriptor | ||||||
// this is useful if you need some data that needs to be exposed as a separate | ||||||
// DB column as opposed to serialised data | ||||||
$this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job); | ||||||
|
||||||
// copy data | ||||||
$this->copyJobToDescriptor($job, $jobDescriptor); | ||||||
|
||||||
$jobDescriptor->write(); | ||||||
|
||||||
$this->startJob($jobDescriptor, $startAfter); | ||||||
|
||||||
return $jobDescriptor->ID; | ||||||
return false; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
PHPDOC function signature is to return an int |
||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -334,6 +300,98 @@ public function isAtMaxJobs() | |||||
return false; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Using a job signature, returns the JobDescriptor ID and whether the | ||||||
* job descriptor is new or existing | ||||||
* | ||||||
* @param string $signature | ||||||
* @param QueuedJob $job | ||||||
* @param QueuedJobDescriptor $jobDescriptor | ||||||
* @param null|string $startAfter | ||||||
* @return int|null | ||||||
*/ | ||||||
protected function findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be made private to reduce the API surface. There's an extension hook available if it needs to be extended. |
||||||
{ | ||||||
// Start a transaction which will hold until we have a lock on this signature. | ||||||
return DB::get_conn()->withTransaction(function () use ($signature, $job, $jobDescriptor, $startAfter) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need the return? |
||||||
$query = 'SELECT "ID" FROM "QueuedJobDescriptor" WHERE "Signature" = ? FOR UPDATE'; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bit of an edge case, though should read the tableName of QueuedJobDescriptor first incase it's been changed (table names are configuraable
|
||||||
|
||||||
// Retrieve first record | ||||||
$result = DB::prepared_query($query, [$signature]); | ||||||
|
||||||
if ($result === null) { | ||||||
throw new Exception('Failed to execute query to retrieve job signature'); | ||||||
} | ||||||
|
||||||
$ID = $result->value(); | ||||||
|
||||||
// If the record does not exist | ||||||
if (!$ID) { | ||||||
// use this to populate custom data columns before job is queued | ||||||
// note: you can pass arbitrary data to your job and then move it to job descriptor | ||||||
// this is useful if you need some data that needs to be exposed as a separate | ||||||
// DB column as opposed to serialised data | ||||||
$this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job); | ||||||
|
||||||
// copy data | ||||||
$this->copyJobToDescriptor($job, $jobDescriptor); | ||||||
|
||||||
// Write the record | ||||||
$jobDescriptorID = $jobDescriptor->write(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
|
||||||
$this->startJob($jobDescriptor, $startAfter); | ||||||
} else { | ||||||
$jobDescriptorID = $ID; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
} | ||||||
}); | ||||||
} | ||||||
|
||||||
/** | ||||||
* @param QueuedJob $job | ||||||
* @param string $signature | ||||||
* @param null $startAfter | ||||||
* @param null $userId | ||||||
* @param null $queueName | ||||||
* @return QueuedJobDescriptor | ||||||
*/ | ||||||
protected function createJobDescriptor( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should change this from protected to private |
||||||
QueuedJob $job, | ||||||
$signature, | ||||||
$startAfter = null, | ||||||
$userId = null, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Convention is to capitilise the d in ID Also update the references in this function to |
||||||
$queueName = null | ||||||
) | ||||||
{ | ||||||
$jobDescriptor = QueuedJobDescriptor::create(); | ||||||
$jobDescriptor->JobTitle = $job->getTitle(); | ||||||
$jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType(); | ||||||
$jobDescriptor->Signature = $signature; | ||||||
$jobDescriptor->Implementation = get_class($job); | ||||||
$jobDescriptor->StartAfter = $startAfter; | ||||||
|
||||||
// no user provided - fallback to job user default | ||||||
if ($userId === null && $job instanceof UserContextInterface) { | ||||||
$userId = $job->getRunAsMemberID(); | ||||||
} | ||||||
|
||||||
// still no user - fallback to current user | ||||||
if ($userId === null) { | ||||||
if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should put this to a variable so that it doesn't need to be called 3 separate times times |
||||||
// current user available | ||||||
$runAsID = Security::getCurrentUser()->ID; | ||||||
} else { | ||||||
// current user unavailable | ||||||
$runAsID = 0; | ||||||
} | ||||||
} else { | ||||||
$runAsID = $userId; | ||||||
} | ||||||
|
||||||
$jobDescriptor->RunAsID = $runAsID; | ||||||
|
||||||
return $jobDescriptor; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Copies data from a job into a descriptor for persisting | ||||||
* | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of weird creating the job and then passing it to
findOrMakeJobDescriptorFromSignature
Seems like we should call
createJobDescriptor()
insidefindOrMakeJobDescriptorFromSignature
on if we were unable to find a jobDescriptor from the signature