Skip to content

Commit 027a805

Browse files
authored
Merge pull request #291 from php-enqueue/dbal-requeue-fix
[dbal] Fix message re-queuing. Reuse producer for it.
2 parents 93ed25a + 97159fe commit 027a805

File tree

5 files changed

+56
-69
lines changed

5 files changed

+56
-69
lines changed

Diff for: composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"enqueue/async-event-dispatcher": "*@dev",
3030
"queue-interop/queue-interop": "^0.6@dev",
3131
"queue-interop/amqp-interop": "^0.7@dev",
32-
"queue-interop/queue-spec": "^0.5.1@dev",
32+
"queue-interop/queue-spec": "^0.5.4@dev",
3333

3434
"phpunit/phpunit": "^5",
3535
"doctrine/doctrine-bundle": "~1.2",

Diff for: pkg/dbal/DbalConsumer.php

+3-26
Original file line numberDiff line numberDiff line change
@@ -130,33 +130,10 @@ public function reject(PsrMessage $message, $requeue = false)
130130
{
131131
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);
132132

133-
if (false == $requeue) {
134-
return;
135-
}
133+
if ($requeue) {
134+
$this->context->createProducer()->send($this->queue, $message);
136135

137-
$dbalMessage = [
138-
'body' => $message->getBody(),
139-
'headers' => JSON::encode($message->getHeaders()),
140-
'properties' => JSON::encode($message->getProperties()),
141-
'priority' => $message->getPriority(),
142-
'queue' => $this->queue->getQueueName(),
143-
'redelivered' => true,
144-
];
145-
146-
$affectedRows = $this->dbal->insert($this->context->getTableName(), $dbalMessage, [
147-
'body' => Type::TEXT,
148-
'headers' => Type::TEXT,
149-
'properties' => Type::TEXT,
150-
'priority' => Type::SMALLINT,
151-
'queue' => Type::STRING,
152-
'redelivered' => Type::BOOLEAN,
153-
]);
154-
155-
if (1 !== $affectedRows) {
156-
throw new \LogicException(sprintf(
157-
'Expected record was inserted but it is not. message: "%s"',
158-
JSON::encode($dbalMessage)
159-
));
136+
return;
160137
}
161138
}
162139

Diff for: pkg/dbal/Tests/DbalConsumerTest.php

+30-41
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Enqueue\Dbal\DbalContext;
1111
use Enqueue\Dbal\DbalDestination;
1212
use Enqueue\Dbal\DbalMessage;
13+
use Enqueue\Dbal\DbalProducer;
1314
use Enqueue\Test\ClassExtensionTrait;
1415
use Interop\Queue\InvalidMessageException;
1516
use Interop\Queue\PsrConsumer;
@@ -67,70 +68,58 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid()
6768
$consumer->reject(new InvalidMessage());
6869
}
6970

70-
public function testRejectShouldInsertNewMessageOnRequeue()
71+
public function testShouldDoNothingOnReject()
7172
{
72-
$expectedMessage = [
73-
'body' => 'theBody',
74-
'headers' => '[]',
75-
'properties' => '[]',
76-
'priority' => 0,
77-
'queue' => 'queue',
78-
'redelivered' => true,
79-
];
73+
$queue = new DbalDestination('queue');
8074

81-
$dbal = $this->createConnectionMock();
82-
$dbal
83-
->expects($this->once())
84-
->method('insert')
85-
->with('tableName', $this->equalTo($expectedMessage))
86-
->will($this->returnValue(1))
87-
;
75+
$message = new DbalMessage();
76+
$message->setBody('theBody');
8877

8978
$context = $this->createContextMock();
9079
$context
91-
->expects($this->once())
92-
->method('getDbalConnection')
93-
->will($this->returnValue($dbal))
94-
;
95-
$context
96-
->expects($this->once())
97-
->method('getTableName')
98-
->will($this->returnValue('tableName'))
80+
->expects($this->never())
81+
->method('createProducer')
9982
;
10083

101-
$message = new DbalMessage();
102-
$message->setBody('theBody');
84+
$consumer = new DbalConsumer($context, $queue);
10385

104-
$consumer = new DbalConsumer($context, new DbalDestination('queue'));
105-
$consumer->reject($message, true);
86+
$consumer->reject($message);
10687
}
10788

108-
public function testRejectShouldThrowIfMessageWasNotInserted()
89+
public function testRejectShouldReSendMessageToSameQueueOnRequeue()
10990
{
110-
$dbal = $this->createConnectionMock();
111-
$dbal
91+
$queue = new DbalDestination('queue');
92+
93+
$message = new DbalMessage();
94+
$message->setBody('theBody');
95+
96+
$producerMock = $this->createProducerMock();
97+
$producerMock
11298
->expects($this->once())
113-
->method('insert')
114-
->willReturn(0)
99+
->method('send')
100+
->with($this->identicalTo($queue), $this->identicalTo($message))
115101
;
116102

117103
$context = $this->createContextMock();
118104
$context
119105
->expects($this->once())
120-
->method('getDbalConnection')
121-
->will($this->returnValue($dbal))
106+
->method('createProducer')
107+
->will($this->returnValue($producerMock))
122108
;
123109

124-
$message = new DbalMessage();
125-
$message->setBody('theBody');
126-
127-
$this->expectException(\LogicException::class);
128-
$this->expectExceptionMessage('Expected record was inserted but it is not. message:');
110+
$consumer = new DbalConsumer($context, $queue);
129111

130-
$consumer = new DbalConsumer($context, new DbalDestination('queue'));
131112
$consumer->reject($message, true);
132113
}
133114

115+
/**
116+
* @return DbalProducer|\PHPUnit_Framework_MockObject_MockObject
117+
*/
118+
private function createProducerMock()
119+
{
120+
return $this->createMock(DbalProducer::class);
121+
}
122+
134123
/**
135124
* @return \PHPUnit_Framework_MockObject_MockObject|Connection
136125
*/

Diff for: pkg/dbal/Tests/Spec/DbalRequeueMessageTest.php

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
namespace Enqueue\Dbal\Tests\Spec;
4+
5+
use Interop\Queue\Spec\RequeueMessageSpec;
6+
7+
/**
8+
* @group functional
9+
*/
10+
class DbalRequeueMessageTest extends RequeueMessageSpec
11+
{
12+
use CreateDbalContextTrait;
13+
14+
/**
15+
* {@inheritdoc}
16+
*/
17+
protected function createContext()
18+
{
19+
return $this->createDbalContext();
20+
}
21+
}

Diff for: pkg/dbal/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"enqueue/test": "^0.8@dev",
1616
"enqueue/enqueue": "^0.8@dev",
1717
"enqueue/null": "^0.8@dev",
18-
"queue-interop/queue-spec": "^0.5.3@dev",
18+
"queue-interop/queue-spec": "^0.5.4@dev",
1919
"symfony/dependency-injection": "^2.8|^3|^4",
2020
"symfony/config": "^2.8|^3|^4"
2121
},

0 commit comments

Comments
 (0)