Skip to content

Commit a92e634

Browse files
committed
Problem: cannot monitor state of queues at runtime
Solution: add API and ZMQ_EVENT_PIPES_STATS event which generates 2 values, one for the egress and one for the ingress pipes respectively. Refactor the events code to be able to send multiple values.
1 parent cb73745 commit a92e634

13 files changed

+345
-9
lines changed

doc/zmq_socket_monitor_versioned.txt

+28-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ zmq_socket_monitor_versioned - monitor socket events
1111
SYNOPSIS
1212
--------
1313
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
14+
*void zmq_socket_monitor_pipes_stats (void '*socket');*
1415

1516

1617
DESCRIPTION
@@ -43,16 +44,21 @@ event number.
4344

4445
Unless it is specified differently, the second frame contains an event
4546
value (64 bits) that provides additional data according to the event number.
46-
Some events might define additional value frames following the second one.
47-
The third and fourth frames contain strings that specifies the affected
48-
connection or endpoint. The third frame contains a string denoting the local
49-
endpoint, while the fourth frame contains a string denoting the remote endpoint.
47+
The third frame contains the number of value frames that will follow it as a 64
48+
bits integer. Each event type might have a different number of values.
49+
The second-to-last and last frames contain strings that specifies the affected
50+
connection or endpoint. The former frame contains a string denoting the local
51+
endpoint, while the latter frame contains a string denoting the remote endpoint.
5052
Either of these may be empty, depending on the event type and whether the
5153
connection uses a bound or connected local endpoint.
5254

5355
Note that the format of the second and further frames, and also the number of
5456
frames, may be different for events added in the future.
5557

58+
The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type
59+
ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket.
60+
NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state.
61+
5662
----
5763
Monitoring events are only generated by some transports: At the moment these
5864
are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
@@ -168,6 +174,24 @@ The ZMTP security mechanism handshake failed due to an authentication failure.
168174
The event value is the status code returned by the ZAP handler (i.e. 300,
169175
400 or 500).
170176

177+
----
178+
179+
Supported events (v2)
180+
----------------
181+
182+
ZMQ_EVENT_PIPE_STATS
183+
~~~~~~~~~~~~~~~~~~~~
184+
This event provides two values, the number of messages in each of the two
185+
queues associated with the returned endpoint (respectively egress and ingress).
186+
This event only triggers after calling the function
187+
_zmq_socket_monitor_pipes_stats()_.
188+
NOTE: this measurement is asynchronous, so by the time the message is received
189+
the internal state might have already changed.
190+
NOTE: when the monitored socket and the monitor are not used in a poll, the
191+
event might not be delivered until an API has been called on the monitored
192+
socket, like zmq_getsockopt for example (the option is irrelevant).
193+
NOTE: in DRAFT state, not yet available in stable releases.
194+
171195

172196

173197
RETURN VALUE

include/zmq.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
726726
const void *routing_id,
727727
size_t routing_id_size);
728728

729+
/* DRAFT Socket monitoring events */
730+
#define ZMQ_EVENT_PIPES_STATS 0x10000
731+
729732
#define ZMQ_CURRENT_EVENT_VERSION 1
730733
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
731734

732735
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
733-
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1
736+
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
734737

735738
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
736739
const char *addr_,
737740
uint64_t events_,
738741
int event_version_);
742+
ZMQ_EXPORT void zmq_socket_monitor_pipes_stats (void *s);
739743

740744
#endif // ZMQ_BUILD_DRAFT_API
741745

src/command.hpp

+20
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include <string>
3434
#include "stdint.hpp"
35+
#include "endpoint.hpp"
3536

3637
namespace zmq
3738
{
@@ -73,6 +74,8 @@ __declspec(align (64))
7374
reap,
7475
reaped,
7576
inproc_connected,
77+
pipe_peer_stats,
78+
pipe_stats_publish,
7679
done
7780
} type;
7881

@@ -186,6 +189,23 @@ __declspec(align (64))
186189
{
187190
} reaped;
188191

192+
// Send application-side pipe count and ask to send monitor event
193+
struct
194+
{
195+
uint64_t queue_count;
196+
zmq::own_t *socket_base;
197+
endpoint_uri_pair_t *endpoint_pair;
198+
} pipe_peer_stats;
199+
200+
// Collate application thread and I/O thread pipe counts and endpoints
201+
// and send as event
202+
struct
203+
{
204+
uint64_t outbound_queue_count;
205+
uint64_t inbound_queue_count;
206+
endpoint_uri_pair_t *endpoint_pair;
207+
} pipe_stats_publish;
208+
189209
// Sent by reaper thread to the term thread when all the sockets
190210
// are successfully deallocated.
191211
struct

src/object.cpp

+56
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_)
107107
process_hiccup (cmd_.args.hiccup.pipe);
108108
break;
109109

