Skip to content

Commit 3f72f6b

Browse files
authored
Merge pull request #22 from php-enqueue/amqp-ext-introduce-buffer
[amqp] Put in buffer not our message. Continue consumption.
2 parents 23f2d0e + b7ad710 commit 3f72f6b

9 files changed

+235
-10
lines changed

Diff for: pkg/amqp-ext/AmqpConsumer.php

+24-8
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ class AmqpConsumer implements Consumer
1919
private $queue;
2020

2121
/**
22-
* @var \AMQPQueue
22+
* @var Buffer
2323
*/
24-
private $extQueue;
24+
private $buffer;
2525

2626
/**
27-
* @var string
27+
* @var \AMQPQueue
2828
*/
29-
private $consumerId;
29+
private $extQueue;
3030

3131
/**
3232
* @var bool
@@ -36,13 +36,14 @@ class AmqpConsumer implements Consumer
3636
/**
3737
* @param AmqpContext $context
3838
* @param AmqpQueue $queue
39+
* @param Buffer $buffer
3940
*/
40-
public function __construct(AmqpContext $context, AmqpQueue $queue)
41+
public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buffer)
4142
{
4243
$this->queue = $queue;
4344
$this->context = $context;
45+
$this->buffer = $buffer;
4446

45-
$this->consumerId = uniqid('', true);
4647
$this->isInit = false;
4748
}
4849

@@ -63,6 +64,10 @@ public function getQueue()
6364
*/
6465
public function receive($timeout = 0)
6566
{
67+
if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
68+
return $message;
69+
}
70+
6671
/** @var \AMQPQueue $extQueue */
6772
$extConnection = $this->getExtQueue()->getChannel()->getConnection();
6873

@@ -71,17 +76,28 @@ public function receive($timeout = 0)
7176
$extConnection->setReadTimeout($timeout / 1000);
7277

7378
if (false == $this->isInit) {
74-
$this->getExtQueue()->consume(null, AMQP_NOPARAM, $this->consumerId);
79+
$this->getExtQueue()->consume(null, AMQP_NOPARAM);
7580

7681
$this->isInit = true;
7782
}
7883

84+
/** @var AmqpMessage|null $message */
7985
$message = null;
8086

8187
$this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
8288
$message = $this->convertMessage($extEnvelope);
89+
$message->setConsumerTag($q->getConsumerTag());
90+
91+
if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
92+
return false;
93+
}
94+
95+
// not our message, put it to buffer and continue.
96+
$this->buffer->push($q->getConsumerTag(), $message);
97+
98+
$message = null;
8399

84-
return false;
100+
return true;
85101
}, AMQP_JUST_CONSUME);
86102

87103
return $message;

Diff for: pkg/amqp-ext/AmqpContext.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ class AmqpContext implements Context
2020
*/
2121
private $extChannelFactory;
2222

23+
/**
24+
* @var Buffer
25+
*/
26+
private $buffer;
27+
2328
/**
2429
* Callable must return instance of \AMQPChannel once called.
2530
*
@@ -34,6 +39,8 @@ public function __construct($extChannel)
3439
} else {
3540
throw new \InvalidArgumentException('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.');
3641
}
42+
43+
$this->buffer = new Buffer();
3744
}
3845

3946
/**
@@ -170,10 +177,10 @@ public function createConsumer(Destination $destination)
170177
$queue = $this->createTemporaryQueue();
171178
$this->bind($destination, $queue);
172179

173-
return new AmqpConsumer($this, $queue);
180+
return new AmqpConsumer($this, $queue, $this->buffer);
174181
}
175182

176-
return new AmqpConsumer($this, $destination);
183+
return new AmqpConsumer($this, $destination, $this->buffer);
177184
}
178185

179186
public function close()

Diff for: pkg/amqp-ext/AmqpMessage.php

+21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ class AmqpMessage implements Message
2626
*/
2727
private $deliveryTag;
2828

29+
/**
30+
* @var string|null
31+
*/
32+
private $consumerTag;
33+
2934
/**
3035
* @var bool
3136
*/
@@ -227,6 +232,22 @@ public function setDeliveryTag($deliveryTag)
227232
$this->deliveryTag = $deliveryTag;
228233
}
229234

