1212#include < cppipc/common/object_factory_proxy.hpp>
1313#include < minipsutil/minipsutil.h>
1414#include < export.hpp>
15- #ifdef FAKE_ZOOKEEPER
16- #include < fault/fake_key_value.hpp>
17- #else
18- #include < zookeeper_util/key_value.hpp>
19- #endif
2015
2116namespace cppipc {
2217
@@ -40,19 +35,8 @@ comm_client::comm_client(std::vector<std::string> zkhosts,
4035 const std::string secret_key,
4136 const std::string server_public_key,
4237 bool ops_interruptible):
43- zmq_ctx (zmq_ctx_new()),
44- keyval (zkhosts.empty() ?
45- NULL : // make a keyval only if zkhosts is not empty
46- new graphlab::zookeeper_util::key_value(zkhosts, " cppipc" , name)),
47- object_socket (zmq_ctx, keyval,
48- zkhosts.empty() ? // use the name as the address if zookeeper
49- name : // is not used.
50- "call",
51- std::vector<std::string>(),
52- public_key, secret_key, server_public_key
53- ),
54- subscribesock(zmq_ctx, keyval,
55- boost::bind (&comm_client::subscribe_callback, this , _1)),
38+ object_socket (name, 2 ),
39+ subscribesock (boost::bind(&comm_client::subscribe_callback, this , _1)),
5640 num_tolerable_ping_failures (num_tolerable_ping_failures),
5741 alternate_control_address (alternate_control_address),
5842 alternate_publish_address (alternate_publish_address),
@@ -61,13 +45,8 @@ comm_client::comm_client(std::vector<std::string> zkhosts,
6145 }
6246
6347comm_client::comm_client (std::string name, void * zmq_ctx) :
64- zmq_ctx(zmq_ctx),
65- owns_zmq_ctx(false ),
66- keyval(NULL ),
67- object_socket(zmq_ctx, keyval, name, std::vector<std::string>(),
68- "", "", ""),
69- subscribesock(zmq_ctx, keyval,
70- boost::bind (&comm_client::subscribe_callback, this , _1)),
48+ object_socket (name, 2 ),
49+ subscribesock (boost::bind(&comm_client::subscribe_callback, this , _1)),
7150 endpoint_name (name) {
7251 ASSERT_MSG (boost::starts_with (name, " inproc://" ), " This constructor only supports inproc address" );
7352 bool ops_interruptible = true ;
@@ -80,13 +59,11 @@ void comm_client::init(bool ops_interruptible) {
8059
8160 // connect the subscribesock either to the key "status" (if zookeeper is used),
8261 // or to the alternate address if zookeeper is not used.
83- object_socket.add_to_pollset (&pollset);
84- subscribesock.add_to_pollset (&pollset);
85- pollset.start_poll_thread ();
8662
8763 if (ops_interruptible) {
8864 cancel_handling_enabled = true ;
8965 }
66+ object_socket.set_receive_poller ([=](){this ->poll_server_pid_is_running ();return this ->server_alive ;});
9067}
9168
9269void comm_client::set_server_alive_watch_pid (int32_t pid) {
@@ -135,24 +112,10 @@ reply_status comm_client::start() {
135112 msg.body = oarc.buf ;
136113 msg.bodylen = oarc.off ;
137114
138- auto future = this ->internal_call_future (msg, true );
139- // now, we wait on the future for 5 seconds
140- // do it in 1 second increments
141- // this speeds up client termination somewhat since it doesn't have
142- // to wait for the full 5 seconds to cancel.
143- for (size_t i = 0 ;i < 5 ; ++i) {
144- auto future_timeout =
145- boost::chrono::system_clock::now () + boost::chrono::milliseconds (1000 );
146- future.wait_until (future_timeout);
147- if (future.has_value ()) break ;
148- if (this ->ping_thread_done ) return ;
149- }
115+ nanosockets::zmq_msg_vector reply;
116+ int status = this ->internal_call_impl (msg, reply, true , 5 /* 5 seconds timeout */ );
150117 lock.lock ();
151- if (future.has_value ()) {
152- // we ignore the message as long as we get a reply
153- future.get ()->msgvec .clear ();
154- delete future.get ();
155- // everything is good!
118+ if (status == 0 ) {
156119 server_alive = true ;
157120 ping_failure_count = 0 ;
158121 } else {
@@ -166,31 +129,24 @@ reply_status comm_client::start() {
166129 start_status_callback_thread ();
167130 std::string cntladdress;
168131 // Bring the control_socket up
169- if (!keyval) {
170- if (alternate_control_address.length () > 0 ) {
171- cntladdress = alternate_control_address;
172- } else {
173- try {
174- cntladdress = object_factory->get_control_address ();
175- } catch (ipcexception& except) {
176- // FAIL!! We cannot start
177- return except.get_reply_status ();
178- }
132+ if (alternate_control_address.length () > 0 ) {
133+ cntladdress = alternate_control_address;
134+ } else {
135+ try {
136+ cntladdress = object_factory->get_control_address ();
137+ } catch (ipcexception& except) {
138+ // FAIL!! We cannot start
139+ return except.get_reply_status ();
179140 }
180141 }
181142
182143 cntladdress = convert_generic_address_to_specific (cntladdress);
183144
184- control_socket = new libfault::async_request_socket (zmq_ctx, keyval,
185- (keyval == NULL ) ? cntladdress : " control" ,
186- std::vector<std::string>());
187-
188- control_socket->add_to_pollset (&pollset);
145+ control_socket = new nanosockets::async_request_socket (cntladdress, 1 );
146+ control_socket->set_receive_poller ([=](){this ->poll_server_pid_is_running ();return this ->server_alive ;});
189147
190148 // connect the subscriber to the status address
191- if (keyval) {
192- subscribesock.connect (" status" );
193- } else if (alternate_publish_address.length () > 0 ) {
149+ if (alternate_publish_address.length () > 0 ) {
194150 subscribesock.connect (alternate_publish_address);
195151 } else {
196152 std::string pubaddress;
@@ -260,9 +216,6 @@ void comm_client::stop() {
260216 // clear all status callbacks
261217 clear_status_watch ();
262218
263- // stop all pollset callbacks
264- pollset.stop_poll_thread ();
265-
266219 // close all sockets
267220 object_socket.close ();
268221 if (control_socket != NULL ) {
@@ -271,14 +224,6 @@ void comm_client::stop() {
271224 subscribesock.close ();
272225 delete control_socket;
273226
274- // destroy zookeeper
275- if (keyval) delete keyval;
276- keyval = NULL ;
277-
278- // close zeromq context
279- if (owns_zmq_ctx) {
280- zmq_ctx_destroy (zmq_ctx);
281- }
282227 socket_closed = true ;
283228 started = false ;
284229}
@@ -300,28 +245,7 @@ void comm_client::stop_ping_thread() {
300245 }
301246}
302247
303- void comm_client::apply_auth (call_message& call) {
304- for (auto & auth : auth_stack) {
305- auth->apply_auth (call);
306- }
307- }
308-
309-
310- bool comm_client::validate_auth (reply_message& reply) {
311- for (auto & auth : boost::adaptors::reverse (auth_stack)) {
312- if (auth->validate_auth (reply) == false ) return false ;
313- }
314- return true ;
315- }
316-
317- void comm_client::subscribe_callback (libfault::zmq_msg_vector& recv) {
318- // check that it is the right format. It should just be one message
319- if (recv.size () != 1 ) return ;
320- // decode the message, convert zmq_msg_t to string
321- recv.reset_read_index ();
322- zmq_msg_t * zmsg = recv.read_next ();
323- std::string msg ((char *)zmq_msg_data (zmsg), zmq_msg_size (zmsg));
324-
248+ void comm_client::subscribe_callback (const std::string& msg) {
325249 boost::lock_guard<boost::mutex> guard (status_buffer_mutex);
326250 status_buffer.push_back (msg);
327251 status_buffer_cond.notify_one ();
@@ -413,58 +337,42 @@ void comm_client::clear_status_watch() {
413337 prefix_to_status_callback.clear ();
414338}
415339
416- boost::shared_future<libfault::message_reply*>
417- comm_client::internal_call_future (call_message& call, bool control) {
340+ int comm_client::internal_call_impl (call_message& call,
341+ nanosockets::zmq_msg_vector& ret,
342+ bool control,
343+ size_t timeout) {
418344 // If the socket is already dead, return with an unreachable
419- if (socket_closed) {
420- libfault::message_reply* reply = new libfault::message_reply;
421- reply->status = EHOSTUNREACH;
422- return boost::make_future (reply);
423- }
424- apply_auth (call);
425- libfault::zmq_msg_vector callmsg;
345+ if (socket_closed) return EHOSTUNREACH;
346+
347+ nanosockets::zmq_msg_vector callmsg;
426348 call.emit (callmsg);
427349 // Control messages use a separate socket
428350 if (control && control_socket != NULL ) {
429- return control_socket->request_master (callmsg);
351+ return control_socket->request_master (callmsg, ret, timeout );
430352 }
431- return object_socket.request_master (callmsg);
353+ return object_socket.request_master (callmsg, ret, timeout );
432354}
433355
434356int comm_client::internal_call (call_message& call, reply_message& reply, bool control) {
435357 if (!started) {
436358 return ENOTCONN;
437359 }
438360
439- auto future = internal_call_future (call, control);
440- while (server_alive && !future.has_value ()) {
441- poll_server_pid_is_running ();
442- auto future_timeout =
443- boost::chrono::system_clock::now () + boost::chrono::milliseconds (5000 );
444- future.wait_until (future_timeout);
445- }
361+ nanosockets::zmq_msg_vector ret;
362+ int status = internal_call_impl (call, ret, control);
446363
447364 // if server is dead, we quit
448365 if (server_alive == false ) {
449366 call.clear ();
450367 return EHOSTUNREACH;
451368 }
452369
453- int status = future.get ()->status ;
454370 if (status != 0 ) {
455- delete future.get ();
456371 return status;
457372 }
458373 // otherwise construct the reply
459- reply.construct (future.get ()->msgvec );
460- future.get ()->msgvec .clear ();
461- delete future.get ();
462-
463- if (!validate_auth (reply)) {
464- // construct an auth failure reply
465- reply.clear ();
466- reply.status = reply_status::AUTH_FAILURE;
467- }
374+ reply.construct (ret);
375+
468376 return status;
469377}
470378
0 commit comments