diff --git a/pkg/pheanstalk/PheanstalkConsumer.php b/pkg/pheanstalk/PheanstalkConsumer.php index bfd9fb3b0..d8460f58a 100644 --- a/pkg/pheanstalk/PheanstalkConsumer.php +++ b/pkg/pheanstalk/PheanstalkConsumer.php @@ -88,6 +88,15 @@ public function acknowledge(Message $message): void */ public function reject(Message $message, bool $requeue = false): void { + InvalidMessageException::assertMessageInstanceOf($message, PheanstalkMessage::class); + + if (false == $message->getJob()) { + throw new \LogicException(sprintf( + 'The message could not be %s because it does not have job set.', + $requeue ? 'requeued' : 'rejected' + )); + } + if ($requeue) { $this->pheanstalk->release($message->getJob(), $message->getPriority(), $message->getDelay()); diff --git a/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php b/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php index 60015dc99..7600e9921 100644 --- a/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php +++ b/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php @@ -96,7 +96,7 @@ public function testShouldReceiveNoWaitFromQueueAndReturnNullIfNoMessageInQueue( public function testShouldReceiveNoWaitFromQueueAndReturnMessageIfMessageInQueue() { $destination = new PheanstalkDestination('theQueueName'); - $message = new PheanstalkMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $message = new PheanstalkMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); $job = new Job('theJobId', json_encode($message)); @@ -118,6 +118,109 @@ public function testShouldReceiveNoWaitFromQueueAndReturnMessageIfMessageInQueue $this->assertSame($job, $actualMessage->getJob()); } + public function testShouldAcknowledgeMessage() + { + $destination = new PheanstalkDestination('theQueueName'); + $message = new PheanstalkMessage(); + + $job = new Job('theJobId', json_encode($message)); + $message->setJob($job); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('delete') + ->with($job) + ; + + $consumer = new PheanstalkConsumer($destination, $pheanstalk); + + $consumer->acknowledge($message); + } + + public function testAcknowledgeShouldThrowExceptionIfMessageHasNoJob() + { + $destination = new PheanstalkDestination('theQueueName'); + $pheanstalk = $this->createPheanstalkMock(); + + $consumer = new PheanstalkConsumer($destination, $pheanstalk); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The message could not be acknowledged because it does not have job set.'); + + $consumer->acknowledge(new PheanstalkMessage()); + } + + public function testShouldRejectMessage() + { + $destination = new PheanstalkDestination('theQueueName'); + $message = new PheanstalkMessage(); + + $job = new Job('theJobId', json_encode($message)); + $message->setJob($job); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('delete') + ->with($job) + ; + + $consumer = new PheanstalkConsumer($destination, $pheanstalk); + + $consumer->reject($message); + } + + public function testRejectShouldThrowExceptionIfMessageHasNoJob() + { + $destination = new PheanstalkDestination('theQueueName'); + $pheanstalk = $this->createPheanstalkMock(); + + $consumer = new PheanstalkConsumer($destination, $pheanstalk); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The message could not be rejected because it does not have job set.'); + + $consumer->reject(new PheanstalkMessage()); + } + + public function testShouldRequeueMessage() + { + $destination = new PheanstalkDestination('theQueueName'); + $message = new PheanstalkMessage(); + + $job = new Job('theJobId', json_encode($message)); + $message->setJob($job); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('release') + ->with($job, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY) + ; + $pheanstalk + ->expects($this->never()) + ->method('delete') + ; + + $consumer = new PheanstalkConsumer($destination, $pheanstalk); + + $consumer->reject($message, true); + } + + public function testRequeueShouldThrowExceptionIfMessageHasNoJob() + { + $destination = new PheanstalkDestination('theQueueName'); + $pheanstalk = $this->createPheanstalkMock(); + + $consumer = new PheanstalkConsumer($destination, $pheanstalk); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The message could not be requeued because it does not have job set.'); + + $consumer->reject(new PheanstalkMessage(), true); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Pheanstalk */