Skip to content

Commit 1ba3d19

Browse files
authored
Merge pull request #221 from php-enqueue/amqp-use-same-qos-options-in-all-transports2
[BC break][amqp] Use same qos options across all all AMQP transports
2 parents 74aec48 + 5ad9c3e commit 1ba3d19

20 files changed

+255
-55
lines changed

Diff for: docs/bundle/cli_commands.md

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Options:
2929
--setup-broker Creates queues, topics, exchanges, binding etc on broker side.
3030
--idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received.
3131
--receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message.
32+
--skip[=SKIP] Queues to skip consumption of messages from (multiple values allowed)
3233
-h, --help Display this help message
3334
-q, --quiet Do not output any message
3435
-V, --version Display this application version

Diff for: pkg/amqp-bunny/AmqpConnectionFactory.php

+22-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,23 @@ public function __construct($config = 'amqp:')
5858
throw new \LogicException('The config must be either an array of options, a DSN string or null');
5959
}
6060

61-
$this->config = array_replace($this->defaultConfig(), $config);
61+
$config = array_replace($this->defaultConfig(), $config);
62+
63+
$config = array_replace($this->defaultConfig(), $config);
64+
if (array_key_exists('qos_global', $config)) {
65+
$config['qos_global'] = (bool) $config['qos_global'];
66+
}
67+
if (array_key_exists('qos_prefetch_count', $config)) {
68+
$config['qos_prefetch_count'] = (int) $config['qos_prefetch_count'];
69+
}
70+
if (array_key_exists('qos_prefetch_size', $config)) {
71+
$config['qos_prefetch_size'] = (int) $config['qos_prefetch_size'];
72+
}
73+
if (array_key_exists('lazy', $config)) {
74+
$config['lazy'] = (bool) $config['lazy'];
75+
}
76+
77+
$this->config = $config;
6278

6379
$supportedMethods = ['basic_get', 'basic_consume'];
6480
if (false == in_array($this->config['receive_method'], $supportedMethods, true)) {
@@ -77,7 +93,10 @@ public function createContext()
7793
{
7894
if ($this->config['lazy']) {
7995
$context = new AmqpContext(function () {
80-
return $this->establishConnection()->channel();
96+
$channel = $this->establishConnection()->channel();
97+
$channel->qos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
98+
99+
return $channel;
81100
}, $this->config);
82101
$context->setDelayStrategy($this->delayStrategy);
83102

@@ -86,6 +105,7 @@ public function createContext()
86105

87106
$context = new AmqpContext($this->establishConnection()->channel(), $this->config);
88107
$context->setDelayStrategy($this->delayStrategy);
108+
$context->setQos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
89109

90110
return $context;
91111
}

Diff for: pkg/amqp-bunny/Tests/Spec/AmqpPreFetchCountTest.php

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\PreFetchCountSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpPreFetchCountTest extends PreFetchCountSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

Diff for: pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php

+10-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33
namespace Enqueue\AmqpBunny\Tests\Spec;
44

5-
use Enqueue\AmqpLib\AmqpConnectionFactory;
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
66
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
7+
use Interop\Amqp\AmqpContext;
78
use Interop\Queue\PsrContext;
89
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;
910

@@ -12,6 +13,11 @@
1213
*/
1314
class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec
1415
{
16+
public function test()
17+
{
18+
$this->markTestIncomplete();
19+
}
20+
1521
/**
1622
* {@inheritdoc}
1723
*/
@@ -25,12 +31,15 @@ protected function createContext()
2531

2632
/**
2733
* {@inheritdoc}
34+
*
35+
* @param AmqpContext $context
2836
*/
2937
protected function createQueue(PsrContext $context, $queueName)
3038
{
3139
$queue = parent::createQueue($context, $queueName);
3240

3341
$context->declareQueue($queue);
42+
$context->purgeQueue($queue);
3443

3544
return $queue;
3645
}

Diff for: pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\AmqpBunny\Tests\Spec;
44

5-
use Enqueue\AmqpLib\AmqpConnectionFactory;
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
66
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
77
use Interop\Queue\PsrContext;
88
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\SendAndReceiveTimestampAsIntegerSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpSendAndReceiveTimestampAsIntengerTest extends SendAndReceiveTimestampAsIntegerSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

Diff for: pkg/amqp-bunny/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"enqueue/test": "^0.8@dev",
1717
"enqueue/enqueue": "^0.8@dev",
1818
"enqueue/null": "^0.8@dev",
19-
"queue-interop/queue-spec": "^0.5.1@dev",
19+
"queue-interop/queue-spec": "^0.5.2@dev",
2020
"symfony/dependency-injection": "^2.8|^3",
2121
"symfony/config": "^2.8|^3"
2222
},

Diff for: pkg/amqp-ext/AmqpConnectionFactory.php

+29-16
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
3434
* 'connect_timeout' => 'Connection timeout. Note: 0 or greater seconds. May be fractional.',
3535
* 'persisted' => 'bool, Whether it use single persisted connection or open a new one for every context',
3636
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
37-
* 'pre_fetch_count' => 'Controls how many messages could be prefetched',
38-
* 'pre_fetch_size' => 'Controls how many messages could be prefetched',
37+
* 'qos_prefetch_size' => 'The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"',
38+
* 'qos_prefetch_count' => 'Specifies a prefetch window in terms of whole messages.',
39+
* 'qos_global' => 'If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.',
3940
* 'receive_method' => 'Could be either basic_get or basic_consume',
4041
* ]
4142
*
@@ -72,7 +73,7 @@ public function __construct($config = 'amqp:')
7273
}
7374

