diff --git a/.travis.yml b/.travis.yml
index 8e7c6d1b9..095fb584a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,3 +1,5 @@
+sudo: required
+
git:
depth: 10
@@ -6,37 +8,27 @@ language: php
matrix:
include:
- php: 5.6
- sudo: false
env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true
- php: 7.1
- sudo: false
env: SYMFONY_VERSION=3.0.* PHPSTAN=true
- php: 7.1
- sudo: false
env: SYMFONY_VERSION=3.0.* PHP_CS_FIXER=true
- php: 7.0
- sudo: false
env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true
- php: 5.6
- sudo: false
env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true SYMFONY_DEPRECATIONS_HELPER=weak
- php: 7.0
- sudo: false
env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true SYMFONY_DEPRECATIONS_HELPER=weak
- php: 7.1
- sudo: required
services: docker
env: SYMFONY_VERSION=2.8.* FUNCTIONAL_TESTS=true
- php: 7.1
- sudo: required
services: docker
env: SYMFONY_VERSION=3.0.* FUNCTIONAL_TESTS=true
- php: 7.1
- sudo: required
services: docker
env: SYMFONY_VERSION=3.2.* FUNCTIONAL_TESTS=true
- php: 7.1
- sudo: required
services: docker
env: SYMFONY_VERSION=3.3.* FUNCTIONAL_TESTS=true
@@ -54,8 +46,7 @@ install:
- if [ "$FUNCTIONAL_TESTS" = true ]; then bin/dev -b; fi
script:
- # misssing pkg/amqp-ext pkg/job-queue pkg/redis
- - if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test; fi
+ - if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test pkg/rdkafka; fi
- if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi
- if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
diff --git a/README.md b/README.md
index 4363b6e3e..c56789970 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,17 @@ Features:
* [Feature rich](docs/quick_tour.md).
* Implements [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transports based on a[queue-interop](https://github.com/queue-interop/queue-interop) interfaces.
-* Supported transports [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ), [Beanstalk](docs/transport/pheanstalk.md), [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Gearman](docs/transport/gearman.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
+* Supported transports
+ * [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ)
+ * [Beanstalk](docs/transport/pheanstalk.md)
+ * [STOMP](docs/transport/stomp.md)
+ * [Amazon SQS](docs/transport/sqs.md)
+ * [Kafka](docs/transport/kafka.md)
+ * [Redis](docs/transport/redis.md)
+ * [Gearman](docs/transport/gearman.md)
+ * [Doctrine DBAL](docs/transport/dbal.md)
+ * [Filesystem](docs/transport/filesystem.md)
+ * [Null](docs/transport/null.md).
* [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
* [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md)
* [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support.
diff --git a/bin/test b/bin/test
index bcee4d6e7..6641bc743 100755
--- a/bin/test
+++ b/bin/test
@@ -25,6 +25,7 @@ waitForService mysql 3306 50
waitForService redis 6379 50
waitForService beanstalkd 11300
waitForService gearmand 4730
+waitForService kafka 9092
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
diff --git a/composer.json b/composer.json
index 520ce055f..c6c665357 100644
--- a/composer.json
+++ b/composer.json
@@ -14,6 +14,8 @@
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
+ "enqueue/rdkafka": "*@dev",
+ "kwn/php-rdkafka-stubs": "^1.0.2",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/simple-client": "*@dev",
@@ -35,7 +37,16 @@
"phpstan/phpstan": "^0.7.0"
},
"autoload": {
- "files": ["pkg/enqueue/functions_include.php"]
+ "files": [
+ "pkg/enqueue/functions_include.php",
+ "pkg/rdkafka/Tests/bootstrap.php"
+ ],
+ "psr-0": {
+ "RdKafka": "vendor/kwn/php-rdkafka-stubs/stubs"
+ },
+ "psr-4": {
+ "RdKafka\\": "vendor/kwn/php-rdkafka-stubs/stubs/RdKafka"
+ }
},
"config": {
"bin-dir": "bin"
@@ -93,6 +104,10 @@
"type": "path",
"url": "pkg/gearman"
},
+ {
+ "type": "path",
+ "url": "pkg/rdkafka"
+ },
{
"type": "path",
"url": "pkg/simple-client"
diff --git a/docker-compose.yml b/docker-compose.yml
index dacec14aa..a1684e30a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,6 +10,8 @@ services:
- redis
- beanstalkd
- gearmand
+ - kafka
+ - zookeeper
volumes:
- './:/mqdev'
environment:
@@ -36,6 +38,8 @@ services:
- BEANSTALKD_PORT=11300
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
- GEARMAN_DSN=gearman://gearmand:4730
+ - RDKAFKA_HOST=kafka
+ - RDKAFKA_PORT=9092
rabbitmq:
image: enqueue/rabbitmq:latest
@@ -71,6 +75,20 @@ services:
volumes:
- ./:/mqdev
+ zookeeper:
+ image: 'wurstmeister/zookeeper'
+ ports:
+ - '2181:2181'
+
+ kafka:
+ image: 'wurstmeister/kafka:0.10.2.1'
+ ports:
+ - '9092:9092'
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ volumes:
+ - '/var/run/docker.sock:/var/run/docker.sock'
+
volumes:
mysql-data:
driver: local
diff --git a/docker/Dockerfile b/docker/Dockerfile
index f6affdda8..fae39efd4 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -3,12 +3,19 @@ FROM formapro/nginx-php-fpm:latest-all-exts
## libs
RUN set -x && \
apt-get update && \
- apt-get install -y --no-install-recommends wget curl openssl ca-certificates nano netcat php-redis
+ apt-get install -y wget curl openssl ca-certificates nano netcat php-dev php-redis git python
## confis
# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini
+## librdkafka
+RUN git clone https://github.com/edenhill/librdkafka.git /root/librdkafka
+RUN cd /root/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && make install
+RUN pecl install rdkafka
+RUN echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini
+RUN echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini
+
COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
RUN chmod u+x /usr/local/bin/entrypoint.sh
diff --git a/docs/index.md b/docs/index.md
index 49ae4c2bf..9d6913abf 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -5,6 +5,8 @@
- [Amqp (RabbitMQ, ActiveMQ)](transport/amqp.md)
- [Amazon SQS](transport/sqs.md)
- [Beanstalk (Pheanstalk)](transport/pheanstalk.md)
+ - [Gearman](transport/gearman.md)
+ - [Kafka](transport/kafka.md)
- [Stomp](transport/stomp.md)
- [Redis](transport/redis.md)
- [Doctrine DBAL](transport/dbal.md)
diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md
new file mode 100644
index 000000000..116dc2b5b
--- /dev/null
+++ b/docs/transport/kafka.md
@@ -0,0 +1,91 @@
+# Kafka transport
+
+The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ broker.
+
+* [Installation](#installation)
+* [Create context](#create-context)
+* [Send message to topic](#send-message-to-topic)
+* [Send message to queue](#send-message-to-queue)
+* [Consume message](#consume-message)
+
+## Installation
+
+```bash
+$ composer require enqueue/rdkafka
+```
+
+## Create context
+
+```php
+ [
+ 'group.id' => uniqid('', true),
+ 'metadata.broker.list' => 'example.com:1000',
+ 'enable.auto.commit' => 'false',
+ ],
+ 'topic' => [
+ 'auto.offset.reset' => 'beginning',
+ ],
+]);
+
+$psrContext = $connectionFactory->createContext();
+```
+
+## Send message to topic
+
+```php
+createMessage('Hello world!');
+
+$fooTopic = $psrContext->createTopic('foo');
+
+$psrContext->createProducer()->send($fooTopic, $message);
+```
+
+## Send message to queue
+
+```php
+createMessage('Hello world!');
+
+$fooQueue = $psrContext->createQueue('foo');
+
+$psrContext->createProducer()->send($fooQueue, $message);
+```
+
+## Consume message:
+
+```php
+createQueue('foo');
+
+$consumer = $psrContext->createConsumer($fooQueue);
+
+$message = $consumer->receive();
+
+// process a message
+
+$consumer->acknowledge($message);
+// $consumer->reject($message);
+```
+
+[back to index](index.md)
\ No newline at end of file
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index d48427b11..8ea7f3d2b 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -53,6 +53,10 @@
pkg/gearman/Tests
+
+ pkg/rdkafka/Tests
+
+
pkg/enqueue-bundle/Tests
diff --git a/pkg/rdkafka/.gitignore b/pkg/rdkafka/.gitignore
new file mode 100644
index 000000000..a770439e5
--- /dev/null
+++ b/pkg/rdkafka/.gitignore
@@ -0,0 +1,6 @@
+*~
+/composer.lock
+/composer.phar
+/phpunit.xml
+/vendor/
+/.idea/
diff --git a/pkg/rdkafka/.travis.yml b/pkg/rdkafka/.travis.yml
new file mode 100644
index 000000000..aaa1849c3
--- /dev/null
+++ b/pkg/rdkafka/.travis.yml
@@ -0,0 +1,21 @@
+sudo: false
+
+git:
+ depth: 1
+
+language: php
+
+php:
+ - '5.6'
+ - '7.0'
+
+cache:
+ directories:
+ - $HOME/.composer/cache
+
+install:
+ - composer self-update
+ - composer install --prefer-source --ignore-platform-reqs
+
+script:
+ - vendor/bin/phpunit --exclude-group=functional
diff --git a/pkg/rdkafka/LICENSE b/pkg/rdkafka/LICENSE
new file mode 100644
index 000000000..f1e6a22fe
--- /dev/null
+++ b/pkg/rdkafka/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+Copyright (c) 2016 Kotliar Maksym
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/pkg/rdkafka/README.md b/pkg/rdkafka/README.md
new file mode 100644
index 000000000..bd05328f2
--- /dev/null
+++ b/pkg/rdkafka/README.md
@@ -0,0 +1,26 @@
+# RdKafka Transport
+
+[](https://gitter.im/php-enqueue/Lobby)
+[](https://travis-ci.org/php-enqueue/rdkafka)
+[](https://packagist.org/packages/enqueue/rdkafka)
+[](https://packagist.org/packages/enqueue/rdkafka)
+
+This is an implementation of PSR specification. It allows you to send and consume message via Kafka protocol.
+
+## Resources
+
+* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
+* [Questions](https://gitter.im/php-enqueue/Lobby)
+* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
+
+## Developed by Forma-Pro
+
+Forma-Pro is a full stack development company which interests also spread to open source development.
+Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
+Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.
+
+If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com
+
+## License
+
+It is released under the [MIT License](LICENSE).
diff --git a/pkg/rdkafka/RdKafkaConnectionFactory.php b/pkg/rdkafka/RdKafkaConnectionFactory.php
new file mode 100644
index 000000000..4db818686
--- /dev/null
+++ b/pkg/rdkafka/RdKafkaConnectionFactory.php
@@ -0,0 +1,113 @@
+ [ // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
+ * 'metadata.broker.list' => 'localhost:9092',
+ * ],
+ * 'topic' => [],
+ * 'dr_msg_cb' => null,
+ * 'error_cb' => null,
+ * 'rebalance_cb' => null,
+ * 'partitioner' => null, // https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-topicconf.setpartitioner.html
+ * 'log_level' => null,
+ * 'commit_async' => false,
+ * ]
+ *
+ * or
+ *
+ * rdkafka://host:port
+ *
+ * @param array|string $config
+ */
+ public function __construct($config = 'rdkafka://')
+ {
+ if (empty($config) || 'rdkafka://' === $config) {
+ $config = [];
+ } elseif (is_string($config)) {
+ $config = $this->parseDsn($config);
+ } elseif (is_array($config)) {
+ } else {
+ throw new \LogicException('The config must be either an array of options, a DSN string or null');
+ }
+
+ $this->config = array_replace($this->defaultConfig(), $config);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return RdKafkaContext
+ */
+ public function createContext()
+ {
+ return new RdKafkaContext($this->config);
+ }
+
+ /**
+ * @param string $dsn
+ *
+ * @return array
+ */
+ private function parseDsn($dsn)
+ {
+ $dsnConfig = parse_url($dsn);
+ if (false === $dsnConfig) {
+ throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
+ }
+
+ $dsnConfig = array_replace([
+ 'scheme' => null,
+ 'host' => null,
+ 'port' => null,
+ 'user' => null,
+ 'pass' => null,
+ 'path' => null,
+ 'query' => null,
+ ], $dsnConfig);
+
+ if ('rdkafka' !== $dsnConfig['scheme']) {
+ throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "rdkafka" only.', $dsnConfig['scheme']));
+ }
+
+ $config = [];
+ if ($dsnConfig['query']) {
+ parse_str($dsnConfig['query'], $config);
+ }
+
+ $broker = $dsnConfig['host'];
+ if ($dsnConfig['port']) {
+ $broker .= ':'.$dsnConfig['port'];
+ }
+
+ $config['global']['metadata.broker.list'] = $broker;
+
+ return array_replace_recursive($this->defaultConfig(), $config);
+ }
+
+ /**
+ * @return array
+ */
+ private function defaultConfig()
+ {
+ return [
+ 'global' => [
+ 'group.id' => uniqid('', true),
+ 'metadata.broker.list' => 'localhost:9092',
+ ],
+ ];
+ }
+}
diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php
new file mode 100644
index 000000000..0cbc48f8a
--- /dev/null
+++ b/pkg/rdkafka/RdKafkaConsumer.php
@@ -0,0 +1,165 @@
+consumer = $consumer;
+ $this->context = $context;
+ $this->topic = $topic;
+ $this->subscribed = false;
+ $this->commitAsync = false;
+ }
+
+ /**
+ * @return bool
+ */
+ public function isCommitAsync()
+ {
+ return $this->commitAsync;
+ }
+
+ /**
+ * @param bool $async
+ */
+ public function setCommitAsync($async)
+ {
+ $this->commitAsync = (bool) $async;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getQueue()
+ {
+ return $this->topic;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function receive($timeout = 0)
+ {
+ $this->consumer->subscribe([$this->topic->getTopicName()]);
+
+ $message = null;
+ if ($timeout > 0) {
+ $message = $this->doReceive($timeout);
+ } else {
+ while (true) {
+ if ($message = $this->doReceive(500)) {
+ break;
+ }
+ }
+ }
+
+ $this->consumer->unsubscribe();
+
+ return $message;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function receiveNoWait()
+ {
+ throw new \LogicException('Not implemented');
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param RdKafkaMessage $message
+ */
+ public function acknowledge(PsrMessage $message)
+ {
+ InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
+
+ if (false == $message->getKafkaMessage()) {
+ throw new \LogicException('The message could not be acknowledged because it does not have kafka message set.');
+ }
+
+ if ($this->isCommitAsync()) {
+ $this->consumer->commitAsync($message->getKafkaMessage());
+ } else {
+ $this->consumer->commit($message->getKafkaMessage());
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param RdKafkaMessage $message
+ */
+ public function reject(PsrMessage $message, $requeue = false)
+ {
+ $this->acknowledge($message);
+
+ if ($requeue) {
+ $this->context->createProducer()->send($this->topic, $message);
+ }
+ }
+
+ /**
+ * @param int $timeout
+ *
+ * @return RdKafkaMessage|null
+ */
+ private function doReceive($timeout)
+ {
+ $kafkaMessage = $this->consumer->consume($timeout);
+
+ switch ($kafkaMessage->err) {
+ case RD_KAFKA_RESP_ERR__PARTITION_EOF:
+ case RD_KAFKA_RESP_ERR__TIMED_OUT:
+ break;
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
+ $message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload);
+ $message->setKey($kafkaMessage->key);
+ $message->setPartition($kafkaMessage->partition);
+ $message->setKafkaMessage($kafkaMessage);
+
+ return $message;
+ default:
+ throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);
+ break;
+ }
+ }
+}
diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php
new file mode 100644
index 000000000..3fba44e30
--- /dev/null
+++ b/pkg/rdkafka/RdKafkaContext.php
@@ -0,0 +1,168 @@
+config = $config;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createMessage($body = '', array $properties = [], array $headers = [])
+ {
+ return new RdKafkaMessage($body, $properties, $headers);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return RdKafkaTopic
+ */
+ public function createTopic($topicName)
+ {
+ return new RdKafkaTopic($topicName);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return RdKafkaTopic
+ */
+ public function createQueue($queueName)
+ {
+ return new RdKafkaTopic($queueName);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createTemporaryQueue()
+ {
+ throw new \LogicException('Not implemented');
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return RdKafkaProducer
+ */
+ public function createProducer()
+ {
+ return new RdKafkaProducer($this->getProducer());
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param RdKafkaTopic $destination
+ */
+ public function createConsumer(PsrDestination $destination)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
+
+ $consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination);
+
+ if (isset($this->config['commit_async'])) {
+ $consumer->setCommitAsync($this->config['commit_async']);
+ }
+
+ return $consumer;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function close()
+ {
+ }
+
+ /**
+ * @return Producer
+ */
+ private function getProducer()
+ {
+ if (null === $this->producer) {
+ $this->producer = new Producer($this->getConf());
+
+ if (isset($this->config['log_level'])) {
+ $this->producer->setLogLevel($this->config['log_level']);
+ }
+ }
+
+ return $this->producer;
+ }
+
+ /**
+ * @return Conf
+ */
+ private function getConf()
+ {
+ if (null === $this->conf) {
+ $topicConf = new TopicConf();
+
+ if (isset($this->config['topic']) && is_array($this->config['topic'])) {
+ foreach ($this->config['topic'] as $key => $value) {
+ $topicConf->set($key, $value);
+ }
+ }
+
+ if (isset($this->config['partitioner'])) {
+ $topicConf->setPartitioner($this->config['partitioner']);
+ }
+
+ $this->conf = new Conf();
+
+ if (isset($this->config['global']) && is_array($this->config['global'])) {
+ foreach ($this->config['global'] as $key => $value) {
+ $this->conf->set($key, $value);
+ }
+ }
+
+ if (isset($this->config['dr_msg_cb'])) {
+ $this->conf->setDrMsgCb($this->config['dr_msg_cb']);
+ }
+
+ if (isset($this->config['error_cb'])) {
+ $this->conf->setErrorCb($this->config['error_cb']);
+ }
+
+ if (isset($this->config['rebalance_cb'])) {
+ $this->conf->setRebalanceCb($this->config['errorebalance_cbr_cb']);
+ }
+
+ $this->conf->setDefaultTopicConf($topicConf);
+ }
+
+ return $this->conf;
+ }
+}
diff --git a/pkg/rdkafka/RdKafkaMessage.php b/pkg/rdkafka/RdKafkaMessage.php
new file mode 100644
index 000000000..c5621c13d
--- /dev/null
+++ b/pkg/rdkafka/RdKafkaMessage.php
@@ -0,0 +1,298 @@
+body = $body;
+ $this->properties = $properties;
+ $this->headers = $headers;
+ $this->redelivered = false;
+ }
+
+ /**
+ * @param string $body
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * @param array $properties
+ */
+ public function setProperties(array $properties)
+ {
+ $this->properties = $properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperties()
+ {
+ return $this->properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperty($name, $value)
+ {
+ $this->properties[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperty($name, $default = null)
+ {
+ return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default;
+ }
+
+ /**
+ * @param array $headers
+ */
+ public function setHeaders(array $headers)
+ {
+ $this->headers = $headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeaders()
+ {
+ return $this->headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setHeader($name, $value)
+ {
+ $this->headers[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeader($name, $default = null)
+ {
+ return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default;
+ }
+
+ /**
+ * @return bool
+ */
+ public function isRedelivered()
+ {
+ return $this->redelivered;
+ }
+
+ /**
+ * @param bool $redelivered
+ */
+ public function setRedelivered($redelivered)
+ {
+ $this->redelivered = $redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setCorrelationId($correlationId)
+ {
+ $this->setHeader('correlation_id', (string) $correlationId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getCorrelationId()
+ {
+ return $this->getHeader('correlation_id');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setMessageId($messageId)
+ {
+ $this->setHeader('message_id', (string) $messageId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageId()
+ {
+ return $this->getHeader('message_id');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTimestamp()
+ {
+ $value = $this->getHeader('timestamp');
+
+ return $value === null ? null : (int) $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setTimestamp($timestamp)
+ {
+ $this->setHeader('timestamp', $timestamp);
+ }
+
+ /**
+ * @param string|null $replyTo
+ */
+ public function setReplyTo($replyTo)
+ {
+ $this->setHeader('reply_to', $replyTo);
+ }
+
+ /**
+ * @return string|null
+ */
+ public function getReplyTo()
+ {
+ return $this->getHeader('reply_to');
+ }
+
+ /**
+ * @return int
+ */
+ public function getPartition()
+ {
+ return $this->partition;
+ }
+
+ /**
+ * @param int $partition
+ */
+ public function setPartition($partition)
+ {
+ $this->partition = $partition;
+ }
+
+ /**
+ * @return string|null
+ */
+ public function getKey()
+ {
+ return $this->key;
+ }
+
+ /**
+ * @param string|null $key
+ */
+ public function setKey($key)
+ {
+ $this->key = $key;
+ }
+
+ /**
+ * @return Message
+ */
+ public function getKafkaMessage()
+ {
+ return $this->kafkaMessage;
+ }
+
+ /**
+ * @param Message $message
+ */
+ public function setKafkaMessage(Message $message)
+ {
+ $this->kafkaMessage = $message;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function jsonSerialize()
+ {
+ return [
+ 'body' => $this->getBody(),
+ 'properties' => $this->getProperties(),
+ 'headers' => $this->getHeaders(),
+ ];
+ }
+
+ /**
+ * @param string $json
+ *
+ * @return self
+ */
+ public static function jsonUnserialize($json)
+ {
+ $data = json_decode($json, true);
+ if (JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf(
+ 'The malformed json given. Error %s and message %s',
+ json_last_error(),
+ json_last_error_msg()
+ ));
+ }
+
+ return new self($data['body'], $data['properties'], $data['headers']);
+ }
+}
diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php
new file mode 100644
index 000000000..b99c3300b
--- /dev/null
+++ b/pkg/rdkafka/RdKafkaProducer.php
@@ -0,0 +1,45 @@
+producer = $producer;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param RdKafkaTopic $destination
+ * @param RdKafkaMessage $message
+ */
+ public function send(PsrDestination $destination, PsrMessage $message)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
+ InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
+
+ $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
+ $key = $message->getKey() ?: $destination->getKey() ?: null;
+ $payload = json_encode($message);
+
+ $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
+ $topic->produce($partition, 0 /* must be 0 */, $payload, $key);
+ }
+}
diff --git a/pkg/rdkafka/RdKafkaTopic.php b/pkg/rdkafka/RdKafkaTopic.php
new file mode 100644
index 000000000..9aea638ee
--- /dev/null
+++ b/pkg/rdkafka/RdKafkaTopic.php
@@ -0,0 +1,95 @@
+name = $name;
+ $this->conf = new TopicConf();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTopicName()
+ {
+ return $this->name;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getQueueName()
+ {
+ return $this->name;
+ }
+
+ /**
+ * @return TopicConf
+ */
+ public function getConf()
+ {
+ return $this->conf;
+ }
+
+ /**
+ * @return int
+ */
+ public function getPartition()
+ {
+ return $this->partition;
+ }
+
+ /**
+ * @param int $partition
+ */
+ public function setPartition($partition)
+ {
+ $this->partition = $partition;
+ }
+
+ /**
+ * @return string|null
+ */
+ public function getKey()
+ {
+ return $this->key;
+ }
+
+ /**
+ * @param string|null $key
+ */
+ public function setKey($key)
+ {
+ $this->key = $key;
+ }
+}
diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php
new file mode 100644
index 000000000..1b765679c
--- /dev/null
+++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php
@@ -0,0 +1,108 @@
+expectException(\LogicException::class);
+ $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null');
+
+ new RdKafkaConnectionFactory(new \stdClass());
+ }
+
+ public function testThrowIfSchemeIsNotBeanstalkAmqp()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "rdkafka" only.');
+
+ new RdKafkaConnectionFactory('http://example.com');
+ }
+
+ public function testThrowIfDsnCouldNotBeParsed()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Failed to parse DSN "rdkafka://:@/"');
+
+ new RdKafkaConnectionFactory('rdkafka://:@/');
+ }
+
+ public function testShouldBeExpectedDefaultConfig()
+ {
+ $factory = new RdKafkaConnectionFactory(null);
+
+ $config = $this->getObjectAttribute($factory, 'config');
+
+ $this->assertNotEmpty($config['global']['group.id']);
+
+ $config['global']['group.id'] = 'group-id';
+ $this->assertSame([
+ 'global' => [
+ 'group.id' => 'group-id',
+ 'metadata.broker.list' => 'localhost:9092',
+ ],
+ ], $config);
+ }
+
+ public function testShouldBeExpectedDefaultDsnConfig()
+ {
+ $factory = new RdKafkaConnectionFactory('rdkafka://');
+
+ $config = $this->getObjectAttribute($factory, 'config');
+
+ $this->assertNotEmpty($config['global']['group.id']);
+
+ $config['global']['group.id'] = 'group-id';
+ $this->assertSame([
+ 'global' => [
+ 'group.id' => 'group-id',
+ 'metadata.broker.list' => 'localhost:9092',
+ ],
+ ], $config);
+ }
+
+ /**
+ * @dataProvider provideConfigs
+ *
+ * @param mixed $config
+ * @param mixed $expectedConfig
+ */
+ public function testShouldParseConfigurationAsExpected($config, $expectedConfig)
+ {
+ $factory = new RdKafkaConnectionFactory($config);
+
+ $this->assertAttributeEquals($expectedConfig, 'config', $factory);
+ }
+
+ public static function provideConfigs()
+ {
+ yield [
+ 'rdkafka://theHost:1234?global%5Bgroup.id%5D=group-id',
+ [
+ 'global' => [
+ 'metadata.broker.list' => 'theHost:1234',
+ 'group.id' => 'group-id',
+ ],
+ ],
+ ];
+
+ yield [
+ [
+ 'global' => [
+ 'metadata.broker.list' => 'theHost:1234',
+ 'group.id' => 'group-id',
+ ],
+ ],
+ [
+ 'global' => [
+ 'metadata.broker.list' => 'theHost:1234',
+ 'group.id' => 'group-id',
+ ],
+ ],
+ ];
+ }
+}
diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php
new file mode 100644
index 000000000..3c40873bf
--- /dev/null
+++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php
@@ -0,0 +1,120 @@
+createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic(''));
+ }
+
+ public function testShouldReturnQueueSetInConstructor()
+ {
+ $destination = new RdKafkaTopic('');
+
+ $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), $destination);
+
+ $this->assertSame($destination, $consumer->getQueue());
+ }
+
+ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
+ {
+ $destination = new RdKafkaTopic('dest');
+
+ $kafkaMessage = new Message();
+ $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ $kafkaConsumer = $this->createKafkaConsumerMock();
+ $kafkaConsumer
+ ->expects($this->once())
+ ->method('subscribe')
+ ->with(['dest'])
+ ;
+ $kafkaConsumer
+ ->expects($this->once())
+ ->method('consume')
+ ->with(1000)
+ ->willReturn($kafkaMessage)
+ ;
+ $kafkaConsumer
+ ->expects($this->once())
+ ->method('unsubscribe')
+ ;
+
+ $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination);
+
+ $this->assertNull($consumer->receive(1000));
+ }
+
+ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
+ {
+ $destination = new RdKafkaTopic('dest');
+
+ $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
+
+ $kafkaMessage = new Message();
+ $kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ $kafkaMessage->payload = json_encode($message);
+
+ $kafkaConsumer = $this->createKafkaConsumerMock();
+ $kafkaConsumer
+ ->expects($this->once())
+ ->method('subscribe')
+ ->with(['dest'])
+ ;
+ $kafkaConsumer
+ ->expects($this->once())
+ ->method('consume')
+ ->with(1000)
+ ->willReturn($kafkaMessage)
+ ;
+ $kafkaConsumer
+ ->expects($this->once())
+ ->method('unsubscribe')
+ ;
+
+ $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination);
+
+ $actualMessage = $consumer->receive(1000);
+
+ $this->assertSame('theBody', $actualMessage->getBody());
+ $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties());
+ $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders());
+ $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage());
+ }
+
+ public function testShouldThrowExceptionNotImplementedOnReceiveNoWait()
+ {
+ $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic(''));
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Not implemented');
+
+ $consumer->receiveNoWait();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|KafkaConsumer
+ */
+ private function createKafkaConsumerMock()
+ {
+ return $this->createMock(KafkaConsumer::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|RdKafkaContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(RdKafkaContext::class);
+ }
+}
diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php
new file mode 100644
index 000000000..b07b82987
--- /dev/null
+++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php
@@ -0,0 +1,28 @@
+expectException(\LogicException::class);
+ $this->expectExceptionMessage('Not implemented');
+ $context->createTemporaryQueue();
+ }
+
+ public function testThrowInvalidDestinationIfInvalidDestinationGivenOnCreateConsumer()
+ {
+ $context = new RdKafkaContext([]);
+
+ $this->expectException(InvalidDestinationException::class);
+ $context->createConsumer(new NullQueue('aQueue'));
+ }
+}
diff --git a/pkg/rdkafka/Tests/RdKafkaMessageTest.php b/pkg/rdkafka/Tests/RdKafkaMessageTest.php
new file mode 100644
index 000000000..9bcc34642
--- /dev/null
+++ b/pkg/rdkafka/Tests/RdKafkaMessageTest.php
@@ -0,0 +1,34 @@
+setPartition(5);
+
+ $this->assertSame(5, $message->getPartition());
+ }
+
+ public function testCouldSetGetKey()
+ {
+ $message = new RdKafkaMessage();
+ $message->setKey('key');
+
+ $this->assertSame('key', $message->getKey());
+ }
+
+ public function testCouldSetGetKafkaMessage()
+ {
+ $message = new RdKafkaMessage();
+ $message->setKafkaMessage($kafkaMessage = $this->createMock(Message::class));
+
+ $this->assertSame($kafkaMessage, $message->getKafkaMessage());
+ }
+}
diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php
new file mode 100644
index 000000000..c31c588ae
--- /dev/null
+++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php
@@ -0,0 +1,87 @@
+createKafkaProducerMock());
+ }
+
+ public function testThrowIfDestinationInvalid()
+ {
+ $producer = new RdKafkaProducer($this->createKafkaProducerMock());
+
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage('The destination must be an instance of Enqueue\RdKafka\RdKafkaTopic but got Enqueue\Null\NullQueue.');
+ $producer->send(new NullQueue('aQueue'), new RdKafkaMessage());
+ }
+
+ public function testThrowIfMessageInvalid()
+ {
+ $producer = new RdKafkaProducer($this->createKafkaProducerMock());
+
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage('The message must be an instance of Enqueue\RdKafka\RdKafkaMessage but it is Enqueue\Null\NullMessage.');
+ $producer->send(new RdKafkaTopic('aQueue'), new NullMessage());
+ }
+
+ public function testShouldJsonEncodeMessageAndPutToExpectedTube()
+ {
+ $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
+ $message->setKey('key');
+
+ $kafkaTopic = $this->createKafkaTopicMock();
+ $kafkaTopic
+ ->expects($this->once())
+ ->method('produce')
+ ->with(
+ RD_KAFKA_PARTITION_UA,
+ 0,
+ '{"body":"theBody","properties":{"foo":"fooVal"},"headers":{"bar":"barVal"}}',
+ 'key'
+ )
+ ;
+
+ $kafkaProducer = $this->createKafkaProducerMock();
+ $kafkaProducer
+ ->expects($this->once())
+ ->method('newTopic')
+ ->with('theQueueName', $this->isInstanceOf(TopicConf::class))
+ ->willReturn($kafkaTopic)
+ ;
+
+ $producer = new RdKafkaProducer($kafkaProducer);
+
+ $producer->send(new RdKafkaTopic('theQueueName'), $message);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|ProducerTopic
+ */
+ private function createKafkaTopicMock()
+ {
+ return $this->createMock(ProducerTopic::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Producer
+ */
+ private function createKafkaProducerMock()
+ {
+ return $this->createMock(Producer::class);
+ }
+}
diff --git a/pkg/rdkafka/Tests/RdKafkaTopicTest.php b/pkg/rdkafka/Tests/RdKafkaTopicTest.php
new file mode 100644
index 000000000..4c9ebe47f
--- /dev/null
+++ b/pkg/rdkafka/Tests/RdKafkaTopicTest.php
@@ -0,0 +1,33 @@
+setPartition(5);
+
+ $this->assertSame(5, $topic->getPartition());
+ }
+
+ public function testCouldSetGetKey()
+ {
+ $topic = new RdKafkaTopic('topic');
+ $topic->setKey('key');
+
+ $this->assertSame('key', $topic->getKey());
+ }
+
+ public function testShouldReturnConfInstance()
+ {
+ $topic = new RdKafkaTopic('topic');
+
+ $this->assertInstanceOf(TopicConf::class, $topic->getConf());
+ }
+}
diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php
new file mode 100644
index 000000000..b0ee68ff8
--- /dev/null
+++ b/pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php
@@ -0,0 +1,14 @@
+ [
+ 'group.id' => 'group',
+ ],
+ ]);
+ }
+}
diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php
new file mode 100644
index 000000000..2d311c2e0
--- /dev/null
+++ b/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php
@@ -0,0 +1,17 @@
+createContext();
+
+ $topic = $this->createTopic($context, uniqid('', true));
+
+ $consumer = $context->createConsumer($topic);
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($topic, $context->createMessage($expectedBody));
+
+ $message = $consumer->receive(10000); // 10 sec
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ protected function createContext()
+ {
+ $config = [
+ 'global' => [
+ 'group.id' => uniqid('', true),
+ 'metadata.broker.list' => getenv('RDKAFKA_HOST').':'.getenv('RDKAFKA_PORT'),
+ 'enable.auto.commit' => 'false',
+ ],
+ 'topic' => [
+ 'auto.offset.reset' => 'beginning',
+ ],
+ ];
+
+ return (new RdKafkaConnectionFactory($config))->createContext();
+ }
+}
diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php
new file mode 100644
index 000000000..86e290b4c
--- /dev/null
+++ b/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php
@@ -0,0 +1,17 @@
+=5.6",
+ "ext-rdkafka": "^3.0.3",
+ "queue-interop/queue-interop": "^0.5@dev",
+ "psr/log": "^1"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.4.0",
+ "enqueue/test": "^0.6@dev",
+ "enqueue/enqueue": "^0.6@dev",
+ "enqueue/null": "^0.6@dev",
+ "queue-interop/queue-spec": "^0.5@dev",
+ "kwn/php-rdkafka-stubs": "^1.0.2"
+ },
+ "autoload": {
+ "psr-4": { "Enqueue\\RdKafka\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "autoload-dev": {
+ "files": [
+ "Tests/bootstrap.php"
+ ],
+ "psr-0": {
+ "RdKafka": "vendor/kwn/php-rdkafka-stubs/stubs"
+ },
+ "psr-4": {
+ "RdKafka\\": "vendor/kwn/php-rdkafka-stubs/stubs/RdKafka"
+ }
+ },
+ "suggest": {
+ "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features"
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.6.x-dev"
+ }
+ }
+}
diff --git a/pkg/rdkafka/phpunit.xml.dist b/pkg/rdkafka/phpunit.xml.dist
new file mode 100644
index 000000000..d899fd655
--- /dev/null
+++ b/pkg/rdkafka/phpunit.xml.dist
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Tests
+
+
+
+