/**************************************************************技术博客http://www.cnblogs.com/itdef/技术交流群群号码:324164944欢迎c c++ windows驱动爱好者 服务器程序员沟通交流**************************************************************/ROUTER 与 REQ通讯
#include "stdafx.h" #include "zhelpers.hpp" #include <thread> void worker_thread(void *arg) { zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_REQ); s_set_id(worker, (intptr_t)arg); worker.connect("tcp://localhost:5671"); // "ipc" doesn't yet work on windows. int total = 0; while (1) { // Tell the broker we're ready for work s_send(worker, "Hi Boss"); // Get workload from broker, until finished std::string workload = s_recv(worker); if ("Fired!" == workload) { std::cout << "Processed: " << total << " tasks" << std::endl; break; } total++; // Do some random work s_sleep(within(500) + 1); } return; } int main() { zmq::context_t context(1); zmq::socket_t broker(context, ZMQ_ROUTER); broker.bind("tcp://*:5671"); // "ipc" doesn't yet work on windows. const int NBR_WORKERS = 10; std::thread workers[NBR_WORKERS]; for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { workers[worker_nbr]= std::thread( worker_thread, (void *)(intptr_t)worker_nbr); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock() + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker std::string identity = s_recv(broker); s_recv(broker); // Envelope delimiter s_recv(broker); // Response from worker s_sendmore(broker, identity); s_sendmore(broker, ""); // Encourage workers until it's time to fire them if (s_clock() < end_time) s_send(broker, "Work harder"); else { s_send(broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { workers[worker_nbr].join(); } return 0; }
ROUTER 与 DEALER通讯
// rtdealer_cpp.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" // // Custom routing Router to Dealer // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> #include "zhelpers.hpp" #include <thread> static void * worker_task(void *args) { zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_DEALER); #if (defined (WIN32)) s_set_id(worker, (intptr_t)args); #else s_set_id(worker); // Set a printable identity #endif worker.connect("tcp://localhost:5671"); int total = 0; while (1) { // Tell the broker we're ready for work s_sendmore(worker, ""); s_send(worker, "Hi Boss"); // Get workload from broker, until finished s_recv(worker); // Envelope delimiter std::string workload = s_recv(worker); // .skip if ("Fired!" == workload) { std::cout << "Completed: " << total << " tasks" << std::endl; break; } total++; // Do some random work s_sleep(within(500) + 1); } return NULL; } // .split main task // While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process. int main() { zmq::context_t context(1); zmq::socket_t broker(context, ZMQ_ROUTER); broker.bind("tcp://*:5671"); srandom((unsigned)time(NULL)); const int NBR_WORKERS = 10; std::thread workers[NBR_WORKERS]; int worker_nbr = 0; for (; worker_nbr < NBR_WORKERS; ++worker_nbr) { workers[worker_nbr] = std::thread( worker_task, (void *)(intptr_t)worker_nbr); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock() + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker std::string identity = s_recv(broker); { s_recv(broker); // Envelope delimiter s_recv(broker); // Response from worker } s_sendmore(broker, identity); s_sendmore(broker, ""); // Encourage workers until it's time to fire them if (s_clock() < end_time) s_send(broker, "Work harder"); else { s_send(broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) { workers[worker_nbr].join(); } return 0; }