本文共 5531 字,大约阅读时间需要 18 分钟。
Zeromq模式:
zeromq主页:
Zeromq Guild:
Zeromq 中文简介:
Zero wiki:
zeromq系列:
ØMQ(Zeromq) 是一个更为高效的传输层
优势是:
1 程序接口库是一个并发框架
2 在集群和超级计算机上表现得比TCP更快
3 通过inproc, IPC, TCP, 和 multicast进行传播消息
4 通过发散,订阅,流水线,请求的方式连接
5 对于不定规模的多核消息传输应用使用异步IO
6 有非常大并且活跃的开源社区
7 支持30+的语言
8 支持多种系统
Zeromq定义为“史上最快的消息队列”
从网络通信的角度看,它处于会话层之上,应用层之下。
ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.
Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个
Pub_Sub模式。
the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.
在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。
1 获取例子
git clone --depth=1 git://github.com/imatix/zguide.git
2 服务器端:
(当服务器收到消息的时候,服务器回复“World”)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | <?php /* * Hello World server * Binds REP socket to *:5555 * Expects "Hello" from client, replies with "World" * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */ $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket( $context , ZMQ::SOCKET_REP); $responder ->bind( "*:5555" ); while (true) { // Wait for next request from client $request = $responder ->recv(); printf ( "Received request: [%s]\n" , $request ); // Do some 'work' sleep (1); // Send reply back to client $responder ->send( "World" ); } |
3 客户端:
(客户端发送消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | <?php /* * Hello World client * Connects REQ socket to * Sends "Hello" to server, expects "World" back * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */ $context = new ZMQContext(); // Socket to talk to server echo "Connecting to hello world server…\n" ; $requester = new ZMQSocket( $context , ZMQ::SOCKET_REQ); $requester ->connect( "" ); for ( $request_nbr = 0; $request_nbr != 10; $request_nbr ++) { printf ( "Sending request %d…\n" , $request_nbr ); $requester ->send( "Hello" ); $reply = $requester ->recv(); printf ( "Received reply %d: [%s]\n" , $request_nbr , $reply ); } |
1 | |
天气气候订阅系统:(pub-sub)
1 server端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | <?php /* * Weather update server * Binds PUB socket to *:5556 * Publishes random weather updates * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */ // Prepare our context and publisher $context = new ZMQContext(); $publisher = $context ->getSocket(ZMQ::SOCKET_PUB); $publisher ->bind( "*:5556" ); $publisher ->bind( "" ); while (true) { // Get values that will fool the boss $zipcode = mt_rand(0, 100000); $temperature = mt_rand(-80, 135); $relhumidity = mt_rand(10, 60); // Send message to all subscribers $update = sprintf ( "%05d %d %d" , $zipcode , $temperature , $relhumidity ); $publisher ->send( $update ); } |
2 client端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | <?php /* * Weather update client * Connects SUB socket to * Collects weather updates and finds avg temp in zipcode * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */ $context = new ZMQContext(); // Socket to talk to server echo "Collecting updates from weather server…" , PHP_EOL; $subscriber = new ZMQSocket( $context , ZMQ::SOCKET_SUB); $subscriber ->connect( "" ); // Subscribe to zipcode, default is NYC, 10001 $filter = $_SERVER [ 'argc' ] > 1 ? $_SERVER [ 'argv' ][1] : "10001" ; $subscriber ->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter ); // Process 100 updates $total_temp = 0; for ( $update_nbr = 0; $update_nbr < 100; $update_nbr ++) { $string = $subscriber ->recv(); sscanf ( $string , "%d %d %d" , $zipcode , $temperature , $relhumidity ); $total_temp += $temperature ; } printf ( "Average temperature for zipcode '%s' was %dF\n" , $filter , (int) ( $total_temp / $update_nbr )); |
1 | ------------------------ |
1 | pub-sub的proxy模式: |
1 | 图示是: |
Proxy节点的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | <?php /* * Weather proxy device * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */ $context = new ZMQContext(); // This is where the weather server sits $frontend = new ZMQSocket( $context , ZMQ::SOCKET_SUB); $frontend ->connect( "" ); // This is our public endpoint for subscribers $backend = new ZMQSocket( $context , ZMQ::SOCKET_PUB); $backend ->bind( "" ); // Subscribe on everything $frontend ->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "" ); // Shunt messages out to our own subscribers while (true) { while (true) { // Process all parts of the message $message = $frontend ->recv(); $more = $frontend ->getSockOpt(ZMQ::SOCKOPT_RCVMORE); $backend ->send( $message , $more ? ZMQ::SOCKOPT_SNDMORE : 0); if (! $more ) { break ; // Last message part } } } |
----------------------
作者:yjf512(轩脉刃)
出处:http://www.cnblogs.com/yjf512/
本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明