queue-sqlite
Advanced tools
+270
-149
| Metadata-Version: 2.4 | ||
| Name: queue-sqlite | ||
| Version: 0.2.1 | ||
| Version: 0.2.2 | ||
| Summary: A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems. | ||
@@ -28,226 +28,347 @@ Author-email: chakcy <947105045@qq.com> | ||
| # SQLite 任务队列系统 | ||
| # Queue SQLite - 高性能 SQLite 任务队列系统 | ||
| 这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。 | ||
|  | ||
|  | ||
|  | ||
|  | ||
| ## 主要特性 | ||
| - 🚀 高性能:使用 Rust 实现核心操作,确保高性能 | ||
| - 📚 多分片支持:支持数据库分片,提高并发处理能力 | ||
| 一个基于 SQLite 的高性能任务队列系统,采用 Rust 核心操作,支持任务挂载、消息监听、优先级处理、重试机制和自动清理过期消息。适合构建可靠、可扩展的后台任务处理系统。 | ||
| - ⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器 | ||
| ## 🌟 特性 | ||
| - 🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理 | ||
| ### 核心优势 | ||
| - 📊 监控支持:内置资源监控功能 | ||
| - 🚀 高性能:Rust 核心提供毫秒级任务处理 | ||
| - 💾 持久化存储:基于 SQLite 的可靠消息存储 | ||
| - 🔄 多调度器支持:标准、异步、Qt 三种调度模式 | ||
| - 🎯 智能分片:自动哈希分片,支持横向扩展 | ||
| - 📊 全面监控:内置资源使用监控和队列状态查看 | ||
| - 🧩 任务挂载系统:通过装饰器轻松添加新任务 | ||
| ### 功能亮点 | ||
| ## 项目结构 | ||
| ``` | ||
| src/ | ||
| ├── core/ # Rust 核心实现 | ||
| │ ├── lib.rs # 主模块 | ||
| │ ├── queue_operation.rs # 队列操作实现 | ||
| │ └── task_mounter.rs # 任务挂载实现 | ||
| │ | ||
| ├── queue_sqlite/ # Python 实现 | ||
| │ ├── constant/ # 常量定义(消息优先级、状态、类型) | ||
| │ ├── core/ # 核心接口 | ||
| │ ├── model/ # 数据模型 | ||
| │ ├── queue_operation/ # 队列操作封装 | ||
| │ ├── scheduler/ # 调度器实现 | ||
| │ └── task_cycle/ # 任务生命周期管理 | ||
| │ | ||
| tests/ # 测试代码 | ||
| ``` | ||
| - ✅ 任务装饰器:使用 @task 装饰器轻松注册任务 | ||
| - ✅ 监听装饰器:使用 @listener 装饰器实现数据变更监听 | ||
| - ✅ 优先级队列:支持 LOW/NORMAL/HIGH/URGENT 四级优先级 | ||
| - ✅ 重试机制:可配置的最大重试次数和延迟重试 | ||
| - ✅ 过期清理:自动清理过期和完成的消息 | ||
| - ✅ 批量操作:支持消息批量入队和处理 | ||
| - ✅ 异步支持:原生支持 async/await 异步任务 | ||
| - ✅ Qt 集成:可选 Qt 调度器用于 GUI 应用 | ||
| ## 核心组件 | ||
| ## 📦 安装 | ||
| 1. 消息模型 (MessageItem) | ||
| 定义了任务消息的数据结构,包含: | ||
| ### 前置要求 | ||
| - 消息ID、类型、状态 | ||
| - Python 3.11+ | ||
| - Rust 1.65+ (用于编译核心扩展) | ||
| - SQLite 3.35+ | ||
| - 内容、创建时间、更新时间 | ||
| ### 安装方式 | ||
| - 优先级、来源、目标 | ||
| #### 方式一:从源码安装(推荐) | ||
| - 重试次数、过期时间 | ||
| ```shell | ||
| # 克隆仓库 | ||
| git clone https://github.com/chakcy/queue_sqlite.git | ||
| cd queue_sqlite | ||
| - 标签和元数据 | ||
| # 安装 Rust(如果未安装) | ||
| curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh | ||
| 2. 队列操作 (QueueOperation) | ||
| 提供对 SQLite 队列的基本操作: | ||
| # 安装 Python 依赖 | ||
| pip install -r requirements.txt | ||
| - 初始化数据库 | ||
| # 安装开发模式 | ||
| pip install -e . | ||
| ``` | ||
| - 入队和出队操作 | ||
| #### 方式二:从 PyPI 安装 | ||
| - 获取队列长度和已完成消息 | ||
| ```shell | ||
| pip install queue-sqlite | ||
| ``` | ||
| - 更新状态和结果 | ||
| ## 🚀 快速开始 | ||
| - 删除消息和清理过期消息 | ||
| ### 基本使用 | ||
| 3. 调度系统 | ||
| ```python | ||
| from queue_sqlite.scheduler import QueueScheduler | ||
| from queue_sqlite.model import MessageItem | ||
| from queue_sqlite.constant import MessagePriority | ||
| from queue_sqlite.mounter import task | ||
| 包含三个主要调度器: | ||
| ***接收调度器 (ReceiveScheduler)*** | ||
| - 处理消息发送 | ||
| # 1. 注册任务 | ||
| @task(meta={"max_retries": 3, "delay": 1}) | ||
| def process_image(message_item): | ||
| """处理图片任务""" | ||
| data = message_item.content | ||
| # 处理逻辑 | ||
| return {"status": "success", "processed": data["image_id"]} | ||
| - 管理回调函数 | ||
| - 接收已完成消息 | ||
| # 2. 创建调度器 | ||
| scheduler = QueueScheduler(scheduler_type="standard") | ||
| ***任务调度器 (TaskScheduler)*** | ||
| - 从队列中获取任务 | ||
| # 3. 启动调度器 | ||
| scheduler.start() | ||
| - 调用任务函数 | ||
| # 4. 发送任务 | ||
| for i in range(10): | ||
| message = MessageItem( | ||
| content={"image_id": i, "path": f"/images/{i}.jpg"}, | ||
| destination="process_image", # 任务函数名 | ||
| priority=MessagePriority.HIGH, # HIGH 优先级 | ||
| tags="image_processing", | ||
| ) | ||
| - 更新任务状态和结果 | ||
| def callback(result_message): | ||
| print(f"任务完成: {result_message.id}, 结果: {result_message.result}") | ||
| ***清理调度器 (CleanupScheduler)*** | ||
| - 清理过期消息 | ||
| scheduler.send_message(message, callback) | ||
| - 删除旧消息(默认清理7天前的消息) | ||
| # 5. 等待任务完成 | ||
| import time | ||
| 4. 任务挂载系统 (TaskMounter) | ||
| 提供装饰器挂载任务函数: | ||
| while scheduler.queue_operation.get_queue_length() > 0: | ||
| print(f"剩余任务: {scheduler.queue_operation.get_queue_length()}") | ||
| time.sleep(1) | ||
| ```python | ||
| @TaskMounter.task(meta={"task_name": "example"}) | ||
| def example_task(message_item: MessageItem): | ||
| # 任务逻辑 | ||
| return result | ||
| ``` | ||
| ## 使用示例 | ||
| # 6. 停止调度器 | ||
| scheduler.stop() | ||
| ``` | ||
| ### 基本使用 | ||
| ### 异步任务示例 | ||
| ```python | ||
| import asyncio | ||
| from queue_sqlite.scheduler import QueueScheduler | ||
| from queue_sqlite.model import MessageItem | ||
| from queue_sqlite.mounter import task | ||
| # 初始化调度器 | ||
| scheduler = QueueScheduler( | ||
| receive_thread_num=4, | ||
| task_thread_num=8, | ||
| shard_num=12 | ||
| ) | ||
| # 启动调度器 | ||
| scheduler.start_queue_scheduler() | ||
| @task(meta={"name": "async_processor", "max_retries": 2}) | ||
| async def async_data_fetcher(message_item): | ||
| """异步数据获取任务""" | ||
| url = message_item.content["url"] | ||
| # 模拟异步 HTTP 请求 | ||
| await asyncio.sleep(0.5) | ||
| return {"url": url, "data": "fetched", "status": 200} | ||
| # 创建消息 | ||
| message = MessageItem( | ||
| content={"data": "example"}, | ||
| destination="task_name" | ||
| ) | ||
| # 定义回调函数 | ||
| def callback(message_item): | ||
| print(f"任务完成: {message_item.id}") | ||
| async def main(): | ||
| scheduler = QueueScheduler(scheduler_type="async") | ||
| scheduler.start() | ||
| # 发送消息 | ||
| scheduler.send_message(message, callback) | ||
| # 发送异步任务 | ||
| message = MessageItem( | ||
| content={"url": "https://api.example.com/data"}, | ||
| destination="async_data_fetcher", | ||
| ) | ||
| # ... 程序运行 ... | ||
| scheduler.send_message(message, lambda m: print(f"完成: {m.id}")) | ||
| # 停止调度器 | ||
| scheduler.stop_queue_scheduler() | ||
| await asyncio.sleep(5) | ||
| scheduler.stop() | ||
| asyncio.run(main()) | ||
| ``` | ||
| ### 压力测试 | ||
| ### 数据监听示例 | ||
| ```python | ||
| # tests/test_stress.py | ||
| class TestStress: | ||
| @classmethod | ||
| def test_stress(cls): | ||
| TASK_NUM = 10000 | ||
| scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12) | ||
| scheduler.start_queue_scheduler() | ||
| # 发送大量任务 | ||
| for i in range(TASK_NUM): | ||
| message = MessageItem(content={"num": i}, destination="example") | ||
| scheduler.send_message(message, cls._callback) | ||
| # 等待所有任务完成 | ||
| while scheduler.queue_operation.get_queue_length() > 0: | ||
| time.sleep(0.5) | ||
| scheduler.stop_queue_scheduler() | ||
| from queue_sqlite import QueueScheduler | ||
| from queue_sqlite.mounter import listener | ||
| # 注册监听器 | ||
| @listener() | ||
| def user_activity_log(data): | ||
| """监听用户活动数据""" | ||
| print(f"用户活动: {data}") | ||
| @listener() | ||
| def system_alert(data): | ||
| """监听系统告警""" | ||
| print(f"系统告警: {data}") | ||
| # 创建调度器 | ||
| scheduler = QueueScheduler() | ||
| scheduler.start() | ||
| # 更新监听数据(会自动触发监听函数) | ||
| scheduler.update_listen_data("user_activity_log", "用户登录") | ||
| scheduler.update_listen_data("user_activity_log", "用户购买") | ||
| scheduler.update_listen_data("system_alert", "CPU使用率过高") | ||
| ``` | ||
| ## 安装与运行 | ||
| ## ⚙️ 配置选项 | ||
| ### 前提条件 | ||
| Python 3.7+ | ||
| ### 调度器配置 | ||
| Rust 工具链 | ||
| ```python | ||
| from queue_sqlite import SchedulerConfig, QueueScheduler | ||
| SQLite 开发文件 | ||
| config = SchedulerConfig( | ||
| receive_thread_num=2, # 接收线程数 | ||
| task_thread_num=8, # 任务执行线程数 | ||
| shard_num=4, # 数据库分片数 | ||
| queue_name="production", # 队列名称 | ||
| meta={"app": "myapp"} # 自定义元数据 | ||
| ) | ||
| ### 安装步骤 | ||
| 1. 克隆仓库: | ||
| scheduler = QueueScheduler( | ||
| scheduler_type="standard", # standard | async | qt | ||
| config=config | ||
| ) | ||
| ``` | ||
| ```bash | ||
| git clone https://gitee.com/cai-xinpenge/queue_sqlite.git | ||
| cd sqlite-task-queue | ||
| ``` | ||
| ### 消息配置 | ||
| 2. 安装 Python 依赖: | ||
| ```python | ||
| from queue_sqlite import MessageItem | ||
| from queue_sqlite.constant import MessagePriority, MessageType | ||
| from datetime import datetime, timedelta | ||
| ```bash | ||
| pip install -r requirements.txt | ||
| ``` | ||
| message = MessageItem( | ||
| # 必需字段 | ||
| content={"data": "任务数据"}, | ||
| destination="task_function_name", | ||
| # 可选字段 | ||
| id="custom-uuid", # 默认自动生成 | ||
| type=MessageType.TASK, | ||
| priority=MessagePriority.HIGH, | ||
| source="web_api", | ||
| tags="urgent,processing", | ||
| # 时间控制 | ||
| expire_time=datetime.now() + timedelta(hours=1), # 1小时后过期 | ||
| retry_count=0, | ||
| # 自定义元数据 | ||
| metadata={"user_id": 123, "request_id": "abc123"} | ||
| ) | ||
| ``` | ||
| 3. 构建 Rust 核心模块: | ||
| ## 📊 系统架构 | ||
| ```bash | ||
| cd src/core | ||
| maturin develop --release | ||
| ``` | ||
| 将会在 `src/core/target/release` 目录下生成 `core.dll` 或 `core.so` 文件。 | ||
| 将该文件复制到 `queue_sqlite/core` 目录下(dll文件需改名为pyd后缀)。 | ||
| ### 架构图 | ||
| 4. 运行测试: | ||
| ```text | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Python application | | ||
| │ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │ | ||
| │ │ @task │ │ @listener │ │ QueueScheduler │ │ | ||
| │ │ │ │ │ │ │ │ | ||
| │ └─────────────┘ └─────────────┘ └────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Python Service │ | ||
| │ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ │ | ||
| │ │ TaskMounter │ │ TaskCycle │ │ Schedulers │ │ | ||
| │ │ ListenMounter│ │ AsyncCycle │ │ │ │ | ||
| │ └──────────────┘ └─────────────┘ └─────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Rust core │ | ||
| │ ┌─────────────────────────────────────────┐ │ | ||
| │ │ queue_sqlite_core │ │ | ||
| │ │ • shared sqlite database │ │ | ||
| │ │ • SQLite Optimization │ │ | ||
| │ │ • Connection pool │ │ | ||
| │ └─────────────────────────────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ SQLite database │ | ||
| │ ┌───────────────────────────────────────────────┐ │ | ||
| │ │ shared database (cache/queue_name/) │ │ | ||
| │ │ • queue_shard_0.db │ │ | ||
| │ │ • queue_shard_1.db │ │ | ||
| │ │ • listen.db │ │ | ||
| │ └───────────────────────────────────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| ``` | ||
| ```bash | ||
| pytest tests/ | ||
| ``` | ||
| ### 组件说明 | ||
| 5. 性能指标 | ||
| 1. **MessageIte**: 核心数据模型,包含消息的所有属性和方法 | ||
| 2. **TaskMounter**: 任务过载器,通过装饰器注册任务函数 | ||
| 3. **ListenMounter**:监听挂载器,通过装饰器注册监听函数 | ||
| 4. **TaskCycle**:任务生命周期管理器,处理重试和状态更新 | ||
| 5. **QueueScheduler**:统一调度器接口,支持三种实现: | ||
| - **StandardQueueScheduler**:统一调度器接口,支持三种实现: | ||
| - **AsyncQueueScheduler**:异步/等待实现 | ||
| - **QtQueueScheduler**:Qt 线程池实现(GUI应用) | ||
| 6. **CleanupScheduler**:自动清理过期消息 | ||
| 7. **ShardedQueueOperation**:Rust 实现的高性能分片队列操作 | ||
| 在标准开发机器上(8核CPU,16GB内存): | ||
| ## 🧪 测试 | ||
| 可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟 | ||
| ### 运行测试套件 | ||
| 平均任务延迟 < 50ms | ||
| ```bash | ||
| # 运行所有测试 | ||
| python -m -v -s pytest tests/ | ||
| CPU 使用率 < 60% | ||
| # 运行特定测试 | ||
| python -m pytest tests/test_stress.py -v | ||
| python -m pytest tests/test_async_scheduler.py -v | ||
| ``` | ||
| 内存占用 < 500MB | ||
| ### 性能测试示例 | ||
| ### 贡献指南 | ||
| ```python | ||
| from tests.test_stress import TestStress | ||
| 欢迎贡献代码!请遵循以下步骤: | ||
| # 压力测试:处理 10000 个任务 | ||
| TestStress.test_stress() | ||
| 1. Fork 仓库 | ||
| # 异步调度器测试 | ||
| from tests.test_async_scheduler import TestAsyncScheduler | ||
| TestAsyncScheduler.test_async_scheduler() | ||
| ``` | ||
| 创建新分支 (git checkout -b feature/your-feature) | ||
| ## 📈 性能指标 | ||
| 提交更改 (git commit -am 'Add some feature') | ||
| ### 基准测试结果 | ||
| 推送到分支 (git push origin feature/your-feature) | ||
| | 指标 | 标准调度器 | 异步调度器 | Qt 调度器 | | ||
| | --- | ---------- | ---------- | --------- | | ||
| | 单核 QPS | 5,000+ | 8,000+ | 6,000+ | | ||
| | 内存占用 | 50-100MB | 60-120MB | 70-150MB | | ||
| | 延迟(p95) | <50ms | <30ms | <40ms | | ||
| | 最大并发 | 1,000+ | 2,000+ | 1500+ | | ||
| 2. 创建 Pull Request | ||
| ### 扩展性测试 | ||
| ### 许可证 | ||
| - 10 分片:支持 50,000+ 并发任务 | ||
| - 自动负载均衡:分片间任务均匀分布 | ||
| - 线性扩展:增加分片数可线性提升吞吐量 | ||
| 本项目采用 MIT 许可证。 | ||
| ## 📄 许可证 | ||
| 本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。 | ||
| ## 📞 联系与支持 | ||
| - **作者**: chakcy | ||
| - **邮箱**: 947105045@qq.com | ||
| ## 🙏 致谢 | ||
| 感谢以下开源项目: | ||
| - [SQLite](https://www.sqlite.org/) - 轻量级嵌入式数据库 | ||
| - [PyO3](https://pyo3.rs/) - Rust-Python 绑定 | ||
| - [r2d2](https://github.com/sfackler/r2d2) - Rust 数据库连接池 | ||
| --- | ||
| **Queue SQLite** - 为您的应用提供可靠、高效的任务队列解决方案。 |
+4
-4
@@ -22,3 +22,3 @@ [project] | ||
| requires-python = ">=3.11" | ||
| version = "0.2.1" | ||
| version = "0.2.2" | ||
@@ -30,5 +30,5 @@ [project.optional-dependencies] | ||
| # [[tool.uv.index]] | ||
| # default = true | ||
| # url = "https://pypi.tuna.tsinghua.edu.cn/simple" | ||
| [[tool.uv.index]] | ||
| default = true | ||
| url = "https://pypi.tuna.tsinghua.edu.cn/simple" | ||
@@ -35,0 +35,0 @@ [build-system] |
+269
-148
@@ -1,225 +0,346 @@ | ||
| # SQLite 任务队列系统 | ||
| # Queue SQLite - 高性能 SQLite 任务队列系统 | ||
| 这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。 | ||
|  | ||
|  | ||
|  | ||
|  | ||
| ## 主要特性 | ||
| - 🚀 高性能:使用 Rust 实现核心操作,确保高性能 | ||
| - 📚 多分片支持:支持数据库分片,提高并发处理能力 | ||
| 一个基于 SQLite 的高性能任务队列系统,采用 Rust 核心操作,支持任务挂载、消息监听、优先级处理、重试机制和自动清理过期消息。适合构建可靠、可扩展的后台任务处理系统。 | ||
| - ⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器 | ||
| ## 🌟 特性 | ||
| - 🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理 | ||
| ### 核心优势 | ||
| - 📊 监控支持:内置资源监控功能 | ||
| - 🚀 高性能:Rust 核心提供毫秒级任务处理 | ||
| - 💾 持久化存储:基于 SQLite 的可靠消息存储 | ||
| - 🔄 多调度器支持:标准、异步、Qt 三种调度模式 | ||
| - 🎯 智能分片:自动哈希分片,支持横向扩展 | ||
| - 📊 全面监控:内置资源使用监控和队列状态查看 | ||
| - 🧩 任务挂载系统:通过装饰器轻松添加新任务 | ||
| ### 功能亮点 | ||
| ## 项目结构 | ||
| ``` | ||
| src/ | ||
| ├── core/ # Rust 核心实现 | ||
| │ ├── lib.rs # 主模块 | ||
| │ ├── queue_operation.rs # 队列操作实现 | ||
| │ └── task_mounter.rs # 任务挂载实现 | ||
| │ | ||
| ├── queue_sqlite/ # Python 实现 | ||
| │ ├── constant/ # 常量定义(消息优先级、状态、类型) | ||
| │ ├── core/ # 核心接口 | ||
| │ ├── model/ # 数据模型 | ||
| │ ├── queue_operation/ # 队列操作封装 | ||
| │ ├── scheduler/ # 调度器实现 | ||
| │ └── task_cycle/ # 任务生命周期管理 | ||
| │ | ||
| tests/ # 测试代码 | ||
| ``` | ||
| - ✅ 任务装饰器:使用 @task 装饰器轻松注册任务 | ||
| - ✅ 监听装饰器:使用 @listener 装饰器实现数据变更监听 | ||
| - ✅ 优先级队列:支持 LOW/NORMAL/HIGH/URGENT 四级优先级 | ||
| - ✅ 重试机制:可配置的最大重试次数和延迟重试 | ||
| - ✅ 过期清理:自动清理过期和完成的消息 | ||
| - ✅ 批量操作:支持消息批量入队和处理 | ||
| - ✅ 异步支持:原生支持 async/await 异步任务 | ||
| - ✅ Qt 集成:可选 Qt 调度器用于 GUI 应用 | ||
| ## 核心组件 | ||
| ## 📦 安装 | ||
| 1. 消息模型 (MessageItem) | ||
| 定义了任务消息的数据结构,包含: | ||
| ### 前置要求 | ||
| - 消息ID、类型、状态 | ||
| - Python 3.11+ | ||
| - Rust 1.65+ (用于编译核心扩展) | ||
| - SQLite 3.35+ | ||
| - 内容、创建时间、更新时间 | ||
| ### 安装方式 | ||
| - 优先级、来源、目标 | ||
| #### 方式一:从源码安装(推荐) | ||
| - 重试次数、过期时间 | ||
| ```shell | ||
| # 克隆仓库 | ||
| git clone https://github.com/chakcy/queue_sqlite.git | ||
| cd queue_sqlite | ||
| - 标签和元数据 | ||
| # 安装 Rust(如果未安装) | ||
| curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh | ||
| 2. 队列操作 (QueueOperation) | ||
| 提供对 SQLite 队列的基本操作: | ||
| # 安装 Python 依赖 | ||
| pip install -r requirements.txt | ||
| - 初始化数据库 | ||
| # 安装开发模式 | ||
| pip install -e . | ||
| ``` | ||
| - 入队和出队操作 | ||
| #### 方式二:从 PyPI 安装 | ||
| - 获取队列长度和已完成消息 | ||
| ```shell | ||
| pip install queue-sqlite | ||
| ``` | ||
| - 更新状态和结果 | ||
| ## 🚀 快速开始 | ||
| - 删除消息和清理过期消息 | ||
| ### 基本使用 | ||
| 3. 调度系统 | ||
| ```python | ||
| from queue_sqlite.scheduler import QueueScheduler | ||
| from queue_sqlite.model import MessageItem | ||
| from queue_sqlite.constant import MessagePriority | ||
| from queue_sqlite.mounter import task | ||
| 包含三个主要调度器: | ||
| ***接收调度器 (ReceiveScheduler)*** | ||
| - 处理消息发送 | ||
| # 1. 注册任务 | ||
| @task(meta={"max_retries": 3, "delay": 1}) | ||
| def process_image(message_item): | ||
| """处理图片任务""" | ||
| data = message_item.content | ||
| # 处理逻辑 | ||
| return {"status": "success", "processed": data["image_id"]} | ||
| - 管理回调函数 | ||
| - 接收已完成消息 | ||
| # 2. 创建调度器 | ||
| scheduler = QueueScheduler(scheduler_type="standard") | ||
| ***任务调度器 (TaskScheduler)*** | ||
| - 从队列中获取任务 | ||
| # 3. 启动调度器 | ||
| scheduler.start() | ||
| - 调用任务函数 | ||
| # 4. 发送任务 | ||
| for i in range(10): | ||
| message = MessageItem( | ||
| content={"image_id": i, "path": f"/images/{i}.jpg"}, | ||
| destination="process_image", # 任务函数名 | ||
| priority=MessagePriority.HIGH, # HIGH 优先级 | ||
| tags="image_processing", | ||
| ) | ||
| - 更新任务状态和结果 | ||
| def callback(result_message): | ||
| print(f"任务完成: {result_message.id}, 结果: {result_message.result}") | ||
| ***清理调度器 (CleanupScheduler)*** | ||
| - 清理过期消息 | ||
| scheduler.send_message(message, callback) | ||
| - 删除旧消息(默认清理7天前的消息) | ||
| # 5. 等待任务完成 | ||
| import time | ||
| 4. 任务挂载系统 (TaskMounter) | ||
| 提供装饰器挂载任务函数: | ||
| while scheduler.queue_operation.get_queue_length() > 0: | ||
| print(f"剩余任务: {scheduler.queue_operation.get_queue_length()}") | ||
| time.sleep(1) | ||
| ```python | ||
| @TaskMounter.task(meta={"task_name": "example"}) | ||
| def example_task(message_item: MessageItem): | ||
| # 任务逻辑 | ||
| return result | ||
| ``` | ||
| ## 使用示例 | ||
| # 6. 停止调度器 | ||
| scheduler.stop() | ||
| ``` | ||
| ### 基本使用 | ||
| ### 异步任务示例 | ||
| ```python | ||
| import asyncio | ||
| from queue_sqlite.scheduler import QueueScheduler | ||
| from queue_sqlite.model import MessageItem | ||
| from queue_sqlite.mounter import task | ||
| # 初始化调度器 | ||
| scheduler = QueueScheduler( | ||
| receive_thread_num=4, | ||
| task_thread_num=8, | ||
| shard_num=12 | ||
| ) | ||
| # 启动调度器 | ||
| scheduler.start_queue_scheduler() | ||
| @task(meta={"name": "async_processor", "max_retries": 2}) | ||
| async def async_data_fetcher(message_item): | ||
| """异步数据获取任务""" | ||
| url = message_item.content["url"] | ||
| # 模拟异步 HTTP 请求 | ||
| await asyncio.sleep(0.5) | ||
| return {"url": url, "data": "fetched", "status": 200} | ||
| # 创建消息 | ||
| message = MessageItem( | ||
| content={"data": "example"}, | ||
| destination="task_name" | ||
| ) | ||
| # 定义回调函数 | ||
| def callback(message_item): | ||
| print(f"任务完成: {message_item.id}") | ||
| async def main(): | ||
| scheduler = QueueScheduler(scheduler_type="async") | ||
| scheduler.start() | ||
| # 发送消息 | ||
| scheduler.send_message(message, callback) | ||
| # 发送异步任务 | ||
| message = MessageItem( | ||
| content={"url": "https://api.example.com/data"}, | ||
| destination="async_data_fetcher", | ||
| ) | ||
| # ... 程序运行 ... | ||
| scheduler.send_message(message, lambda m: print(f"完成: {m.id}")) | ||
| # 停止调度器 | ||
| scheduler.stop_queue_scheduler() | ||
| await asyncio.sleep(5) | ||
| scheduler.stop() | ||
| asyncio.run(main()) | ||
| ``` | ||
| ### 压力测试 | ||
| ### 数据监听示例 | ||
| ```python | ||
| # tests/test_stress.py | ||
| class TestStress: | ||
| @classmethod | ||
| def test_stress(cls): | ||
| TASK_NUM = 10000 | ||
| scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12) | ||
| scheduler.start_queue_scheduler() | ||
| # 发送大量任务 | ||
| for i in range(TASK_NUM): | ||
| message = MessageItem(content={"num": i}, destination="example") | ||
| scheduler.send_message(message, cls._callback) | ||
| # 等待所有任务完成 | ||
| while scheduler.queue_operation.get_queue_length() > 0: | ||
| time.sleep(0.5) | ||
| scheduler.stop_queue_scheduler() | ||
| from queue_sqlite import QueueScheduler | ||
| from queue_sqlite.mounter import listener | ||
| # 注册监听器 | ||
| @listener() | ||
| def user_activity_log(data): | ||
| """监听用户活动数据""" | ||
| print(f"用户活动: {data}") | ||
| @listener() | ||
| def system_alert(data): | ||
| """监听系统告警""" | ||
| print(f"系统告警: {data}") | ||
| # 创建调度器 | ||
| scheduler = QueueScheduler() | ||
| scheduler.start() | ||
| # 更新监听数据(会自动触发监听函数) | ||
| scheduler.update_listen_data("user_activity_log", "用户登录") | ||
| scheduler.update_listen_data("user_activity_log", "用户购买") | ||
| scheduler.update_listen_data("system_alert", "CPU使用率过高") | ||
| ``` | ||
| ## 安装与运行 | ||
| ## ⚙️ 配置选项 | ||
| ### 前提条件 | ||
| Python 3.7+ | ||
| ### 调度器配置 | ||
| Rust 工具链 | ||
| ```python | ||
| from queue_sqlite import SchedulerConfig, QueueScheduler | ||
| SQLite 开发文件 | ||
| config = SchedulerConfig( | ||
| receive_thread_num=2, # 接收线程数 | ||
| task_thread_num=8, # 任务执行线程数 | ||
| shard_num=4, # 数据库分片数 | ||
| queue_name="production", # 队列名称 | ||
| meta={"app": "myapp"} # 自定义元数据 | ||
| ) | ||
| ### 安装步骤 | ||
| 1. 克隆仓库: | ||
| scheduler = QueueScheduler( | ||
| scheduler_type="standard", # standard | async | qt | ||
| config=config | ||
| ) | ||
| ``` | ||
| ```bash | ||
| git clone https://gitee.com/cai-xinpenge/queue_sqlite.git | ||
| cd sqlite-task-queue | ||
| ``` | ||
| ### 消息配置 | ||
| 2. 安装 Python 依赖: | ||
| ```python | ||
| from queue_sqlite import MessageItem | ||
| from queue_sqlite.constant import MessagePriority, MessageType | ||
| from datetime import datetime, timedelta | ||
| ```bash | ||
| pip install -r requirements.txt | ||
| ``` | ||
| message = MessageItem( | ||
| # 必需字段 | ||
| content={"data": "任务数据"}, | ||
| destination="task_function_name", | ||
| # 可选字段 | ||
| id="custom-uuid", # 默认自动生成 | ||
| type=MessageType.TASK, | ||
| priority=MessagePriority.HIGH, | ||
| source="web_api", | ||
| tags="urgent,processing", | ||
| # 时间控制 | ||
| expire_time=datetime.now() + timedelta(hours=1), # 1小时后过期 | ||
| retry_count=0, | ||
| # 自定义元数据 | ||
| metadata={"user_id": 123, "request_id": "abc123"} | ||
| ) | ||
| ``` | ||
| 3. 构建 Rust 核心模块: | ||
| ## 📊 系统架构 | ||
| ```bash | ||
| cd src/core | ||
| maturin develop --release | ||
| ``` | ||
| 将会在 `src/core/target/release` 目录下生成 `core.dll` 或 `core.so` 文件。 | ||
| 将该文件复制到 `queue_sqlite/core` 目录下(dll文件需改名为pyd后缀)。 | ||
| ### 架构图 | ||
| 4. 运行测试: | ||
| ```text | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Python application | | ||
| │ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │ | ||
| │ │ @task │ │ @listener │ │ QueueScheduler │ │ | ||
| │ │ │ │ │ │ │ │ | ||
| │ └─────────────┘ └─────────────┘ └────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Python Service │ | ||
| │ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ │ | ||
| │ │ TaskMounter │ │ TaskCycle │ │ Schedulers │ │ | ||
| │ │ ListenMounter│ │ AsyncCycle │ │ │ │ | ||
| │ └──────────────┘ └─────────────┘ └─────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Rust core │ | ||
| │ ┌─────────────────────────────────────────┐ │ | ||
| │ │ queue_sqlite_core │ │ | ||
| │ │ • shared sqlite database │ │ | ||
| │ │ • SQLite Optimization │ │ | ||
| │ │ • Connection pool │ │ | ||
| │ └─────────────────────────────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ SQLite database │ | ||
| │ ┌───────────────────────────────────────────────┐ │ | ||
| │ │ shared database (cache/queue_name/) │ │ | ||
| │ │ • queue_shard_0.db │ │ | ||
| │ │ • queue_shard_1.db │ │ | ||
| │ │ • listen.db │ │ | ||
| │ └───────────────────────────────────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| ``` | ||
| ```bash | ||
| pytest tests/ | ||
| ``` | ||
| ### 组件说明 | ||
| 5. 性能指标 | ||
| 1. **MessageIte**: 核心数据模型,包含消息的所有属性和方法 | ||
| 2. **TaskMounter**: 任务过载器,通过装饰器注册任务函数 | ||
| 3. **ListenMounter**:监听挂载器,通过装饰器注册监听函数 | ||
| 4. **TaskCycle**:任务生命周期管理器,处理重试和状态更新 | ||
| 5. **QueueScheduler**:统一调度器接口,支持三种实现: | ||
| - **StandardQueueScheduler**:统一调度器接口,支持三种实现: | ||
| - **AsyncQueueScheduler**:异步/等待实现 | ||
| - **QtQueueScheduler**:Qt 线程池实现(GUI应用) | ||
| 6. **CleanupScheduler**:自动清理过期消息 | ||
| 7. **ShardedQueueOperation**:Rust 实现的高性能分片队列操作 | ||
| 在标准开发机器上(8核CPU,16GB内存): | ||
| ## 🧪 测试 | ||
| 可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟 | ||
| ### 运行测试套件 | ||
| 平均任务延迟 < 50ms | ||
| ```bash | ||
| # 运行所有测试 | ||
| python -m -v -s pytest tests/ | ||
| CPU 使用率 < 60% | ||
| # 运行特定测试 | ||
| python -m pytest tests/test_stress.py -v | ||
| python -m pytest tests/test_async_scheduler.py -v | ||
| ``` | ||
| 内存占用 < 500MB | ||
| ### 性能测试示例 | ||
| ### 贡献指南 | ||
| ```python | ||
| from tests.test_stress import TestStress | ||
| 欢迎贡献代码!请遵循以下步骤: | ||
| # 压力测试:处理 10000 个任务 | ||
| TestStress.test_stress() | ||
| 1. Fork 仓库 | ||
| # 异步调度器测试 | ||
| from tests.test_async_scheduler import TestAsyncScheduler | ||
| TestAsyncScheduler.test_async_scheduler() | ||
| ``` | ||
| 创建新分支 (git checkout -b feature/your-feature) | ||
| ## 📈 性能指标 | ||
| 提交更改 (git commit -am 'Add some feature') | ||
| ### 基准测试结果 | ||
| 推送到分支 (git push origin feature/your-feature) | ||
| | 指标 | 标准调度器 | 异步调度器 | Qt 调度器 | | ||
| | --- | ---------- | ---------- | --------- | | ||
| | 单核 QPS | 5,000+ | 8,000+ | 6,000+ | | ||
| | 内存占用 | 50-100MB | 60-120MB | 70-150MB | | ||
| | 延迟(p95) | <50ms | <30ms | <40ms | | ||
| | 最大并发 | 1,000+ | 2,000+ | 1500+ | | ||
| 2. 创建 Pull Request | ||
| ### 扩展性测试 | ||
| ### 许可证 | ||
| - 10 分片:支持 50,000+ 并发任务 | ||
| - 自动负载均衡:分片间任务均匀分布 | ||
| - 线性扩展:增加分片数可线性提升吞吐量 | ||
| 本项目采用 MIT 许可证。 | ||
| ## 📄 许可证 | ||
| 本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。 | ||
| ## 📞 联系与支持 | ||
| - **作者**: chakcy | ||
| - **邮箱**: 947105045@qq.com | ||
| ## 🙏 致谢 | ||
| 感谢以下开源项目: | ||
| - [SQLite](https://www.sqlite.org/) - 轻量级嵌入式数据库 | ||
| - [PyO3](https://pyo3.rs/) - Rust-Python 绑定 | ||
| - [r2d2](https://github.com/sfackler/r2d2) - Rust 数据库连接池 | ||
| --- | ||
| **Queue SQLite** - 为您的应用提供可靠、高效的任务队列解决方案。 |
| import os | ||
| import shutil | ||
| # 需要遍历的目录 | ||
@@ -4,0 +5,0 @@ root_dir = "./" |
@@ -489,3 +489,3 @@ # This file is automatically @generated by Cargo. | ||
| name = "queue_sqlite_core" | ||
| version = "0.2.0" | ||
| version = "0.2.1" | ||
| dependencies = [ | ||
@@ -492,0 +492,0 @@ "chrono", |
| [package] | ||
| name = "queue_sqlite_core" | ||
| version = "0.2.0" | ||
| version = "0.2.1" | ||
| edition = "2021" | ||
@@ -5,0 +5,0 @@ |
@@ -7,2 +7,3 @@ from typing import Callable | ||
| def enqueue(self, message: dict) -> str: ... | ||
| def enqueue_batch(self, messages: list[dict]) -> list[str]: ... | ||
| def dequeue(self, size: int = 1) -> list[dict]: ... | ||
@@ -27,2 +28,3 @@ def get_queue_length(self) -> int: ... | ||
| def enqueue(self, message: dict) -> str: ... | ||
| def enqueue_batch(self, messages: list[dict]) -> list[str]: ... | ||
| def dequeue(self, size: int = 1) -> list[dict]: ... | ||
@@ -29,0 +31,0 @@ def get_queue_length(self) -> int: ... |
@@ -259,2 +259,100 @@ // src/lib.rs | ||
| pub fn enqueue_batch(&self, messages: Vec<Bound<'_, PyDict>>) -> PyResult<Vec<String>> { | ||
| let mut conn = self.get_connection()?; | ||
| let tx = conn | ||
| .transaction() | ||
| .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?; | ||
| let mut inserted_ids = Vec::new(); | ||
| { | ||
| let mut stmt = tx | ||
| .prepare( | ||
| "INSERT INTO messages ( | ||
| id, type, status, content, createtime, updatetime, result, | ||
| priority, source, destination, retry_count, expire_time, tags, metadata | ||
| ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)", | ||
| ) | ||
| .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?; | ||
| for message in messages { | ||
| let id: String = message.get_item("id")?.unwrap().extract()?; | ||
| let message_type: String = message.get_item("type")?.unwrap().extract()?; | ||
| let status: i32 = message.get_item("status")?.unwrap().extract()?; | ||
| let content: String = message.get_item("content")?.unwrap().extract()?; | ||
| let createtime: String = message.get_item("createtime")?.unwrap().extract()?; | ||
| let updatetime: String = message.get_item("updatetime")?.unwrap().extract()?; | ||
| let priority: i32 = message.get_item("priority")?.unwrap().extract()?; | ||
| let source: String = message.get_item("source")?.unwrap().extract()?; | ||
| let destination: String = message.get_item("destination")?.unwrap().extract()?; | ||
| let retry_count: i32 = message.get_item("retry_count")?.unwrap().extract()?; | ||
| let result: Option<String> = match message.get_item("result")? { | ||
| Some(item) => { | ||
| if item.is_none() { | ||
| None | ||
| } else { | ||
| Some(item.extract()?) | ||
| } | ||
| } | ||
| None => None, | ||
| }; | ||
| let expire_time: Option<String> = match message.get_item("expire_time")? { | ||
| Some(item) => { | ||
| if item.is_none() { | ||
| None | ||
| } else { | ||
| Some(item.extract()?) | ||
| } | ||
| } | ||
| None => None, | ||
| }; | ||
| let tags: Option<String> = match message.get_item("tags")? { | ||
| Some(item) => { | ||
| if item.is_none() { | ||
| None | ||
| } else { | ||
| Some(item.extract()?) | ||
| } | ||
| } | ||
| None => None, | ||
| }; | ||
| let metadata: Option<String> = match message.get_item("metadata")? { | ||
| Some(item) => { | ||
| if item.is_none() { | ||
| None | ||
| } else { | ||
| Some(item.extract()?) | ||
| } | ||
| } | ||
| None => None, | ||
| }; | ||
| stmt.execute(params![ | ||
| id, | ||
| message_type, | ||
| status, | ||
| content, | ||
| createtime, | ||
| updatetime, | ||
| result, | ||
| priority, | ||
| source, | ||
| destination, | ||
| retry_count, | ||
| expire_time, | ||
| tags, | ||
| metadata, | ||
| ]) | ||
| .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?; | ||
| inserted_ids.push(id); | ||
| } | ||
| } | ||
| tx.commit() | ||
| .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?; | ||
| Ok(inserted_ids) | ||
| } | ||
| /// 出队 | ||
@@ -665,2 +763,24 @@ /// Args: | ||
| pub fn enqueue_batch(&self, messages: Vec<Bound<'_, PyDict>>) -> PyResult<Vec<String>> { | ||
| let mut shard_groups: HashMap<usize, Vec<Bound<'_, PyDict>>> = HashMap::new(); | ||
| for message in messages { | ||
| let message_id_bound = message | ||
| .get_item("id")? | ||
| .ok_or_else(|| PyErr::new::<pyo3::exceptions::PyKeyError, _>("Missing is field"))?; | ||
| let message_id: String = message_id_bound.extract()?; | ||
| let shard_index = self._get_shard_index(&message_id); | ||
| shard_groups | ||
| .entry(shard_index) | ||
| .or_insert_with(Vec::new) | ||
| .push(message); | ||
| } | ||
| let mut all_ids = Vec::new(); | ||
| for (shard_index, shard_messages) in shard_groups { | ||
| let shard_ids = self.shards[shard_index].enqueue_batch(shard_messages)?; | ||
| all_ids.extend(shard_ids); | ||
| } | ||
| Ok(all_ids) | ||
| } | ||
| /// 获取消息 | ||
@@ -667,0 +787,0 @@ /// Args: |
@@ -36,3 +36,3 @@ #!/usr/bin/env python | ||
| __title__ = "queue_sqlite" | ||
| __version__ = "0.2.1" | ||
| __version__ = "0.2.2" | ||
| __author__ = "chakcy" | ||
@@ -39,0 +39,0 @@ __email__ = "947105045@qq.com" |
@@ -22,2 +22,3 @@ #!/usr/bin/env python | ||
| delay: float = field(default=1, metadata={"help": "任务延迟执行时间"}) | ||
| other: dict = field(default_factory=dict, metadata={"help": "其他参数"}) | ||
@@ -40,4 +41,12 @@ | ||
| delay = meta.get("delay", 1) | ||
| meta_other = {} | ||
| for key, value in meta.items(): | ||
| if key not in TaskMeta.__annotations__.keys(): | ||
| meta_other[key] = value | ||
| task_meta = TaskMeta( | ||
| name=name, description=description, max_retries=max_retries, delay=delay | ||
| name=name, | ||
| description=description, | ||
| max_retries=max_retries, | ||
| delay=delay, | ||
| other=meta_other, | ||
| ) | ||
@@ -44,0 +53,0 @@ function.meta = task_meta # type: ignore |
@@ -81,3 +81,3 @@ #!/usr/bin/env python | ||
| INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value) | ||
| VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key); | ||
| VALUES ('listen_table', NEW.id, OLD.key, OLD.value, NEW.value); | ||
| END | ||
@@ -84,0 +84,0 @@ """ |
@@ -15,7 +15,10 @@ #!/usr/bin/env python | ||
| from ..model import MessageItem, SchedulerConfig | ||
| from typing import Callable, Literal | ||
| from typing import Callable, Literal, Dict | ||
| import logging | ||
| SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler} | ||
| SCHEDULER_TYPES: Dict[str, type[BaseScheduler]] = { | ||
| "standard": StandardQueueScheduler, | ||
| "async": AsyncQueueScheduler, | ||
| } | ||
@@ -43,8 +46,8 @@ try: | ||
| scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None) | ||
| if not scheduler_class: | ||
| if scheduler_class is None: | ||
| raise ValueError(f"Invalid scheduler type: {scheduler_type}") | ||
| self.scheduler = scheduler_class(config) | ||
| if self.scheduler: | ||
| self.queue_operation = self.scheduler.queue_operation | ||
| # if self.scheduler: | ||
| # self.queue_operation = self.scheduler.queue_operation | ||
@@ -51,0 +54,0 @@ def send_message(self, message: MessageItem, callback: Callable): |
@@ -62,2 +62,3 @@ #!/usr/bin/env python | ||
| self.task_scheduler.start_task_thread() | ||
| self.listen_scheduler.start_listen_data() | ||
@@ -64,0 +65,0 @@ def stop(self): |
@@ -18,2 +18,3 @@ #!/usr/bin/env python | ||
| import concurrent.futures | ||
| import logging | ||
@@ -53,4 +54,4 @@ | ||
| for data in change_data_items: | ||
| key = data[6] | ||
| new_value = data[7] | ||
| key = data[3] | ||
| new_value = data[5] | ||
| delete_id = data[0] | ||
@@ -68,2 +69,13 @@ tasks.append( | ||
| def _run_listen_loop(self): | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| try: | ||
| loop.run_until_complete(self.listen()) | ||
| except Exception as e: | ||
| logging.error(f"监听调度出错: {str(e)}") | ||
| finally: | ||
| loop.close() | ||
| def start_listen_data(self): | ||
@@ -73,3 +85,3 @@ if self.is_running: | ||
| self.is_running = True | ||
| self.listen_thread = threading.Thread(target=self.listen) | ||
| self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True) | ||
| self.listen_thread.start() | ||
@@ -76,0 +88,0 @@ |
@@ -19,4 +19,7 @@ #!/usr/bin/env python | ||
| import logging | ||
| from collections import namedtuple | ||
| SendMessageCallback = namedtuple("send_message_callback", ["message", "callback"]) | ||
| class AsyncReceiveScheduler: | ||
@@ -41,2 +44,17 @@ def __init__( | ||
| def send_message_batch(self, message_callback_list: list[SendMessageCallback]): | ||
| message_list = [ | ||
| message_callback.message.to_dict_by_core() | ||
| for message_callback in message_callback_list | ||
| ] | ||
| if message_list: | ||
| self.queue_operation.enqueue_batch(message_list) | ||
| with self.lock: | ||
| for message_callback in message_callback_list: | ||
| message = message_callback.message | ||
| callback = message_callback.callback | ||
| if callback is None: | ||
| callback = lambda m: logging.info(f"receive message: {m.id}") | ||
| self.callbacks[message.id] = callback | ||
| async def receive_message(self): | ||
@@ -43,0 +61,0 @@ """单一轮询线程,并行执行回调""" |
| from abc import ABC, abstractmethod | ||
| from typing import Callable | ||
| from ...model import MessageItem, SchedulerConfig | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
@@ -9,2 +10,5 @@ | ||
| @property | ||
| def queue_operation(self) -> ShardedQueueOperation: ... | ||
| def __init__(self, config: SchedulerConfig = SchedulerConfig()): | ||
@@ -11,0 +15,0 @@ pass |
@@ -17,2 +17,3 @@ #!/usr/bin/env python | ||
| import logging | ||
| import os | ||
@@ -24,3 +25,3 @@ | ||
| queue_operation: ShardedQueueOperation, | ||
| interval_minutes=60, | ||
| interval_minutes=1, | ||
| remove_days=30, | ||
@@ -33,4 +34,4 @@ ): | ||
| self.remove_days = remove_days | ||
| self.max_db_size_mb = 500 | ||
| # for i in range(self.queue_operation.shard_num): | ||
| # 清理过期但未处理的消息 | ||
@@ -46,2 +47,3 @@ self.queue_operation.clean_expired_messages() | ||
| self.queue_operation.clean_expired_messages() | ||
| self.remove_days = self._check_and_optimize_db(self.remove_days) | ||
@@ -57,2 +59,33 @@ except Exception as e: | ||
| def _check_and_optimize_db(self, remove_days): | ||
| """检查数据库大小并优化""" | ||
| try: | ||
| # 获取数据库文件大小 | ||
| db_size_mb = self._get_database_size() | ||
| if db_size_mb > self.max_db_size_mb: | ||
| logging.info(f"数据库大小 {db_size_mb}MB 超过限制,执行优化...") | ||
| print(f"数据库大小 {db_size_mb}MB 超过限制,执行优化...") | ||
| if remove_days > 1: | ||
| remove_days = remove_days - 1 | ||
| self.queue_operation.clean_old_messages(remove_days) | ||
| except Exception as e: | ||
| logging.error(f"数据库优化错误: {str(e)}") | ||
| return remove_days | ||
| def _get_database_size(self) -> int: | ||
| """获取数据库文件大小""" | ||
| # 实现获取数据库文件大小的逻辑 | ||
| total_size = 0 | ||
| db_path = self.queue_operation.db_dir | ||
| for root, _, files in os.walk(db_path): | ||
| for file in files: | ||
| file_path = os.path.join(root, file) | ||
| total_size += os.path.getsize(file_path) | ||
| return total_size // (1024 * 1024) | ||
| def start_cleanup(self): | ||
@@ -59,0 +92,0 @@ if self.is_running: |
@@ -20,4 +20,7 @@ #!/usr/bin/env python | ||
| from .callback_task import QtCallbackTask | ||
| from collections import namedtuple | ||
| SendMessageCallback = namedtuple("send_message_callback", ["message", "callback"]) | ||
| class QtReceiveScheduler: | ||
@@ -46,2 +49,17 @@ """Qt 接收调度器 - 独立实现""" | ||
| def send_message_batch(self, message_callback_list: list[SendMessageCallback]): | ||
| message_list = [ | ||
| message_callback.message.to_dict_by_core() | ||
| for message_callback in message_callback_list | ||
| ] | ||
| if message_list: | ||
| self.queue_operation.enqueue_batch(message_list) | ||
| with self.lock: | ||
| for message_callback in message_callback_list: | ||
| message = message_callback.message | ||
| callback = message_callback.callback | ||
| if callback is None: | ||
| callback = lambda m: logging.info(f"receive message: {m.id}") | ||
| self.callbacks[message.id] = callback | ||
| def _receive_loop(self): | ||
@@ -48,0 +66,0 @@ """接收消息循环""" |
@@ -70,5 +70,4 @@ #!/usr/bin/env python | ||
| self.cleanup_scheduler.stop_cleanup() | ||
| # self.queue_operation.close_all_connections() | ||
| __all__ = ["StandardQueueScheduler"] |
@@ -41,8 +41,9 @@ #!/usr/bin/env python | ||
| for data in change_data_items: | ||
| key = data[6] | ||
| new_value = data[7] | ||
| key = data[3] | ||
| new_value = data[5] | ||
| delete_id = data[0] | ||
| self.executor.submit( | ||
| self._process_listen_data, key, new_value, delete_id | ||
| ) | ||
| if self.is_running: | ||
| self.executor.submit( | ||
| self._process_listen_data, key, new_value, delete_id | ||
| ) | ||
@@ -49,0 +50,0 @@ def start_listen_data(self): |
@@ -19,4 +19,7 @@ #!/usr/bin/env python | ||
| import logging | ||
| from collections import namedtuple | ||
| SendMessageCallback = namedtuple("send_message_callback", ["message", "callback"]) | ||
| class ReceiveScheduler: | ||
@@ -39,3 +42,2 @@ def __init__( | ||
| callback = lambda message: logging.info(f"receive message: {message.id}") | ||
| # self.queue_operation.enqueue(message.to_dict()) | ||
| self.queue_operation.enqueue(message.to_dict_by_core()) | ||
@@ -45,2 +47,17 @@ with self.lock: | ||
| def send_message_batch(self, message_callback_list: list[SendMessageCallback]): | ||
| message_list = [ | ||
| message_callback.message.to_dict_by_core() | ||
| for message_callback in message_callback_list | ||
| ] | ||
| if message_list: | ||
| self.queue_operation.enqueue_batch(message_list) | ||
| with self.lock: | ||
| for message_callback in message_callback_list: | ||
| message = message_callback.message | ||
| callback = message_callback.callback | ||
| if callback is None: | ||
| callback = lambda m: logging.info(f"receive message: {m.id}") | ||
| self.callbacks[message.id] = callback | ||
| def receive_message(self): | ||
@@ -47,0 +64,0 @@ """单一轮询线程,并行执行回调""" |
@@ -14,5 +14,3 @@ import queue_sqlite_core as core | ||
| print( | ||
| task_mounter.get_task_function("task")({"num": 1})(lambda x: x + 1).meta[ # type: ignore | ||
| "num" | ||
| ] | ||
| f"task: {task_mounter.get_task_function("task")({"num": 1})(lambda x: x + 1).meta}" | ||
| ) | ||
@@ -22,3 +20,3 @@ from tasks import example | ||
| task_mounter.get_task_list() | ||
| print(task_mounter.get_task_function("<lambda>")) | ||
| print(task_mounter.get_task_function("<lambda>")(1)) | ||
@@ -25,0 +23,0 @@ @classmethod |
@@ -8,3 +8,3 @@ from listen import * | ||
| def test_listen_data(self): | ||
| scheduler = QueueScheduler(scheduler_type="qt") | ||
| scheduler = QueueScheduler() | ||
| scheduler.start() | ||
@@ -11,0 +11,0 @@ scheduler.update_listen_data("key_1", "value_1") |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
212874
11.99%65
1.56%2979
3.44%