Skip to content

Commit bdfb50c

Browse files
committed
Problem: cannot monitor state of queues at runtime
Solution: add API and event
1 parent dcc80f7 commit bdfb50c

15 files changed

+300
-2
lines changed

doc/Makefile.am

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
1111
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \
1212
zmq_getsockopt.3 zmq_setsockopt.3 \
1313
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
14-
zmq_socket_monitor_versioned.3 \
14+
zmq_socket_monitor_versioned.3 zmq_socket_monitor_pipes_stats.3 \
1515
zmq_errno.3 zmq_strerror.3 zmq_version.3 \
1616
zmq_sendmsg.3 zmq_recvmsg.3 \
1717
zmq_proxy.3 zmq_proxy_steerable.3 \
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
zmq_socket_monitor_versioned.txt

doc/zmq_socket_monitor_versioned.txt

+14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ zmq_socket_monitor_versioned - monitor socket events
1111
SYNOPSIS
1212
--------
1313
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
14+
*void zmq_socket_monitor_pipes_stats (void '*socket');*
1415

1516

1617
DESCRIPTION
@@ -52,6 +53,10 @@ connection uses a bound or connected local endpoint.
5253
Note that the format of the second and further frames, and also the number of
5354
frames, may be different for events added in the future.
5455

56+
The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type
57+
ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket.
58+
NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state.
59+
5560
----
5661
Monitoring events are only generated by some transports: At the moment these
5762
are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
@@ -167,6 +172,15 @@ The ZMTP security mechanism handshake failed due to an authentication failure.
167172
The event value is the status code returned by the ZAP handler (i.e. 300,
168173
400 or 500).
169174

175+
ZMQ_EVENT_PIPE_STATS
176+
~~~~~~~~~~~~~~~~~~~~
177+
The event value is the number of messages in the queue associated with the
178+
returned endpoint. This event only triggers after calling the function
179+
_zmq_socket_monitor_pipes_stats()_.
180+
NOTE: this measurement is asynchronous, so by the time the message is received
181+
the internal state might have already changed.
182+
NOTE: in DRAFT state, not yet available in stable releases.
183+
170184

171185

172186
RETURN VALUE

include/zmq.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
726726
const void *routing_id,
727727
size_t routing_id_size);
728728

729+
/* DRAFT Socket monitoring events */
730+
#define ZMQ_EVENT_PIPES_STATS 0x8000
731+
729732
#define ZMQ_CURRENT_EVENT_VERSION 1
730733
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
731734

732735
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
733-
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1
736+
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
734737

735738
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
736739
const char *addr_,
737740
uint64_t events_,
738741
int event_version_);
742+
ZMQ_EXPORT void zmq_socket_monitor_pipes_stats (void *s);
739743

740744
#endif // ZMQ_BUILD_DRAFT_API
741745

src/command.hpp

+20
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include <string>
3434
#include "stdint.hpp"
35+
#include "endpoint.hpp"
3536

3637
namespace zmq
3738
{
@@ -73,6 +74,8 @@ __declspec(align (64))
7374
reap,
7475
reaped,
7576
inproc_connected,
77+
pipe_peer_stats,
78+
pipe_stats_publish,
7679
done
7780
} type;
7881

@@ -186,6 +189,23 @@ __declspec(align (64))
186189
{
187190
} reaped;
188191

192+
// Send application-side pipe count and ask to send monitor event
193+
struct
194+
{
195+
uint64_t queue_count;
196+
zmq::own_t *socket_base;
197+
endpoint_uri_pair_t *endpoint_pair;
198+
} pipe_peer_stats;
199+
200+
// Collate application thread and I/O thread pipe counts and endpoints
201+
// and send as event
202+
struct
203+
{
204+
uint64_t outbound_queue_count;
205+
uint64_t inbound_queue_count;
206+
endpoint_uri_pair_t *endpoint_pair;
207+
} pipe_stats_publish;
208+
189209
// Sent by reaper thread to the term thread when all the sockets
190210
// are successfully deallocated.
191211
struct

src/object.cpp

+53
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_)
107107
process_hiccup (cmd_.args.hiccup.pipe);
108108
break;
109109

110+
case command_t::pipe_peer_stats:
111+
process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
112+
cmd_.args.pipe_peer_stats.socket_base,
113+
cmd_.args.pipe_peer_stats.endpoint_pair);
114+
break;
115+
116+
case command_t::pipe_stats_publish:
117+
process_pipe_stats_publish (
118+
cmd_.args.pipe_stats_publish.outbound_queue_count,
119+
cmd_.args.pipe_stats_publish.inbound_queue_count,
120+
cmd_.args.pipe_stats_publish.endpoint_pair);
121+
break;
122+
110123
case command_t::pipe_term:
111124
process_pipe_term ();
112125
break;
@@ -285,6 +298,34 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
285298
send_command (cmd);
286299
}
287300

