Security News
Supply Chain Attack Detected in Solana's web3.js Library
A supply chain attack has been detected in versions 1.95.6 and 1.95.7 of the popular @solana/web3.js library.
message-queue-sqlite
Advanced tools
[TOC]
这是一个基于消息队列的任务处理系统。使用 SQLite 作为中间件实现了异步任务的提交与处理,支持多线程处理任务和结果的回调。
message_queue_sqlite/
├── cache/
│ └── __init__.py # 缓存模块
├── config/
│ └── __init__.py # 配置模块
├── constants/ # 常量模块
│ ├── __init__.py
│ └── task_status.py # 任务状态常量
├── middleware/ # 中间件模块
│ ├── __init__.py
│ ├── client.py # 客户端中间件
│ └── server.py # 服务器中间件
├── model/ # 结果模型模块
│ ├── __init__.py │
| └── task_result.py # 任务结果模型
├── task_service/ # 任务服务模块
│ ├── service/
│ │ ├── __init__.py
│ │ └── services.py # 服务回调模块
│ ├── task/
│ │ ├── __init__.py
│ │ ├── discover.py # 动态任务挂载模块
│ │ ├── task_base.py # 任务基类
│ │ └── tasks.py # 任务函数管理模块
│ └── __init__.py
└── __init__.py # 初始化模块
创建虚拟环境
mkdir demo
cd demo
python -m venv venv
source venv/bin/activate
克隆仓库
git clone https://gitee.com/cai-xinpenge/message_queue.git
cd message_queue
安装
pip install .
请参考以下示例代码以了解如何使用 message_queue_sqlite
:
demo/
├── app/
| ├── engine/
| | ├── __init__.py
| | └── test.py # 回调函数
| └── __init__.py
main.py
app/engine/test.py
from message_queue_sqlite import task_function
@task_function(use_cache=True)
def test(message):
print(message)
return "test"
@task_function()
def test1(message):
print(f"{message}1")
return "test1"
app/engine/__init__.py
from .test import test, test1
__all__ = ["test", "test1"]
app/__init__.py
from message_queue_sqlite import discover_and_mount_ts, init_server
from message_queue_sqlite.config import Config
# 动态挂在 app.engine 目录下的任务
tasks = discover_and_mount_ts("app.engine")
tasks.get_all_task_names()
# 初始化服务
config = Config(middleware_path="message_queue", max_workers=5, middleware_num=1)
server, client, send_message = init_server(tasks, config)
# config 参数可选,默认 middleware_path 为 message_queue,max_workers 为 5,middleware_num 为 1
__all__ = ["model", "server"]
Config 参数可选
main.py
from app import server, client, send_message
from message_queue_sqlite import stop_server, start_server
import sys
import time
from threading import Thread
if __name__ == '__main__':
# 启动服务
Thread(target=start_server, args=(server, client)).start()
# 循环发送任务
while(True):
try:
keyworded_args = {'message': 'hello world'}
callback = lambda x: print(x)
send_message('test', keyworded_args, callback, 1, True)
send_message('test1', keyworded_args, callback, 2)
time.sleep(0.002)
except KeyboardInterrupt:
break
# 停止服务
sys.exit(stop_server(server, client))
# 缓存模式
send_message('test', keyworded_args, callback, priority=1, use_cache=True) # priority 值越大优先级越高
# 非缓存模式
send_message('test1', keyworded_args, callback, priority=2)
当task任务的参数类型不方便被序列化时,可以选择使用缓存模式。该模式会将参数存入 cache 模块中,但执行任务时会先从 cache 中获取参数,由于 cache 只在运行时存在,所以该模式不支持持久化,若程序意外退出,缓存数据也会丢失,该模式主要针对参数类型不方便被序列化的场景。
区别于 1.x 版本,2.x 版本支持多中间件负载均衡,可以根据任务的执行时间和任务的负载情况,动态调整中间件的数量,以提高任务处理的效率。send_message 函数会根据指定的中间件数量,将任务发送到不同的中间件,以降低 sqlite 并发处理的压力。
任务的生命周期通过枚举类型 TaskStatus 管理,包括:
如有问题,请联系作者:
该项目遵循 MIT 许可证。请查看 LICENSE 文件以获取更多信息。
FAQs
A simple message queue demo
We found that message-queue-sqlite demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
A supply chain attack has been detected in versions 1.95.6 and 1.95.7 of the popular @solana/web3.js library.
Research
Security News
A malicious npm package targets Solana developers, rerouting funds in 2% of transactions to a hardcoded address.
Security News
Research
Socket researchers have discovered malicious npm packages targeting crypto developers, stealing credentials and wallet data using spyware delivered through typosquats of popular cryptographic libraries.