python ZeroMQ实现1:N,异步收发消息(也可向指定客户端发送消息)


ZeroMQ的python版本和C/C++版本的接口差不多,要实现一个server对N个client,异步方式,而且可以对指定的client发送消息,可以这样: server采用ROUTER方式,client采用DEALER方式,而且要自己制定client的zmq.IDENTITY(如果不指定,zmq就会自动生成一个,不好控制对制定的client发送消息。)

#!/usr/bin/python
#-*-coding:utf-8-*-
import time
import zmq
#import zhelpers

context = zmq.Context()
socket = context.socket(zmq.ROUTER)

#server不需要指定
#socket.setsockopt_string(zmq.IDENTITY, u"desktop")

socket.bind("tcp://*:5555")

while True:
    #zhelpers.dump(socket) #这里可以打印出帧的具体内容
    
    [address,contents]=socket.recv_multipart()
    print("[%s]%s
"%(address,contents))

    reply = "[get server reply:" + contents + "]"
    socket.send_multipart([address, reply]) #这里的address就可以指定客户端发消息
#!/usr/bin/python
#-*-coding:utf-8-*-
import zmq
import sys
import os

import threading
import ctypes
import inspect

class ZmqClientThread(threading.Thread):

    def __init__(self, func, serverIp, port, identity):
        threading.Thread.__init__(self)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)
        self.serverIp = serverIp
        self.identity = identity
        self.port = port
        self.func = func
        
        self.socket.setsockopt_string(zmq.IDENTITY, identity) #默认使用utf-8编码
        #先设置IDENTITY,再connect,顺序不能颠倒
        self.socket.connect( "tcp://{0}:{1}".format(serverIp, port) )
        #("tcp://localhost:5555")或者("tcp://127.0.0.1:5555")都可以

    #向server发送字符串
    def sendMsg(self, data):
        if not self.socket.closed:
            self.socket.send(data)
        else:
            print "sock is closed,cant send message..."
            
    def run(self):
        self.func(self.socket)

#收从server发来的字符串
def loop(socket):
    while True:
        if not socket.closed:
            message = socket.recv()
            print message
        else:
            print "sock is closed,cant receive any message..."
            break

def main():
    serverIp = "127.0.0.1"
    port = 5555
    identity = u"client1"
    zmqThread = ZmqClientThread(loop, serverIp, port, identity)
    zmqThread.start()
  
    while(True):
        data = raw_input("input your data:")
        if data == q:
            print "data == q"
            os._exit(1)
        else:
            zmqThread.sendMsg(data)  #这种方式发字符串

if __name__==__main__:
    main()

知识点: Context使用完,在C/C++中需要手动关闭,而python中会在垃圾回收的时候自动调用term关闭。

Close or terminate the context. This can be called to close the context by hand. If this is not called, the context will automatically be closed when it is garbage collected.

Socket也是一样,使用完后,在C/C++中需要手动关闭,而python中会在垃圾回收的时候自动调用term关闭。

Close the socket. If linger is specified, LINGER sockopt will be set prior to closing. This can be called to close the socket by hand. If this is not called, the socket will automatically be closed when it is garbage collected.

具体查看官方文档: 还有需要注意的是Router <->Dealer模式,需要客户端先发一个数据帧(空白的也可以)到服务器端,之后服务器端才能指定客户端发消息(这是一个建立路由的过程)

(这个文件是从github的一个开源项目上下载下来的,用dump可以打印出帧的具体内容,方便调试,地址:)

# encoding: utf-8
"""
Helper module for example applications. Mimics ZeroMQ Guides zhelpers.h.
"""
from __future__ import print_function

import binascii
import os
from random import randint

import zmq

def socket_set_hwm(socket, hwm=-1):
    """libzmq 2/3/4 compatible sethwm"""
    try:
        socket.sndhwm = socket.rcvhwm = hwm
    except AttributeError:
        socket.hwm = hwm


def dump(msg_or_socket):
    """Receives all message parts from socket, printing each frame neatly"""
    if isinstance(msg_or_socket, zmq.Socket):
        # its a socket, call on current message
        msg = msg_or_socket.recv_multipart()
    else:
        msg = msg_or_socket
    print("----------------------------------------")
    for part in msg:
        print("[%03d]" % len(part), end= )
        is_text = True
        try:
            print(part.decode(ascii))
        except UnicodeDecodeError:
            print(r"0x%s" % (binascii.hexlify(part).decode(ascii)))


def set_id(zsocket):
    """Set simple random printable identity on socket"""
    identity = u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
    zsocket.setsockopt_string(zmq.IDENTITY, identity)


def zpipe(ctx):
    """build inproc pipe for talking to threads
    mimic pipe used in czmq zthread_fork.
    Returns a pair of PAIRs connected via inproc
    """
    a = ctx.socket(zmq.PAIR)
    b = ctx.socket(zmq.PAIR)
    a.linger = b.linger = 0
    a.hwm = b.hwm = 1
    iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
    a.bind(iface)
    b.connect(iface)
    return a,b

python版本的ZeroMQ的参考资料: 官方文档: 实例代码: 另外就是参考《ZeroMQ 云时代极速消息通信库》这本书是用C/C++写的,但是里面的模型和设置参数的方式和python版本几乎是一样的,可以用来参考,他山之石可以攻玉。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/290813.html

(0)
上一篇 2022年10月9日
下一篇 2022年10月9日

相关推荐

发表回复

登录后才能评论