July
5th,
2018
第一章 ZeroMQ基础
背景
- 如何在任何地点连接任何两个应用程序;
- 将这个解决方案用最为简单的方式包装起来,供程序员使用。
ZMQ简介
ZMQ看起来像是一套嵌入式的网络链接库,但工作起来更像是一个并发式的框架。它提供的套接字可以在多种协议中传输消息,如线程间、进程间、TCP、广播等。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。它的异步I/O机制让你能够构建多核应用程序,完成异步消息处理任务。
提问-回答(Request-Reply)
创建一个客户端和一个服务端,客户端发送Hello给服务端,服务端返回World。
// server
int main () {
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
// Wait for next request from client
socket.recv (&request);
std::cout << "Received Hello" << std::endl;
// Do some 'work'
sleep(1);
// Send reply back to client
zmq::message_t reply (5);
memcpy (reply.data (), "World", 5);
socket.send (reply);
}
return 0;
}
// client
int main ()
{
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REQ);
std::cout << "Connecting to hello world server..." << std::endl;
socket.connect ("tcp://localhost:5555");
// Do 10 requests, waiting each time for a response
for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
zmq::message_t request (5);
memcpy (request.data (), "Hello", 5);
std::cout << "Sending Hello " << request_nbr << "..." << std::endl;
socket.send (request);
// Get the reply.
zmq::message_t reply;
socket.recv (&reply);
std::cout << "Received World " << request_nbr << std::endl;
}
return 0;
}
支持服务端-客户端一对多的模式,甚至启动顺序没有先后。
关于字符串
ZMQ的字符串是有长度的,且传送时不加结束符,这是因为C语言字符串以空字符串结尾,而其他语言不一定。C语言有专门的库,zhelpers.h
消息流动(Publish-Subscribe)
单向数据分发:服务端将更新事件发送给一组客户端。
- PUB-SUB套接字是异步的,服务端只能发,客户端只能收。
- 慢连接:即使先打开SUB,后打开PUB,依然存在信息丢失。因为TCP连接耗时比传输耗时多,连接成功时可能PUB已经发出信息。后续会有SUB-PUB同步机制。
- 如果没有SUB,则PUB信息直接丢失
- 当SUB处理消息过慢时,消息在PUB处堆积。需要后期处理保护PUB。
- 目前,PUB发送所有消息,SUB负责过滤消息。
分布式处理
任务:ventilator分发大量任务,一组worker会处理数据并输出结果,sinker接受结果并汇总。
- 需要有同步机制,待worker准备好后启动任务。否则当第一个worker连接成功时,它会一下收到很多任务。
- ventilator向worker均匀地分发任务,称为负载均衡load balancing;sinker均匀地从worker处收集消息,称为公平队列fair-queuing。