Skip to content

Commit acfca1b

Browse files
authored
Merge pull request #40 from php-enqueue/client-message-scope
[client] Add ability to define scope of send message.
2 parents 05ecef7 + 62eab8a commit acfca1b

File tree

4 files changed

+224
-4
lines changed

4 files changed

+224
-4
lines changed

Diff for: pkg/enqueue/Client/Message.php

+27
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@
44

55
class Message
66
{
7+
/**
8+
* @const string
9+
*/
10+
const SCOPE_MESSAGE_BUS = 'enqueue.scope.message_bus';
11+
12+
/**
13+
* @const string
14+
*/
15+
const SCOPE_APP = 'enqueue.scope.app';
16+
717
/**
818
* @var string|null
919
*/
@@ -57,6 +67,7 @@ public function __construct()
5767
{
5868
$this->headers = [];
5969
$this->properties = [];
70+
$this->scope = static::SCOPE_MESSAGE_BUS;
6071
}
6172

6273
/**
@@ -177,6 +188,22 @@ public function setDelay($delay)
177188
$this->delay = $delay;
178189
}
179190

191+
/**
192+
* @param string $scope
193+
*/
194+
public function setScope($scope)
195+
{
196+
$this->scope = $scope;
197+
}
198+
199+
/**
200+
* @return string
201+
*/
202+
public function getScope()
203+
{
204+
return $this->scope;
205+
}
206+
180207
/**
181208
* @return array
182209
*/

Diff for: pkg/enqueue/Client/MessageProducer.php

+21-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,27 @@ public function send($topic, $message)
4747
$message->setPriority(MessagePriority::NORMAL);
4848
}
4949

50-
$this->driver->sendToRouter($message);
50+
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
51+
if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
52+
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME));
53+
}
54+
if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
55+
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME));
56+
}
57+
58+
$this->driver->sendToRouter($message);
59+
} elseif (Message::SCOPE_APP == $message->getScope()) {
60+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
61+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName());
62+
}
63+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
64+
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName());
65+
}
66+
67+
$this->driver->sendToProcessor($message);
68+
} else {
69+
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
70+
}
5171
}
5272