235+
/**
236+
* @return string|null
237+
*/
238+
public function getConsumerTag()
239+
{
240+
return $this->consumerTag;
241+
}
242+
243+
/**
244+
* @param string|null $consumerTag
245+
*/
246+
public function setConsumerTag($consumerTag)
247+
{
248+
$this->consumerTag = $consumerTag;
249+
}
250+
230251
public function clearFlags()
231252
{
232253
$this->flags = AMQP_NOPARAM;

Diff for: pkg/amqp-ext/Buffer.php

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt;
4+
5+
class Buffer
6+
{
7+
/**
8+
* @var array ['aTag' => [AmqpMessage, AmqpMessage ...]]
9+
*/
10+
private $messages;
11+
12+
public function __construct()
13+
{
14+
$this->messages = [];
15+
}
16+
17+
/**
18+
* @param string $consumerTag
19+
* @param AmqpMessage $message
20+
*/
21+
public function push($consumerTag, AmqpMessage $message)
22+
{
23+
if (false == array_key_exists($consumerTag, $this->messages)) {
24+
$this->messages[$consumerTag] = [];
25+
}
26+
27+
$this->messages[$consumerTag][] = $message;
28+
}
29+
30+
/**
31+
* @param string $consumerTag
32+
*
33+
* @return AmqpMessage|null
34+
*/
35+
public function pop($consumerTag)
36+
{
37+
if (false == empty($this->messages[$consumerTag])) {
38+
return array_shift($this->messages[$consumerTag]);
39+
}
40+
}
41+
}

Diff for: pkg/amqp-ext/Tests/AmqpConsumerTest.php

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests;
4+
5+
use Enqueue\AmqpExt\AmqpConsumer;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Enqueue\AmqpExt\AmqpQueue;
8+
use Enqueue\AmqpExt\Buffer;
9+
use Enqueue\Psr\Consumer;
10+
use Enqueue\Test\ClassExtensionTrait;
11+
12+
class AmqpConsumerTest extends \PHPUnit_Framework_TestCase
13+
{
14+
use ClassExtensionTrait;
15+
16+
public function testShouldImplementConsumerInterface()
17+
{
18+
$this->assertClassImplements(Consumer::class, AmqpConsumer::class);
19+
}
20+
21+
public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments()
22+
{
23+
new AmqpConsumer(
24+
$this->createContext(),
25+
new AmqpQueue('aName'),
26+
new Buffer()
27+
);
28+
}
29+
30+
/**
31+
* @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext
32+
*/
33+
private function createContext()
34+
{
35+
return $this->createMock(AmqpContext::class);
36+
}
37+
}

Diff for: pkg/amqp-ext/Tests/AmqpContextTest.php

+14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Enqueue\AmqpExt\AmqpProducer;
99
use Enqueue\AmqpExt\AmqpQueue;
1010
use Enqueue\AmqpExt\AmqpTopic;
11+
use Enqueue\AmqpExt\Buffer;
1112
use Enqueue\Psr\Context;
1213
use Enqueue\Psr\InvalidDestinationException;
1314
use Enqueue\Test\ClassExtensionTrait;
@@ -35,6 +36,15 @@ public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgume
3536
});
3637
}
3738

39+
public function testShouldCreateNewBufferOnConstruct()
40+
{
41+
$context = new AmqpContext(function () {
42+
return $this->createExtChannelMock();
43+
});
44+
45+
$this->assertAttributeInstanceOf(Buffer::class, 'buffer', $context);
46+
}
47+
3848
public function testThrowIfNeitherCallbackNorExtChannelAsFirstArgument()
3949
{
4050
$this->expectException(\InvalidArgumentException::class);
@@ -143,13 +153,17 @@ public function testShouldReturnAmqpConsumerForGivenQueue()
143153
{
144154
$context = new AmqpContext($this->createExtChannelMock());
145155

156+
$buffer = $this->readAttribute($context, 'buffer');
157+
146158
$queue = new AmqpQueue('aName');
147159

148160
$consumer = $context->createConsumer($queue);
149161

150162
$this->assertInstanceOf(AmqpConsumer::class, $consumer);
151163
$this->assertAttributeSame($context, 'context', $consumer);
152164
$this->assertAttributeSame($queue, 'queue', $consumer);
165+
$this->assertAttributeSame($queue, 'queue', $consumer);
166+
$this->assertAttributeSame($buffer, 'buffer', $consumer);
153167
}
154168

155169
public function testShouldThrowIfNotAmqpQueueGivenOnCreateConsumerCall()

Diff for: pkg/amqp-ext/Tests/AmqpMessageTest.php

+9
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ public function testShouldReturnPreviouslySetDeliveryTag()
170170
$this->assertSame('theDeliveryTag', $message->getDeliveryTag());
171171
}
172172

