Skip to content

Commit 54f60df

Browse files
committed
Add gearman transport.
0 parents  commit 54f60df

19 files changed

+1063
-0
lines changed

.travis.yml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

GearmanConnectionFactory.php

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
namespace Enqueue\Gearman;
4+
5+
use Enqueue\Psr\PsrConnectionFactory;
6+
7+
class GearmanConnectionFactory implements PsrConnectionFactory
8+
{
9+
/**
10+
* @var array
11+
*/
12+
private $config;
13+
14+
/**
15+
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default settings.
16+
*
17+
* [
18+
* 'host' => 'localhost',
19+
* 'port' => 11300
20+
* ]
21+
*
22+
* or
23+
*
24+
* gearman://host:port
25+
*
26+
* @param array|string $config
27+
*/
28+
public function __construct($config = 'gearman://')
29+
{
30+
if (empty($config) || 'gearman://' === $config) {
31+
$config = [];
32+
} elseif (is_string($config)) {
33+
$config = $this->parseDsn($config);
34+
} elseif (is_array($config)) {
35+
} else {
36+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
37+
}
38+
39+
$this->config = array_replace($this->defaultConfig(), $config);
40+
}
41+
42+
/**
43+
* {@inheritdoc}
44+
*
45+
* @return GearmanContext
46+
*/
47+
public function createContext()
48+
{
49+
return new GearmanContext($this->config);
50+
}
51+
52+
/**
53+
* @param string $dsn
54+
*
55+
* @return array
56+
*/
57+
private function parseDsn($dsn)
58+
{
59+
$dsnConfig = parse_url($dsn);
60+
if (false === $dsnConfig) {
61+
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
62+
}
63+
64+
$dsnConfig = array_replace([
65+
'scheme' => null,
66+
'host' => null,
67+
'port' => null,
68+
'user' => null,
69+
'pass' => null,
70+
'path' => null,
71+
'query' => null,
72+
], $dsnConfig);
73+
74+
if ('gearman' !== $dsnConfig['scheme']) {
75+
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "gearman" only.', $dsnConfig['scheme']));
76+
}
77+
78+
return [
79+
'port' => $dsnConfig['port'],
80+
'host' => $dsnConfig['host'],
81+
];
82+
}
83+
84+
/**
85+
* @return array
86+
*/
87+
private function defaultConfig()
88+
{
89+
return [
90+
'host' => \GEARMAN_DEFAULT_TCP_HOST,
91+
'port' => \GEARMAN_DEFAULT_TCP_PORT,
92+
];
93+
}
94+
}