5373
/**

Diff for: pkg/enqueue/Tests/Client/MessageProducerTest.php

+160-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Tests\Client;
44

5+
use Enqueue\Client\Config;
56
use Enqueue\Client\DriverInterface;
67
use Enqueue\Client\Message;
78
use Enqueue\Client\MessagePriority;
@@ -294,6 +295,10 @@ public function testShouldThrowExceptionIfBodyIsObjectOnSend()
294295
->expects($this->never())
295296
->method('sendToRouter')
296297
;
298+
$driver
299+
->expects($this->never())
300+
->method('sendToProcessor')
301+
;
297302

298303
$producer = new MessageProducer($driver);
299304

@@ -312,6 +317,10 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInsideOnSend()
312317
->expects($this->never())
313318
->method('sendToRouter')
314319
;
320+
$driver
321+
->expects($this->never())
322+
->method('sendToProcessor')
323+
;
315324

316325
$producer = new MessageProducer($driver);
317326

@@ -330,6 +339,10 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInSubArraysInsid
330339
->expects($this->never())
331340
->method('sendToRouter')
332341
;
342+
$driver
343+
->expects($this->never())
344+
->method('sendToProcessor')
345+
;
333346

334347
$producer = new MessageProducer($driver);
335348

@@ -339,7 +352,7 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInSubArraysInsid
339352
$producer->send($queue, ['foo' => ['bar' => new \stdClass()]]);
340353
}
341354

342-
public function testShouldSendJsonSerializableObjectAsJsonString()
355+
public function testShouldSendJsonSerializableObjectAsJsonStringToMessageBus()
343356
{
344357
$object = new JsonSerializableObject();
345358

@@ -357,7 +370,7 @@ public function testShouldSendJsonSerializableObjectAsJsonString()
357370
$producer->send('topic', $object);
358371
}
359372

360-
public function testShouldSendMessageJsonSerializableBodyAsJsonString()
373+
public function testShouldSendMessageJsonSerializableBodyAsJsonStringToMessageBus()
361374
{
362375
$object = new JsonSerializableObject();
363376

@@ -378,6 +391,56 @@ public function testShouldSendMessageJsonSerializableBodyAsJsonString()
378391
$producer->send('topic', $message);
379392
}
380393

394+
public function testThrowIfTryToSendMessageToMessageBusWithProcessorNamePropertySet()
395+
{
396+
$object = new JsonSerializableObject();
397+
398+
$message = new Message();
399+
$message->setBody($object);
400+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'aProcessor');
401+
402+
$driver = $this->createDriverStub();
403+
$driver
404+
->expects($this->never())
405+
->method('sendToRouter')
406+
;
407+
$driver
408+
->expects($this->never())
409+
->method('sendToProcessor')
410+
;
411+
412+
$producer = new MessageProducer($driver);
413+
414+
$this->expectException(\LogicException::class);
415+
$this->expectExceptionMessage('The enqueue.processor_name property must not be set for messages that are sent to message bus.');
416+
$producer->send('topic', $message);
417+
}
418+
419+
public function testThrowIfTryToSendMessageToMessageBusWithProcessorQueueNamePropertySet()
420+
{
421+
$object = new JsonSerializableObject();
422+
423+
$message = new Message();
424+
$message->setBody($object);
425+
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aProcessorQueue');
426+
427+
$driver = $this->createDriverStub();
428+
$driver
429+
->expects($this->never())
430+
->method('sendToRouter')
431+
;
432+
$driver
433+
->expects($this->never())
434+
->method('sendToProcessor')
435+
;
436+
437+
$producer = new MessageProducer($driver);
438+
439+
$this->expectException(\LogicException::class);
440+
$this->expectExceptionMessage('The enqueue.processor_queue_name property must not be set for messages that are sent to message bus.');
441+
$producer->send('topic', $message);
442+
}
443+
381444
public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableBody()
382445
{
383446
$object = new JsonSerializableObject();
@@ -391,6 +454,10 @@ public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableB
391454
->expects($this->never())
392455
->method('sendToRouter')
393456
;
457+
$driver
458+
->expects($this->never())
459+
->method('sendToProcessor')
460+
;
394461

395462
$this->expectException(\LogicException::class);
396463
$this->expectExceptionMessage('Content type "application/json" only allowed when body is array');
@@ -399,12 +466,102 @@ public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableB
399466
$producer->send('topic', $message);
400467
}
401468

469+
public function testShouldSendMessageToApplicationRouter()
470+
{
471+
$message = new Message();
472+
$message->setBody('aBody');
473+
$message->setScope(Message::SCOPE_APP);
474+
475+
$driver = $this->createDriverStub();
476+
$driver
477+
->expects($this->never())
478+
->method('sendToRouter')
479+
;
480+
$driver
481+
->expects($this->once())
482+
->method('sendToProcessor')
483+
->willReturnCallback(function (Message $message) {
484+
self::assertSame('aBody', $message->getBody());
485+
self::assertSame('a_router_processor_name', $message->getProperty(Config::PARAMETER_PROCESSOR_NAME));
486+
self::assertSame('a_router_queue', $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME));
487+
})
488+
;
489+
490+
$producer = new MessageProducer($driver);
491+
$producer->send('topic', $message);
492+
}
493+
494+
public function testShouldSendToCustomMessageToApplicationRouter()
495+
{
496+
$message = new Message();
497+
$message->setBody('aBody');
498+
$message->setScope(Message::SCOPE_APP);
499+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'aCustomProcessor');
500+
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aCustomProcessorQueue');
501+
502+
$driver = $this->createDriverStub();
503+
$driver
504+
->expects($this->never())
505+
->method('sendToRouter')
506+
;
507+
$driver
508+
->expects($this->once())
509+
->method('sendToProcessor')
510+
->willReturnCallback(function (Message $message) {
511+
self::assertSame('aBody', $message->getBody());
512+
self::assertSame('aCustomProcessor', $message->getProperty(Config::PARAMETER_PROCESSOR_NAME));
513+
self::assertSame('aCustomProcessorQueue', $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME));
514+
})
515+
;
516+
517+
$producer = new MessageProducer($driver);
518+
$producer->send('topic', $message);
519+
}
520+
521+
public function testThrowIfUnSupportedScopeGivenOnSend()
522+
{
523+
$message = new Message();
524+
$message->setScope('iDontKnowScope');
525+
526+
$driver = $this->createDriverStub();
527+
$driver
528+
->expects($this->never())
529+
->method('sendToRouter')
530+
;
531+
$driver
532+
->expects($this->never())
533+
->method('sendToProcessor')
534+
;
535+
536+
$producer = new MessageProducer($driver);
537+
538+
$this->expectException(\LogicException::class);
539+
$this->expectExceptionMessage('The message scope "iDontKnowScope" is not supported.');
540+
$producer->send('topic', $message);
541+
}
542+
402543
/**
403544
* @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface
404545
*/
405546
protected function createDriverStub()
406547
{
407-
return $this->createMock(DriverInterface::class);
548+
$config = new Config(
549+
'a_prefix',
550+
'an_app',
551+
'a_router_topic',
552+
'a_router_queue',
553+
'a_default_processor_queue',
554+
'a_router_processor_name'
555+
);
556+
557+
$driverMock = $this->createMock(DriverInterface::class);
558+
$driverMock
559+
->expects($this->any())
560+
->method('getConfig')
561+
->willReturn($config)
562+
;
563+
564+
return $driverMock;
408565
}
409566
}
410567

Diff for: pkg/enqueue/Tests/Client/MessageTest.php

+16
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ public function testShouldAllowGetPreviouslySetDelay()
3838
self::assertSame('theDelay', $message->getDelay());
3939
}
4040

41+
public function testShouldAllowGetPreviouslySetScope()
42+
{
43+
$message = new Message();
44+
45+
$message->setScope('theScope');
46+
47+
self::assertSame('theScope', $message->getScope());
48+
}
49+
4150
public function testShouldAllowGetPreviouslySetExpire()
4251
{
4352
$message = new Message();
@@ -81,6 +90,13 @@ public function testShouldSetEmptyArrayAsDefaultHeadersInConstructor()
8190
self::assertSame([], $message->getHeaders());
8291
}
8392

93+
public function testShouldSetMessageBusScopeInConstructor()
94+
{
95+
$message = new Message();
96+
97+
self::assertSame(Message::SCOPE_MESSAGE_BUS, $message->getScope());
98+
}
99+
84100
public function testShouldAllowGetPreviouslySetHeaders()
85101
{
86102
$message = new Message();

0 commit comments

Comments
 (0)