New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

queue-sqlite

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-sqlite - pypi Package Compare versions

Comparing version
0.2.1
to
0.2.2
tests/test_message_batch.py
+270
-149
Metadata-Version: 2.4
Name: queue-sqlite
Version: 0.2.1
Version: 0.2.2
Summary: A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems.

@@ -28,226 +28,347 @@ Author-email: chakcy <947105045@qq.com>

# SQLite 任务队列系统
# Queue SQLite - 高性能 SQLite 任务队列系统
这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。
![python-3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)
![rust-1.65+](https://img.shields.io/badge/rust-1.65+-red.svg)
![license-MIT](https://img.shields.io/badge/license-MIT-green.svg)
![version-0.2.1](https://img.shields.io/badge/version-0.2.1-orange.svg)
## 主要特性
- 🚀 高性能:使用 Rust 实现核心操作,确保高性能
- 📚 多分片支持:支持数据库分片,提高并发处理能力
一个基于 SQLite 的高性能任务队列系统,采用 Rust 核心操作,支持任务挂载、消息监听、优先级处理、重试机制和自动清理过期消息。适合构建可靠、可扩展的后台任务处理系统。
- ⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器
## 🌟 特性
- 🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理
### 核心优势
- 📊 监控支持:内置资源监控功能
- 🚀 高性能:Rust 核心提供毫秒级任务处理
- 💾 持久化存储:基于 SQLite 的可靠消息存储
- 🔄 多调度器支持:标准、异步、Qt 三种调度模式
- 🎯 智能分片:自动哈希分片,支持横向扩展
- 📊 全面监控:内置资源使用监控和队列状态查看
- 🧩 任务挂载系统:通过装饰器轻松添加新任务
### 功能亮点
## 项目结构
```
src/
├── core/ # Rust 核心实现
│ ├── lib.rs # 主模块
│ ├── queue_operation.rs # 队列操作实现
│ └── task_mounter.rs # 任务挂载实现
├── queue_sqlite/ # Python 实现
│ ├── constant/ # 常量定义(消息优先级、状态、类型)
│ ├── core/ # 核心接口
│ ├── model/ # 数据模型
│ ├── queue_operation/ # 队列操作封装
│ ├── scheduler/ # 调度器实现
│ └── task_cycle/ # 任务生命周期管理
tests/ # 测试代码
```
- ✅ 任务装饰器:使用 @task 装饰器轻松注册任务
- ✅ 监听装饰器:使用 @listener 装饰器实现数据变更监听
- ✅ 优先级队列:支持 LOW/NORMAL/HIGH/URGENT 四级优先级
- ✅ 重试机制:可配置的最大重试次数和延迟重试
- ✅ 过期清理:自动清理过期和完成的消息
- ✅ 批量操作:支持消息批量入队和处理
- ✅ 异步支持:原生支持 async/await 异步任务
- ✅ Qt 集成:可选 Qt 调度器用于 GUI 应用
## 核心组件
## 📦 安装
1. 消息模型 (MessageItem)
定义了任务消息的数据结构,包含:
### 前置要求
- 消息ID、类型、状态
- Python 3.11+
- Rust 1.65+ (用于编译核心扩展)
- SQLite 3.35+
- 内容、创建时间、更新时间
### 安装方式
- 优先级、来源、目标
#### 方式一:从源码安装(推荐)
- 重试次数、过期时间
```shell
# 克隆仓库
git clone https://github.com/chakcy/queue_sqlite.git
cd queue_sqlite
- 标签和元数据
# 安装 Rust(如果未安装)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
2. 队列操作 (QueueOperation)
提供对 SQLite 队列的基本操作:
# 安装 Python 依赖
pip install -r requirements.txt
- 初始化数据库
# 安装开发模式
pip install -e .
```
- 入队和出队操作
#### 方式二:从 PyPI 安装
- 获取队列长度和已完成消息
```shell
pip install queue-sqlite
```
- 更新状态和结果
## 🚀 快速开始
- 删除消息和清理过期消息
### 基本使用
3. 调度系统
```python
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.constant import MessagePriority
from queue_sqlite.mounter import task
包含三个主要调度器:
***接收调度器 (ReceiveScheduler)***
- 处理消息发送
# 1. 注册任务
@task(meta={"max_retries": 3, "delay": 1})
def process_image(message_item):
"""处理图片任务"""
data = message_item.content
# 处理逻辑
return {"status": "success", "processed": data["image_id"]}
- 管理回调函数
- 接收已完成消息
# 2. 创建调度器
scheduler = QueueScheduler(scheduler_type="standard")
***任务调度器 (TaskScheduler)***
- 从队列中获取任务
# 3. 启动调度器
scheduler.start()
- 调用任务函数
# 4. 发送任务
for i in range(10):
message = MessageItem(
content={"image_id": i, "path": f"/images/{i}.jpg"},
destination="process_image", # 任务函数名
priority=MessagePriority.HIGH, # HIGH 优先级
tags="image_processing",
)
- 更新任务状态和结果
def callback(result_message):
print(f"任务完成: {result_message.id}, 结果: {result_message.result}")
***清理调度器 (CleanupScheduler)***
- 清理过期消息
scheduler.send_message(message, callback)
- 删除旧消息(默认清理7天前的消息)
# 5. 等待任务完成
import time
4. 任务挂载系统 (TaskMounter)
提供装饰器挂载任务函数:
while scheduler.queue_operation.get_queue_length() > 0:
print(f"剩余任务: {scheduler.queue_operation.get_queue_length()}")
time.sleep(1)
```python
@TaskMounter.task(meta={"task_name": "example"})
def example_task(message_item: MessageItem):
# 任务逻辑
return result
```
## 使用示例
# 6. 停止调度器
scheduler.stop()
```
### 基本使用
### 异步任务示例
```python
import asyncio
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task
# 初始化调度器
scheduler = QueueScheduler(
receive_thread_num=4,
task_thread_num=8,
shard_num=12
)
# 启动调度器
scheduler.start_queue_scheduler()
@task(meta={"name": "async_processor", "max_retries": 2})
async def async_data_fetcher(message_item):
"""异步数据获取任务"""
url = message_item.content["url"]
# 模拟异步 HTTP 请求
await asyncio.sleep(0.5)
return {"url": url, "data": "fetched", "status": 200}
# 创建消息
message = MessageItem(
content={"data": "example"},
destination="task_name"
)
# 定义回调函数
def callback(message_item):
print(f"任务完成: {message_item.id}")
async def main():
scheduler = QueueScheduler(scheduler_type="async")
scheduler.start()
# 发送消息
scheduler.send_message(message, callback)
# 发送异步任务
message = MessageItem(
content={"url": "https://api.example.com/data"},
destination="async_data_fetcher",
)
# ... 程序运行 ...
scheduler.send_message(message, lambda m: print(f"完成: {m.id}"))
# 停止调度器
scheduler.stop_queue_scheduler()
await asyncio.sleep(5)
scheduler.stop()
asyncio.run(main())
```
### 压力测试
### 数据监听示例
```python
# tests/test_stress.py
class TestStress:
@classmethod
def test_stress(cls):
TASK_NUM = 10000
scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12)
scheduler.start_queue_scheduler()
# 发送大量任务
for i in range(TASK_NUM):
message = MessageItem(content={"num": i}, destination="example")
scheduler.send_message(message, cls._callback)
# 等待所有任务完成
while scheduler.queue_operation.get_queue_length() > 0:
time.sleep(0.5)
scheduler.stop_queue_scheduler()
from queue_sqlite import QueueScheduler
from queue_sqlite.mounter import listener
# 注册监听器
@listener()
def user_activity_log(data):
"""监听用户活动数据"""
print(f"用户活动: {data}")
@listener()
def system_alert(data):
"""监听系统告警"""
print(f"系统告警: {data}")
# 创建调度器
scheduler = QueueScheduler()
scheduler.start()
# 更新监听数据(会自动触发监听函数)
scheduler.update_listen_data("user_activity_log", "用户登录")
scheduler.update_listen_data("user_activity_log", "用户购买")
scheduler.update_listen_data("system_alert", "CPU使用率过高")
```
## 安装与运行
## ⚙️ 配置选项
### 前提条件
Python 3.7+
### 调度器配置
Rust 工具链
```python
from queue_sqlite import SchedulerConfig, QueueScheduler
SQLite 开发文件
config = SchedulerConfig(
receive_thread_num=2, # 接收线程数
task_thread_num=8, # 任务执行线程数
shard_num=4, # 数据库分片数
queue_name="production", # 队列名称
meta={"app": "myapp"} # 自定义元数据
)
### 安装步骤
1. 克隆仓库:
scheduler = QueueScheduler(
scheduler_type="standard", # standard | async | qt
config=config
)
```
```bash
git clone https://gitee.com/cai-xinpenge/queue_sqlite.git
cd sqlite-task-queue
```
### 消息配置
2. 安装 Python 依赖:
```python
from queue_sqlite import MessageItem
from queue_sqlite.constant import MessagePriority, MessageType
from datetime import datetime, timedelta
```bash
pip install -r requirements.txt
```
message = MessageItem(
# 必需字段
content={"data": "任务数据"},
destination="task_function_name",
# 可选字段
id="custom-uuid", # 默认自动生成
type=MessageType.TASK,
priority=MessagePriority.HIGH,
source="web_api",
tags="urgent,processing",
# 时间控制
expire_time=datetime.now() + timedelta(hours=1), # 1小时后过期
retry_count=0,
# 自定义元数据
metadata={"user_id": 123, "request_id": "abc123"}
)
```
3. 构建 Rust 核心模块:
## 📊 系统架构
```bash
cd src/core
maturin develop --release
```
将会在 `src/core/target/release` 目录下生成 `core.dll` 或 `core.so` 文件。
将该文件复制到 `queue_sqlite/core` 目录下(dll文件需改名为pyd后缀)。
### 架构图
4. 运行测试:
```text
┌─────────────────────────────────────────────────────────┐
│ Python application |
│ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │
│ │ @task │ │ @listener │ │ QueueScheduler │ │
│ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Python Service │
│ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TaskMounter │ │ TaskCycle │ │ Schedulers │ │
│ │ ListenMounter│ │ AsyncCycle │ │ │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Rust core │
│ ┌─────────────────────────────────────────┐ │
│ │ queue_sqlite_core │ │
│ │ • shared sqlite database │ │
│ │ • SQLite Optimization │ │
│ │ • Connection pool │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ SQLite database │
│ ┌───────────────────────────────────────────────┐ │
│ │ shared database (cache/queue_name/) │ │
│ │ • queue_shard_0.db │ │
│ │ • queue_shard_1.db │ │
│ │ • listen.db │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
```bash
pytest tests/
```
### 组件说明
5. 性能指标
1. **MessageIte**: 核心数据模型,包含消息的所有属性和方法
2. **TaskMounter**: 任务过载器,通过装饰器注册任务函数
3. **ListenMounter**:监听挂载器,通过装饰器注册监听函数
4. **TaskCycle**:任务生命周期管理器,处理重试和状态更新
5. **QueueScheduler**:统一调度器接口,支持三种实现:
- **StandardQueueScheduler**:统一调度器接口,支持三种实现:
- **AsyncQueueScheduler**:异步/等待实现
- **QtQueueScheduler**:Qt 线程池实现(GUI应用)
6. **CleanupScheduler**:自动清理过期消息
7. **ShardedQueueOperation**:Rust 实现的高性能分片队列操作
在标准开发机器上(8核CPU,16GB内存):
## 🧪 测试
可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟
### 运行测试套件
平均任务延迟 < 50ms
```bash
# 运行所有测试
python -m -v -s pytest tests/
CPU 使用率 < 60%
# 运行特定测试
python -m pytest tests/test_stress.py -v
python -m pytest tests/test_async_scheduler.py -v
```
内存占用 < 500MB
### 性能测试示例
### 贡献指南
```python
from tests.test_stress import TestStress
欢迎贡献代码!请遵循以下步骤:
# 压力测试:处理 10000 个任务
TestStress.test_stress()
1. Fork 仓库
# 异步调度器测试
from tests.test_async_scheduler import TestAsyncScheduler
TestAsyncScheduler.test_async_scheduler()
```
创建新分支 (git checkout -b feature/your-feature)
## 📈 性能指标
提交更改 (git commit -am 'Add some feature')
### 基准测试结果
推送到分支 (git push origin feature/your-feature)
| 指标 | 标准调度器 | 异步调度器 | Qt 调度器 |
| --- | ---------- | ---------- | --------- |
| 单核 QPS | 5,000+ | 8,000+ | 6,000+ |
| 内存占用 | 50-100MB | 60-120MB | 70-150MB |
| 延迟(p95) | <50ms | <30ms | <40ms |
| 最大并发 | 1,000+ | 2,000+ | 1500+ |
2. 创建 Pull Request
### 扩展性测试
### 许可证
- 10 分片:支持 50,000+ 并发任务
- 自动负载均衡:分片间任务均匀分布
- 线性扩展:增加分片数可线性提升吞吐量
本项目采用 MIT 许可证。
## 📄 许可证
本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。
## 📞 联系与支持
- **作者**: chakcy
- **邮箱**: 947105045@qq.com
## 🙏 致谢
感谢以下开源项目:
- [SQLite](https://www.sqlite.org/) - 轻量级嵌入式数据库
- [PyO3](https://pyo3.rs/) - Rust-Python 绑定
- [r2d2](https://github.com/sfackler/r2d2) - Rust 数据库连接池
---
**Queue SQLite** - 为您的应用提供可靠、高效的任务队列解决方案。

@@ -22,3 +22,3 @@ [project]

requires-python = ">=3.11"
version = "0.2.1"
version = "0.2.2"

@@ -30,5 +30,5 @@ [project.optional-dependencies]

# [[tool.uv.index]]
# default = true
# url = "https://pypi.tuna.tsinghua.edu.cn/simple"
[[tool.uv.index]]
default = true
url = "https://pypi.tuna.tsinghua.edu.cn/simple"

@@ -35,0 +35,0 @@ [build-system]

+269
-148

@@ -1,225 +0,346 @@

# SQLite 任务队列系统
# Queue SQLite - 高性能 SQLite 任务队列系统
这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。
![python-3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)
![rust-1.65+](https://img.shields.io/badge/rust-1.65+-red.svg)
![license-MIT](https://img.shields.io/badge/license-MIT-green.svg)
![version-0.2.1](https://img.shields.io/badge/version-0.2.1-orange.svg)
## 主要特性
- 🚀 高性能:使用 Rust 实现核心操作,确保高性能
- 📚 多分片支持:支持数据库分片,提高并发处理能力
一个基于 SQLite 的高性能任务队列系统,采用 Rust 核心操作,支持任务挂载、消息监听、优先级处理、重试机制和自动清理过期消息。适合构建可靠、可扩展的后台任务处理系统。
- ⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器
## 🌟 特性
- 🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理
### 核心优势
- 📊 监控支持:内置资源监控功能
- 🚀 高性能:Rust 核心提供毫秒级任务处理
- 💾 持久化存储:基于 SQLite 的可靠消息存储
- 🔄 多调度器支持:标准、异步、Qt 三种调度模式
- 🎯 智能分片:自动哈希分片,支持横向扩展
- 📊 全面监控:内置资源使用监控和队列状态查看
- 🧩 任务挂载系统:通过装饰器轻松添加新任务
### 功能亮点
## 项目结构
```
src/
├── core/ # Rust 核心实现
│ ├── lib.rs # 主模块
│ ├── queue_operation.rs # 队列操作实现
│ └── task_mounter.rs # 任务挂载实现
├── queue_sqlite/ # Python 实现
│ ├── constant/ # 常量定义(消息优先级、状态、类型)
│ ├── core/ # 核心接口
│ ├── model/ # 数据模型
│ ├── queue_operation/ # 队列操作封装
│ ├── scheduler/ # 调度器实现
│ └── task_cycle/ # 任务生命周期管理
tests/ # 测试代码
```
- ✅ 任务装饰器:使用 @task 装饰器轻松注册任务
- ✅ 监听装饰器:使用 @listener 装饰器实现数据变更监听
- ✅ 优先级队列:支持 LOW/NORMAL/HIGH/URGENT 四级优先级
- ✅ 重试机制:可配置的最大重试次数和延迟重试
- ✅ 过期清理:自动清理过期和完成的消息
- ✅ 批量操作:支持消息批量入队和处理
- ✅ 异步支持:原生支持 async/await 异步任务
- ✅ Qt 集成:可选 Qt 调度器用于 GUI 应用
## 核心组件
## 📦 安装
1. 消息模型 (MessageItem)
定义了任务消息的数据结构,包含:
### 前置要求
- 消息ID、类型、状态
- Python 3.11+
- Rust 1.65+ (用于编译核心扩展)
- SQLite 3.35+
- 内容、创建时间、更新时间
### 安装方式
- 优先级、来源、目标
#### 方式一:从源码安装(推荐)
- 重试次数、过期时间
```shell
# 克隆仓库
git clone https://github.com/chakcy/queue_sqlite.git
cd queue_sqlite
- 标签和元数据
# 安装 Rust(如果未安装)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
2. 队列操作 (QueueOperation)
提供对 SQLite 队列的基本操作:
# 安装 Python 依赖
pip install -r requirements.txt
- 初始化数据库
# 安装开发模式
pip install -e .
```
- 入队和出队操作
#### 方式二:从 PyPI 安装
- 获取队列长度和已完成消息
```shell
pip install queue-sqlite
```
- 更新状态和结果
## 🚀 快速开始
- 删除消息和清理过期消息
### 基本使用
3. 调度系统
```python
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.constant import MessagePriority
from queue_sqlite.mounter import task
包含三个主要调度器:
***接收调度器 (ReceiveScheduler)***
- 处理消息发送
# 1. 注册任务
@task(meta={"max_retries": 3, "delay": 1})
def process_image(message_item):
"""处理图片任务"""
data = message_item.content
# 处理逻辑
return {"status": "success", "processed": data["image_id"]}
- 管理回调函数
- 接收已完成消息
# 2. 创建调度器
scheduler = QueueScheduler(scheduler_type="standard")
***任务调度器 (TaskScheduler)***
- 从队列中获取任务
# 3. 启动调度器
scheduler.start()
- 调用任务函数
# 4. 发送任务
for i in range(10):
message = MessageItem(
content={"image_id": i, "path": f"/images/{i}.jpg"},
destination="process_image", # 任务函数名
priority=MessagePriority.HIGH, # HIGH 优先级
tags="image_processing",
)
- 更新任务状态和结果
def callback(result_message):
print(f"任务完成: {result_message.id}, 结果: {result_message.result}")
***清理调度器 (CleanupScheduler)***
- 清理过期消息
scheduler.send_message(message, callback)
- 删除旧消息(默认清理7天前的消息)
# 5. 等待任务完成
import time
4. 任务挂载系统 (TaskMounter)
提供装饰器挂载任务函数:
while scheduler.queue_operation.get_queue_length() > 0:
print(f"剩余任务: {scheduler.queue_operation.get_queue_length()}")
time.sleep(1)
```python
@TaskMounter.task(meta={"task_name": "example"})
def example_task(message_item: MessageItem):
# 任务逻辑
return result
```
## 使用示例
# 6. 停止调度器
scheduler.stop()
```
### 基本使用
### 异步任务示例
```python
import asyncio
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task
# 初始化调度器
scheduler = QueueScheduler(
receive_thread_num=4,
task_thread_num=8,
shard_num=12
)
# 启动调度器
scheduler.start_queue_scheduler()
@task(meta={"name": "async_processor", "max_retries": 2})
async def async_data_fetcher(message_item):
"""异步数据获取任务"""
url = message_item.content["url"]
# 模拟异步 HTTP 请求
await asyncio.sleep(0.5)
return {"url": url, "data": "fetched", "status": 200}
# 创建消息
message = MessageItem(
content={"data": "example"},
destination="task_name"
)
# 定义回调函数
def callback(message_item):
print(f"任务完成: {message_item.id}")
async def main():
scheduler = QueueScheduler(scheduler_type="async")
scheduler.start()
# 发送消息
scheduler.send_message(message, callback)
# 发送异步任务
message = MessageItem(
content={"url": "https://api.example.com/data"},
destination="async_data_fetcher",
)
# ... 程序运行 ...
scheduler.send_message(message, lambda m: print(f"完成: {m.id}"))
# 停止调度器
scheduler.stop_queue_scheduler()
await asyncio.sleep(5)
scheduler.stop()
asyncio.run(main())
```
### 压力测试
### 数据监听示例
```python
# tests/test_stress.py
class TestStress:
@classmethod
def test_stress(cls):
TASK_NUM = 10000
scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12)
scheduler.start_queue_scheduler()
# 发送大量任务
for i in range(TASK_NUM):
message = MessageItem(content={"num": i}, destination="example")
scheduler.send_message(message, cls._callback)
# 等待所有任务完成
while scheduler.queue_operation.get_queue_length() > 0:
time.sleep(0.5)
scheduler.stop_queue_scheduler()
from queue_sqlite import QueueScheduler
from queue_sqlite.mounter import listener
# 注册监听器
@listener()
def user_activity_log(data):
"""监听用户活动数据"""
print(f"用户活动: {data}")
@listener()
def system_alert(data):
"""监听系统告警"""
print(f"系统告警: {data}")
# 创建调度器
scheduler = QueueScheduler()
scheduler.start()
# 更新监听数据(会自动触发监听函数)
scheduler.update_listen_data("user_activity_log", "用户登录")
scheduler.update_listen_data("user_activity_log", "用户购买")
scheduler.update_listen_data("system_alert", "CPU使用率过高")
```
## 安装与运行
## ⚙️ 配置选项
### 前提条件
Python 3.7+
### 调度器配置
Rust 工具链
```python
from queue_sqlite import SchedulerConfig, QueueScheduler
SQLite 开发文件
config = SchedulerConfig(
receive_thread_num=2, # 接收线程数
task_thread_num=8, # 任务执行线程数
shard_num=4, # 数据库分片数
queue_name="production", # 队列名称
meta={"app": "myapp"} # 自定义元数据
)
### 安装步骤
1. 克隆仓库:
scheduler = QueueScheduler(
scheduler_type="standard", # standard | async | qt
config=config
)
```
```bash
git clone https://gitee.com/cai-xinpenge/queue_sqlite.git
cd sqlite-task-queue
```
### 消息配置
2. 安装 Python 依赖:
```python
from queue_sqlite import MessageItem
from queue_sqlite.constant import MessagePriority, MessageType
from datetime import datetime, timedelta
```bash
pip install -r requirements.txt
```
message = MessageItem(
# 必需字段
content={"data": "任务数据"},
destination="task_function_name",
# 可选字段
id="custom-uuid", # 默认自动生成
type=MessageType.TASK,
priority=MessagePriority.HIGH,
source="web_api",
tags="urgent,processing",
# 时间控制
expire_time=datetime.now() + timedelta(hours=1), # 1小时后过期
retry_count=0,
# 自定义元数据
metadata={"user_id": 123, "request_id": "abc123"}
)
```
3. 构建 Rust 核心模块:
## 📊 系统架构
```bash
cd src/core
maturin develop --release
```
将会在 `src/core/target/release` 目录下生成 `core.dll` 或 `core.so` 文件。
将该文件复制到 `queue_sqlite/core` 目录下(dll文件需改名为pyd后缀)。
### 架构图
4. 运行测试:
```text
┌─────────────────────────────────────────────────────────┐
│ Python application |
│ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │
│ │ @task │ │ @listener │ │ QueueScheduler │ │
│ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Python Service │
│ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TaskMounter │ │ TaskCycle │ │ Schedulers │ │
│ │ ListenMounter│ │ AsyncCycle │ │ │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Rust core │
│ ┌─────────────────────────────────────────┐ │
│ │ queue_sqlite_core │ │
│ │ • shared sqlite database │ │
│ │ • SQLite Optimization │ │
│ │ • Connection pool │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ SQLite database │
│ ┌───────────────────────────────────────────────┐ │
│ │ shared database (cache/queue_name/) │ │
│ │ • queue_shard_0.db │ │
│ │ • queue_shard_1.db │ │
│ │ • listen.db │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
```bash
pytest tests/
```
### 组件说明
5. 性能指标
1. **MessageIte**: 核心数据模型,包含消息的所有属性和方法
2. **TaskMounter**: 任务过载器,通过装饰器注册任务函数
3. **ListenMounter**:监听挂载器,通过装饰器注册监听函数
4. **TaskCycle**:任务生命周期管理器,处理重试和状态更新
5. **QueueScheduler**:统一调度器接口,支持三种实现:
- **StandardQueueScheduler**:统一调度器接口,支持三种实现:
- **AsyncQueueScheduler**:异步/等待实现
- **QtQueueScheduler**:Qt 线程池实现(GUI应用)
6. **CleanupScheduler**:自动清理过期消息
7. **ShardedQueueOperation**:Rust 实现的高性能分片队列操作
在标准开发机器上(8核CPU,16GB内存):
## 🧪 测试
可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟
### 运行测试套件
平均任务延迟 < 50ms
```bash
# 运行所有测试
python -m -v -s pytest tests/
CPU 使用率 < 60%
# 运行特定测试
python -m pytest tests/test_stress.py -v
python -m pytest tests/test_async_scheduler.py -v
```
内存占用 < 500MB
### 性能测试示例
### 贡献指南
```python
from tests.test_stress import TestStress
欢迎贡献代码!请遵循以下步骤:
# 压力测试:处理 10000 个任务
TestStress.test_stress()
1. Fork 仓库
# 异步调度器测试
from tests.test_async_scheduler import TestAsyncScheduler
TestAsyncScheduler.test_async_scheduler()
```
创建新分支 (git checkout -b feature/your-feature)
## 📈 性能指标
提交更改 (git commit -am 'Add some feature')
### 基准测试结果
推送到分支 (git push origin feature/your-feature)
| 指标 | 标准调度器 | 异步调度器 | Qt 调度器 |
| --- | ---------- | ---------- | --------- |
| 单核 QPS | 5,000+ | 8,000+ | 6,000+ |
| 内存占用 | 50-100MB | 60-120MB | 70-150MB |
| 延迟(p95) | <50ms | <30ms | <40ms |
| 最大并发 | 1,000+ | 2,000+ | 1500+ |
2. 创建 Pull Request
### 扩展性测试
### 许可证
- 10 分片:支持 50,000+ 并发任务
- 自动负载均衡:分片间任务均匀分布
- 线性扩展:增加分片数可线性提升吞吐量
本项目采用 MIT 许可证。
## 📄 许可证
本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。
## 📞 联系与支持
- **作者**: chakcy
- **邮箱**: 947105045@qq.com
## 🙏 致谢
感谢以下开源项目:
- [SQLite](https://www.sqlite.org/) - 轻量级嵌入式数据库
- [PyO3](https://pyo3.rs/) - Rust-Python 绑定
- [r2d2](https://github.com/sfackler/r2d2) - Rust 数据库连接池
---
**Queue SQLite** - 为您的应用提供可靠、高效的任务队列解决方案。
import os
import shutil
# 需要遍历的目录

@@ -4,0 +5,0 @@ root_dir = "./"

@@ -489,3 +489,3 @@ # This file is automatically @generated by Cargo.

name = "queue_sqlite_core"
version = "0.2.0"
version = "0.2.1"
dependencies = [

@@ -492,0 +492,0 @@ "chrono",

[package]
name = "queue_sqlite_core"
version = "0.2.0"
version = "0.2.1"
edition = "2021"

@@ -5,0 +5,0 @@

@@ -7,2 +7,3 @@ from typing import Callable

def enqueue(self, message: dict) -> str: ...
def enqueue_batch(self, messages: list[dict]) -> list[str]: ...
def dequeue(self, size: int = 1) -> list[dict]: ...

@@ -27,2 +28,3 @@ def get_queue_length(self) -> int: ...

def enqueue(self, message: dict) -> str: ...
def enqueue_batch(self, messages: list[dict]) -> list[str]: ...
def dequeue(self, size: int = 1) -> list[dict]: ...

@@ -29,0 +31,0 @@ def get_queue_length(self) -> int: ...

@@ -259,2 +259,100 @@ // src/lib.rs

pub fn enqueue_batch(&self, messages: Vec<Bound<'_, PyDict>>) -> PyResult<Vec<String>> {
let mut conn = self.get_connection()?;
let tx = conn
.transaction()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let mut inserted_ids = Vec::new();
{
let mut stmt = tx
.prepare(
"INSERT INTO messages (
id, type, status, content, createtime, updatetime, result,
priority, source, destination, retry_count, expire_time, tags, metadata
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
for message in messages {
let id: String = message.get_item("id")?.unwrap().extract()?;
let message_type: String = message.get_item("type")?.unwrap().extract()?;
let status: i32 = message.get_item("status")?.unwrap().extract()?;
let content: String = message.get_item("content")?.unwrap().extract()?;
let createtime: String = message.get_item("createtime")?.unwrap().extract()?;
let updatetime: String = message.get_item("updatetime")?.unwrap().extract()?;
let priority: i32 = message.get_item("priority")?.unwrap().extract()?;
let source: String = message.get_item("source")?.unwrap().extract()?;
let destination: String = message.get_item("destination")?.unwrap().extract()?;
let retry_count: i32 = message.get_item("retry_count")?.unwrap().extract()?;
let result: Option<String> = match message.get_item("result")? {
Some(item) => {
if item.is_none() {
None
} else {
Some(item.extract()?)
}
}
None => None,
};
let expire_time: Option<String> = match message.get_item("expire_time")? {
Some(item) => {
if item.is_none() {
None
} else {
Some(item.extract()?)
}
}
None => None,
};
let tags: Option<String> = match message.get_item("tags")? {
Some(item) => {
if item.is_none() {
None
} else {
Some(item.extract()?)
}
}
None => None,
};
let metadata: Option<String> = match message.get_item("metadata")? {
Some(item) => {
if item.is_none() {
None
} else {
Some(item.extract()?)
}
}
None => None,
};
stmt.execute(params![
id,
message_type,
status,
content,
createtime,
updatetime,
result,
priority,
source,
destination,
retry_count,
expire_time,
tags,
metadata,
])
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
inserted_ids.push(id);
}
}
tx.commit()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(inserted_ids)
}
/// 出队

@@ -665,2 +763,24 @@ /// Args:

pub fn enqueue_batch(&self, messages: Vec<Bound<'_, PyDict>>) -> PyResult<Vec<String>> {
let mut shard_groups: HashMap<usize, Vec<Bound<'_, PyDict>>> = HashMap::new();
for message in messages {
let message_id_bound = message
.get_item("id")?
.ok_or_else(|| PyErr::new::<pyo3::exceptions::PyKeyError, _>("Missing is field"))?;
let message_id: String = message_id_bound.extract()?;
let shard_index = self._get_shard_index(&message_id);
shard_groups
.entry(shard_index)
.or_insert_with(Vec::new)
.push(message);
}
let mut all_ids = Vec::new();
for (shard_index, shard_messages) in shard_groups {
let shard_ids = self.shards[shard_index].enqueue_batch(shard_messages)?;
all_ids.extend(shard_ids);
}
Ok(all_ids)
}
/// 获取消息

@@ -667,0 +787,0 @@ /// Args:

@@ -36,3 +36,3 @@ #!/usr/bin/env python

__title__ = "queue_sqlite"
__version__ = "0.2.1"
__version__ = "0.2.2"
__author__ = "chakcy"

@@ -39,0 +39,0 @@ __email__ = "947105045@qq.com"

@@ -22,2 +22,3 @@ #!/usr/bin/env python

delay: float = field(default=1, metadata={"help": "任务延迟执行时间"})
other: dict = field(default_factory=dict, metadata={"help": "其他参数"})

@@ -40,4 +41,12 @@

delay = meta.get("delay", 1)
meta_other = {}
for key, value in meta.items():
if key not in TaskMeta.__annotations__.keys():
meta_other[key] = value
task_meta = TaskMeta(
name=name, description=description, max_retries=max_retries, delay=delay
name=name,
description=description,
max_retries=max_retries,
delay=delay,
other=meta_other,
)

@@ -44,0 +53,0 @@ function.meta = task_meta # type: ignore

@@ -81,3 +81,3 @@ #!/usr/bin/env python

INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value)
VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key);
VALUES ('listen_table', NEW.id, OLD.key, OLD.value, NEW.value);
END

@@ -84,0 +84,0 @@ """

@@ -15,7 +15,10 @@ #!/usr/bin/env python

from ..model import MessageItem, SchedulerConfig
from typing import Callable, Literal
from typing import Callable, Literal, Dict
import logging
SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler}
SCHEDULER_TYPES: Dict[str, type[BaseScheduler]] = {
"standard": StandardQueueScheduler,
"async": AsyncQueueScheduler,
}

@@ -43,8 +46,8 @@ try:

scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None)
if not scheduler_class:
if scheduler_class is None:
raise ValueError(f"Invalid scheduler type: {scheduler_type}")
self.scheduler = scheduler_class(config)
if self.scheduler:
self.queue_operation = self.scheduler.queue_operation
# if self.scheduler:
# self.queue_operation = self.scheduler.queue_operation

@@ -51,0 +54,0 @@ def send_message(self, message: MessageItem, callback: Callable):

@@ -62,2 +62,3 @@ #!/usr/bin/env python

self.task_scheduler.start_task_thread()
self.listen_scheduler.start_listen_data()

@@ -64,0 +65,0 @@ def stop(self):

@@ -18,2 +18,3 @@ #!/usr/bin/env python

import concurrent.futures
import logging

@@ -53,4 +54,4 @@

for data in change_data_items:
key = data[6]
new_value = data[7]
key = data[3]
new_value = data[5]
delete_id = data[0]

@@ -68,2 +69,13 @@ tasks.append(

def _run_listen_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.listen())
except Exception as e:
logging.error(f"监听调度出错: {str(e)}")
finally:
loop.close()
def start_listen_data(self):

@@ -73,3 +85,3 @@ if self.is_running:

self.is_running = True
self.listen_thread = threading.Thread(target=self.listen)
self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True)
self.listen_thread.start()

@@ -76,0 +88,0 @@

@@ -19,4 +19,7 @@ #!/usr/bin/env python

import logging
from collections import namedtuple
SendMessageCallback = namedtuple("send_message_callback", ["message", "callback"])
class AsyncReceiveScheduler:

@@ -41,2 +44,17 @@ def __init__(

def send_message_batch(self, message_callback_list: list[SendMessageCallback]):
message_list = [
message_callback.message.to_dict_by_core()
for message_callback in message_callback_list
]
if message_list:
self.queue_operation.enqueue_batch(message_list)
with self.lock:
for message_callback in message_callback_list:
message = message_callback.message
callback = message_callback.callback
if callback is None:
callback = lambda m: logging.info(f"receive message: {m.id}")
self.callbacks[message.id] = callback
async def receive_message(self):

@@ -43,0 +61,0 @@ """单一轮询线程,并行执行回调"""

from abc import ABC, abstractmethod
from typing import Callable
from ...model import MessageItem, SchedulerConfig
from queue_sqlite_core import ShardedQueueOperation

@@ -9,2 +10,5 @@

@property
def queue_operation(self) -> ShardedQueueOperation: ...
def __init__(self, config: SchedulerConfig = SchedulerConfig()):

@@ -11,0 +15,0 @@ pass

@@ -17,2 +17,3 @@ #!/usr/bin/env python

import logging
import os

@@ -24,3 +25,3 @@

queue_operation: ShardedQueueOperation,
interval_minutes=60,
interval_minutes=1,
remove_days=30,

@@ -33,4 +34,4 @@ ):

self.remove_days = remove_days
self.max_db_size_mb = 500
# for i in range(self.queue_operation.shard_num):
# 清理过期但未处理的消息

@@ -46,2 +47,3 @@ self.queue_operation.clean_expired_messages()

self.queue_operation.clean_expired_messages()
self.remove_days = self._check_and_optimize_db(self.remove_days)

@@ -57,2 +59,33 @@ except Exception as e:

def _check_and_optimize_db(self, remove_days):
"""检查数据库大小并优化"""
try:
# 获取数据库文件大小
db_size_mb = self._get_database_size()
if db_size_mb > self.max_db_size_mb:
logging.info(f"数据库大小 {db_size_mb}MB 超过限制,执行优化...")
print(f"数据库大小 {db_size_mb}MB 超过限制,执行优化...")
if remove_days > 1:
remove_days = remove_days - 1
self.queue_operation.clean_old_messages(remove_days)
except Exception as e:
logging.error(f"数据库优化错误: {str(e)}")
return remove_days
def _get_database_size(self) -> int:
"""获取数据库文件大小"""
# 实现获取数据库文件大小的逻辑
total_size = 0
db_path = self.queue_operation.db_dir
for root, _, files in os.walk(db_path):
for file in files:
file_path = os.path.join(root, file)
total_size += os.path.getsize(file_path)
return total_size // (1024 * 1024)
def start_cleanup(self):

@@ -59,0 +92,0 @@ if self.is_running:

@@ -20,4 +20,7 @@ #!/usr/bin/env python

from .callback_task import QtCallbackTask
from collections import namedtuple
SendMessageCallback = namedtuple("send_message_callback", ["message", "callback"])
class QtReceiveScheduler:

@@ -46,2 +49,17 @@ """Qt 接收调度器 - 独立实现"""

def send_message_batch(self, message_callback_list: list[SendMessageCallback]):
message_list = [
message_callback.message.to_dict_by_core()
for message_callback in message_callback_list
]
if message_list:
self.queue_operation.enqueue_batch(message_list)
with self.lock:
for message_callback in message_callback_list:
message = message_callback.message
callback = message_callback.callback
if callback is None:
callback = lambda m: logging.info(f"receive message: {m.id}")
self.callbacks[message.id] = callback
def _receive_loop(self):

@@ -48,0 +66,0 @@ """接收消息循环"""

@@ -70,5 +70,4 @@ #!/usr/bin/env python

self.cleanup_scheduler.stop_cleanup()
# self.queue_operation.close_all_connections()
__all__ = ["StandardQueueScheduler"]

@@ -41,8 +41,9 @@ #!/usr/bin/env python

for data in change_data_items:
key = data[6]
new_value = data[7]
key = data[3]
new_value = data[5]
delete_id = data[0]
self.executor.submit(
self._process_listen_data, key, new_value, delete_id
)
if self.is_running:
self.executor.submit(
self._process_listen_data, key, new_value, delete_id
)

@@ -49,0 +50,0 @@ def start_listen_data(self):

@@ -19,4 +19,7 @@ #!/usr/bin/env python

import logging
from collections import namedtuple
SendMessageCallback = namedtuple("send_message_callback", ["message", "callback"])
class ReceiveScheduler:

@@ -39,3 +42,2 @@ def __init__(

callback = lambda message: logging.info(f"receive message: {message.id}")
# self.queue_operation.enqueue(message.to_dict())
self.queue_operation.enqueue(message.to_dict_by_core())

@@ -45,2 +47,17 @@ with self.lock:

def send_message_batch(self, message_callback_list: list[SendMessageCallback]):
message_list = [
message_callback.message.to_dict_by_core()
for message_callback in message_callback_list
]
if message_list:
self.queue_operation.enqueue_batch(message_list)
with self.lock:
for message_callback in message_callback_list:
message = message_callback.message
callback = message_callback.callback
if callback is None:
callback = lambda m: logging.info(f"receive message: {m.id}")
self.callbacks[message.id] = callback
def receive_message(self):

@@ -47,0 +64,0 @@ """单一轮询线程,并行执行回调"""

@@ -14,5 +14,3 @@ import queue_sqlite_core as core

print(
task_mounter.get_task_function("task")({"num": 1})(lambda x: x + 1).meta[ # type: ignore
"num"
]
f"task: {task_mounter.get_task_function("task")({"num": 1})(lambda x: x + 1).meta}"
)

@@ -22,3 +20,3 @@ from tasks import example

task_mounter.get_task_list()
print(task_mounter.get_task_function("<lambda>"))
print(task_mounter.get_task_function("<lambda>")(1))

@@ -25,0 +23,0 @@ @classmethod

@@ -8,3 +8,3 @@ from listen import *

def test_listen_data(self):
scheduler = QueueScheduler(scheduler_type="qt")
scheduler = QueueScheduler()
scheduler.start()

@@ -11,0 +11,0 @@ scheduler.update_listen_data("key_1", "value_1")