Skip to content

Commit e7ee2e5

Browse files
committed
Add Command processor and Command creator
1 parent 952f592 commit e7ee2e5

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

Diff for: pkg/enqueue/Symfony/Creator/JobCommandCreator.php

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\Creator;
4+
5+
use Enqueue\Client\ProducerInterface;
6+
7+
class JobCommandCreator
8+
{
9+
/**
10+
* @var ProducerInterface
11+
*/
12+
protected $producer;
13+
14+
/**
15+
* @var string
16+
*/
17+
protected $env;
18+
19+
/**
20+
* CommandJobCreator constructor.
21+
*
22+
* @param ProducerInterface $producer
23+
* @param string $env
24+
*/
25+
public function __construct(ProducerInterface $producer, $env)
26+
{
27+
$this->producer = $producer;
28+
$this->env = $env;
29+
}
30+
31+
/**
32+
* @param $command
33+
* @param mixed $args
34+
*
35+
* @return \Enqueue\Rpc\Promise|null
36+
*/
37+
public function scheduleCommand($command, $args = [])
38+
{
39+
$argumentString = $this->createArgumentString($args);
40+
41+
return $this->producer->sendCommand('run_command', sprintf('%s %s', $command, $argumentString));
42+
}
43+
44+
/**
45+
* @param array $arguments
46+
*
47+
* @return string
48+
*/
49+
public function createArgumentString(array $arguments)
50+
{
51+
$optionList = [];
52+
53+
foreach ($arguments as $key => $value) {
54+
if (!is_int($key)) {
55+
$optionList[] = sprintf('--%s=%s', $key, $value);
56+
continue;
57+
}
58+
59+
$optionList[] = sprintf('%s', $value);
60+
}
61+
62+
$optionList[] = sprintf('--env=%s', $this->env);
63+
64+
return implode(' ', $optionList);
65+
}
66+
}
+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\Processor;
4+
5+
use Enqueue\Client\CommandSubscriberInterface;
6+
use Enqueue\Consumption\Result;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\PsrMessage;
9+
use Interop\Queue\PsrProcessor;
10+
use Symfony\Component\Process\Exception\ProcessFailedException;
11+
use Symfony\Component\Process\Process;
12+
13+
class RunCommandProcessor implements PsrProcessor, CommandSubscriberInterface
14+
{
15+
/**
16+
* @var string
17+
*/
18+
private $projectDir;
19+
20+
public function __construct($projectDir)
21+
{
22+
$this->projectDir = $projectDir;
23+
}
24+
25+
public function process(PsrMessage $message, PsrContext $context)
26+
{
27+
$commandline = $message->getBody();
28+
29+
$process = new Process('./bin/console '.$commandline, $this->projectDir);
30+
31+
try {
32+
$process->mustRun();
33+
34+
return Result::ACK;
35+
} catch (ProcessFailedException $e) {
36+
return Result::reject(sprintf('The process failed with exception: "%s" in %s at %s', $e->getMessage(), $e->getFile(), $e->getLine()));
37+
}
38+
}
39+
40+
public static function getSubscribedCommand()
41+
{
42+
return 'run_command';
43+
}
44+
}

0 commit comments

Comments
 (0)