From c7bb45f016c44f154198c03ea77209feb1d98674 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 11 Jul 2017 17:00:04 +0300 Subject: [PATCH 01/11] kafka transport --- composer.json | 6 + docker-compose.yml | 14 + docker/Dockerfile | 9 +- phpunit.xml.dist | 4 + pkg/rdkafka/.gitignore | 6 + pkg/rdkafka/.travis.yml | 21 ++ pkg/rdkafka/LICENSE | 20 ++ pkg/rdkafka/README.md | 26 ++ pkg/rdkafka/RdKafkaConnectionFactory.php | 80 +++++ pkg/rdkafka/RdKafkaContext.php | 150 ++++++++++ pkg/rdkafka/RdKafkaMessage.php | 275 ++++++++++++++++++ pkg/rdkafka/RdKafkaProducer.php | 44 +++ pkg/rdkafka/RdKafkaTopic.php | 85 ++++++ pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php | 16 + pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php | 16 + pkg/rdkafka/composer.json | 42 +++ pkg/rdkafka/phpunit.xml.dist | 30 ++ 17 files changed, 843 insertions(+), 1 deletion(-) create mode 100644 pkg/rdkafka/.gitignore create mode 100644 pkg/rdkafka/.travis.yml create mode 100644 pkg/rdkafka/LICENSE create mode 100644 pkg/rdkafka/README.md create mode 100644 pkg/rdkafka/RdKafkaConnectionFactory.php create mode 100644 pkg/rdkafka/RdKafkaContext.php create mode 100644 pkg/rdkafka/RdKafkaMessage.php create mode 100644 pkg/rdkafka/RdKafkaProducer.php create mode 100644 pkg/rdkafka/RdKafkaTopic.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php create mode 100644 pkg/rdkafka/composer.json create mode 100644 pkg/rdkafka/phpunit.xml.dist diff --git a/composer.json b/composer.json index 520ce055f..c95c124c9 100644 --- a/composer.json +++ b/composer.json @@ -14,6 +14,8 @@ "enqueue/sqs": "*@dev", "enqueue/pheanstalk": "*@dev", "enqueue/gearman": "*@dev", + "enqueue/rdkafka": "*@dev", + "kwn/php-rdkafka-stubs": "^1.0.2", "enqueue/enqueue-bundle": "*@dev", "enqueue/job-queue": "*@dev", "enqueue/simple-client": "*@dev", @@ -93,6 +95,10 @@ "type": "path", "url": "pkg/gearman" }, + { + "type": "path", + "url": "pkg/rdkafka" + }, { "type": "path", "url": "pkg/simple-client" diff --git a/docker-compose.yml b/docker-compose.yml index dacec14aa..e641a5864 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,6 +71,20 @@ services: volumes: - ./:/mqdev + zookeeper: + image: 'wurstmeister/zookeeper' + ports: + - '2181:2181' + + kafka: + image: 'wurstmeister/kafka:0.10.2.1' + ports: + - '9092:9092' + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + volumes: + - '/var/run/docker.sock:/var/run/docker.sock' + volumes: mysql-data: driver: local diff --git a/docker/Dockerfile b/docker/Dockerfile index f6affdda8..fae39efd4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -3,12 +3,19 @@ FROM formapro/nginx-php-fpm:latest-all-exts ## libs RUN set -x && \ apt-get update && \ - apt-get install -y --no-install-recommends wget curl openssl ca-certificates nano netcat php-redis + apt-get install -y wget curl openssl ca-certificates nano netcat php-dev php-redis git python ## confis # RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini +## librdkafka +RUN git clone https://github.com/edenhill/librdkafka.git /root/librdkafka +RUN cd /root/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && make install +RUN pecl install rdkafka +RUN echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini +RUN echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini + COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh RUN chmod u+x /usr/local/bin/entrypoint.sh diff --git a/phpunit.xml.dist b/phpunit.xml.dist index d48427b11..8ea7f3d2b 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -53,6 +53,10 @@ pkg/gearman/Tests + + pkg/rdkafka/Tests + + pkg/enqueue-bundle/Tests diff --git a/pkg/rdkafka/.gitignore b/pkg/rdkafka/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/rdkafka/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/rdkafka/.travis.yml b/pkg/rdkafka/.travis.yml new file mode 100644 index 000000000..aaa1849c3 --- /dev/null +++ b/pkg/rdkafka/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source --ignore-platform-reqs + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/rdkafka/LICENSE b/pkg/rdkafka/LICENSE new file mode 100644 index 000000000..f1e6a22fe --- /dev/null +++ b/pkg/rdkafka/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2016 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/rdkafka/README.md b/pkg/rdkafka/README.md new file mode 100644 index 000000000..bd05328f2 --- /dev/null +++ b/pkg/rdkafka/README.md @@ -0,0 +1,26 @@ +# RdKafka Transport + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/rdkafka.png?branch=master)](https://travis-ci.org/php-enqueue/rdkafka) +[![Total Downloads](https://poser.pugx.org/enqueue/rdkafka/d/total.png)](https://packagist.org/packages/enqueue/rdkafka) +[![Latest Stable Version](https://poser.pugx.org/enqueue/rdkafka/version.png)](https://packagist.org/packages/enqueue/rdkafka) + +This is an implementation of PSR specification. It allows you to send and consume message via Kafka protocol. + +## Resources + +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## Developed by Forma-Pro + +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. + +If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com + +## License + +It is released under the [MIT License](LICENSE). diff --git a/pkg/rdkafka/RdKafkaConnectionFactory.php b/pkg/rdkafka/RdKafkaConnectionFactory.php new file mode 100644 index 000000000..95d5ae7b8 --- /dev/null +++ b/pkg/rdkafka/RdKafkaConnectionFactory.php @@ -0,0 +1,80 @@ + [ // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + * 'metadata.broker.list' => 'localhost:9092', + * ], + * 'topic' => [], + * 'dr_msg_cb' => null, + * 'error_cb' => null, + * 'rebalance_cb' => null, + * 'partitioner' => null, // https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-topicconf.setpartitioner.html + * 'log_level' => null, + * ] + * + * or + * + * rdkafka://host:port + * + * @param array|string $config + */ + public function __construct($config = 'rdkafka://') + { + if (empty($config) || 'rdkafka://' === $config) { + $config = []; + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + + $this->config = array_replace($this->defaultConfig(), $config); + } + + + /** + * {@inheritdoc} + * + * @return RdKafkaContext + */ + public function createContext() + { + return new RdKafkaContext($this->config); + } + + /** + * @param string $dsn + * + * @return array + */ + private function parseDsn($dsn) + { + + } + + /** + * @return array + */ + private function defaultConfig() + { + return [ + 'global' => [ + 'metadata.broker.list' => 'localhost:9092', + ], + ]; + } +} diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php new file mode 100644 index 000000000..8d84532df --- /dev/null +++ b/pkg/rdkafka/RdKafkaContext.php @@ -0,0 +1,150 @@ +config = $config; + } + + /** + * {@inheritdoc} + */ + public function createMessage($body = '', array $properties = [], array $headers = []) + { + return new RdKafkaMessage($body, $properties, $headers); + } + + /** + * {@inheritdoc} + * + * @return RdKafkaTopic + */ + public function createTopic($topicName) + { + return new RdKafkaTopic($topicName); + } + + public function createQueue($queueName) + { + // TODO: Implement createQueue() method. + } + + /** + * {@inheritdoc} + */ + public function createTemporaryQueue() + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + * + * @return RdKafkaProducer + */ + public function createProducer() + { + return new RdKafkaProducer($this->getProducer()); + } + + public function createConsumer(PsrDestination $destination) + { + // TODO: Implement createConsumer() method. + } + + public function close() + { + + } + + /** + * @return Producer + */ + private function getProducer() + { + if (null === $this->producer) { + $this->producer = new Producer($this->getConf()); + + if (isset($this->config['log_level'])) { + $this->producer->setLogLevel($this->config['log_level']); + } + } + + return $this->producer; + } + + /** + * @return Conf + */ + private function getConf() + { + if (null === $this->conf) { + $topicConf = new TopicConf(); + + if (isset($this->config['topic']) && is_array($this->config['topic'])) { + foreach ($this->config['topic'] as $key => $value) { + $topicConf->set($key, $value); + } + } + + if (isset($this->config['partitioner'])) { + $topicConf->setPartitioner($this->config['partitioner']); + } + + $this->conf = new Conf(); + + if (isset($this->config['global']) && is_array($this->config['global'])) { + foreach ($this->config['global'] as $key => $value) { + $this->conf->set($key, $value); + } + } + + if (isset($this->config['dr_msg_cb'])) { + $this->conf->setDrMsgCb($this->config['dr_msg_cb']); + } + + if (isset($this->config['error_cb'])) { + $this->conf->setErrorCb($this->config['error_cb']); + } + + if (isset($this->config['rebalance_cb'])) { + $this->conf->setRebalanceCb($this->config['errorebalance_cbr_cb']); + } + + $this->conf->setDefaultTopicConf($topicConf); + } + + return $this->conf; + } +} diff --git a/pkg/rdkafka/RdKafkaMessage.php b/pkg/rdkafka/RdKafkaMessage.php new file mode 100644 index 000000000..0f1f500fd --- /dev/null +++ b/pkg/rdkafka/RdKafkaMessage.php @@ -0,0 +1,275 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + } + + /** + * @param string $body + */ + public function setBody($body) + { + $this->body = $body; + } + + /** + * {@inheritdoc} + */ + public function getBody() + { + return $this->body; + } + + /** + * @param array $properties + */ + public function setProperties(array $properties) + { + $this->properties = $properties; + } + + /** + * {@inheritdoc} + */ + public function getProperties() + { + return $this->properties; + } + + /** + * {@inheritdoc} + */ + public function setProperty($name, $value) + { + $this->properties[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getProperty($name, $default = null) + { + return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; + } + + /** + * @param array $headers + */ + public function setHeaders(array $headers) + { + $this->headers = $headers; + } + + /** + * {@inheritdoc} + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * {@inheritdoc} + */ + public function setHeader($name, $value) + { + $this->headers[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getHeader($name, $default = null) + { + return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; + } + + /** + * @return bool + */ + public function isRedelivered() + { + return $this->redelivered; + } + + /** + * @param bool $redelivered + */ + public function setRedelivered($redelivered) + { + $this->redelivered = $redelivered; + } + + /** + * {@inheritdoc} + */ + public function setCorrelationId($correlationId) + { + $this->setHeader('correlation_id', (string) $correlationId); + } + + /** + * {@inheritdoc} + */ + public function getCorrelationId() + { + return $this->getHeader('correlation_id'); + } + + /** + * {@inheritdoc} + */ + public function setMessageId($messageId) + { + $this->setHeader('message_id', (string) $messageId); + } + + /** + * {@inheritdoc} + */ + public function getMessageId() + { + return $this->getHeader('message_id'); + } + + /** + * {@inheritdoc} + */ + public function getTimestamp() + { + $value = $this->getHeader('timestamp'); + + return $value === null ? null : (int) $value; + } + + /** + * {@inheritdoc} + */ + public function setTimestamp($timestamp) + { + $this->setHeader('timestamp', $timestamp); + } + + /** + * @param string|null $replyTo + */ + public function setReplyTo($replyTo) + { + $this->setHeader('reply_to', $replyTo); + } + + /** + * @return string|null + */ + public function getReplyTo() + { + return $this->getHeader('reply_to'); + } + + /** + * @return int + */ + public function getPartition() + { + return $this->partition; + } + + /** + * @param int $partition + */ + public function setPartition($partition) + { + $this->partition = $partition; + } + + /** + * @return string|null + */ + public function getKey() + { + return $this->key; + } + + /** + * @param string|null $key + */ + public function setKey($key) + { + $this->key = $key; + } + + /** + * {@inheritdoc} + */ + public function jsonSerialize() + { + return [ + 'body' => $this->getBody(), + 'properties' => $this->getProperties(), + 'headers' => $this->getHeaders(), + ]; + } + + /** + * @param string $json + * + * @return self + */ + public static function jsonUnserialize($json) + { + $data = json_decode($json, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new self($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php new file mode 100644 index 000000000..52704f3f4 --- /dev/null +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -0,0 +1,44 @@ +producer = $producer; + } + + /** + * {@inheritdoc} + * + * @param RdKafkaTopic $destination + * @param RdKafkaMessage $message + */ + public function send(PsrDestination $destination, PsrMessage $message) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); + InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class); + + $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA; + $key = $message->getKey() ?: $destination->getKey() ?: null; + $payload = json_encode($message); + + $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); + $topic->produce($partition, 0 /* must be 0 */, $payload, $key); + } +} diff --git a/pkg/rdkafka/RdKafkaTopic.php b/pkg/rdkafka/RdKafkaTopic.php new file mode 100644 index 000000000..cf2ae54a9 --- /dev/null +++ b/pkg/rdkafka/RdKafkaTopic.php @@ -0,0 +1,85 @@ +name = $name; + $this->conf = new TopicConf(); + } + + /** + * {@inheritdoc} + */ + public function getTopicName() + { + return $this->name; + } + + /** + * @return TopicConf + */ + public function getConf() + { + return $this->conf; + } + + /** + * @return int + */ + public function getPartition() + { + return $this->partition; + } + + /** + * @param int $partition + */ + public function setPartition($partition) + { + $this->partition = $partition; + } + + /** + * @return string|null + */ + public function getKey() + { + return $this->key; + } + + /** + * @param string|null $key + */ + public function setKey($key) + { + $this->key = $key; + } +} diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php new file mode 100644 index 000000000..43867abcb --- /dev/null +++ b/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php @@ -0,0 +1,16 @@ +=5.6", + "ext-rdkafka": "^3.0.3", + "queue-interop/queue-interop": "^0.5@dev", + "psr/log": "^1" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "^0.6@dev", + "enqueue/enqueue": "^0.6@dev", + "enqueue/null": "^0.6@dev", + "queue-interop/queue-spec": "^0.5@dev", + "kwn/php-rdkafka-stubs": "^1.0.2" + }, + "autoload": { + "psr-4": { "Enqueue\\RdKafka\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "suggest": { + "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features" + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.6.x-dev" + } + } +} diff --git a/pkg/rdkafka/phpunit.xml.dist b/pkg/rdkafka/phpunit.xml.dist new file mode 100644 index 000000000..d899fd655 --- /dev/null +++ b/pkg/rdkafka/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + + From 81bddfdd0be191d9220290e93109bc9d7c84791b Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 12 Jul 2017 16:42:22 +0300 Subject: [PATCH 02/11] kafka transport --- pkg/rdkafka/RdKafkaConnectionFactory.php | 38 ++++ pkg/rdkafka/RdKafkaConsumer.php | 163 ++++++++++++++++++ pkg/rdkafka/RdKafkaContext.php | 33 +++- pkg/rdkafka/RdKafkaMessage.php | 22 +++ .../Spec/RdKafkaConnectionFactoryTest.php | 13 ++ pkg/rdkafka/Tests/Spec/RdKafkaContextTest.php | 17 ++ pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php | 2 +- .../RdKafkaSendToAndReceiveFromTopicTest.php | 16 ++ ...fkaSendToAndReceiveNoWaitFromTopicTest.php | 16 ++ 9 files changed, 311 insertions(+), 9 deletions(-) create mode 100644 pkg/rdkafka/RdKafkaConsumer.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaContextTest.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php diff --git a/pkg/rdkafka/RdKafkaConnectionFactory.php b/pkg/rdkafka/RdKafkaConnectionFactory.php index 95d5ae7b8..03127219e 100644 --- a/pkg/rdkafka/RdKafkaConnectionFactory.php +++ b/pkg/rdkafka/RdKafkaConnectionFactory.php @@ -23,6 +23,7 @@ class RdKafkaConnectionFactory implements PsrConnectionFactory * 'rebalance_cb' => null, * 'partitioner' => null, // https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-topicconf.setpartitioner.html * 'log_level' => null, + * 'commit_async' => false, * ] * * or @@ -63,7 +64,43 @@ public function createContext() */ private function parseDsn($dsn) { + $dsnConfig = parse_url($dsn); + if (false === $dsnConfig) { + throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn)); + } + + $dsnConfig = array_replace([ + 'scheme' => null, + 'host' => null, + 'port' => null, + 'user' => null, + 'pass' => null, + 'path' => null, + 'query' => null, + ], $dsnConfig); + + if ('rdkafka' !== $dsnConfig['scheme']) { + throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "rdkafka" only.', $dsnConfig['scheme'])); + } + +// $query = []; +// if ($dsnConfig['query']) { +// parse_str($dsnConfig['query'], $query); +// } + + $broker = $dsnConfig['host']; + if ($dsnConfig['port']) { + $broker .= ':'.$dsnConfig['port']; + } + + return [ + 'global' => [ + 'group.id' => uniqid('', true), + 'metadata.broker.list' => $broker, + 'auto.offset.reset' => 'largest', + ], + ]; } /** @@ -74,6 +111,7 @@ private function defaultConfig() return [ 'global' => [ 'metadata.broker.list' => 'localhost:9092', + 'group.id' => uniqid('', true), ], ]; } diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php new file mode 100644 index 000000000..5dbb157a9 --- /dev/null +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -0,0 +1,163 @@ +consumer = $consumer; + $this->context = $context; + $this->topic = $topic; + $this->subscribed = false; + $this->commitAsync = false; + } + + /** + * @return bool + */ + public function isCommitAsync() + { + return $this->commitAsync; + } + + /** + * @param bool $async + */ + public function setCommitAsync($async) + { + $this->commitAsync = (bool) $async; + } + + /** + * {@inheritdoc} + */ + public function getQueue() + { + return $this->topic; + } + + /** + * {@inheritdoc} + */ + public function receive($timeout = 0) + { + $this->consumer->subscribe([$this->topic->getTopicName()]); + + if ($timeout > 0) { + $message = $this->doReceive($timeout); + } else { + while (true) { + if ($message = $this->doReceive(500)) { + break; + } + } + } + + $this->consumer->unsubscribe(); + + return $message; + } + + /** + * {@inheritdoc} + */ + public function receiveNoWait() + { + return $this->receive(1000); + } + + /** + * {@inheritdoc} + * + * @param RdKafkaMessage $message + */ + public function acknowledge(PsrMessage $message) + { + InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class); + + if (false == $message->getKafkaMessage()) { + throw new \LogicException('The message could not be acknowledged because it does not have kafka message set.'); + } + + if ($this->isCommitAsync()) { + $this->consumer->commitAsync($message->getKafkaMessage()); + } else { + $this->consumer->commit($message->getKafkaMessage()); + } + } + + /** + * {@inheritdoc} + * + * @param RdKafkaMessage $message + */ + public function reject(PsrMessage $message, $requeue = false) + { + $this->acknowledge($message); + + if ($requeue) { + $this->context->createProducer()->send($this->topic, $message); + } + } + + /** + * @param int $timeout + * + * @return RdKafkaMessage|null + */ + private function doReceive($timeout) + { + $kafkaMessage = $this->consumer->consume($timeout); + + switch ($kafkaMessage->err) { + case RD_KAFKA_RESP_ERR__PARTITION_EOF: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + break; + case RD_KAFKA_RESP_ERR_NO_ERROR: + $message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload); + $message->setKey($kafkaMessage->key); + $message->setPartition($kafkaMessage->partition); + $message->setKafkaMessage($kafkaMessage); + + return $message; + default: + throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err); + break; + } + } +} diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 8d84532df..652b5223a 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -1,14 +1,11 @@ getProducer()); } + /** + * {@inheritdoc} + * + * @param RdKafkaTopic $destination + */ public function createConsumer(PsrDestination $destination) { - // TODO: Implement createConsumer() method. + InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); + + $consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination); + + if (isset($this->config['commit_async'])) { + $consumer->setCommitAsync($this->config['commit_async']); + } + + return $consumer; } + /** + * {@inheritdoc} + */ public function close() { - } /** diff --git a/pkg/rdkafka/RdKafkaMessage.php b/pkg/rdkafka/RdKafkaMessage.php index 0f1f500fd..847057bf8 100644 --- a/pkg/rdkafka/RdKafkaMessage.php +++ b/pkg/rdkafka/RdKafkaMessage.php @@ -2,6 +2,7 @@ namespace Enqueue\RdKafka; use Interop\Queue\PsrMessage; +use RdKafka\Message; class RdKafkaMessage implements PsrMessage, \JsonSerializable { @@ -35,6 +36,11 @@ class RdKafkaMessage implements PsrMessage, \JsonSerializable */ private $key; + /** + * @var Message + */ + private $kafkaMessage; + /** * @param string $body * @param array $properties @@ -242,6 +248,22 @@ public function setKey($key) $this->key = $key; } + /** + * @return Message + */ + public function getKafkaMessage() + { + return $this->kafkaMessage; + } + + /** + * @param Message $message + */ + public function setKafkaMessage(Message $message) + { + $this->kafkaMessage = $message; + } + /** * {@inheritdoc} */ diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php new file mode 100644 index 000000000..2204e4983 --- /dev/null +++ b/pkg/rdkafka/Tests/Spec/RdKafkaConnectionFactoryTest.php @@ -0,0 +1,13 @@ + [ + 'group.id' => 'group', + ], + ]); + } +} diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php index 43867abcb..25dd75f9e 100644 --- a/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php +++ b/pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php @@ -11,6 +11,6 @@ class RdKafkaMessageTest extends PsrMessageSpec */ protected function createMessage() { - return new RdKafkaMessage(self::EXPECTED_TOPIC_NAME); + return new RdKafkaMessage(); } } diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..59d006171 --- /dev/null +++ b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php @@ -0,0 +1,16 @@ +createContext(); + } +} diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..d9984dab2 --- /dev/null +++ b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,16 @@ +createContext(); + } +} From 56cd5e41d5e2ce948fb517c6396f15d9d902ee74 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 13 Jul 2017 13:00:56 +0300 Subject: [PATCH 03/11] kafka transport --- pkg/rdkafka/RdKafkaConnectionFactory.php | 22 +-- pkg/rdkafka/RdKafkaConsumer.php | 2 +- pkg/rdkafka/RdKafkaTopic.php | 11 +- .../Tests/RdKafkaConnectionFactoryTest.php | 107 +++++++++++ pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 175 ++++++++++++++++++ pkg/rdkafka/Tests/RdKafkaContextTest.php | 27 +++ pkg/rdkafka/Tests/RdKafkaMessageTest.php | 33 ++++ pkg/rdkafka/Tests/RdKafkaProducerTest.php | 86 +++++++++ pkg/rdkafka/Tests/RdKafkaTopicTest.php | 32 ++++ pkg/rdkafka/Tests/Spec/RdKafkaQueueTest.php | 13 ++ 10 files changed, 492 insertions(+), 16 deletions(-) create mode 100644 pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaConsumerTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaContextTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaMessageTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaProducerTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaTopicTest.php create mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaQueueTest.php diff --git a/pkg/rdkafka/RdKafkaConnectionFactory.php b/pkg/rdkafka/RdKafkaConnectionFactory.php index 03127219e..2cf3e1dae 100644 --- a/pkg/rdkafka/RdKafkaConnectionFactory.php +++ b/pkg/rdkafka/RdKafkaConnectionFactory.php @@ -46,7 +46,6 @@ public function __construct($config = 'rdkafka://') $this->config = array_replace($this->defaultConfig(), $config); } - /** * {@inheritdoc} * @@ -83,24 +82,19 @@ private function parseDsn($dsn) throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "rdkafka" only.', $dsnConfig['scheme'])); } -// $query = []; -// if ($dsnConfig['query']) { -// parse_str($dsnConfig['query'], $query); -// } - + $config = []; + if ($dsnConfig['query']) { + parse_str($dsnConfig['query'], $config); + } $broker = $dsnConfig['host']; if ($dsnConfig['port']) { $broker .= ':'.$dsnConfig['port']; } - return [ - 'global' => [ - 'group.id' => uniqid('', true), - 'metadata.broker.list' => $broker, - 'auto.offset.reset' => 'largest', - ], - ]; + $config['global']['metadata.broker.list'] = $broker; + + return array_replace_recursive($this->defaultConfig(), $config); } /** @@ -110,8 +104,8 @@ private function defaultConfig() { return [ 'global' => [ - 'metadata.broker.list' => 'localhost:9092', 'group.id' => uniqid('', true), + 'metadata.broker.list' => 'localhost:9092', ], ]; } diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 5dbb157a9..24acc1c6d 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -98,7 +98,7 @@ public function receive($timeout = 0) */ public function receiveNoWait() { - return $this->receive(1000); + return $this->receive(10); } /** diff --git a/pkg/rdkafka/RdKafkaTopic.php b/pkg/rdkafka/RdKafkaTopic.php index cf2ae54a9..593dc948b 100644 --- a/pkg/rdkafka/RdKafkaTopic.php +++ b/pkg/rdkafka/RdKafkaTopic.php @@ -1,10 +1,11 @@ name; } + /** + * {@inheritdoc} + */ + public function getQueueName() + { + return $this->name; + } + /** * @return TopicConf */ diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php new file mode 100644 index 000000000..53adfd453 --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryTest.php @@ -0,0 +1,107 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null'); + + new RdKafkaConnectionFactory(new \stdClass()); + } + + public function testThrowIfSchemeIsNotBeanstalkAmqp() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "rdkafka" only.'); + + new RdKafkaConnectionFactory('http://example.com'); + } + + public function testThrowIfDsnCouldNotBeParsed() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Failed to parse DSN "rdkafka://:@/"'); + + new RdKafkaConnectionFactory('rdkafka://:@/'); + } + + public function testShouldBeExpectedDefaultConfig() + { + $factory = new RdKafkaConnectionFactory(null); + + $config = $this->getObjectAttribute($factory, 'config'); + + $this->assertNotEmpty($config['global']['group.id']); + + $config['global']['group.id'] = 'group-id'; + $this->assertSame([ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092', + ] + ], $config); + } + + public function testShouldBeExpectedDefaultDsnConfig() + { + $factory = new RdKafkaConnectionFactory('rdkafka://'); + + $config = $this->getObjectAttribute($factory, 'config'); + + $this->assertNotEmpty($config['global']['group.id']); + + $config['global']['group.id'] = 'group-id'; + $this->assertSame([ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092', + ] + ], $config); + } + + /** + * @dataProvider provideConfigs + * + * @param mixed $config + * @param mixed $expectedConfig + */ + public function testShouldParseConfigurationAsExpected($config, $expectedConfig) + { + $factory = new RdKafkaConnectionFactory($config); + + $this->assertAttributeEquals($expectedConfig, 'config', $factory); + } + + public static function provideConfigs() + { + yield [ + 'rdkafka://theHost:1234?global%5Bgroup.id%5D=group-id', + [ + 'global' => [ + 'metadata.broker.list' => 'theHost:1234', + 'group.id' => 'group-id', + ] + ], + ]; + + yield [ + [ + 'global' => [ + 'metadata.broker.list' => 'theHost:1234', + 'group.id' => 'group-id', + ] + ], + [ + 'global' => [ + 'metadata.broker.list' => 'theHost:1234', + 'group.id' => 'group-id', + ] + ], + ]; + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php new file mode 100644 index 000000000..7cadef2dc --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -0,0 +1,175 @@ +createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic('')); + } + + public function testShouldReturnQueueSetInConstructor() + { + $destination = new RdKafkaTopic(''); + + $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), $destination); + + $this->assertSame($destination, $consumer->getQueue()); + } + + public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('subscribe') + ->with(['dest']) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('consume') + ->with(1000) + ->willReturn($kafkaMessage) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('unsubscribe') + ; + + $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + + $this->assertNull($consumer->receive(1000)); + } + + public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() + { + $destination = new RdKafkaTopic('dest'); + + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; + $kafkaMessage->payload = json_encode($message); + + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('subscribe') + ->with(['dest']) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('consume') + ->with(1000) + ->willReturn($kafkaMessage) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('unsubscribe') + ; + + $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + + $actualMessage = $consumer->receive(1000); + + $this->assertSame('theBody', $actualMessage->getBody()); + $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties()); + $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders()); + $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage()); + } + + public function testShouldReceiveNoWaitFromQueueAndReturnNullIfNoMessageInQueue() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('subscribe') + ->with(['dest']) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('consume') + ->with(10) + ->willReturn($kafkaMessage) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('unsubscribe') + ; + + $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + + $this->assertNull($consumer->receiveNoWait()); + } + + public function testShouldReceiveNoWaitFromQueueAndReturnMessageIfMessageInQueue() + { + $destination = new RdKafkaTopic('dest'); + + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; + $kafkaMessage->payload = json_encode($message); + + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('subscribe') + ->with(['dest']) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('consume') + ->with(10) + ->willReturn($kafkaMessage) + ; + $kafkaConsumer + ->expects($this->once()) + ->method('unsubscribe') + ; + + $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + + $actualMessage = $consumer->receiveNoWait(); + + $this->assertSame('theBody', $actualMessage->getBody()); + $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties()); + $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders()); + $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|KafkaConsumer + */ + private function createKafkaConsumerMock() + { + return $this->createMock(KafkaConsumer::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RdKafkaContext + */ + private function createContextMock() + { + return $this->createMock(RdKafkaContext::class); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php new file mode 100644 index 000000000..925e8dbde --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -0,0 +1,27 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('Not implemented'); + $context->createTemporaryQueue(); + } + + public function testThrowInvalidDestinationIfInvalidDestinationGivenOnCreateConsumer() + { + $context = new RdKafkaContext([]); + + $this->expectException(InvalidDestinationException::class); + $context->createConsumer(new NullQueue('aQueue')); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaMessageTest.php b/pkg/rdkafka/Tests/RdKafkaMessageTest.php new file mode 100644 index 000000000..bd135bb99 --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaMessageTest.php @@ -0,0 +1,33 @@ +setPartition(5); + + $this->assertSame(5, $message->getPartition()); + } + + public function testCouldSetGetKey() + { + $message = new RdKafkaMessage(); + $message->setKey('key'); + + $this->assertSame('key', $message->getKey()); + } + + public function testCouldSetGetKafkaMessage() + { + $message = new RdKafkaMessage(); + $message->setKafkaMessage($kafkaMessage = $this->createMock(Message::class)); + + $this->assertSame($kafkaMessage, $message->getKafkaMessage()); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php new file mode 100644 index 000000000..41c660a79 --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -0,0 +1,86 @@ +createKafkaProducerMock()); + } + + public function testThrowIfDestinationInvalid() + { + $producer = new RdKafkaProducer($this->createKafkaProducerMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\RdKafka\RdKafkaTopic but got Enqueue\Null\NullQueue.'); + $producer->send(new NullQueue('aQueue'), new RdKafkaMessage()); + } + + public function testThrowIfMessageInvalid() + { + $producer = new RdKafkaProducer($this->createKafkaProducerMock()); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\RdKafka\RdKafkaMessage but it is Enqueue\Null\NullMessage.'); + $producer->send(new RdKafkaTopic('aQueue'), new NullMessage()); + } + + public function testShouldJsonEncodeMessageAndPutToExpectedTube() + { + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $message->setKey('key'); + + $kafkaTopic = $this->createKafkaTopicMock(); + $kafkaTopic + ->expects($this->once()) + ->method('produce') + ->with( + RD_KAFKA_PARTITION_UA, + 0, + '{"body":"theBody","properties":{"foo":"fooVal"},"headers":{"bar":"barVal"}}', + 'key' + ) + ; + + $kafkaProducer = $this->createKafkaProducerMock(); + $kafkaProducer + ->expects($this->once()) + ->method('newTopic') + ->with('theQueueName', $this->isInstanceOf(TopicConf::class)) + ->willReturn($kafkaTopic) + ; + + $producer = new RdKafkaProducer($kafkaProducer); + + $producer->send(new RdKafkaTopic('theQueueName'), $message); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ProducerTopic + */ + private function createKafkaTopicMock() + { + return $this->createMock(ProducerTopic::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Producer + */ + private function createKafkaProducerMock() + { + return $this->createMock(Producer::class); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaTopicTest.php b/pkg/rdkafka/Tests/RdKafkaTopicTest.php new file mode 100644 index 000000000..9fe4565ef --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaTopicTest.php @@ -0,0 +1,32 @@ +setPartition(5); + + $this->assertSame(5, $topic->getPartition()); + } + + public function testCouldSetGetKey() + { + $topic = new RdKafkaTopic('topic'); + $topic->setKey('key'); + + $this->assertSame('key', $topic->getKey()); + } + + public function testShouldReturnConfInstance() + { + $topic = new RdKafkaTopic('topic'); + + $this->assertInstanceOf(TopicConf::class, $topic->getConf()); + } +} diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaQueueTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaQueueTest.php new file mode 100644 index 000000000..fde5ba90d --- /dev/null +++ b/pkg/rdkafka/Tests/Spec/RdKafkaQueueTest.php @@ -0,0 +1,13 @@ + Date: Fri, 14 Jul 2017 11:59:20 +0300 Subject: [PATCH 04/11] kafka transport --- docker-compose.yml | 2 + pkg/rdkafka/RdKafkaConsumer.php | 2 +- pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 66 ++----------------- .../RdKafkaSendToAndReceiveFromTopicTest.php | 34 +++++++++- ...fkaSendToAndReceiveNoWaitFromTopicTest.php | 16 ----- 5 files changed, 41 insertions(+), 79 deletions(-) delete mode 100644 pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php diff --git a/docker-compose.yml b/docker-compose.yml index e641a5864..1861301f7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,8 @@ services: - BEANSTALKD_PORT=11300 - BEANSTALKD_DSN=beanstalk://beanstalkd:11300 - GEARMAN_DSN=gearman://gearmand:4730 + - RDKAFKA_HOST=kafka + - RDKAFKA_PORT=9092 rabbitmq: image: enqueue/rabbitmq:latest diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 24acc1c6d..ad45a6936 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -98,7 +98,7 @@ public function receive($timeout = 0) */ public function receiveNoWait() { - return $this->receive(10); + throw new \LogicException('Not implemented'); } /** diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 7cadef2dc..8bed69eb6 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -91,70 +91,14 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage()); } - public function testShouldReceiveNoWaitFromQueueAndReturnNullIfNoMessageInQueue() + public function testShouldThrowExceptionNotImplementedOnReceiveNoWait() { - $destination = new RdKafkaTopic('dest'); - - $kafkaMessage = new Message(); - $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; - - $kafkaConsumer = $this->createKafkaConsumerMock(); - $kafkaConsumer - ->expects($this->once()) - ->method('subscribe') - ->with(['dest']) - ; - $kafkaConsumer - ->expects($this->once()) - ->method('consume') - ->with(10) - ->willReturn($kafkaMessage) - ; - $kafkaConsumer - ->expects($this->once()) - ->method('unsubscribe') - ; - - $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); - - $this->assertNull($consumer->receiveNoWait()); - } + $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic('')); - public function testShouldReceiveNoWaitFromQueueAndReturnMessageIfMessageInQueue() - { - $destination = new RdKafkaTopic('dest'); - - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Not implemented'); - $kafkaMessage = new Message(); - $kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; - $kafkaMessage->payload = json_encode($message); - - $kafkaConsumer = $this->createKafkaConsumerMock(); - $kafkaConsumer - ->expects($this->once()) - ->method('subscribe') - ->with(['dest']) - ; - $kafkaConsumer - ->expects($this->once()) - ->method('consume') - ->with(10) - ->willReturn($kafkaMessage) - ; - $kafkaConsumer - ->expects($this->once()) - ->method('unsubscribe') - ; - - $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); - - $actualMessage = $consumer->receiveNoWait(); - - $this->assertSame('theBody', $actualMessage->getBody()); - $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties()); - $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders()); - $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage()); + $consumer->receiveNoWait(); } /** diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php index 59d006171..e242e4d2d 100644 --- a/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php +++ b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php @@ -2,6 +2,7 @@ namespace Enqueue\RdKafka\Tests\Spec; use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Interop\Queue\PsrMessage; use Interop\Queue\Spec\SendToAndReceiveFromTopicSpec; /** @@ -11,6 +12,37 @@ class RdKafkaSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec { protected function createContext() { - return (new RdKafkaConnectionFactory(getenv('RDKAFKA_DSN')))->createContext(); + $config = [ + 'global' => [ + 'group.id' => uniqid('', true), + 'metadata.broker.list' => getenv('RDKAFKA_HOST').':'.getenv('RDKAFKA_PORT'), + 'enable.auto.commit' => 'false', + ], + 'topic' => [ + 'auto.offset.reset' => 'beginning', + ] + ]; + + return (new RdKafkaConnectionFactory($config))->createContext(); + } + + public function test() + { + $context = $this->createContext(); + + $topic = $this->createTopic($context, uniqid('', true)); + + $consumer = $context->createConsumer($topic); + + $expectedBody = __CLASS__.time(); + + $context->createProducer()->send($topic, $context->createMessage($expectedBody)); + + $message = $consumer->receive(10000); // 10 sec + + $this->assertInstanceOf(PsrMessage::class, $message); + $consumer->acknowledge($message); + + $this->assertSame($expectedBody, $message->getBody()); } } diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php deleted file mode 100644 index d9984dab2..000000000 --- a/pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveNoWaitFromTopicTest.php +++ /dev/null @@ -1,16 +0,0 @@ -createContext(); - } -} From 755eb6f39ff82739e688fa72b77a53742942614c Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 14 Jul 2017 12:11:08 +0300 Subject: [PATCH 05/11] kafka transport --- pkg/rdkafka/RdKafkaConnectionFactory.php | 1 + pkg/rdkafka/RdKafkaConsumer.php | 1 + pkg/rdkafka/RdKafkaContext.php | 1 + pkg/rdkafka/RdKafkaMessage.php | 1 + pkg/rdkafka/RdKafkaProducer.php | 1 + pkg/rdkafka/RdKafkaTopic.php | 1 + .../Tests/RdKafkaConnectionFactoryTest.php | 11 ++++--- pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 1 + pkg/rdkafka/Tests/RdKafkaContextTest.php | 1 + pkg/rdkafka/Tests/RdKafkaMessageTest.php | 1 + pkg/rdkafka/Tests/RdKafkaProducerTest.php | 1 + pkg/rdkafka/Tests/RdKafkaTopicTest.php | 1 + .../Spec/RdKafkaConnectionFactoryTest.php | 1 + pkg/rdkafka/Tests/Spec/RdKafkaContextTest.php | 1 + pkg/rdkafka/Tests/Spec/RdKafkaMessageTest.php | 1 + pkg/rdkafka/Tests/Spec/RdKafkaQueueTest.php | 1 + .../RdKafkaSendToAndReceiveFromTopicTest.php | 33 ++++++++++--------- pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php | 1 + 18 files changed, 39 insertions(+), 21 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConnectionFactory.php b/pkg/rdkafka/RdKafkaConnectionFactory.php index 2cf3e1dae..4db818686 100644 --- a/pkg/rdkafka/RdKafkaConnectionFactory.php +++ b/pkg/rdkafka/RdKafkaConnectionFactory.php @@ -1,4 +1,5 @@ [ 'group.id' => 'group-id', 'metadata.broker.list' => 'localhost:9092', - ] + ], ], $config); } @@ -60,7 +61,7 @@ public function testShouldBeExpectedDefaultDsnConfig() 'global' => [ 'group.id' => 'group-id', 'metadata.broker.list' => 'localhost:9092', - ] + ], ], $config); } @@ -85,7 +86,7 @@ public static function provideConfigs() 'global' => [ 'metadata.broker.list' => 'theHost:1234', 'group.id' => 'group-id', - ] + ], ], ]; @@ -94,13 +95,13 @@ public static function provideConfigs() 'global' => [ 'metadata.broker.list' => 'theHost:1234', 'group.id' => 'group-id', - ] + ], ], [ 'global' => [ 'metadata.broker.list' => 'theHost:1234', 'group.id' => 'group-id', - ] + ], ], ]; } diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 8bed69eb6..3c40873bf 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -1,4 +1,5 @@ [ - 'group.id' => uniqid('', true), - 'metadata.broker.list' => getenv('RDKAFKA_HOST').':'.getenv('RDKAFKA_PORT'), - 'enable.auto.commit' => 'false', - ], - 'topic' => [ - 'auto.offset.reset' => 'beginning', - ] - ]; - - return (new RdKafkaConnectionFactory($config))->createContext(); - } - public function test() { $context = $this->createContext(); @@ -45,4 +30,20 @@ public function test() $this->assertSame($expectedBody, $message->getBody()); } + + protected function createContext() + { + $config = [ + 'global' => [ + 'group.id' => uniqid('', true), + 'metadata.broker.list' => getenv('RDKAFKA_HOST').':'.getenv('RDKAFKA_PORT'), + 'enable.auto.commit' => 'false', + ], + 'topic' => [ + 'auto.offset.reset' => 'beginning', + ], + ]; + + return (new RdKafkaConnectionFactory($config))->createContext(); + } } diff --git a/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php b/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php index 1e019d5c0..86e290b4c 100644 --- a/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php +++ b/pkg/rdkafka/Tests/Spec/RdKafkaTopicTest.php @@ -1,4 +1,5 @@ Date: Fri, 14 Jul 2017 13:26:10 +0300 Subject: [PATCH 06/11] kafka transport --- .travis.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 8e7c6d1b9..01a4a1146 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,6 +44,10 @@ cache: directories: - $HOME/.composer/cache +before_install: + - git clone https://github.com/edenhill/librdkafka.git $HOME/librdkafka + - cd $HOME/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && make install + install: - rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini; - echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini @@ -55,7 +59,7 @@ install: script: # misssing pkg/amqp-ext pkg/job-queue pkg/redis - - if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test; fi + - if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test pkg/rdkafka; fi - if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi - if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi - if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi From b3fa0bdb85dfd2f8a77592a0ef2837f2ccdec944 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 14 Jul 2017 13:41:10 +0300 Subject: [PATCH 07/11] kafka transport --- .travis.yml | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/.travis.yml b/.travis.yml index 01a4a1146..65c555166 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ +sudo: required + git: depth: 10 @@ -6,37 +8,27 @@ language: php matrix: include: - php: 5.6 - sudo: false env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true - php: 7.1 - sudo: false env: SYMFONY_VERSION=3.0.* PHPSTAN=true - php: 7.1 - sudo: false env: SYMFONY_VERSION=3.0.* PHP_CS_FIXER=true - php: 7.0 - sudo: false env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true - php: 5.6 - sudo: false env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true SYMFONY_DEPRECATIONS_HELPER=weak - php: 7.0 - sudo: false env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true SYMFONY_DEPRECATIONS_HELPER=weak - php: 7.1 - sudo: required services: docker env: SYMFONY_VERSION=2.8.* FUNCTIONAL_TESTS=true - php: 7.1 - sudo: required services: docker env: SYMFONY_VERSION=3.0.* FUNCTIONAL_TESTS=true - php: 7.1 - sudo: required services: docker env: SYMFONY_VERSION=3.2.* FUNCTIONAL_TESTS=true - php: 7.1 - sudo: required services: docker env: SYMFONY_VERSION=3.3.* FUNCTIONAL_TESTS=true @@ -44,11 +36,12 @@ cache: directories: - $HOME/.composer/cache -before_install: +install: - git clone https://github.com/edenhill/librdkafka.git $HOME/librdkafka - - cd $HOME/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && make install + - cd $HOME/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && sudo make install + - pecl install rdkafka + - cd $TRAVIS_BUILD_DIR -install: - rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini; - echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini - composer require symfony/symfony:${SYMFONY_VERSION} --no-update From 21dfaf935acb8f3a04a78e862862f412c900948e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 14 Jul 2017 15:01:01 +0300 Subject: [PATCH 08/11] kafka transport --- pkg/rdkafka/RdKafkaConsumer.php | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 77caf49d4..0cbc48f8a 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -79,6 +79,7 @@ public function receive($timeout = 0) { $this->consumer->subscribe([$this->topic->getTopicName()]); + $message = null; if ($timeout > 0) { $message = $this->doReceive($timeout); } else { From c6cbc237cd9450deb1d8c439479054ab1848ec66 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 17 Jul 2017 12:50:42 +0300 Subject: [PATCH 09/11] fix tests. --- bin/test | 1 + docker-compose.yml | 2 ++ 2 files changed, 3 insertions(+) diff --git a/bin/test b/bin/test index bcee4d6e7..6641bc743 100755 --- a/bin/test +++ b/bin/test @@ -25,6 +25,7 @@ waitForService mysql 3306 50 waitForService redis 6379 50 waitForService beanstalkd 11300 waitForService gearmand 4730 +waitForService kafka 9092 php pkg/job-queue/Tests/Functional/app/console doctrine:database:create php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force diff --git a/docker-compose.yml b/docker-compose.yml index 1861301f7..a1684e30a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,8 @@ services: - redis - beanstalkd - gearmand + - kafka + - zookeeper volumes: - './:/mqdev' environment: From ee89d2985fc0a463e771de7921f26aa545f500d8 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 17 Jul 2017 13:44:00 +0300 Subject: [PATCH 10/11] [kafka] Do not use extension to run unit tests, use the stub lib. --- .travis.yml | 6 ------ composer.json | 11 ++++++++++- pkg/rdkafka/Tests/bootstrap.php | 16 ++++++++++++++++ pkg/rdkafka/composer.json | 11 +++++++++++ 4 files changed, 37 insertions(+), 7 deletions(-) create mode 100644 pkg/rdkafka/Tests/bootstrap.php diff --git a/.travis.yml b/.travis.yml index 65c555166..095fb584a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,11 +37,6 @@ cache: - $HOME/.composer/cache install: - - git clone https://github.com/edenhill/librdkafka.git $HOME/librdkafka - - cd $HOME/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && sudo make install - - pecl install rdkafka - - cd $TRAVIS_BUILD_DIR - - rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini; - echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini - composer require symfony/symfony:${SYMFONY_VERSION} --no-update @@ -51,7 +46,6 @@ install: - if [ "$FUNCTIONAL_TESTS" = true ]; then bin/dev -b; fi script: - # misssing pkg/amqp-ext pkg/job-queue pkg/redis - if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test pkg/rdkafka; fi - if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi - if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi diff --git a/composer.json b/composer.json index c95c124c9..c6c665357 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,16 @@ "phpstan/phpstan": "^0.7.0" }, "autoload": { - "files": ["pkg/enqueue/functions_include.php"] + "files": [ + "pkg/enqueue/functions_include.php", + "pkg/rdkafka/Tests/bootstrap.php" + ], + "psr-0": { + "RdKafka": "vendor/kwn/php-rdkafka-stubs/stubs" + }, + "psr-4": { + "RdKafka\\": "vendor/kwn/php-rdkafka-stubs/stubs/RdKafka" + } }, "config": { "bin-dir": "bin" diff --git a/pkg/rdkafka/Tests/bootstrap.php b/pkg/rdkafka/Tests/bootstrap.php new file mode 100644 index 000000000..8d913af5b --- /dev/null +++ b/pkg/rdkafka/Tests/bootstrap.php @@ -0,0 +1,16 @@ + Date: Mon, 17 Jul 2017 14:03:38 +0300 Subject: [PATCH 11/11] [kafka] add docs --- README.md | 12 +++++- docs/index.md | 2 + docs/transport/kafka.md | 91 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 docs/transport/kafka.md diff --git a/README.md b/README.md index 4363b6e3e..c56789970 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,17 @@ Features: * [Feature rich](docs/quick_tour.md). * Implements [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transports based on a[queue-interop](https://github.com/queue-interop/queue-interop) interfaces. -* Supported transports [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ), [Beanstalk](docs/transport/pheanstalk.md), [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Gearman](docs/transport/gearman.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md). +* Supported transports + * [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ) + * [Beanstalk](docs/transport/pheanstalk.md) + * [STOMP](docs/transport/stomp.md) + * [Amazon SQS](docs/transport/sqs.md) + * [Kafka](docs/transport/kafka.md) + * [Redis](docs/transport/redis.md) + * [Gearman](docs/transport/gearman.md) + * [Doctrine DBAL](docs/transport/dbal.md) + * [Filesystem](docs/transport/filesystem.md) + * [Null](docs/transport/null.md). * [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md) * [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md) * [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support. diff --git a/docs/index.md b/docs/index.md index 49ae4c2bf..9d6913abf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -5,6 +5,8 @@ - [Amqp (RabbitMQ, ActiveMQ)](transport/amqp.md) - [Amazon SQS](transport/sqs.md) - [Beanstalk (Pheanstalk)](transport/pheanstalk.md) + - [Gearman](transport/gearman.md) + - [Kafka](transport/kafka.md) - [Stomp](transport/stomp.md) - [Redis](transport/redis.md) - [Doctrine DBAL](transport/dbal.md) diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md new file mode 100644 index 000000000..116dc2b5b --- /dev/null +++ b/docs/transport/kafka.md @@ -0,0 +1,91 @@ +# Kafka transport + +The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ broker. + +* [Installation](#installation) +* [Create context](#create-context) +* [Send message to topic](#send-message-to-topic) +* [Send message to queue](#send-message-to-queue) +* [Consume message](#consume-message) + +## Installation + +```bash +$ composer require enqueue/rdkafka +``` + +## Create context + +```php + [ + 'group.id' => uniqid('', true), + 'metadata.broker.list' => 'example.com:1000', + 'enable.auto.commit' => 'false', + ], + 'topic' => [ + 'auto.offset.reset' => 'beginning', + ], +]); + +$psrContext = $connectionFactory->createContext(); +``` + +## Send message to topic + +```php +createMessage('Hello world!'); + +$fooTopic = $psrContext->createTopic('foo'); + +$psrContext->createProducer()->send($fooTopic, $message); +``` + +## Send message to queue + +```php +createMessage('Hello world!'); + +$fooQueue = $psrContext->createQueue('foo'); + +$psrContext->createProducer()->send($fooQueue, $message); +``` + +## Consume message: + +```php +createQueue('foo'); + +$consumer = $psrContext->createConsumer($fooQueue); + +$message = $consumer->receive(); + +// process a message + +$consumer->acknowledge($message); +// $consumer->reject($message); +``` + +[back to index](index.md) \ No newline at end of file