From e929725ec7c41422490f321cbc734e7e0a13dba2 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 3 Aug 2017 10:07:26 +0300 Subject: [PATCH 1/2] amqp tutorial --- pkg/amqp-lib/tutorial/emit_log.php | 33 +++++++++++ pkg/amqp-lib/tutorial/emit_log_direct.php | 37 ++++++++++++ pkg/amqp-lib/tutorial/emit_log_topic.php | 37 ++++++++++++ pkg/amqp-lib/tutorial/new_task.php | 35 ++++++++++++ pkg/amqp-lib/tutorial/receive.php | 33 +++++++++++ pkg/amqp-lib/tutorial/receive_logs.php | 41 ++++++++++++++ pkg/amqp-lib/tutorial/receive_logs_direct.php | 49 ++++++++++++++++ pkg/amqp-lib/tutorial/receive_logs_topic.php | 49 ++++++++++++++++ pkg/amqp-lib/tutorial/rpc_client.php | 56 +++++++++++++++++++ pkg/amqp-lib/tutorial/rpc_server.php | 52 +++++++++++++++++ pkg/amqp-lib/tutorial/send.php | 26 +++++++++ pkg/amqp-lib/tutorial/worker.php | 38 +++++++++++++ 12 files changed, 486 insertions(+) create mode 100644 pkg/amqp-lib/tutorial/emit_log.php create mode 100644 pkg/amqp-lib/tutorial/emit_log_direct.php create mode 100644 pkg/amqp-lib/tutorial/emit_log_topic.php create mode 100644 pkg/amqp-lib/tutorial/new_task.php create mode 100644 pkg/amqp-lib/tutorial/receive.php create mode 100644 pkg/amqp-lib/tutorial/receive_logs.php create mode 100644 pkg/amqp-lib/tutorial/receive_logs_direct.php create mode 100644 pkg/amqp-lib/tutorial/receive_logs_topic.php create mode 100644 pkg/amqp-lib/tutorial/rpc_client.php create mode 100644 pkg/amqp-lib/tutorial/rpc_server.php create mode 100644 pkg/amqp-lib/tutorial/send.php create mode 100644 pkg/amqp-lib/tutorial/worker.php diff --git a/pkg/amqp-lib/tutorial/emit_log.php b/pkg/amqp-lib/tutorial/emit_log.php new file mode 100644 index 000000000..bc9fd4c2b --- /dev/null +++ b/pkg/amqp-lib/tutorial/emit_log.php @@ -0,0 +1,33 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$topic = $context->createTopic('logs'); +$topic->setType(AmqpTopic::TYPE_FANOUT); + +$context->declareTopic($topic); + +$data = implode(' ', array_slice($argv, 1)); +if (empty($data)) { + $data = 'info: Hello World!'; +} +$message = $context->createMessage($data); + +$context->createProducer()->send($topic, $message); + +echo ' [x] Sent ', $data, "\n"; + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/emit_log_direct.php b/pkg/amqp-lib/tutorial/emit_log_direct.php new file mode 100644 index 000000000..87e890854 --- /dev/null +++ b/pkg/amqp-lib/tutorial/emit_log_direct.php @@ -0,0 +1,37 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$topic = $context->createTopic('direct_logs'); +$topic->setType(AmqpTopic::TYPE_DIRECT); + +$context->declareTopic($topic); + +$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; + +$data = implode(' ', array_slice($argv, 2)); +if (empty($data)) { + $data = 'Hello World!'; +} + +$message = $context->createMessage($data); +$message->setRoutingKey($severity); + +$context->createProducer()->send($topic, $message); + +echo ' [x] Sent ',$severity,':',$data," \n"; + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/emit_log_topic.php b/pkg/amqp-lib/tutorial/emit_log_topic.php new file mode 100644 index 000000000..ab181865c --- /dev/null +++ b/pkg/amqp-lib/tutorial/emit_log_topic.php @@ -0,0 +1,37 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$topic = $context->createTopic('topic_logs'); +$topic->setType(AmqpTopic::TYPE_TOPIC); + +$context->declareTopic($topic); + +$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; + +$data = implode(' ', array_slice($argv, 2)); +if (empty($data)) { + $data = 'Hello World!'; +} + +$message = $context->createMessage($data); +$message->setRoutingKey($routing_key); + +$context->createProducer()->send($topic, $message); + +echo ' [x] Sent ',$routing_key,':',$data," \n"; + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/new_task.php b/pkg/amqp-lib/tutorial/new_task.php new file mode 100644 index 000000000..5c3c836f8 --- /dev/null +++ b/pkg/amqp-lib/tutorial/new_task.php @@ -0,0 +1,35 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$queue = $context->createQueue('task_queue'); +$queue->addFlag(AmqpQueue::FLAG_DURABLE); + +$context->declareQueue($queue); + +$data = implode(' ', array_slice($argv, 1)); +if (empty($data)) { + $data = 'Hello World!'; +} +$message = $context->createMessage($data); +$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT); + +$context->createProducer()->send($queue, $message); + +echo ' [x] Sent ', $data, "\n"; + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/receive.php b/pkg/amqp-lib/tutorial/receive.php new file mode 100644 index 000000000..337421020 --- /dev/null +++ b/pkg/amqp-lib/tutorial/receive.php @@ -0,0 +1,33 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$queue = $context->createQueue('hello'); +$context->declareQueue($queue); + +$consumer = $context->createConsumer($queue); +$consumer->addFlag(AmqpConsumer::FLAG_NOACK); + +echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; + +while (true) { + if ($message = $consumer->receive()) { + echo ' [x] Received ', $message->getBody(), "\n"; + } +} + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/receive_logs.php b/pkg/amqp-lib/tutorial/receive_logs.php new file mode 100644 index 000000000..bf68bf1fb --- /dev/null +++ b/pkg/amqp-lib/tutorial/receive_logs.php @@ -0,0 +1,41 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$topic = $context->createTopic('logs'); +$topic->setType(AmqpTopic::TYPE_FANOUT); + +$context->declareTopic($topic); + +$queue = $context->createTemporaryQueue(); + +$context->bind(new AmqpBind($topic, $queue)); + +$consumer = $context->createConsumer($queue); +$consumer->addFlag(AmqpConsumer::FLAG_NOACK); + +echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; + +while (true) { + if ($message = $consumer->receive()) { + echo ' [x] ', $message->getBody(), "\n"; + } +} + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/receive_logs_direct.php b/pkg/amqp-lib/tutorial/receive_logs_direct.php new file mode 100644 index 000000000..699d5108f --- /dev/null +++ b/pkg/amqp-lib/tutorial/receive_logs_direct.php @@ -0,0 +1,49 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$topic = $context->createTopic('direct_logs'); +$topic->setType(AmqpTopic::TYPE_DIRECT); + +$context->declareTopic($topic); + +$queue = $context->createTemporaryQueue(); + +$severities = array_slice($argv, 1); +if (empty($severities)) { + file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); + exit(1); +} + +foreach ($severities as $severity) { + $context->bind(new AmqpBind($topic, $queue, $severity)); +} + +$consumer = $context->createConsumer($queue); +$consumer->addFlag(AmqpConsumer::FLAG_NOACK); + +echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; + +while (true) { + if ($message = $consumer->receive()) { + echo ' [x] '.$message->getRoutingKey().':'.$message->getBody()."\n"; + } +} + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/receive_logs_topic.php b/pkg/amqp-lib/tutorial/receive_logs_topic.php new file mode 100644 index 000000000..a149be84c --- /dev/null +++ b/pkg/amqp-lib/tutorial/receive_logs_topic.php @@ -0,0 +1,49 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$topic = $context->createTopic('topic_logs'); +$topic->setType(AmqpTopic::TYPE_TOPIC); + +$context->declareTopic($topic); + +$queue = $context->createTemporaryQueue(); + +$binding_keys = array_slice($argv, 1); +if (empty($binding_keys)) { + file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); + exit(1); +} + +foreach ($binding_keys as $binding_key) { + $context->bind(new AmqpBind($topic, $queue, $binding_key)); +} + +$consumer = $context->createConsumer($queue); +$consumer->addFlag(AmqpConsumer::FLAG_NOACK); + +echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; + +while (true) { + if ($message = $consumer->receive()) { + echo ' [x] '.$message->getRoutingKey().':'.$message->getBody()."\n"; + } +} + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/rpc_client.php b/pkg/amqp-lib/tutorial/rpc_client.php new file mode 100644 index 000000000..9c34510dd --- /dev/null +++ b/pkg/amqp-lib/tutorial/rpc_client.php @@ -0,0 +1,56 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +class FibonacciRpcClient +{ + /** @var \Interop\Amqp\AmqpContext */ + private $context; + + /** @var \Interop\Amqp\AmqpQueue */ + private $callback_queue; + + public function __construct(array $config) + { + $this->context = (new AmqpConnectionFactory($config))->createContext(); + $this->callback_queue = $this->context->createTemporaryQueue(); + } + + public function call($n) + { + $corr_id = uniqid(); + + $message = $this->context->createMessage((string) $n); + $message->setCorrelationId($corr_id); + $message->setReplyTo($this->callback_queue->getQueueName()); + + $this->context->createProducer()->send( + $this->context->createQueue('rpc_queue'), + $message + ); + + $consumer = $this->context->createConsumer($this->callback_queue); + + while (true) { + if ($message = $consumer->receive()) { + if ($message->getCorrelationId() == $corr_id) { + return (int) ($message->getBody()); + } + } + } + } +} + +$fibonacci_rpc = new FibonacciRpcClient($config); +$response = $fibonacci_rpc->call(30); +echo ' [.] Got ', $response, "\n"; diff --git a/pkg/amqp-lib/tutorial/rpc_server.php b/pkg/amqp-lib/tutorial/rpc_server.php new file mode 100644 index 000000000..3ad25fbe2 --- /dev/null +++ b/pkg/amqp-lib/tutorial/rpc_server.php @@ -0,0 +1,52 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +function fib($n) +{ + if ($n == 0) { + return 0; + } + if ($n == 1) { + return 1; + } + return fib($n - 1) + fib($n - 2); +} + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); +$context->setQos(0, 1, false); + +$rpc_queue = $context->createQueue('rpc_queue'); +$context->declareQueue($rpc_queue); + +$consumer = $context->createConsumer($rpc_queue); + +echo " [x] Awaiting RPC requests\n"; + +while (true) { + if ($req = $consumer->receive()) { + $n = (int) ($req->getBody()); + echo ' [.] fib(', $n, ")\n"; + + $msg = $context->createMessage((string) fib($n)); + $msg->setCorrelationId($req->getCorrelationId()); + + $reply_queue = $context->createQueue($req->getReplyTo()); + $context->createProducer()->send($reply_queue, $msg); + + $consumer->acknowledge($req); + } +} + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/send.php b/pkg/amqp-lib/tutorial/send.php new file mode 100644 index 000000000..5f1d89b62 --- /dev/null +++ b/pkg/amqp-lib/tutorial/send.php @@ -0,0 +1,26 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); + +$queue = $context->createQueue('hello'); +$context->declareQueue($queue); + +$message = $context->createMessage('Hello World!'); + +$context->createProducer()->send($queue, $message); + +echo " [x] Sent 'Hello World!'\n"; + +$context->close(); diff --git a/pkg/amqp-lib/tutorial/worker.php b/pkg/amqp-lib/tutorial/worker.php new file mode 100644 index 000000000..3f908b6a6 --- /dev/null +++ b/pkg/amqp-lib/tutorial/worker.php @@ -0,0 +1,38 @@ + 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'receive_method' => 'basic_consume', +]; + +$connection = new AmqpConnectionFactory($config); +$context = $connection->createContext(); +$context->setQos(0, 1, false); + +$queue = $context->createQueue('task_queue'); +$queue->addFlag(AmqpQueue::FLAG_DURABLE); + +$context->declareQueue($queue); + +$consumer = $context->createConsumer($queue); + +echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; + +while (true) { + if ($message = $consumer->receive()) { + echo ' [x] Received ', $message->getBody(), "\n"; + sleep(substr_count($message->getBody(), '.')); + echo ' [x] Done', "\n"; + $consumer->acknowledge($message); + } +} + +$context->close(); From 1186af16251ebd613e88de264c3ef6cc1b6334c5 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 3 Aug 2017 11:19:52 +0300 Subject: [PATCH 2/2] cs fix --- pkg/amqp-lib/tutorial/rpc_server.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/amqp-lib/tutorial/rpc_server.php b/pkg/amqp-lib/tutorial/rpc_server.php index 3ad25fbe2..9e848dce0 100644 --- a/pkg/amqp-lib/tutorial/rpc_server.php +++ b/pkg/amqp-lib/tutorial/rpc_server.php @@ -17,9 +17,11 @@ function fib($n) if ($n == 0) { return 0; } + if ($n == 1) { return 1; } + return fib($n - 1) + fib($n - 2); }