110+
case command_t::pipe_peer_stats:
111+
process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
112+
cmd_.args.pipe_peer_stats.socket_base,
113+
cmd_.args.pipe_peer_stats.endpoint_pair);
114+
break;
115+
116+
case command_t::pipe_stats_publish:
117+
process_pipe_stats_publish (
118+
cmd_.args.pipe_stats_publish.outbound_queue_count,
119+
cmd_.args.pipe_stats_publish.inbound_queue_count,
120+
cmd_.args.pipe_stats_publish.endpoint_pair);
121+
break;
122+
110123
case command_t::pipe_term:
111124
process_pipe_term ();
112125
break;
@@ -285,6 +298,35 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
285298
send_command (cmd);
286299
}
287300

301+
void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
302+
uint64_t queue_count_,
303+
own_t *socket_base_,
304+
endpoint_uri_pair_t *endpoint_pair_)
305+
{
306+
command_t cmd;
307+
cmd.destination = destination_;
308+
cmd.type = command_t::pipe_peer_stats;
309+
cmd.args.pipe_peer_stats.queue_count = queue_count_;
310+
cmd.args.pipe_peer_stats.socket_base = socket_base_;
311+
cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
312+
send_command (cmd);
313+
}
314+
315+
void zmq::object_t::send_pipe_stats_publish (
316+
own_t *destination_,
317+
uint64_t outbound_queue_count_,
318+
uint64_t inbound_queue_count_,
319+
endpoint_uri_pair_t *endpoint_pair_)
320+
{
321+
command_t cmd;
322+
cmd.destination = destination_;
323+
cmd.type = command_t::pipe_stats_publish;
324+
cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
325+
cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
326+
cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
327+
send_command (cmd);
328+
}
329+
288330
void zmq::object_t::send_pipe_term (pipe_t *destination_)
289331
{
290332
command_t cmd;
@@ -422,6 +464,20 @@ void zmq::object_t::process_hiccup (void *)
422464
zmq_assert (false);
423465
}
424466

467+
void zmq::object_t::process_pipe_peer_stats (uint64_t,
468+
own_t *,
469+
endpoint_uri_pair_t *)
470+
{
471+
zmq_assert (false);
472+
}
473+
474+
void zmq::object_t::process_pipe_stats_publish (uint64_t,
475+
uint64_t,
476+
endpoint_uri_pair_t *)
477+
{
478+
zmq_assert (false);
479+
}
480+
425481
void zmq::object_t::process_pipe_term ()
426482
{
427483
zmq_assert (false);

src/object.hpp

+16
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include <string>
3434
#include "stdint.hpp"
35+
#include "endpoint.hpp"
3536

3637
namespace zmq
3738
{
@@ -96,6 +97,14 @@ class object_t
9697
void send_activate_read (zmq::pipe_t *destination_);
9798
void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_);
9899
void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
100+
void send_pipe_peer_stats (zmq::pipe_t *destination_,
101+
uint64_t queue_count_,
102+
zmq::own_t *socket_base,
103+
endpoint_uri_pair_t *endpoint_pair_);
104+
void send_pipe_stats_publish (zmq::own_t *destination_,
105+
uint64_t outbound_queue_count_,
106+
uint64_t inbound_queue_count_,
107+
endpoint_uri_pair_t *endpoint_pair_);
99108
void send_pipe_term (zmq::pipe_t *destination_);
100109
void send_pipe_term_ack (zmq::pipe_t *destination_);
101110
void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_);
@@ -117,6 +126,13 @@ class object_t
117126
virtual void process_activate_read ();
118127
virtual void process_activate_write (uint64_t msgs_read_);
119128
virtual void process_hiccup (void *pipe_);
129+
virtual void process_pipe_peer_stats (uint64_t queue_count_,
130+
zmq::own_t *socket_base_,
131+
endpoint_uri_pair_t *endpoint_pair_);
132+
virtual void
133+
process_pipe_stats_publish (uint64_t outbound_queue_count_,
134+
uint64_t inbound_queue_count_,
135+
endpoint_uri_pair_t *endpoint_pair_);
120136
virtual void process_pipe_term ();
121137
virtual void process_pipe_term_ack ();
122138
virtual void process_pipe_hwm (int inhwm_, int outhwm_);

src/pipe.cpp

+16
Original file line numberDiff line numberDiff line change
@@ -563,3 +563,19 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
563563
{
564564
return _endpoint_pair;
565565
}
566+
567+
void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
568+
{
569+
endpoint_uri_pair_t *ep =
570+
new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
571+
send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
572+
ep);
573+
}
574+
575+
void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
576+
own_t *socket_base_,
577+
endpoint_uri_pair_t *endpoint_pair_)
578+
{
579+
send_pipe_stats_publish (socket_base_, queue_count_,
580+
_msgs_written - _peers_msgs_read, endpoint_pair_);
581+
}