301+
void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
302+
uint64_t queue_count_,
303+
own_t *socket_base_,
304+
endpoint_uri_pair_t *endpoint_pair_)
305+
{
306+
command_t cmd;
307+
cmd.destination = destination_;
308+
cmd.type = command_t::pipe_peer_stats;
309+
cmd.args.pipe_peer_stats.queue_count = queue_count_;
310+
cmd.args.pipe_peer_stats.socket_base = socket_base_;
311+
cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
312+
send_command (cmd);
313+
}
314+
315+
void zmq::object_t::send_pipe_stats_publish (own_t *destination_,
316+
uint64_t outbound_queue_count_,
317+
uint64_t inbound_queue_count_,
318+
endpoint_uri_pair_t *endpoint_pair_)
319+
{
320+
command_t cmd;
321+
cmd.destination = destination_;
322+
cmd.type = command_t::pipe_stats_publish;
323+
cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
324+
cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
325+
cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
326+
send_command (cmd);
327+
}
328+
288329
void zmq::object_t::send_pipe_term (pipe_t *destination_)
289330
{
290331
command_t cmd;
@@ -422,6 +463,18 @@ void zmq::object_t::process_hiccup (void *)
422463
zmq_assert (false);
423464
}
424465

466+
void zmq::object_t::process_pipe_peer_stats (uint64_t, own_t *, endpoint_uri_pair_t *)
467+
{
468+
zmq_assert (false);
469+
}
470+
471+
void zmq::object_t::process_pipe_stats_publish (uint64_t,
472+
uint64_t,
473+
endpoint_uri_pair_t *)
474+
{
475+
zmq_assert (false);
476+
}
477+
425478
void zmq::object_t::process_pipe_term ()
426479
{
427480
zmq_assert (false);

src/object.hpp

+15
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include <string>
3434
#include "stdint.hpp"
35+
#include "endpoint.hpp"
3536

3637
namespace zmq
3738
{
@@ -96,6 +97,14 @@ class object_t
9697
void send_activate_read (zmq::pipe_t *destination_);
9798
void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_);
9899
void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
100+
void send_pipe_peer_stats (zmq::pipe_t *destination_,
101+
uint64_t queue_count_,
102+
zmq::own_t *socket_base,
103+
endpoint_uri_pair_t *endpoint_pair_);
104+
void send_pipe_stats_publish (zmq::own_t *destination_,
105+
uint64_t outbound_queue_count_,
106+
uint64_t inbound_queue_count_,
107+
endpoint_uri_pair_t *endpoint_pair_);
99108
void send_pipe_term (zmq::pipe_t *destination_);
100109
void send_pipe_term_ack (zmq::pipe_t *destination_);
101110
void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_);
@@ -117,6 +126,12 @@ class object_t
117126
virtual void process_activate_read ();
118127
virtual void process_activate_write (uint64_t msgs_read_);
119128
virtual void process_hiccup (void *pipe_);
129+
virtual void process_pipe_peer_stats (uint64_t queue_count_,
130+
zmq::own_t *socket_base_,
131+
endpoint_uri_pair_t *endpoint_pair_);
132+
virtual void process_pipe_stats_publish (uint64_t outbound_queue_count_,
133+
uint64_t inbound_queue_count_,
134+
endpoint_uri_pair_t *endpoint_pair_);
120135
virtual void process_pipe_term ();
121136
virtual void process_pipe_term_ack ();
122137
virtual void process_pipe_hwm (int inhwm_, int outhwm_);

src/pipe.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -563,3 +563,18 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
563563
{
564564
return _endpoint_pair;
565565
}
566+
567+
void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
568+
{
569+
endpoint_uri_pair_t *ep = new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
570+
send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
571+
ep);
572+
}
573+
574+
void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
575+
own_t *socket_base_,
576+
endpoint_uri_pair_t *endpoint_pair_)
577+
{
578+
send_pipe_stats_publish (socket_base_, queue_count_,
579+
_msgs_written - _peers_msgs_read, endpoint_pair_);
580+
}

src/pipe.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ class pipe_t : public object_t,
145145
void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
146146
const endpoint_uri_pair_t &get_endpoint_pair () const;
147147

