diff --git a/bin/release b/bin/release
index 511e3053b..87613ea64 100755
--- a/bin/release
+++ b/bin/release
@@ -13,7 +13,7 @@ fi
CURRENT_BRANCH=`git rev-parse --abbrev-ref HEAD`
-for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue enqueue-bundle job-queue test
+for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue simple-client enqueue-bundle job-queue test
do
TMP_DIR="/tmp/enqueue-repo"
REMOTE_URL=`git remote get-url $REMOTE`
diff --git a/bin/subtree-split b/bin/subtree-split
index 53e46cce1..6dbdc0193 100755
--- a/bin/subtree-split
+++ b/bin/subtree-split
@@ -45,6 +45,7 @@ function remote()
remote psr-queue git@github.com:php-enqueue/psr-queue.git
remote enqueue git@github.com:php-enqueue/enqueue.git
+remote simple-client git@github.com:php-enqueue/simple-client.git
remote stomp git@github.com:php-enqueue/stomp.git
remote amqp-ext git@github.com:php-enqueue/amqp-ext.git
remote fs git@github.com:php-enqueue/fs.git
@@ -58,6 +59,7 @@ remote test git@github.com:php-enqueue/test.git
split 'pkg/psr-queue' psr-queue
split 'pkg/enqueue' enqueue
+split 'pkg/simple-client' simple-client
split 'pkg/stomp' stomp
split 'pkg/amqp-ext' amqp-ext
split 'pkg/fs' fs
diff --git a/composer.json b/composer.json
index 2afbec114..5730bb920 100644
--- a/composer.json
+++ b/composer.json
@@ -15,6 +15,7 @@
"enqueue/sqs": "*@dev",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
+ "enqueue/simple-client": "*@dev",
"enqueue/test": "*@dev",
"phpunit/phpunit": "^5",
@@ -77,6 +78,10 @@
{
"type": "path",
"url": "pkg/sqs"
+ },
+ {
+ "type": "path",
+ "url": "pkg/simple-client"
}
]
}
diff --git a/docs/client/quick_tour.md b/docs/client/quick_tour.md
new file mode 100644
index 000000000..107f2bbaa
--- /dev/null
+++ b/docs/client/quick_tour.md
@@ -0,0 +1,123 @@
+# Simple client. Quick tour.
+
+The simple client library takes Enqueue client classes and Symfony components and makes an easy to use client facade.
+It reduces the boiler plate code you have to write to start using the Enqueue client features.
+
+* [Install](#install)
+* [Configure](#configure)
+* [Producer message](#produce-message)
+* [Consume messages](#consume-messages)
+
+## Install
+
+```bash
+$ composer require enqueue/simple-client enqueue/amqp-ext
+```
+
+## Configure
+
+```php
+ [
+ 'default' => 'amqp',
+ 'amqp' => [
+ 'host' => 'localhost',
+ 'port' => 5672,
+ 'vhost' => '/',
+ 'login' => 'guest',
+ 'password' => 'guest',
+ ],
+ ],
+ 'client' => [
+ 'app_name' => 'plain_php',
+ ],
+]);
+```
+
+## Produce message
+
+```php
+send('a_bar_topic', 'aMessageData');
+
+// or an array
+
+$client->send('a_bar_topic', ['foo', 'bar']);
+
+// or an json serializable object
+$client->send('a_bar_topic', new class() implements \JsonSerializable {
+ public function jsonSerialize() {
+ return ['foo', 'bar'];
+ }
+});
+```
+
+## Consume messages
+
+```php
+bind('a_bar_topic', 'a_processor_name', function(PsrMessage $psrMessage) {
+ // processing logic here
+});
+
+$client->consume();
+```
+
+## Cli commands
+
+```php
+#!/usr/bin/env php
+add(new SetupBrokerCommand($client->getDriver()));
+$application->add(new ProduceMessageCommand($client->getProducer()));
+$application->add(new QueuesCommand($client->getQueueMetaRegistry()));
+$application->add(new TopicsCommand($client->getTopicMetaRegistry()));
+$application->add(new ConsumeMessagesCommand(
+ $client->getQueueConsumer(),
+ $client->getDelegateProcessor(),
+ $client->getQueueMetaRegistry(),
+ $client->getDriver()
+));
+
+$application->run();
+```
+
+and run to see what is there:
+
+```bash
+$ php bin/enqueue.php
+```
+
+or consume messages
+
+```bash
+$ php bin/enqueue.php enqueue:consume -vvv --setup-broker
+```
+
+[back to index](../index.md)
diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md
index 460df5943..cccd4f602 100644
--- a/docs/client/rpc_call.md
+++ b/docs/client/rpc_call.md
@@ -1,5 +1,7 @@
# Client. RPC call
+The client's [quick tour](quick_tour.md) describes how to get the client object.
+We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var.
## The client side
@@ -8,13 +10,10 @@ It allows you to easily send a message and wait for a reply.
```php
getProducer(), $context);
$replyMessage = $rpcClient->call('greeting_topic', 'Hi Thomas!', 5);
@@ -24,13 +23,10 @@ You can perform several requests asynchronously with `callAsync` and request rep
```php
getProducer(), $context);
$promises = [];
@@ -53,7 +49,6 @@ Of course it is possible to implement rpc server side based on transport classes
```php
context);
+/** @var \Enqueue\SimpleClient\SimpleClient $client */
+
$client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
echo $message->getBody();
diff --git a/docs/index.md b/docs/index.md
index e5c3ee007..d8c9258cc 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -13,6 +13,7 @@
- [Extensions](consumption/extensions.md)
- [Message processor](consumption/message_processor.md)
* Client
+ - [Quick tour](client/quick_tour.md)
- [Message examples](client/message_examples.md)
- [Supported brokers](client/supported_brokers.md)
- [Message bus](client/message_bus.md)
diff --git a/docs/quick_tour.md b/docs/quick_tour.md
index 75f8ba00e..d7749b73a 100644
--- a/docs/quick_tour.md
+++ b/docs/quick_tour.md
@@ -167,20 +167,30 @@ Here's an example of how you can send and consume messages.
```php
bind('foo_topic', 'processor_name', function (PsrMessage $message) {
- // process message
-
- return PsrProcessor::ACK;
+$client = new SimpleClient([
+ 'transport' => [
+ 'default' => 'amqp',
+ 'amqp' => [
+ 'host' => 'localhost',
+ 'port' => 5672,
+ 'vhost' => '/',
+ 'login' => 'guest',
+ 'password' => 'guest',
+ ],
+ ],
+ 'client' => true,
+]);
+
+$client->setupBroker();
+
+$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) {
+ // your processing logic here
});
-$client->send('foo_topic', 'Hello there!');
+$client->send('a_bar_topic', 'aMessageData');
// in another process you can consume messages.
$client->consume();
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index f2faadb8e..429ead9d2 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -56,6 +56,10 @@
pkg/job-queue/Tests
+
+
+ pkg/simple-client/Tests
+
diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/enqueue/Client/SimpleClient.php
deleted file mode 100644
index 27f295cdb..000000000
--- a/pkg/enqueue/Client/SimpleClient.php
+++ /dev/null
@@ -1,174 +0,0 @@
-context = $context;
- $this->config = $config ?: Config::create();
-
- $this->queueMetaRegistry = new QueueMetaRegistry($this->config, []);
- $this->queueMetaRegistry->add($this->config->getDefaultProcessorQueueName());
- $this->queueMetaRegistry->add($this->config->getRouterQueueName());
-
- $this->topicsMetaRegistry = new TopicMetaRegistry([]);
- $this->processorsRegistry = new ArrayProcessorRegistry();
-
- $this->driver = new AmqpDriver($context, $this->config, $this->queueMetaRegistry);
- $this->routerProcessor = new RouterProcessor($this->driver, []);
-
- $this->processorsRegistry->add($this->config->getRouterProcessorName(), $this->routerProcessor);
- $this->queueMetaRegistry->addProcessor($this->config->getRouterQueueName(), $this->routerProcessor);
- }
-
- /**
- * @param string $topic
- * @param string $processorName
- * @param callback $processor
- */
- public function bind($topic, $processorName, callable $processor)
- {
- $queueName = $this->config->getDefaultProcessorQueueName();
-
- $this->topicsMetaRegistry->addProcessor($topic, $processorName);
- $this->queueMetaRegistry->addProcessor($queueName, $processorName);
- $this->processorsRegistry->add($processorName, new CallbackProcessor($processor));
-
- $this->routerProcessor->add($topic, $queueName, $processorName);
- }
-
- public function send($topic, $message)
- {
- $this->getProducer()->send($topic, $message);
- }
-
- public function consume(ExtensionInterface $runtimeExtension = null)
- {
- $this->driver->setupBroker();
-
- $processor = $this->getProcessor();
-
- $queueConsumer = $this->getQueueConsumer();
-
- $defaultQueueName = $this->config->getDefaultProcessorQueueName();
- $defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName);
-
- $queueConsumer->bind($defaultTransportQueueName, $processor);
- if ($this->config->getRouterQueueName() != $defaultQueueName) {
- $routerTransportQueueName = $this->config->createTransportQueueName($this->config->getRouterQueueName());
-
- $queueConsumer->bind($routerTransportQueueName, $processor);
- }
-
- $queueConsumer->consume($runtimeExtension);
- }
-
- /**
- * @return AmqpContext
- */
- public function getContext()
- {
- return $this->context;
- }
-
- /**
- * @return QueueConsumer
- */
- public function getQueueConsumer()
- {
- return new QueueConsumer($this->context, new ChainExtension([
- new SetRouterPropertiesExtension($this->driver),
- ]));
- }
-
- /**
- * @return DriverInterface
- */
- public function getDriver()
- {
- return $this->driver;
- }
-
- /**
- * @return TopicMetaRegistry
- */
- public function getTopicMetaRegistry()
- {
- return $this->topicsMetaRegistry;
- }
-
- /**
- * @return QueueMetaRegistry
- */
- public function getQueueMetaRegistry()
- {
- return $this->queueMetaRegistry;
- }
-
- /**
- * @return ProducerInterface
- */
- public function getProducer()
- {
- $this->driver->setupBroker();
-
- return new Producer($this->driver);
- }
-
- /**
- * @return DelegateProcessor
- */
- public function getProcessor()
- {
- return new DelegateProcessor($this->processorsRegistry);
- }
-}
diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php
index bf7d69e13..49f6fa20a 100644
--- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php
+++ b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php
@@ -4,7 +4,7 @@
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Client\RpcClient;
-use Enqueue\Client\SimpleClient;
+use Enqueue\SimpleClient\SimpleClient;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
@@ -39,14 +39,27 @@ public function setUp()
$this->context = $this->buildAmqpContext();
$this->replyContext = $this->buildAmqpContext();
- $this->removeQueue('default');
+ $this->removeQueue('enqueue.app.default');
}
public function testProduceAndConsumeOneMessage()
{
+ $config = [
+ 'transport' => [
+ 'rabbitmq_amqp' => [
+ 'host' => getenv('SYMFONY__RABBITMQ__HOST'),
+ 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
+ 'login' => getenv('SYMFONY__RABBITMQ__USER'),
+ 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
+ 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
+ ],
+ ],
+ ];
+
$requestMessage = null;
- $client = new SimpleClient($this->context);
+ $client = new SimpleClient($config);
+ $client->setupBroker();
$client->bind('foo_topic', 'foo_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
$requestMessage = $message;
diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php
deleted file mode 100644
index f96ba9571..000000000
--- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php
+++ /dev/null
@@ -1,83 +0,0 @@
-context = $this->buildAmqpContext();
-
- $this->removeQueue('default');
- }
-
- public function testProduceAndConsumeOneMessage()
- {
- $actualMessage = null;
-
- $client = new SimpleClient($this->context);
- $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) {
- $actualMessage = $message;
-
- return Result::ACK;
- });
-
- $client->send('foo_topic', 'Hello there!');
-
- $client->consume(new ChainExtension([
- new LimitConsumptionTimeExtension(new \DateTime('+5sec')),
- new LimitConsumedMessagesExtension(2),
- ]));
-
- $this->assertInstanceOf(PsrMessage::class, $actualMessage);
- $this->assertSame('Hello there!', $actualMessage->getBody());
- }
-
- public function testProduceAndRouteToTwoConsumes()
- {
- $received = 0;
-
- $client = new SimpleClient($this->context);
- $client->bind('foo_topic', 'foo_processor1', function () use (&$received) {
- ++$received;
-
- return Result::ACK;
- });
- $client->bind('foo_topic', 'foo_processor2', function () use (&$received) {
- ++$received;
-
- return Result::ACK;
- });
-
- $client->send('foo_topic', 'Hello there!');
-
- $client->consume(new ChainExtension([
- new LimitConsumptionTimeExtension(new \DateTime('+5sec')),
- new LimitConsumedMessagesExtension(3),
- ]));
-
- $this->assertSame(2, $received);
- }
-}
diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json
index cfa0e9ca1..0f8e11f75 100644
--- a/pkg/enqueue/composer.json
+++ b/pkg/enqueue/composer.json
@@ -21,7 +21,8 @@
"symfony/dependency-injection": "^2.8|^3",
"symfony/config": "^2.8|^3",
"enqueue/null": "^0.3",
- "enqueue/test": "^0.3"
+ "enqueue/test": "^0.3",
+ "enqueue/simple-client": "^0.3"
},
"suggest": {
"symfony/console": "^2.8|^3 If you want to use li commands",
@@ -31,7 +32,8 @@
"enqueue/stomp": "STOMP transport",
"enqueue/fs": "Filesystem transport",
"enqueue/redis": "Redis transport",
- "enqueue/dbal": "Doctrine DBAL transport"
+ "enqueue/dbal": "Doctrine DBAL transport",
+ "enqueue/sqs": "Amazon AWS SQS transport"
},
"autoload": {
"psr-4": { "Enqueue\\": "" },
diff --git a/pkg/simple-client/.gitignore b/pkg/simple-client/.gitignore
new file mode 100644
index 000000000..a770439e5
--- /dev/null
+++ b/pkg/simple-client/.gitignore
@@ -0,0 +1,6 @@
+*~
+/composer.lock
+/composer.phar
+/phpunit.xml
+/vendor/
+/.idea/
diff --git a/pkg/simple-client/.travis.yml b/pkg/simple-client/.travis.yml
new file mode 100644
index 000000000..42374ddc7
--- /dev/null
+++ b/pkg/simple-client/.travis.yml
@@ -0,0 +1,21 @@
+sudo: false
+
+git:
+ depth: 1
+
+language: php
+
+php:
+ - '5.6'
+ - '7.0'
+
+cache:
+ directories:
+ - $HOME/.composer/cache
+
+install:
+ - composer self-update
+ - composer install --prefer-source
+
+script:
+ - vendor/bin/phpunit --exclude-group=functional
diff --git a/pkg/simple-client/LICENSE b/pkg/simple-client/LICENSE
new file mode 100644
index 000000000..70fa75252
--- /dev/null
+++ b/pkg/simple-client/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2017 Kotliar Maksym
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
\ No newline at end of file
diff --git a/pkg/simple-client/README.md b/pkg/simple-client/README.md
new file mode 100644
index 000000000..d842aa27d
--- /dev/null
+++ b/pkg/simple-client/README.md
@@ -0,0 +1 @@
+# Message Queue Simple Client.
diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php
new file mode 100644
index 000000000..5879b969d
--- /dev/null
+++ b/pkg/simple-client/SimpleClient.php
@@ -0,0 +1,293 @@
+ [
+ * 'default' => 'amqp',
+ * 'amqp' => [],
+ * ....
+ * ],
+ * 'client' => [
+ * 'prefix' => 'enqueue',
+ * 'app_name' => 'app',
+ * 'router_topic' => 'router',
+ * 'router_queue' => 'default',
+ * 'default_processor_queue' => 'default',
+ * 'redelivered_delay_time' => 0
+ * ],
+ * 'extensions' => [
+ * 'signal_extension' => true,
+ * ]
+ * ]
+ *
+ *
+ * @param string|array $config
+ */
+ public function __construct($config)
+ {
+ $this->container = $this->buildContainer($config);
+ }
+
+ /**
+ * @param array|string $config
+ *
+ * @return ContainerBuilder
+ */
+ private function buildContainer($config)
+ {
+ $config = $this->buildConfig($config);
+ $extension = $this->buildContainerExtension($config);
+
+ $container = new ContainerBuilder();
+ $container->registerExtension($extension);
+ $container->loadFromExtension($extension->getAlias(), $config);
+
+ $container->compile();
+
+ return $container;
+ }
+
+ /**
+ * @param array $config
+ *
+ * @return SimpleClientContainerExtension
+ */
+ private function buildContainerExtension($config)
+ {
+ $map = [
+ 'default' => DefaultTransportFactory::class,
+ 'amqp' => AmqpTransportFactory::class,
+ 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class,
+ 'dbal' => DbalTransportFactory::class,
+ 'fs' => FsTransportFactory::class,
+ 'redis' => RedisTransportFactory::class,
+ 'stomp' => StompTransportFactory::class,
+ 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class,
+ 'sqs' => SqsTransportFactory::class,
+ ];
+
+ $extension = new SimpleClientContainerExtension();
+
+ foreach (array_keys($config['transport']) as $transport) {
+ if (false == isset($map[$transport])) {
+ throw new \LogicException(sprintf('Transport is not supported: "%s"', $transport));
+ }
+
+ $extension->addTransportFactory(new $map[$transport]);
+ }
+
+ return $extension;
+ }
+
+ /**
+ * @param array|string $config
+ *
+ * @return array
+ */
+ private function buildConfig($config)
+ {
+ if (is_string($config)) {
+ $extConfig = [
+ 'client' => [],
+ 'transport' => [
+ 'default' => $config,
+ $config => [],
+ ],
+ ];
+ } elseif (is_array($config)) {
+ $extConfig = array_merge_recursive([
+ 'client' => [],
+ 'transport' => [],
+ ], $config);
+ } else {
+ throw new \LogicException('Expects config is string or array');
+ }
+
+ if (empty($extConfig['transport']['default'])) {
+ $defaultTransport = null;
+ foreach ($extConfig['transport'] as $transport => $config) {
+ if ('default' === $transport) {
+ continue;
+ }
+
+ $defaultTransport = $transport;
+ break;
+ }
+
+ if (false == $defaultTransport) {
+ throw new \LogicException('There is no transport configured');
+ }
+
+ $extConfig['transport']['default'] = $defaultTransport;
+ }
+
+ return $extConfig;
+ }
+
+ /**
+ * @param string $topic
+ * @param string $processorName
+ * @param callback $processor
+ */
+ public function bind($topic, $processorName, callable $processor)
+ {
+ $queueName = $this->getConfig()->getDefaultProcessorQueueName();
+
+ $this->getTopicMetaRegistry()->addProcessor($topic, $processorName);
+ $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName);
+ $this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor));
+ $this->getRouterProcessor()->add($topic, $queueName, $processorName);
+ }
+
+ /**
+ * @param string $topic
+ * @param string|array $message
+ * @param bool $setupBroker
+ */
+ public function send($topic, $message, $setupBroker = false)
+ {
+ $this->getProducer($setupBroker)->send($topic, $message);
+ }
+
+ /**
+ * @param ExtensionInterface|null $runtimeExtension
+ */
+ public function consume(ExtensionInterface $runtimeExtension = null)
+ {
+ $this->setupBroker();
+ $processor = $this->getDelegateProcessor();
+ $queueConsumer = $this->getQueueConsumer();
+
+ $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName();
+ $defaultTransportQueueName = $this->getConfig()->createTransportQueueName($defaultQueueName);
+
+ $queueConsumer->bind($defaultTransportQueueName, $processor);
+ if ($this->getConfig()->getRouterQueueName() != $defaultQueueName) {
+ $routerTransportQueueName = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName());
+
+ $queueConsumer->bind($routerTransportQueueName, $processor);
+ }
+
+ $queueConsumer->consume($runtimeExtension);
+ }
+
+ /**
+ * @return PsrContext
+ */
+ public function getContext()
+ {
+ return $this->container->get('enqueue.transport.context');
+ }
+
+ /**
+ * @return QueueConsumer
+ */
+ public function getQueueConsumer()
+ {
+ return $this->container->get('enqueue.client.queue_consumer');
+ }
+
+ /**
+ * @return Config
+ */
+ public function getConfig()
+ {
+ return $this->container->get('enqueue.client.config');
+ }
+
+ /**
+ * @return DriverInterface
+ */
+ public function getDriver()
+ {
+ return $this->container->get('enqueue.client.driver');
+ }
+
+ /**
+ * @return TopicMetaRegistry
+ */
+ public function getTopicMetaRegistry()
+ {
+ return $this->container->get('enqueue.client.meta.topic_meta_registry');
+ }
+
+ /**
+ * @return QueueMetaRegistry
+ */
+ public function getQueueMetaRegistry()
+ {
+ return $this->container->get('enqueue.client.meta.queue_meta_registry');
+ }
+
+ /**
+ * @param bool $setupBroker
+ *
+ * @return ProducerInterface
+ */
+ public function getProducer($setupBroker = false)
+ {
+ $setupBroker && $this->setupBroker();
+
+ return $this->container->get('enqueue.client.producer');
+ }
+
+ public function setupBroker()
+ {
+ $this->getDriver()->setupBroker();
+ }
+
+ /**
+ * @return ArrayProcessorRegistry
+ */
+ public function getProcessorRegistry()
+ {
+ return $this->container->get('enqueue.client.processor_registry');
+ }
+
+ /**
+ * @return DelegateProcessor
+ */
+ public function getDelegateProcessor()
+ {
+ return $this->container->get('enqueue.client.delegate_processor');
+ }
+
+ /**
+ * @return RouterProcessor
+ */
+ public function getRouterProcessor()
+ {
+ return $this->container->get('enqueue.client.router_processor');
+ }
+}
diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php
new file mode 100644
index 000000000..0784cfb17
--- /dev/null
+++ b/pkg/simple-client/SimpleClientContainerExtension.php
@@ -0,0 +1,172 @@
+factories = [];
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getAlias()
+ {
+ return 'enqueue';
+ }
+
+ /**
+ * @return NodeInterface
+ */
+ private function createConfiguration()
+ {
+ $tb = new TreeBuilder();
+ $rootNode = $tb->root('enqueue');
+
+ $transportChildren = $rootNode->children()
+ ->arrayNode('transport')->isRequired()->children();
+
+ foreach ($this->factories as $factory) {
+ $factory->addConfiguration(
+ $transportChildren->arrayNode($factory->getName())
+ );
+ }
+
+ $rootNode->children()
+ ->arrayNode('client')->children()
+ ->scalarNode('prefix')->defaultValue('enqueue')->end()
+ ->scalarNode('app_name')->defaultValue('app')->end()
+ ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end()
+ ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end()
+ ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end()
+ ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
+ ->end()->end()
+ ->arrayNode('extensions')->addDefaultsIfNotSet()->children()
+ ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
+ ->end()->end()
+ ;
+
+ return $tb->buildTree();
+ }
+
+ /**
+ * @param TransportFactoryInterface $transportFactory
+ */
+ public function addTransportFactory(TransportFactoryInterface $transportFactory)
+ {
+ $name = $transportFactory->getName();
+
+ if (empty($name)) {
+ throw new \LogicException('Transport factory name cannot be empty');
+ }
+ if (array_key_exists($name, $this->factories)) {
+ throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name));
+ }
+
+ $this->factories[$name] = $transportFactory;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function load(array $configs, ContainerBuilder $container)
+ {
+ $configProcessor = new Processor();
+ $config = $configProcessor->process($this->createConfiguration(), $configs);
+
+ foreach ($config['transport'] as $name => $transportConfig) {
+ $this->factories[$name]->createConnectionFactory($container, $transportConfig);
+ $this->factories[$name]->createContext($container, $transportConfig);
+ $this->factories[$name]->createDriver($container, $transportConfig);
+ }
+
+ $container->register('enqueue.client.config', Config::class)
+ ->setArguments([
+ $config['client']['prefix'],
+ $config['client']['app_name'],
+ $config['client']['router_topic'],
+ $config['client']['router_queue'],
+ $config['client']['default_processor_queue'],
+ 'enqueue.client.router_processor',
+ $config['transport'][$config['transport']['default']['alias']],
+ ]);
+
+ $container->register('enqueue.client.producer', Producer::class)
+ ->setArguments([
+ new Reference('enqueue.client.driver')
+ ]);
+
+ $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class)
+ ->setArguments([[]]);
+
+ $container->register('enqueue.client.meta.queue_meta_registry', QueueMetaRegistry::class)
+ ->setArguments([
+ new Reference('enqueue.client.config'),
+ [],
+ ]);
+
+ $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class);
+
+ $container->register('enqueue.client.delegate_processor', DelegateProcessor::class)
+ ->setArguments([new Reference('enqueue.client.processor_registry')]);
+
+ $container->register('enqueue.client.queue_consumer', QueueConsumer::class)
+ ->setArguments([
+ new Reference('enqueue.transport.context'),
+ new Reference('enqueue.consumption.extensions')
+ ]);
+
+ // router
+ $container->register('enqueue.client.router_processor', RouterProcessor::class)
+ ->setArguments([new Reference('enqueue.client.driver'), []]);
+ $container->getDefinition('enqueue.client.processor_registry')
+ ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]);
+ $container->getDefinition('enqueue.client.meta.queue_meta_registry')
+ ->addMethodCall('addProcessor', [$config['client']['router_queue'], 'enqueue.client.router_processor']);
+
+ // extensions
+ $extensions = [];
+ if ($config['client']['redelivered_delay_time']) {
+ $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class)
+ ->setArguments([
+ new Reference('enqueue.client.driver'),
+ $config['client']['redelivered_delay_time']
+ ]);
+
+ $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension');
+ }
+
+ $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class)
+ ->setArguments([new Reference('enqueue.client.driver')]);
+
+ $extensions[] = new Reference('enqueue.client.extension.set_router_properties');
+
+ $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class)
+ ->setArguments([$extensions]);
+ }
+}
diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php
new file mode 100644
index 000000000..7cef59d54
--- /dev/null
+++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php
@@ -0,0 +1,113 @@
+removeQueue('enqueue.app.default');
+ }
+
+ public function transportConfigDataProvider()
+ {
+ $amqp = [
+ 'transport' => [
+ 'amqp' => [
+ 'host' => getenv('SYMFONY__RABBITMQ__HOST'),
+ 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
+ 'login' => getenv('SYMFONY__RABBITMQ__USER'),
+ 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
+ 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
+ ],
+ ],
+ ];
+
+ $rabbitmqAmqp = [
+ 'transport' => [
+ 'rabbitmq_amqp' => [
+ 'host' => getenv('SYMFONY__RABBITMQ__HOST'),
+ 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
+ 'login' => getenv('SYMFONY__RABBITMQ__USER'),
+ 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
+ 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
+ ],
+ ],
+ ];
+
+ return [[$amqp, $rabbitmqAmqp]];
+ }
+
+ /**
+ * @dataProvider transportConfigDataProvider
+ */
+ public function testProduceAndConsumeOneMessage($config)
+ {
+ $actualMessage = null;
+
+ $client = new SimpleClient($config);
+ $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) {
+ $actualMessage = $message;
+
+ return Result::ACK;
+ });
+
+ $client->send('foo_topic', 'Hello there!', true);
+
+ $client->consume(new ChainExtension([
+ new LimitConsumptionTimeExtension(new \DateTime('+5sec')),
+ new LimitConsumedMessagesExtension(2),
+ ]));
+
+ $this->assertInstanceOf(PsrMessage::class, $actualMessage);
+ $this->assertSame('Hello there!', $actualMessage->getBody());
+ }
+
+ /**
+ * @dataProvider transportConfigDataProvider
+ */
+ public function testProduceAndRouteToTwoConsumes($config)
+ {
+ $received = 0;
+
+ $client = new SimpleClient($config);
+ $client->bind('foo_topic', 'foo_processor1', function () use (&$received) {
+ ++$received;
+
+ return Result::ACK;
+ });
+ $client->bind('foo_topic', 'foo_processor2', function () use (&$received) {
+ ++$received;
+
+ return Result::ACK;
+ });
+
+ $client->send('foo_topic', 'Hello there!', true);
+
+ $client->consume(new ChainExtension([
+ new LimitConsumptionTimeExtension(new \DateTime('+5sec')),
+ new LimitConsumedMessagesExtension(3),
+ ]));
+
+ $this->assertSame(2, $received);
+ }
+}
diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json
new file mode 100644
index 000000000..7df693ba2
--- /dev/null
+++ b/pkg/simple-client/composer.json
@@ -0,0 +1,36 @@
+{
+ "name": "enqueue/simple-client",
+ "type": "library",
+ "description": "Message Queue Simple Client",
+ "keywords": ["messaging", "queue", "amqp", "rabbitmq"],
+ "license": "MIT",
+ "repositories": [
+ {
+ "type": "vcs",
+ "url": "git@github.com:php-enqueue/test.git"
+ }
+ ],
+ "require": {
+ "php": ">=5.6",
+ "enqueue/enqueue": "^0.3",
+ "symfony/dependency-injection": "^2.8|^3",
+ "symfony/config": "^2.8|^3",
+ "symfony/console": "^2.8|^3"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.5",
+ "enqueue/test": "^0.3"
+ },
+ "autoload": {
+ "psr-4": { "Enqueue\\SimpleClient\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.3.x-dev"
+ }
+ }
+}
diff --git a/pkg/simple-client/phpunit.xml.dist b/pkg/simple-client/phpunit.xml.dist
new file mode 100644
index 000000000..e86476dec
--- /dev/null
+++ b/pkg/simple-client/phpunit.xml.dist
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Resources
+ ./Tests
+
+
+
+
diff --git a/pkg/stomp/Symfony/StompTransportFactory.php b/pkg/stomp/Symfony/StompTransportFactory.php
index d3d30ecf6..c7ed2f4ec 100644
--- a/pkg/stomp/Symfony/StompTransportFactory.php
+++ b/pkg/stomp/Symfony/StompTransportFactory.php
@@ -84,6 +84,7 @@ public function createDriver(ContainerBuilder $container, array $config)
$driver->setArguments([
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
new Reference('enqueue.client.config'),
+ new Reference('enqueue.client.meta.queue_meta_registry'),
]);
$driverId = sprintf('enqueue.client.%s.driver', $this->getName());