diff --git a/src/Queue/VaporJob.php b/src/Queue/VaporJob.php index d25d131..94a08f1 100644 --- a/src/Queue/VaporJob.php +++ b/src/Queue/VaporJob.php @@ -6,5 +6,39 @@ class VaporJob extends SqsJob { - // + /** + * Release the job back into the queue. + * + * @param int $delay + * @return void + */ + public function release($delay = 0) + { + $this->released = true; + + $payload = $this->payload(); + + $payload['attempts']++; + + $this->sqs->deleteMessage([ + 'QueueUrl' => $this->queue, + 'ReceiptHandle' => $this->job['ReceiptHandle'], + ]); + + $this->sqs->sendMessage([ + 'QueueUrl' => $this->queue, + 'MessageBody' => json_encode($payload), + 'DelaySeconds' => $this->secondsUntil($delay), + ]); + } + + /** + * Get the number of times the job has been attempted. + * + * @return int + */ + public function attempts() + { + return ($this->payload()['attempts'] ?? null) + 1; + } } diff --git a/src/Queue/VaporQueue.php b/src/Queue/VaporQueue.php index 6cac3bb..8a1e8d0 100644 --- a/src/Queue/VaporQueue.php +++ b/src/Queue/VaporQueue.php @@ -6,6 +6,27 @@ class VaporQueue extends SqsQueue { + /** + * Pop the next job off of the queue. + * + * @param string $queue + * @return \Illuminate\Contracts\Queue\Job|null + */ + public function pop($queue = null) + { + $response = $this->sqs->receiveMessage([ + 'QueueUrl' => $queue = $this->getQueue($queue), + 'AttributeNames' => ['ApproximateReceiveCount'], + ]); + + if (! is_null($response['Messages']) && count($response['Messages']) > 0) { + return new VaporJob( + $this->container, $this->sqs, $response['Messages'][0], + $this->connectionName, $queue + ); + } + } + /** * Create a payload string from the given job and data. *