src/pipe.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ class pipe_t : public object_t,
145145
void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
146146
const endpoint_uri_pair_t &get_endpoint_pair () const;
147147

148+
void send_stats_to_peer (own_t *socket_base_);
149+
148150
private:
149151
// Type of the underlying lock-free pipe.
150152
typedef ypipe_base_t<msg_t> upipe_t;
@@ -153,6 +155,9 @@ class pipe_t : public object_t,
153155
void process_activate_read ();
154156
void process_activate_write (uint64_t msgs_read_);
155157
void process_hiccup (void *pipe_);
158+
void process_pipe_peer_stats (uint64_t queue_count_,
159+
own_t *socket_base_,
160+
endpoint_uri_pair_t *endpoint_pair_);
156161
void process_pipe_term ();
157162
void process_pipe_term_ack ();
158163
void process_pipe_hwm (int inhwm_, int outhwm_);

src/session_base.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
409409
zmq_assert (!_pipe);
410410
_pipe = pipes[0];
411411

412+
// The endpoints strings are not set on bind, set them here so that
413+
// events can use them.
414+
pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
415+
pipes[1]->set_endpoint_pair (engine_->get_endpoint ());
416+
412417
// Ask socket to plug into the remote end of the pipe.
413418
send_bind (_socket, pipes[1]);
414419
}

src/socket_base.cpp

+26
Original file line numberDiff line numberDiff line change
@@ -1421,6 +1421,32 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
14211421
delete endpoint_;
14221422
}
14231423

1424+
void zmq::socket_base_t::process_pipe_stats_publish (
1425+
uint64_t outbound_queue_count_,
1426+
uint64_t inbound_queue_count_,
1427+
endpoint_uri_pair_t *endpoint_pair_)
1428+
{
1429+
uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
1430+
event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
1431+
delete endpoint_pair_;
1432+
}
1433+
1434+
/*
1435+
* There are 2 pipes per connection, and the inbound one _must_ be queried from
1436+
* the I/O thread. So ask the outbound pipe, in the application thread, to send
1437+
* a message (pipe_peer_stats) to its peer. The message will carry the outbound
1438+
* pipe stats and endpoint, and the reference to the socket object.
1439+
* The inbound pipe on the I/O thread will then add its own stats and endpoint,
1440+
* and write back a message to the socket object (pipe_stats_publish) which
1441+
* will raise an event with the data.
1442+
*/
1443+
void zmq::socket_base_t::query_pipes_stats ()
1444+
{
1445+
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
1446+
_pipes[i]->send_stats_to_peer (this);
1447+
}
1448+
}
1449+
14241450
void zmq::socket_base_t::update_pipe_options (int option_)
14251451
{
14261452
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {

src/socket_base.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ class socket_base_t : public own_t,
157157
virtual int get_peer_state (const void *routing_id_,
158158
size_t routing_id_size_) const;
159159

160+
void query_pipes_stats ();
161+
160162
protected:
161163
socket_base_t (zmq::ctx_t *parent_,
162164
uint32_t tid_,
@@ -278,6 +280,9 @@ class socket_base_t : public own_t,
278280
// Handlers for incoming commands.
279281
void process_stop ();
280282
void process_bind (zmq::pipe_t *pipe_);
283+
void process_pipe_stats_publish (uint64_t outbound_queue_count_,
284+
uint64_t inbound_queue_count_,
285+
endpoint_uri_pair_t *endpoint_pair_);
281286
void process_term (int linger_);
282287
void process_term_endpoint (std::string *endpoint_);
283288

src/zmq.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -1452,3 +1452,10 @@ int zmq_has (const char *capability_)
14521452
// Whatever the application asked for, we don't have
14531453
return false;
14541454
}
1455+
1456+
void zmq_socket_monitor_pipes_stats (void *s_)
1457+
{
1458+
zmq::socket_base_t *s = as_socket_base_t (s_);
1459+
if (s)
1460+
s->query_pipes_stats ();
1461+
}

src/zmq_draft.h

+10
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_,
123123
const void *routing_id_,
124124
size_t routing_id_size_);
125125

126+
/* DRAFT Socket monitoring events */
127+
#define ZMQ_EVENT_PIPES_STATS 0x10000
128+
129+
#define ZMQ_CURRENT_EVENT_VERSION 1
130+
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
131+
132+
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
133+
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
134+
126135
int zmq_socket_monitor_versioned (void *s_,
127136
const char *addr_,
128137
uint64_t events_,
129138
int event_version_);
139+
void zmq_socket_monitor_pipes_stats (void *s_);
130140

131141
#endif // ZMQ_BUILD_DRAFT_API
132142

0 commit comments

Comments
 (0)