5
5
use Interop \Queue \InvalidMessageException ;
6
6
use Interop \Queue \PsrConsumer ;
7
7
use Interop \Queue \PsrMessage ;
8
+ use Interop \Queue \PsrQueue ;
8
9
9
10
class FsConsumer implements PsrConsumer
10
11
{
@@ -29,16 +30,13 @@ class FsConsumer implements PsrConsumer
29
30
private $ preFetchedMessages ;
30
31
31
32
/**
32
- * @var int microseconds
33
+ * In milliseconds.
34
+ *
35
+ * @var int
33
36
*/
34
- private $ pollingInterval = 100000 ;
37
+ private $ pollingInterval = 100 ;
35
38
36
- /**
37
- * @param FsContext $context
38
- * @param FsDestination $destination
39
- * @param int $preFetchCount
40
- */
41
- public function __construct (FsContext $ context , FsDestination $ destination , $ preFetchCount )
39
+ public function __construct (FsContext $ context , FsDestination $ destination , int $ preFetchCount )
42
40
{
43
41
$ this ->context = $ context ;
44
42
$ this ->destination = $ destination ;
@@ -49,40 +47,32 @@ public function __construct(FsContext $context, FsDestination $destination, $pre
49
47
50
48
/**
51
49
* Set polling interval in milliseconds.
52
- *
53
- * @param int $msec
54
50
*/
55
- public function setPollingInterval ($ msec )
51
+ public function setPollingInterval (int $ msec ): void
56
52
{
57
- $ this ->pollingInterval = $ msec * 1000 ;
53
+ $ this ->pollingInterval = $ msec ;
58
54
}
59
55
60
56
/**
61
57
* Get polling interval in milliseconds.
62
- *
63
- * @return int
64
58
*/
65
- public function getPollingInterval ()
59
+ public function getPollingInterval (): int
66
60
{
67
- return ( int ) $ this ->pollingInterval / 1000 ;
61
+ return $ this ->pollingInterval ;
68
62
}
69
63
70
64
/**
71
- * {@inheritdoc}
72
- *
73
65
* @return FsDestination
74
66
*/
75
- public function getQueue ()
67
+ public function getQueue (): PsrQueue
76
68
{
77
69
return $ this ->destination ;
78
70
}
79
71
80
72
/**
81
- * {@inheritdoc}
82
- *
83
- * @return FsMessage|null
73
+ * @return FsMessage
84
74
*/
85
- public function receive ($ timeout = 0 )
75
+ public function receive (int $ timeout = 0 ): ? PsrMessage
86
76
{
87
77
$ timeout /= 1000 ;
88
78
$ startAt = microtime (true );
@@ -95,21 +85,21 @@ public function receive($timeout = 0)
95
85
}
96
86
97
87
if ($ timeout && (microtime (true ) - $ startAt ) >= $ timeout ) {
98
- return ;
88
+ return null ;
99
89
}
100
90
101
91
usleep ($ this ->pollingInterval );
102
92
103
93
if ($ timeout && (microtime (true ) - $ startAt ) >= $ timeout ) {
104
- return ;
94
+ return null ;
105
95
}
106
96
}
107
97
}
108
98
109
99
/**
110
- * {@inheritdoc}
100
+ * @return FsMessage
111
101
*/
112
- public function receiveNoWait ()
102
+ public function receiveNoWait (): ? PsrMessage
113
103
{
114
104
if ($ this ->preFetchedMessages ) {
115
105
return array_shift ($ this ->preFetchedMessages );
@@ -140,15 +130,15 @@ public function receiveNoWait()
140
130
$ expireAt = $ fetchedMessage ->getHeader ('x-expire-at ' );
141
131
if ($ expireAt && $ expireAt - microtime (true ) < 0 ) {
142
132
// message has expired, just drop it.
143
- return ;
133
+ return null ;
144
134
}
145
135
146
136
$ this ->preFetchedMessages [] = $ fetchedMessage ;
147
137
} catch (\Exception $ e ) {
148
138
throw new \LogicException (sprintf ("Cannot decode json message '%s' " , $ rawMessage ), null , $ e );
149
139
}
150
140
} else {
151
- return ;
141
+ return null ;
152
142
}
153
143
154
144
--$ count ;
@@ -158,20 +148,16 @@ public function receiveNoWait()
158
148
if ($ this ->preFetchedMessages ) {
159
149
return array_shift ($ this ->preFetchedMessages );
160
150
}
151
+
152
+ return null ;
161
153
}
162
154
163
- /**
164
- * {@inheritdoc}
165
- */
166
- public function acknowledge (PsrMessage $ message )
155
+ public function acknowledge (PsrMessage $ message ): void
167
156
{
168
157
// do nothing. fs transport always works in auto ack mode
169
158
}
170
159
171
- /**
172
- * {@inheritdoc}
173
- */
174
- public function reject (PsrMessage $ message , $ requeue = false )
160
+ public function reject (PsrMessage $ message , bool $ requeue = false ): void
175
161
{
176
162
InvalidMessageException::assertMessageInstanceOf ($ message , FsMessage::class);
177
163
@@ -182,29 +168,20 @@ public function reject(PsrMessage $message, $requeue = false)
182
168
}
183
169
}
184
170
185
- /**
186
- * @return int
187
- */
188
- public function getPreFetchCount ()
171
+ public function getPreFetchCount (): int
189
172
{
190
173
return $ this ->preFetchCount ;
191
174
}
192
175
193
- /**
194
- * @param int $preFetchCount
195
- */
196
- public function setPreFetchCount ($ preFetchCount )
176
+ public function setPreFetchCount (int $ preFetchCount ): void
197
177
{
198
178
$ this ->preFetchCount = $ preFetchCount ;
199
179
}
200
180
201
181
/**
202
182
* @param resource $file
203
- * @param int $frameNumber
204
- *
205
- * @return string
206
183
*/
207
- private function readFrame ($ file , $ frameNumber )
184
+ private function readFrame ($ file , int $ frameNumber ): string
208
185
{
209
186
$ frameSize = 64 ;
210
187
$ offset = $ frameNumber * $ frameSize ;
0 commit comments