Lazy pub/sub in zeromq, only get last message -



Lazy pub/sub in zeromq, only get last message -

i'am trying implement lazy subscriber on zeromq illustration wuclient/wuserver. client way slower server, must lastly sent message server.

so far way i've found that, connecting/disconnecting client, there of course of study unwanted cost @ each connection, around 3ms:

server.cxx

int main () { // prepare our context , publisher zmq::context_t context (1); zmq::socket_t publisher (context, zmq_pub); publisher.bind("tcp://*:5556"); int counter = 0; while (1) { counter++; // send message subscribers zmq::message_t message(20); snprintf ((char *) message.data(), 20 , "%d", counter); publisher.send(message); std::cout << counter << std::endl; usleep(100000); } homecoming 0; }

client.cxx

int main (int argc, char *argv[]) { zmq::context_t context (1); zmq::socket_t subscriber (context, zmq_sub); while(1){ zmq::message_t update; int counter; subscriber.connect("tcp://localhost:5556"); // phone call take milliseconds subscriber.setsockopt(zmq_subscribe, "", 0); subscriber.recv(&update); subscriber.disconnect("tcp://localhost:5556"); std::istringstream iss(static_cast<char*>(update.data())); iss >> counter; std::cout << counter << std::endl; usleep(1000000); } homecoming 0; }

server output: 1 2 3 4 5 6 7 8 9 ...

client output: 4 14 24 ...

i've tried utilize high water mark without co/deco, not working. kind of code, frame begin dropped when buffer reach @ to the lowest degree hundreds of messages. :

int high_water_mark = 1; socket.setsockopt(zmq_rcvhwm, &high_water_mark, sizeof(high_water_mark) ); socket.setsockopt(zmq_sndhwm, &high_water_mark, sizeof(high_water_mark) );

also there this post in zeromq-dev closely related, solution provided ( utilize of thread select lastly message not acceptable, can't transfer tons of message on network wich not used after.

the solution utilize zmq_conflate ( non multipart messages ):

client.cxx

#include <zmq.hpp> #include <iostream> #include <sstream> #include <unistd.h> int main (int argc, char *argv[]) { zmq::context_t context (1); zmq::socket_t subscriber (context, zmq_sub); int conflate = 1; subscriber.setsockopt(zmq_conflate, &conflate, sizeof(conflate) ); subscriber.connect("tcp://localhost:5556"); subscriber.setsockopt(zmq_subscribe, "", 0); while(1){ zmq::message_t update; int counter; subscriber.recv(&update); std::istringstream iss(static_cast<char*>(update.data())); iss >> counter; std::cout << counter << std::endl; usleep(1000000); } homecoming 0; }

zeromq lazy-evaluation datareader publisher subscriber

Comments

Popular posts from this blog

xslt - DocBook 5 to PDF transform failing with error: "fo:flow" is missing child elements. Required content model: marker* -

mediawiki - How do I insert tables inside infoboxes on Wikia pages? -

SQL Server : need assitance parsing delimted data and returning a long concatenated string -