7475
if ('basic_consume' == $this->config['receive_method']) {
75-
if (false == (version_compare(phpversion('amqp'), '1.9.1', '>=') || phpversion('amqp') == '1.9.1-dev')) {
76+
if (false == (version_compare(phpversion('amqp'), '1.9.1', '>=') || '1.9.1-dev' == phpversion('amqp'))) {
7677
// @see https://github.com/php-enqueue/enqueue-dev/issues/110 and https://github.com/pdezwart/php-amqp/issues/281
7778
throw new \LogicException('The "basic_consume" method does not work on amqp extension prior 1.9.1 version.');
7879
}
@@ -88,7 +89,10 @@ public function createContext()
8889
{
8990
if ($this->config['lazy']) {
9091
$context = new AmqpContext(function () {
91-
return $this->createExtContext($this->establishConnection());
92+
$extContext = $this->createExtContext($this->establishConnection());
93+
$extContext->qos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count']);
94+
95+
return $extContext;
9296
}, $this->config['receive_method']);
9397
$context->setDelayStrategy($this->delayStrategy);
9498

@@ -97,6 +101,7 @@ public function createContext()
97101

98102
$context = new AmqpContext($this->createExtContext($this->establishConnection()), $this->config['receive_method']);
99103
$context->setDelayStrategy($this->delayStrategy);
104+
$context->setQos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
100105

101106
return $context;
102107
}
@@ -108,16 +113,7 @@ public function createContext()
108113
*/
109114
private function createExtContext(\AMQPConnection $extConnection)
110115
{
111-
$channel = new \AMQPChannel($extConnection);
112-
if (false == empty($this->config['pre_fetch_count'])) {
113-
$channel->setPrefetchCount((int) $this->config['pre_fetch_count']);
114-
}
115-
116-
if (false == empty($this->config['pre_fetch_size'])) {
117-
$channel->setPrefetchSize((int) $this->config['pre_fetch_size']);
118-
}
119-
120-
return $channel;
116+
return new \AMQPChannel($extConnection);
121117
}
122118

123119
/**
@@ -183,6 +179,22 @@ private function parseDsn($dsn)
183179
return urldecode($value);
184180
}, $config);
185181

182+
if (array_key_exists('qos_global', $config)) {
183+
$config['qos_global'] = (bool) $config['qos_global'];
184+
}
185+
if (array_key_exists('qos_prefetch_count', $config)) {
186+
$config['qos_prefetch_count'] = (int) $config['qos_prefetch_count'];
187+
}
188+
if (array_key_exists('qos_prefetch_size', $config)) {
189+
$config['qos_prefetch_size'] = (int) $config['qos_prefetch_size'];
190+
}
191+
if (array_key_exists('lazy', $config)) {
192+
$config['lazy'] = (bool) $config['lazy'];
193+
}
194+
if (array_key_exists('persisted', $config)) {
195+
$config['persisted'] = (bool) $config['persisted'];
196+
}
197+
186198
return $config;
187199
}
188200

@@ -202,8 +214,9 @@ private function defaultConfig()
202214
'connect_timeout' => null,
203215
'persisted' => false,
204216
'lazy' => true,
205-
'pre_fetch_count' => null,
206-
'pre_fetch_size' => null,
217+
'qos_prefetch_size' => 0,
218+
'qos_prefetch_count' => 1,
219+
'qos_global' => false,
207220
'receive_method' => 'basic_get',
208221
];
209222
}

Diff for: pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php

+35-24
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ public static function provideConfigs()
7373
'connect_timeout' => null,
7474
'persisted' => false,
7575
'lazy' => true,
76-
'pre_fetch_count' => null,
77-
'pre_fetch_size' => null,
76+
'qos_prefetch_size' => 0,
77+
'qos_prefetch_count' => 1,
78+
'qos_global' => false,
7879
'receive_method' => 'basic_get',
7980
],
8081
];
@@ -94,8 +95,9 @@ public static function provideConfigs()
9495
'connect_timeout' => null,
9596
'persisted' => false,
9697
'lazy' => true,
97-
'pre_fetch_count' => null,
98-
'pre_fetch_size' => null,
98+
'qos_prefetch_size' => 0,
99+
'qos_prefetch_count' => 1,
100+
'qos_global' => false,
99101
'receive_method' => 'basic_get',
100102
],
101103
];
@@ -113,8 +115,9 @@ public static function provideConfigs()
113115
'connect_timeout' => null,
114116
'persisted' => false,
115117
'lazy' => true,
116-
'pre_fetch_count' => null,
117-
'pre_fetch_size' => null,
118+
'qos_prefetch_size' => 0,
119+
'qos_prefetch_count' => 1,
120+
'qos_global' => false,
118121
'receive_method' => 'basic_get',
119122
],
120123
];
@@ -132,8 +135,9 @@ public static function provideConfigs()
132135
'connect_timeout' => null,
133136
'persisted' => false,
134137
'lazy' => true,
135-
'pre_fetch_count' => null,
136-
'pre_fetch_size' => null,
138+
'qos_prefetch_size' => 0,
139+
'qos_prefetch_count' => 1,
140+
'qos_global' => false,
137141
'receive_method' => 'basic_get',
138142
],
139143
];
@@ -151,8 +155,9 @@ public static function provideConfigs()
151155
'connect_timeout' => null,
152156
'persisted' => false,
153157
'lazy' => true,
154-
'pre_fetch_count' => null,
155-
'pre_fetch_size' => null,
158+
'qos_prefetch_size' => 0,
159+
'qos_prefetch_count' => 1,
160+
'qos_global' => false,
156161
'receive_method' => 'basic_get',
157162
],
158163
];
@@ -170,8 +175,9 @@ public static function provideConfigs()
170175
'connect_timeout' => null,
171176
'persisted' => false,
172177
'lazy' => true,
173-
'pre_fetch_count' => null,
174-
'pre_fetch_size' => null,
178+
'qos_prefetch_size' => 0,
179+
'qos_prefetch_count' => 1,
180+
'qos_global' => false,
175181
'receive_method' => 'basic_get',
176182
],
177183
];
@@ -189,8 +195,9 @@ public static function provideConfigs()
189195
'connect_timeout' => '2',
190196
'persisted' => false,
191197
'lazy' => '',
192-
'pre_fetch_count' => null,
193-
'pre_fetch_size' => null,
198+
'qos_prefetch_size' => 0,
199+
'qos_prefetch_count' => 1,
200+
'qos_global' => false,
194201
'receive_method' => 'basic_get',
195202
],
196203
];
@@ -208,8 +215,9 @@ public static function provideConfigs()
208215
'connect_timeout' => null,
209216
'persisted' => false,
210217
'lazy' => true,
211-
'pre_fetch_count' => null,
212-
'pre_fetch_size' => null,
218+
'qos_prefetch_size' => 0,
219+
'qos_prefetch_count' => 1,
220+
'qos_global' => false,
213221
'receive_method' => 'basic_get',
214222
],
215223
];
@@ -227,14 +235,15 @@ public static function provideConfigs()
227235
'connect_timeout' => null,
228236
'persisted' => false,
229237
'lazy' => false,
230-
'pre_fetch_count' => null,
231-
'pre_fetch_size' => null,
238+
'qos_prefetch_size' => 0,
239+
'qos_prefetch_count' => 1,
240+
'qos_global' => false,
232241
'receive_method' => 'basic_get',
233242
],
234243
];
235244

236245
yield [
237-
['pre_fetch_count' => 123, 'pre_fetch_size' => 321],
246+
['qos_prefetch_count' => 123, 'qos_prefetch_size' => 321],
238247
[
239248
'host' => 'localhost',
240249
'port' => 5672,
@@ -246,14 +255,15 @@ public static function provideConfigs()
246255
'connect_timeout' => null,
247256
'persisted' => false,
248257
'lazy' => true,
249-
'pre_fetch_count' => 123,
250-
'pre_fetch_size' => 321,
258+
'qos_prefetch_count' => 123,
259+
'qos_prefetch_size' => 321,
260+
'qos_global' => false,
251261
'receive_method' => 'basic_get',
252262
],
253263
];
254264

255265
yield [
256-
'amqp://user:pass@host:10000/vhost?pre_fetch_count=123&pre_fetch_size=321',
266+
'amqp://user:pass@host:10000/vhost?qos_prefetch_count=123&qos_prefetch_size=321&qos_global=1',
257267
[
258268
'host' => 'host',
259269
'port' => '10000',
@@ -265,8 +275,9 @@ public static function provideConfigs()
265275
'connect_timeout' => null,
266276
'persisted' => false,
267277
'lazy' => true,
268-
'pre_fetch_count' => 123,
269-
'pre_fetch_size' => 321,
278+
'qos_prefetch_size' => 321,
279+
'qos_prefetch_count' => 123,
280+
'qos_global' => true,
270281
'receive_method' => 'basic_get',
271282
],
272283
];

0 commit comments

Comments
 (0)