4
4
5
5
namespace Enqueue \Gps ;
6
6
7
+ use Enqueue \Dsn \Dsn ;
7
8
use Google \Cloud \PubSub \PubSubClient ;
8
9
use Interop \Queue \PsrConnectionFactory ;
9
10
use Interop \Queue \PsrContext ;
@@ -15,6 +16,11 @@ class GpsConnectionFactory implements PsrConnectionFactory
15
16
*/
16
17
private $ config ;
17
18
19
+ /**
20
+ * @var PubSubClient
21
+ */
22
+ private $ client ;
23
+
18
24
/**
19
25
* @see https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
20
26
* @see \Google\Cloud\PubSub\PubSubClient::__construct()
@@ -24,6 +30,7 @@ class GpsConnectionFactory implements PsrConnectionFactory
24
30
* 'keyFilePath' => The full path to your service account credentials.json file retrieved from the Google Developers Console.
25
31
* 'retries' => Number of retries for a failed request. **Defaults to** `3`.
26
32
* 'scopes' => Scopes to be used for the request.
33
+ * 'emulatorHost' => The endpoint used to emulate communication with GooglePubSub.
27
34
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true'
28
35
* ]
29
36
*
@@ -32,17 +39,31 @@ class GpsConnectionFactory implements PsrConnectionFactory
32
39
* gps:
33
40
* gps:?projectId=projectName
34
41
*
35
- * @param array|string|null $config
42
+ * or instance of Google\Cloud\PubSub\PubSubClient
43
+ *
44
+ * @param array|string|PubSubClient|null $config
36
45
*/
37
46
public function __construct ($ config = 'gps: ' )
38
47
{
39
- if (empty ($ config ) || 'gps: ' === $ config ) {
48
+ if ($ config instanceof PubSubClient) {
49
+ $ this ->client = $ config ;
50
+ $ this ->config = ['lazy ' => false ] + $ this ->defaultConfig ();
51
+
52
+ return ;
53
+ }
54
+
55
+ if (empty ($ config )) {
40
56
$ config = [];
41
57
} elseif (is_string ($ config )) {
42
58
$ config = $ this ->parseDsn ($ config );
43
59
} elseif (is_array ($ config )) {
60
+ if (array_key_exists ('dsn ' , $ config )) {
61
+ $ config = array_replace_recursive ($ config , $ this ->parseDsn ($ config ['dsn ' ]));
62
+
63
+ unset($ config ['dsn ' ]);
64
+ }
44
65
} else {
45
- throw new \LogicException ('The config must be either an array of options, a DSN string or null ' );
66
+ throw new \LogicException (sprintf ( 'The config must be either an array of options, a DSN string, null or instance of %s ' , PubSubClient::class) );
46
67
}
47
68
48
69
$ this ->config = array_replace ($ this ->defaultConfig (), $ config );
@@ -64,22 +85,36 @@ public function createContext(): PsrContext
64
85
65
86
private function parseDsn (string $ dsn ): array
66
87
{
67
- if (false === strpos ($ dsn , 'gps: ' )) {
68
- throw new \LogicException (sprintf ('The given DSN "%s" is not supported. Must start with "gps:". ' , $ dsn ));
69
- }
70
-
71
- $ config = [];
88
+ $ dsn = new Dsn ($ dsn );
72
89
73
- if ($ query = parse_url ($ dsn , PHP_URL_QUERY )) {
74
- parse_str ($ query , $ config );
90
+ if ('gps ' !== $ dsn ->getSchemeProtocol ()) {
91
+ throw new \LogicException (sprintf (
92
+ 'The given scheme protocol "%s" is not supported. It must be "gps" ' ,
93
+ $ dsn ->getSchemeProtocol ()
94
+ ));
75
95
}
76
96
77
- return $ config ;
97
+ $ emulatorHost = $ dsn ->getQueryParameter ('emulatorHost ' );
98
+ $ hasEmulator = $ emulatorHost ? true : null ;
99
+
100
+ return array_filter (array_replace ($ dsn ->getQuery (), [
101
+ 'projectId ' => $ dsn ->getQueryParameter ('projectId ' ),
102
+ 'keyFilePath ' => $ dsn ->getQueryParameter ('keyFilePath ' ),
103
+ 'retries ' => $ dsn ->getInt ('retries ' ),
104
+ 'scopes ' => $ dsn ->getQueryParameter ('scopes ' ),
105
+ 'emulatorHost ' => $ emulatorHost ,
106
+ 'hasEmulator ' => $ hasEmulator ,
107
+ 'lazy ' => $ dsn ->getBool ('lazy ' ),
108
+ ]), function ($ value ) { return null !== $ value ; });
78
109
}
79
110
80
111
private function establishConnection (): PubSubClient
81
112
{
82
- return new PubSubClient ($ this ->config );
113
+ if (false == $ this ->client ) {
114
+ $ this ->client = new PubSubClient ($ this ->config );
115
+ }
116
+
117
+ return $ this ->client ;
83
118
}
84
119
85
120
private function defaultConfig (): array
0 commit comments