173+
public function testShouldReturnPreviouslySetConsumerTag()
174+
{
175+
$message = new AmqpMessage();
176+
177+
$message->setConsumerTag('theConsumerTag');
178+
179+
$this->assertSame('theConsumerTag', $message->getConsumerTag());
180+
}
181+
173182
public function testShouldAllowAddFlags()
174183
{
175184
$message = new AmqpMessage();

Diff for: pkg/amqp-ext/Tests/AmqpProducerTest.php

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests;
4+
5+
use Enqueue\AmqpExt\AmqpProducer;
6+
use Enqueue\Psr\Producer;
7+
use Enqueue\Test\ClassExtensionTrait;
8+
9+
class AmqpProducerTest extends \PHPUnit_Framework_TestCase
10+
{
11+
use ClassExtensionTrait;
12+
13+
public function testShouldImplementProducerInterface()
14+
{
15+
$this->assertClassImplements(Producer::class, AmqpProducer::class);
16+
}
17+
}

Diff for: pkg/amqp-ext/Tests/BufferTest.php

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests;
4+
5+
use Enqueue\AmqpExt\AmqpMessage;
6+
use Enqueue\AmqpExt\Buffer;
7+
8+
class BufferTest extends \PHPUnit_Framework_TestCase
9+
{
10+
public function testCouldBeConstructedWithoutAnyArguments()
11+
{
12+
new Buffer();
13+
}
14+
15+
public function testShouldSetEmptyArrayToMessagesPropertyOnConstruct()
16+
{
17+
$buffer = new Buffer();
18+
19+
$this->assertAttributeSame([], 'messages', $buffer);
20+
}
21+
22+
public function testShouldReturnNullIfNoMessagesInBuffer()
23+
{
24+
$buffer = new Buffer();
25+
26+
$this->assertNull($buffer->pop('aConsumerTag'));
27+
$this->assertNull($buffer->pop('anotherConsumerTag'));
28+
}
29+
30+
public function testShouldPushMessageToBuffer()
31+
{
32+
$fooMessage = new AmqpMessage();
33+
$barMessage = new AmqpMessage();
34+
$bazMessage = new AmqpMessage();
35+
36+
$buffer = new Buffer();
37+
38+
$buffer->push('aConsumerTag', $fooMessage);
39+
$buffer->push('aConsumerTag', $barMessage);
40+
41+
$buffer->push('anotherConsumerTag', $bazMessage);
42+
43+
$this->assertAttributeSame([
44+
'aConsumerTag' => [$fooMessage, $barMessage],
45+
'anotherConsumerTag' => [$bazMessage],
46+
], 'messages', $buffer);
47+
}
48+
49+
public function testShouldPopMessageFromBuffer()
50+
{
51+
$fooMessage = new AmqpMessage();
52+
$barMessage = new AmqpMessage();
53+
54+
$buffer = new Buffer();
55+
56+
$buffer->push('aConsumerTag', $fooMessage);
57+
$buffer->push('aConsumerTag', $barMessage);
58+
59+
$this->assertSame($fooMessage, $buffer->pop('aConsumerTag'));
60+
$this->assertSame($barMessage, $buffer->pop('aConsumerTag'));
61+
$this->assertNull($buffer->pop('aConsumerTag'));
62+
}
63+
}

0 commit comments

Comments
 (0)