148+
void send_stats_to_peer (own_t *socket_base_);
149+
148150
private:
149151
// Type of the underlying lock-free pipe.
150152
typedef ypipe_base_t<msg_t> upipe_t;
@@ -153,6 +155,9 @@ class pipe_t : public object_t,
153155
void process_activate_read ();
154156
void process_activate_write (uint64_t msgs_read_);
155157
void process_hiccup (void *pipe_);
158+
void process_pipe_peer_stats (uint64_t queue_count_,
159+
own_t *socket_base_,
160+
endpoint_uri_pair_t *endpoint_pair_);
156161
void process_pipe_term ();
157162
void process_pipe_term_ack ();
158163
void process_pipe_hwm (int inhwm_, int outhwm_);

src/session_base.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
409409
zmq_assert (!_pipe);
410410
_pipe = pipes[0];
411411

412+
// The endpoints strings are not set on bind, set them here so that
413+
// events can use them.
414+
pipes[0]->set_endpoint_pair (engine_->get_endpoint());
415+
pipes[1]->set_endpoint_pair (engine_->get_endpoint());
416+
412417
// Ask socket to plug into the remote end of the pipe.
413418
send_bind (_socket, pipes[1]);
414419
}

src/socket_base.cpp

+26
Original file line numberDiff line numberDiff line change
@@ -1421,6 +1421,32 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
14211421
delete endpoint_;
14221422
}
14231423

1424+
void zmq::socket_base_t::process_pipe_stats_publish (
1425+
uint64_t outbound_queue_count_,
1426+
uint64_t inbound_queue_count_,
1427+
endpoint_uri_pair_t *endpoint_pair_)
1428+
{
1429+
event (*endpoint_pair_, outbound_queue_count_, ZMQ_EVENT_PIPES_STATS);
1430+
event (*endpoint_pair_, inbound_queue_count_, ZMQ_EVENT_PIPES_STATS);
1431+
delete endpoint_pair_;
1432+
}
1433+
1434+
/*
1435+
* There are 2 pipes per connection, and the inbound one _must_ be queried from
1436+
* the I/O thread. So ask the outbound pipe, in the application thread, to send
1437+
* a message (pipe_peer_stats) to its peer. The message will carry the outbound
1438+
* pipe stats and endpoint, and the reference to the socket object.
1439+
* The inbound pipe on the I/O thread will then add its own stats and endpoint,
1440+
* and write back a message to the socket object (pipe_stats_publish) which
1441+
* will raise an event with the data.
1442+
*/
1443+
void zmq::socket_base_t::query_pipes_stats ()
1444+
{
1445+
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
1446+
_pipes[i]->send_stats_to_peer (this);
1447+
}
1448+
}
1449+
14241450
void zmq::socket_base_t::update_pipe_options (int option_)
14251451
{
14261452
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {

src/socket_base.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ class socket_base_t : public own_t,
157157
virtual int get_peer_state (const void *routing_id_,
158158
size_t routing_id_size_) const;
159159

160+
void query_pipes_stats ();
161+
160162
protected:
161163
socket_base_t (zmq::ctx_t *parent_,
162164
uint32_t tid_,
@@ -276,6 +278,9 @@ class socket_base_t : public own_t,
276278
// Handlers for incoming commands.
277279
void process_stop ();
278280
void process_bind (zmq::pipe_t *pipe_);
281+
void process_pipe_stats_publish (uint64_t outbound_queue_count_,
282+
uint64_t inbound_queue_count_,
283+
endpoint_uri_pair_t *endpoint_pair_);
279284
void process_term (int linger_);
280285
void process_term_endpoint (std::string *endpoint_);
281286

src/zmq.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -1452,3 +1452,10 @@ int zmq_has (const char *capability_)
14521452
// Whatever the application asked for, we don't have
14531453
return false;
14541454
}
1455+
1456+
void zmq_socket_monitor_pipes_stats (void *s_)
1457+
{
1458+
zmq::socket_base_t *s = as_socket_base_t (s_);
1459+
if (s)
1460+
s->query_pipes_stats ();
1461+
}

src/zmq_draft.h

+10
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_,
123123
const void *routing_id_,
124124
size_t routing_id_size_);
125125

126+
/* DRAFT Socket monitoring events */
127+
#define ZMQ_EVENT_PIPES_STATS 0x8000
128+
129+
#define ZMQ_CURRENT_EVENT_VERSION 1
130+
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
131+
132+
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
133+
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
134+
126135
int zmq_socket_monitor_versioned (void *s_,
127136
const char *addr_,
128137
uint64_t events_,
129138
int event_version_);
139+
void zmq_socket_monitor_pipes_stats (void *s_);
130140

131141
#endif // ZMQ_BUILD_DRAFT_API
132142

0 commit comments

Comments
 (0)