介绍
本项目提供Python进程间通信的基础功能,适用于多进程编程中一主多从的编程模型。
项目中的Server可在主进程启动,Client在(多个)子进程中启动,客户端发送的消息能够被服务端接收,
并且对于不同的消息类型,可以注册不同的回调逻辑。
项目预置了一些常用通信协议,同时也提供简便的自定义通信协议的接口。
进程间通信基于unix socket实现,因此不能在windows系统中运行。
Quick Start
- 启动服务端
from dip import Server
import asyncio
def cb(req, mtype, data):
print(data['hello'])
server = Server('/tmp/run.sock', callback=cb)
asyncio.run(server.serve_forever())
- 使用客户端发送消息
from dip import Client
cli = Client('/tmp/run.sock')
cli.send_json({'hello': 'world'})
服务端将会输出
>>> 'world'
自定义通信协议
如果需要自定义进程间通信时传输字节流的格式,可以使用自定义Protocol
import typing
from dip import Protocol
class NewProtocol(Protocol, mtype=b'N'):
@classmethod
def _decode(cls, buf: bytes) -> typing.Any:
pass
@classmethod
def _encode(cls, data: typing.Any) -> bytes:
pass
自定义通信协议需要使用一个byte作为标识(mtype),并且必须实现_decode
和_encode
两个方法。其中:
_encode
用于将python对象序列化为字节流_decode
用于将字节流反序列化为python对象
两者一般互为逆操作,即对于python对象obj
:
obj == Protocol.decode(Protocol.encode(obj))
注: 这并不是硬性规定,只是一种通常做法
下述是一个非常粗糙的固定字符串压缩协议的实现
from dip import Protocol
from dip import errors
class StringMappingProto(Protocol, mtype=b'S'):
MAP = {
'Hello World': b'h',
}
MAP_REV = {
v: k for k, v in MAP.items()
}
@classmethod
def _decode(cls, buf: bytes) -> str:
data = cls.MAP_REV.get(buf)
if data is not None:
return data
else:
raise errors.ProtoDecodeError(f'Cannot decode byte: {buf!r}')
@classmethod
def _encode(cls, data: str) -> bytes:
compressed = cls.MAP.get(data)
if compressed is not None:
return compressed
else:
raise errors.ProtoEncodeError(f'Cannot encode string: {data!r}')
使用客户端发送消息:
cli.send_msg('S', 'Hello World')
将会在服务端接收到消息 Hello World