GearmanConsumer.php

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
3+
namespace Enqueue\Gearman;
4+
5+
use Enqueue\Psr\PsrConsumer;
6+
use Enqueue\Psr\PsrMessage;
7+
8+
class GearmanConsumer implements PsrConsumer
9+
{
10+
/**
11+
* @var \GearmanWorker
12+
*/
13+
private $worker;
14+
15+
/**
16+
* @var GearmanDestination
17+
*/
18+
private $destination;
19+
20+
/**
21+
* @param \GearmanWorker $worker
22+
* @param GearmanDestination $destination
23+
*/
24+
public function __construct(\GearmanWorker $worker, GearmanDestination $destination)
25+
{
26+
$this->worker = $worker;
27+
$this->destination = $destination;
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*
33+
* @return GearmanDestination
34+
*/
35+
public function getQueue()
36+
{
37+
return $this->destination;
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*
43+
* @return GearmanMessage
44+
*/
45+
public function receive($timeout = 0)
46+
{
47+
set_error_handler(function ($severity, $message, $file, $line) {
48+
throw new \ErrorException($message, 0, $severity, $file, $line);
49+
});
50+
51+
$this->worker->setTimeout($timeout);
52+
53+
try {
54+
$message = null;
55+
56+
$this->worker->addFunction($this->destination->getName(), function (\GearmanJob $job) use (&$message) {
57+
$message = GearmanMessage::jsonUnserialize($job->workload());
58+
});
59+
60+
while ($this->worker->work());
61+
} finally {
62+
restore_error_handler();
63+
}
64+
65+
return $message;
66+
}
67+
68+
/**
69+
* {@inheritdoc}
70+
*/
71+
public function receiveNoWait()
72+
{
73+
return $this->receive(100);
74+
}
75+
76+
/**
77+
* {@inheritdoc}
78+
*/
79+
public function acknowledge(PsrMessage $message)
80+
{
81+
}
82+
83+
/**
84+
* {@inheritdoc}
85+
*/
86+
public function reject(PsrMessage $message, $requeue = false)
87+
{
88+
}
89+
90+
/**
91+
* @return \GearmanWorker
92+
*/
93+
public function getWorker()
94+
{
95+
return $this->worker;
96+
}
97+
}

GearmanContext.php

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php
2+
3+
namespace Enqueue\Gearman;
4+
5+
use Enqueue\Psr\InvalidDestinationException;
6+
use Enqueue\Psr\PsrContext;
7+
use Enqueue\Psr\PsrDestination;
8+
9+
class GearmanContext implements PsrContext
10+
{
11+
/**
12+
* @var \GearmanClient
13+
*/
14+
private $client;
15+
16+
/**
17+
* @var GearmanConsumer[]
18+
*/
19+
private $consumers;
20+
21+
/**
22+
* @var array
23+
*/
24+
private $config;
25+
26+
/**
27+
* @param array $config
28+
*/
29+
public function __construct(array $config)
30+
{
31+
$this->config = $config;
32+
}
33+
34+
/**
35+
* {@inheritdoc}
36+
*
37+
* @return GearmanMessage
38+
*/
39+
public function createMessage($body = '', array $properties = [], array $headers = [])
40+
{
41+
return new GearmanMessage($body, $properties, $headers);
42+
}
43+
44+
/**
45+
* {@inheritdoc}
46+
*
47+
* @return GearmanDestination
48+
*/
49+
public function createTopic($topicName)
50+
{
51+
return new GearmanDestination($topicName);
52+
}
53+
54+
/**
55+
* {@inheritdoc}
56+
*/
57+
public function createQueue($queueName)
58+
{
59+
return new GearmanDestination($queueName);
60+
}
61+
62+
/**
63+
* {@inheritdoc}
64+
*/
65+
public function createTemporaryQueue()
66+
{
67+
throw new \LogicException('Not implemented');
68+
}
69+
70+
/**
71+
* {@inheritdoc}
72+
*
73+
* @return GearmanProducer
74+
*/
75+
public function createProducer()
76+
{
77+
return new GearmanProducer($this->getClient());
78+
}
79+
80+
/**
81+
* {@inheritdoc}
82+
*
83+
* @param GearmanDestination $destination
84+
*
85+
* @return GearmanConsumer
86+
*/
87+
public function createConsumer(PsrDestination $destination)
88+
{
89+
InvalidDestinationException::assertDestinationInstanceOf($destination, GearmanDestination::class);
90+
91+
$this->consumers[] = $consumer = new GearmanConsumer($this->createWorker(), $destination);
92+
93+
return $consumer;
94+
}
95+
96+
public function close()
97+
{
98+
$this->getClient()->clearCallbacks();
99+
100+
foreach ($this->consumers as $consumer) {
101+
$consumer->getWorker()->unregisterAll();
102+
}
103+
}
104+
105+
/**
106+
* @return \GearmanClient
107+
*/
108+
public function getClient()
109+
{
110+
if (false == $this->client) {
111+
$this->client = new \GearmanClient();
112+
$this->client->addServer($this->config['host'], $this->config['port']);
113+
}
114+
115+
return $this->client;
116+
}
117+
118+
/**
119+
* @return \GearmanWorker
120+
*/
121+
public function createWorker()
122+
{
123+
$worker = new \GearmanWorker();
124+
$worker->addServer($this->config['host'], $this->config['port']);
125+
126+
return $worker;
127+
}
128+
}

GearmanDestination.php

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Enqueue\Gearman;
4+
5+
use Enqueue\Psr\PsrQueue;
6+
use Enqueue\Psr\PsrTopic;
7+
8+
class GearmanDestination implements PsrQueue, PsrTopic
9+
{
10+
/**
11+
* @var string
12+
*/
13+
private $destinationName;
14+
15+
/**
16+
* @param string $destinationName
17+
*/
18+
public function __construct($destinationName)
19+
{
20+
$this->destinationName = $destinationName;
21+
}
22+
23+
/**
24+
* @return string
25+
*/
26+
public function getName()
27+
{
28+
return $this->destinationName;
29+
}
30+
31+
/**
32+
* {@inheritdoc}
33+
*/
34+
public function getQueueName()
35+
{
36+
return $this->getName();
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*/
42+
public function getTopicName()
43+
{
44+
return $this->getName();
45+
}
46+
}

0 commit comments

Comments
 (0)