流式推理框架
目录结构说明
- config: 作业级别配置相关的解析代码
- pipeline:任务流水线相关的代码,包括线程的创建、推理客户端封装、推理请求metrics生成等
- products:实际的各个推理场景的代码
- example:这里是一些示例代码
- soe: 仿真优化推理任务
- streamz_ext: 对streamz的扩展,实现一些额外的source、窗口、sink算子
- tasks: 任务级别的配置管理、任务发现
- supervisor.py: 管理者,用于创建WorkerActor和维护WorkerActor中的流水线
- worker.py: 实际任务流水线运行的一个容器,一个Worker可以包含多个任务流水线
- main.py: ray作业提交的driver入口
开发步骤
对于一个新的推理场景,开发流水线的步骤如下:
-
在products目录下创建一个对应产品的目录(以example为例)),并且在products/init.py中加上import. 后续步骤创建的类的文件都放在此目录中, 且必须在example/init.py中import, 否则可能会初始化失败。
from . import example
-
开发一个InferTaskConfig的子类,一般放在task.py。 其中包含一个任务需要的配置信息。包括:
1) 数据源信息:jetstream的subject、消费者等信息
2)输出目标信息:可能是数据库、jetstream
-
开发一个TaskManager的子类,从数据库或其他来源获取上述InferTaskConfig的全部对象。
-
开发一个InferPipeline的子类,用于实现推理任务的流水线。 一般只需要实现其中的__exec__方法。exec 目前没有标准化的模板,主要包括以下几步:
1) source:数据源。一般都是jetstream,已经内置好。
2) 预处理:将数据源的消息转换成内部对象。 需要自行开发,一般就是一个map算子。
3)窗口算子: 如果有窗口需求,可以增加一个窗口算子,包含时间窗口、计数窗口等。 窗口算子都已经内置实现,只需要引用即可。
4) 推理预处理:将一个窗口中的元素集合转换成推理服务的请求。 定制开发,要看推理服务的请求需要如何组织。
5) 调用推理服务。 这一般是一个map算子,是个通用的算子。
6) 写入目标。 根据具体需求引用sink算子将数据发送给下游。
example的运行方法
在本地ray中运行
export RAY_PARAMETERS='{"task_type": "example"}'; python3.9 -m streaming_infer.main
其中任务的配置是在example/task_manager.py中写死的,可以用nats命令向写死的nats发送消息
cur_sec=`date '+%s'`; nats -s "ns://10.31.76.16:8221" pub js-test-7 --count=3 --sleep 1s "{\"time\": $cur_sec}"
在本地运行单个任务
这种情况下不会启动ray, 因为pipeline的运行本身是不依赖ray的。 但是目前只运行单个线程,这种情况下可以添加断点来调试。
RAY_PARAMETERS='{"task_type": "example"}'; python3.9 -m streaming_infer.local_test --id ex_1
发送消息的方法同上。