diff --git a/bin/release b/bin/release index 511e3053b..87613ea64 100755 --- a/bin/release +++ b/bin/release @@ -13,7 +13,7 @@ fi CURRENT_BRANCH=`git rev-parse --abbrev-ref HEAD` -for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue enqueue-bundle job-queue test +for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue simple-client enqueue-bundle job-queue test do TMP_DIR="/tmp/enqueue-repo" REMOTE_URL=`git remote get-url $REMOTE` diff --git a/bin/subtree-split b/bin/subtree-split index 53e46cce1..6dbdc0193 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -45,6 +45,7 @@ function remote() remote psr-queue git@github.com:php-enqueue/psr-queue.git remote enqueue git@github.com:php-enqueue/enqueue.git +remote simple-client git@github.com:php-enqueue/simple-client.git remote stomp git@github.com:php-enqueue/stomp.git remote amqp-ext git@github.com:php-enqueue/amqp-ext.git remote fs git@github.com:php-enqueue/fs.git @@ -58,6 +59,7 @@ remote test git@github.com:php-enqueue/test.git split 'pkg/psr-queue' psr-queue split 'pkg/enqueue' enqueue +split 'pkg/simple-client' simple-client split 'pkg/stomp' stomp split 'pkg/amqp-ext' amqp-ext split 'pkg/fs' fs diff --git a/composer.json b/composer.json index 2afbec114..5730bb920 100644 --- a/composer.json +++ b/composer.json @@ -15,6 +15,7 @@ "enqueue/sqs": "*@dev", "enqueue/enqueue-bundle": "*@dev", "enqueue/job-queue": "*@dev", + "enqueue/simple-client": "*@dev", "enqueue/test": "*@dev", "phpunit/phpunit": "^5", @@ -77,6 +78,10 @@ { "type": "path", "url": "pkg/sqs" + }, + { + "type": "path", + "url": "pkg/simple-client" } ] } diff --git a/docs/client/quick_tour.md b/docs/client/quick_tour.md new file mode 100644 index 000000000..107f2bbaa --- /dev/null +++ b/docs/client/quick_tour.md @@ -0,0 +1,123 @@ +# Simple client. Quick tour. + +The simple client library takes Enqueue client classes and Symfony components and makes an easy to use client facade. +It reduces the boiler plate code you have to write to start using the Enqueue client features. + +* [Install](#install) +* [Configure](#configure) +* [Producer message](#produce-message) +* [Consume messages](#consume-messages) + +## Install + +```bash +$ composer require enqueue/simple-client enqueue/amqp-ext +``` + +## Configure + +```php + [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'guest', + ], + ], + 'client' => [ + 'app_name' => 'plain_php', + ], +]); +``` + +## Produce message + +```php +send('a_bar_topic', 'aMessageData'); + +// or an array + +$client->send('a_bar_topic', ['foo', 'bar']); + +// or an json serializable object +$client->send('a_bar_topic', new class() implements \JsonSerializable { + public function jsonSerialize() { + return ['foo', 'bar']; + } +}); +``` + +## Consume messages + +```php +bind('a_bar_topic', 'a_processor_name', function(PsrMessage $psrMessage) { + // processing logic here +}); + +$client->consume(); +``` + +## Cli commands + +```php +#!/usr/bin/env php +add(new SetupBrokerCommand($client->getDriver())); +$application->add(new ProduceMessageCommand($client->getProducer())); +$application->add(new QueuesCommand($client->getQueueMetaRegistry())); +$application->add(new TopicsCommand($client->getTopicMetaRegistry())); +$application->add(new ConsumeMessagesCommand( + $client->getQueueConsumer(), + $client->getDelegateProcessor(), + $client->getQueueMetaRegistry(), + $client->getDriver() +)); + +$application->run(); +``` + +and run to see what is there: + +```bash +$ php bin/enqueue.php +``` + +or consume messages + +```bash +$ php bin/enqueue.php enqueue:consume -vvv --setup-broker +``` + +[back to index](../index.md) diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index 460df5943..cccd4f602 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -1,5 +1,7 @@ # Client. RPC call +The client's [quick tour](quick_tour.md) describes how to get the client object. +We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var. ## The client side @@ -8,13 +10,10 @@ It allows you to easily send a message and wait for a reply. ```php getProducer(), $context); $replyMessage = $rpcClient->call('greeting_topic', 'Hi Thomas!', 5); @@ -24,13 +23,10 @@ You can perform several requests asynchronously with `callAsync` and request rep ```php getProducer(), $context); $promises = []; @@ -53,7 +49,6 @@ Of course it is possible to implement rpc server side based on transport classes ```php context); +/** @var \Enqueue\SimpleClient\SimpleClient $client */ + $client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { echo $message->getBody(); diff --git a/docs/index.md b/docs/index.md index e5c3ee007..d8c9258cc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,6 +13,7 @@ - [Extensions](consumption/extensions.md) - [Message processor](consumption/message_processor.md) * Client + - [Quick tour](client/quick_tour.md) - [Message examples](client/message_examples.md) - [Supported brokers](client/supported_brokers.md) - [Message bus](client/message_bus.md) diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 75f8ba00e..d7749b73a 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -167,20 +167,30 @@ Here's an example of how you can send and consume messages. ```php bind('foo_topic', 'processor_name', function (PsrMessage $message) { - // process message - - return PsrProcessor::ACK; +$client = new SimpleClient([ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'guest', + ], + ], + 'client' => true, +]); + +$client->setupBroker(); + +$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) { + // your processing logic here }); -$client->send('foo_topic', 'Hello there!'); +$client->send('a_bar_topic', 'aMessageData'); // in another process you can consume messages. $client->consume(); diff --git a/phpunit.xml.dist b/phpunit.xml.dist index f2faadb8e..429ead9d2 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -56,6 +56,10 @@ pkg/job-queue/Tests + + + pkg/simple-client/Tests + diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/enqueue/Client/SimpleClient.php deleted file mode 100644 index 27f295cdb..000000000 --- a/pkg/enqueue/Client/SimpleClient.php +++ /dev/null @@ -1,174 +0,0 @@ -context = $context; - $this->config = $config ?: Config::create(); - - $this->queueMetaRegistry = new QueueMetaRegistry($this->config, []); - $this->queueMetaRegistry->add($this->config->getDefaultProcessorQueueName()); - $this->queueMetaRegistry->add($this->config->getRouterQueueName()); - - $this->topicsMetaRegistry = new TopicMetaRegistry([]); - $this->processorsRegistry = new ArrayProcessorRegistry(); - - $this->driver = new AmqpDriver($context, $this->config, $this->queueMetaRegistry); - $this->routerProcessor = new RouterProcessor($this->driver, []); - - $this->processorsRegistry->add($this->config->getRouterProcessorName(), $this->routerProcessor); - $this->queueMetaRegistry->addProcessor($this->config->getRouterQueueName(), $this->routerProcessor); - } - - /** - * @param string $topic - * @param string $processorName - * @param callback $processor - */ - public function bind($topic, $processorName, callable $processor) - { - $queueName = $this->config->getDefaultProcessorQueueName(); - - $this->topicsMetaRegistry->addProcessor($topic, $processorName); - $this->queueMetaRegistry->addProcessor($queueName, $processorName); - $this->processorsRegistry->add($processorName, new CallbackProcessor($processor)); - - $this->routerProcessor->add($topic, $queueName, $processorName); - } - - public function send($topic, $message) - { - $this->getProducer()->send($topic, $message); - } - - public function consume(ExtensionInterface $runtimeExtension = null) - { - $this->driver->setupBroker(); - - $processor = $this->getProcessor(); - - $queueConsumer = $this->getQueueConsumer(); - - $defaultQueueName = $this->config->getDefaultProcessorQueueName(); - $defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName); - - $queueConsumer->bind($defaultTransportQueueName, $processor); - if ($this->config->getRouterQueueName() != $defaultQueueName) { - $routerTransportQueueName = $this->config->createTransportQueueName($this->config->getRouterQueueName()); - - $queueConsumer->bind($routerTransportQueueName, $processor); - } - - $queueConsumer->consume($runtimeExtension); - } - - /** - * @return AmqpContext - */ - public function getContext() - { - return $this->context; - } - - /** - * @return QueueConsumer - */ - public function getQueueConsumer() - { - return new QueueConsumer($this->context, new ChainExtension([ - new SetRouterPropertiesExtension($this->driver), - ])); - } - - /** - * @return DriverInterface - */ - public function getDriver() - { - return $this->driver; - } - - /** - * @return TopicMetaRegistry - */ - public function getTopicMetaRegistry() - { - return $this->topicsMetaRegistry; - } - - /** - * @return QueueMetaRegistry - */ - public function getQueueMetaRegistry() - { - return $this->queueMetaRegistry; - } - - /** - * @return ProducerInterface - */ - public function getProducer() - { - $this->driver->setupBroker(); - - return new Producer($this->driver); - } - - /** - * @return DelegateProcessor - */ - public function getProcessor() - { - return new DelegateProcessor($this->processorsRegistry); - } -} diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php index bf7d69e13..49f6fa20a 100644 --- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php @@ -4,7 +4,7 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\Client\RpcClient; -use Enqueue\Client\SimpleClient; +use Enqueue\SimpleClient\SimpleClient; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension; use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; @@ -39,14 +39,27 @@ public function setUp() $this->context = $this->buildAmqpContext(); $this->replyContext = $this->buildAmqpContext(); - $this->removeQueue('default'); + $this->removeQueue('enqueue.app.default'); } public function testProduceAndConsumeOneMessage() { + $config = [ + 'transport' => [ + 'rabbitmq_amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + $requestMessage = null; - $client = new SimpleClient($this->context); + $client = new SimpleClient($config); + $client->setupBroker(); $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { $requestMessage = $message; diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php deleted file mode 100644 index f96ba9571..000000000 --- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php +++ /dev/null @@ -1,83 +0,0 @@ -context = $this->buildAmqpContext(); - - $this->removeQueue('default'); - } - - public function testProduceAndConsumeOneMessage() - { - $actualMessage = null; - - $client = new SimpleClient($this->context); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { - $actualMessage = $message; - - return Result::ACK; - }); - - $client->send('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(2), - ])); - - $this->assertInstanceOf(PsrMessage::class, $actualMessage); - $this->assertSame('Hello there!', $actualMessage->getBody()); - } - - public function testProduceAndRouteToTwoConsumes() - { - $received = 0; - - $client = new SimpleClient($this->context); - $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - - $client->send('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(3), - ])); - - $this->assertSame(2, $received); - } -} diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index cfa0e9ca1..0f8e11f75 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -21,7 +21,8 @@ "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3", "enqueue/null": "^0.3", - "enqueue/test": "^0.3" + "enqueue/test": "^0.3", + "enqueue/simple-client": "^0.3" }, "suggest": { "symfony/console": "^2.8|^3 If you want to use li commands", @@ -31,7 +32,8 @@ "enqueue/stomp": "STOMP transport", "enqueue/fs": "Filesystem transport", "enqueue/redis": "Redis transport", - "enqueue/dbal": "Doctrine DBAL transport" + "enqueue/dbal": "Doctrine DBAL transport", + "enqueue/sqs": "Amazon AWS SQS transport" }, "autoload": { "psr-4": { "Enqueue\\": "" }, diff --git a/pkg/simple-client/.gitignore b/pkg/simple-client/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/simple-client/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/simple-client/.travis.yml b/pkg/simple-client/.travis.yml new file mode 100644 index 000000000..42374ddc7 --- /dev/null +++ b/pkg/simple-client/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/simple-client/LICENSE b/pkg/simple-client/LICENSE new file mode 100644 index 000000000..70fa75252 --- /dev/null +++ b/pkg/simple-client/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2017 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/pkg/simple-client/README.md b/pkg/simple-client/README.md new file mode 100644 index 000000000..d842aa27d --- /dev/null +++ b/pkg/simple-client/README.md @@ -0,0 +1 @@ +# Message Queue Simple Client. diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php new file mode 100644 index 000000000..5879b969d --- /dev/null +++ b/pkg/simple-client/SimpleClient.php @@ -0,0 +1,293 @@ + [ + * 'default' => 'amqp', + * 'amqp' => [], + * .... + * ], + * 'client' => [ + * 'prefix' => 'enqueue', + * 'app_name' => 'app', + * 'router_topic' => 'router', + * 'router_queue' => 'default', + * 'default_processor_queue' => 'default', + * 'redelivered_delay_time' => 0 + * ], + * 'extensions' => [ + * 'signal_extension' => true, + * ] + * ] + * + * + * @param string|array $config + */ + public function __construct($config) + { + $this->container = $this->buildContainer($config); + } + + /** + * @param array|string $config + * + * @return ContainerBuilder + */ + private function buildContainer($config) + { + $config = $this->buildConfig($config); + $extension = $this->buildContainerExtension($config); + + $container = new ContainerBuilder(); + $container->registerExtension($extension); + $container->loadFromExtension($extension->getAlias(), $config); + + $container->compile(); + + return $container; + } + + /** + * @param array $config + * + * @return SimpleClientContainerExtension + */ + private function buildContainerExtension($config) + { + $map = [ + 'default' => DefaultTransportFactory::class, + 'amqp' => AmqpTransportFactory::class, + 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, + 'dbal' => DbalTransportFactory::class, + 'fs' => FsTransportFactory::class, + 'redis' => RedisTransportFactory::class, + 'stomp' => StompTransportFactory::class, + 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, + 'sqs' => SqsTransportFactory::class, + ]; + + $extension = new SimpleClientContainerExtension(); + + foreach (array_keys($config['transport']) as $transport) { + if (false == isset($map[$transport])) { + throw new \LogicException(sprintf('Transport is not supported: "%s"', $transport)); + } + + $extension->addTransportFactory(new $map[$transport]); + } + + return $extension; + } + + /** + * @param array|string $config + * + * @return array + */ + private function buildConfig($config) + { + if (is_string($config)) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + $config => [], + ], + ]; + } elseif (is_array($config)) { + $extConfig = array_merge_recursive([ + 'client' => [], + 'transport' => [], + ], $config); + } else { + throw new \LogicException('Expects config is string or array'); + } + + if (empty($extConfig['transport']['default'])) { + $defaultTransport = null; + foreach ($extConfig['transport'] as $transport => $config) { + if ('default' === $transport) { + continue; + } + + $defaultTransport = $transport; + break; + } + + if (false == $defaultTransport) { + throw new \LogicException('There is no transport configured'); + } + + $extConfig['transport']['default'] = $defaultTransport; + } + + return $extConfig; + } + + /** + * @param string $topic + * @param string $processorName + * @param callback $processor + */ + public function bind($topic, $processorName, callable $processor) + { + $queueName = $this->getConfig()->getDefaultProcessorQueueName(); + + $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); + $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); + $this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor)); + $this->getRouterProcessor()->add($topic, $queueName, $processorName); + } + + /** + * @param string $topic + * @param string|array $message + * @param bool $setupBroker + */ + public function send($topic, $message, $setupBroker = false) + { + $this->getProducer($setupBroker)->send($topic, $message); + } + + /** + * @param ExtensionInterface|null $runtimeExtension + */ + public function consume(ExtensionInterface $runtimeExtension = null) + { + $this->setupBroker(); + $processor = $this->getDelegateProcessor(); + $queueConsumer = $this->getQueueConsumer(); + + $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName(); + $defaultTransportQueueName = $this->getConfig()->createTransportQueueName($defaultQueueName); + + $queueConsumer->bind($defaultTransportQueueName, $processor); + if ($this->getConfig()->getRouterQueueName() != $defaultQueueName) { + $routerTransportQueueName = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName()); + + $queueConsumer->bind($routerTransportQueueName, $processor); + } + + $queueConsumer->consume($runtimeExtension); + } + + /** + * @return PsrContext + */ + public function getContext() + { + return $this->container->get('enqueue.transport.context'); + } + + /** + * @return QueueConsumer + */ + public function getQueueConsumer() + { + return $this->container->get('enqueue.client.queue_consumer'); + } + + /** + * @return Config + */ + public function getConfig() + { + return $this->container->get('enqueue.client.config'); + } + + /** + * @return DriverInterface + */ + public function getDriver() + { + return $this->container->get('enqueue.client.driver'); + } + + /** + * @return TopicMetaRegistry + */ + public function getTopicMetaRegistry() + { + return $this->container->get('enqueue.client.meta.topic_meta_registry'); + } + + /** + * @return QueueMetaRegistry + */ + public function getQueueMetaRegistry() + { + return $this->container->get('enqueue.client.meta.queue_meta_registry'); + } + + /** + * @param bool $setupBroker + * + * @return ProducerInterface + */ + public function getProducer($setupBroker = false) + { + $setupBroker && $this->setupBroker(); + + return $this->container->get('enqueue.client.producer'); + } + + public function setupBroker() + { + $this->getDriver()->setupBroker(); + } + + /** + * @return ArrayProcessorRegistry + */ + public function getProcessorRegistry() + { + return $this->container->get('enqueue.client.processor_registry'); + } + + /** + * @return DelegateProcessor + */ + public function getDelegateProcessor() + { + return $this->container->get('enqueue.client.delegate_processor'); + } + + /** + * @return RouterProcessor + */ + public function getRouterProcessor() + { + return $this->container->get('enqueue.client.router_processor'); + } +} diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php new file mode 100644 index 000000000..0784cfb17 --- /dev/null +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -0,0 +1,172 @@ +factories = []; + } + + /** + * {@inheritdoc} + */ + public function getAlias() + { + return 'enqueue'; + } + + /** + * @return NodeInterface + */ + private function createConfiguration() + { + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); + + $transportChildren = $rootNode->children() + ->arrayNode('transport')->isRequired()->children(); + + foreach ($this->factories as $factory) { + $factory->addConfiguration( + $transportChildren->arrayNode($factory->getName()) + ); + } + + $rootNode->children() + ->arrayNode('client')->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end()->end() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->end()->end() + ; + + return $tb->buildTree(); + } + + /** + * @param TransportFactoryInterface $transportFactory + */ + public function addTransportFactory(TransportFactoryInterface $transportFactory) + { + $name = $transportFactory->getName(); + + if (empty($name)) { + throw new \LogicException('Transport factory name cannot be empty'); + } + if (array_key_exists($name, $this->factories)) { + throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name)); + } + + $this->factories[$name] = $transportFactory; + } + + /** + * {@inheritdoc} + */ + public function load(array $configs, ContainerBuilder $container) + { + $configProcessor = new Processor(); + $config = $configProcessor->process($this->createConfiguration(), $configs); + + foreach ($config['transport'] as $name => $transportConfig) { + $this->factories[$name]->createConnectionFactory($container, $transportConfig); + $this->factories[$name]->createContext($container, $transportConfig); + $this->factories[$name]->createDriver($container, $transportConfig); + } + + $container->register('enqueue.client.config', Config::class) + ->setArguments([ + $config['client']['prefix'], + $config['client']['app_name'], + $config['client']['router_topic'], + $config['client']['router_queue'], + $config['client']['default_processor_queue'], + 'enqueue.client.router_processor', + $config['transport'][$config['transport']['default']['alias']], + ]); + + $container->register('enqueue.client.producer', Producer::class) + ->setArguments([ + new Reference('enqueue.client.driver') + ]); + + $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) + ->setArguments([[]]); + + $container->register('enqueue.client.meta.queue_meta_registry', QueueMetaRegistry::class) + ->setArguments([ + new Reference('enqueue.client.config'), + [], + ]); + + $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class); + + $container->register('enqueue.client.delegate_processor', DelegateProcessor::class) + ->setArguments([new Reference('enqueue.client.processor_registry')]); + + $container->register('enqueue.client.queue_consumer', QueueConsumer::class) + ->setArguments([ + new Reference('enqueue.transport.context'), + new Reference('enqueue.consumption.extensions') + ]); + + // router + $container->register('enqueue.client.router_processor', RouterProcessor::class) + ->setArguments([new Reference('enqueue.client.driver'), []]); + $container->getDefinition('enqueue.client.processor_registry') + ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); + $container->getDefinition('enqueue.client.meta.queue_meta_registry') + ->addMethodCall('addProcessor', [$config['client']['router_queue'], 'enqueue.client.router_processor']); + + // extensions + $extensions = []; + if ($config['client']['redelivered_delay_time']) { + $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) + ->setArguments([ + new Reference('enqueue.client.driver'), + $config['client']['redelivered_delay_time'] + ]); + + $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); + } + + $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class) + ->setArguments([new Reference('enqueue.client.driver')]); + + $extensions[] = new Reference('enqueue.client.extension.set_router_properties'); + + $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) + ->setArguments([$extensions]); + } +} diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php new file mode 100644 index 000000000..7cef59d54 --- /dev/null +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -0,0 +1,113 @@ +removeQueue('enqueue.app.default'); + } + + public function transportConfigDataProvider() + { + $amqp = [ + 'transport' => [ + 'amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + + $rabbitmqAmqp = [ + 'transport' => [ + 'rabbitmq_amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + + return [[$amqp, $rabbitmqAmqp]]; + } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testProduceAndConsumeOneMessage($config) + { + $actualMessage = null; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { + $actualMessage = $message; + + return Result::ACK; + }); + + $client->send('foo_topic', 'Hello there!', true); + + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumedMessagesExtension(2), + ])); + + $this->assertInstanceOf(PsrMessage::class, $actualMessage); + $this->assertSame('Hello there!', $actualMessage->getBody()); + } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testProduceAndRouteToTwoConsumes($config) + { + $received = 0; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + + $client->send('foo_topic', 'Hello there!', true); + + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumedMessagesExtension(3), + ])); + + $this->assertSame(2, $received); + } +} diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json new file mode 100644 index 000000000..7df693ba2 --- /dev/null +++ b/pkg/simple-client/composer.json @@ -0,0 +1,36 @@ +{ + "name": "enqueue/simple-client", + "type": "library", + "description": "Message Queue Simple Client", + "keywords": ["messaging", "queue", "amqp", "rabbitmq"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "enqueue/enqueue": "^0.3", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3", + "symfony/console": "^2.8|^3" + }, + "require-dev": { + "phpunit/phpunit": "~5.5", + "enqueue/test": "^0.3" + }, + "autoload": { + "psr-4": { "Enqueue\\SimpleClient\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.3.x-dev" + } + } +} diff --git a/pkg/simple-client/phpunit.xml.dist b/pkg/simple-client/phpunit.xml.dist new file mode 100644 index 000000000..e86476dec --- /dev/null +++ b/pkg/simple-client/phpunit.xml.dist @@ -0,0 +1,31 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Resources + ./Tests + + + + diff --git a/pkg/stomp/Symfony/StompTransportFactory.php b/pkg/stomp/Symfony/StompTransportFactory.php index d3d30ecf6..c7ed2f4ec 100644 --- a/pkg/stomp/Symfony/StompTransportFactory.php +++ b/pkg/stomp/Symfony/StompTransportFactory.php @@ -84,6 +84,7 @@ public function createDriver(ContainerBuilder $container, array $config) $driver->setArguments([ new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), ]); $driverId = sprintf('enqueue.client.%s.driver', $this->getName());