queue-sqlite
Advanced tools
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : scheduler_config.py | ||
| @Time : 2025-10-28 10:21:21 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : 队列调度器配置 | ||
| """ | ||
| from dataclasses import dataclass, field | ||
| from typing import Dict, Any | ||
| import logging | ||
| @dataclass | ||
| class SchedulerConfig: | ||
| receive_thread_num: int = field( | ||
| default=1, | ||
| metadata={"help": "接收消息的线程数"}, | ||
| ) | ||
| task_thread_num: int = field( | ||
| default=4, | ||
| metadata={"help": "任务执行线程数"}, | ||
| ) | ||
| shard_num: int = field( | ||
| default=4, | ||
| metadata={"help": "分片数"}, | ||
| ) | ||
| queue_name: str = field( | ||
| default="default", | ||
| metadata={"help": "队列名称"}, | ||
| ) | ||
| meta: Dict[str, Any] = field( | ||
| default_factory=dict, | ||
| metadata={"help": "队列元数据"}, | ||
| ) | ||
| @classmethod | ||
| def get_field_descriptions(cls) -> Dict[str, str]: | ||
| """获取所有字段的描述信息 | ||
| Returns: | ||
| Dict[str, str]: 字段名到描述信息的映射 | ||
| """ | ||
| from dataclasses import fields | ||
| descriptions = {} | ||
| for field_info in fields(cls): | ||
| if "help" in field_info.metadata: | ||
| descriptions[field_info.name] = field_info.metadata["help"] | ||
| else: | ||
| descriptions[field_info.name] = "无描述信息" | ||
| return descriptions | ||
| @classmethod | ||
| def print_field_info(cls): | ||
| """打印所有字段的信息,包括类型、默认值和描述""" | ||
| from dataclasses import fields | ||
| logging.info(f"=== {cls.__name__} 字段信息 ===") | ||
| for field_info in fields(cls): | ||
| logging.info( | ||
| f"{field_info.name}: {field_info.type} (默认值: {field_info.default})" | ||
| ) | ||
| logging.info(f" 描述: {field_info.metadata.get('help', '无描述信息')}") | ||
| logging.info("-" * 50) |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : __init__.py | ||
| @Time : 2025-10-26 10:26:16 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : Qt调度器 - 独立实现 | ||
| """ | ||
| import logging | ||
| import os | ||
| from ..base import BaseScheduler | ||
| from ...model import MessageItem, SchedulerConfig | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
| from ...queue_operation.listen_operation import ListenOperation | ||
| from .receive_scheduler import QtReceiveScheduler | ||
| from .listen_scheduler import QtListenScheduler | ||
| from .task_scheduler import QtTaskScheduler | ||
| from ..cleanup_scheduler import CleanupScheduler as QtCleanupScheduler | ||
| from qtpy.QtCore import QThreadPool | ||
| from qtpy.QtWidgets import QApplication | ||
| from typing import Callable, Dict, Any, List | ||
| class QtQueueScheduler(BaseScheduler): | ||
| """完全独立的 Qt 队列调度器 - 修复版""" | ||
| qt_type = None | ||
| def __init__( | ||
| self, | ||
| config: SchedulerConfig = SchedulerConfig(), | ||
| ): | ||
| # 初始化队列操作 | ||
| self.queue_operation = ShardedQueueOperation( | ||
| config.shard_num, config.queue_name | ||
| ) | ||
| # 初始化监听操作 | ||
| db_dir = f"cache/{config.queue_name}" | ||
| os.makedirs(db_dir, exist_ok=True) | ||
| self.listen_operation = ListenOperation(f"{db_dir}/listen.db") | ||
| # 确保监听表被创建 | ||
| self._ensure_listen_operation_tables() | ||
| # 初始化各个独立的调度器组件 | ||
| self.receive_scheduler = QtReceiveScheduler( | ||
| self.queue_operation, config.receive_thread_num | ||
| ) | ||
| self.task_scheduler = QtTaskScheduler( | ||
| self.queue_operation, config.task_thread_num | ||
| ) | ||
| self.listen_scheduler = QtListenScheduler(self.listen_operation) | ||
| self.cleanup_scheduler = QtCleanupScheduler(self.queue_operation) | ||
| @classmethod | ||
| def is_available(cls) -> bool: | ||
| # 检查 PySide6/PyQt6/PyQt5/PySide2 模块中的一个 | ||
| import importlib | ||
| qt_modules = ["PySide6", "PyQt6", "PyQt5", "PySide2"] | ||
| for module in qt_modules: | ||
| try: | ||
| importlib.import_module(module) | ||
| cls.qt_type = module | ||
| logging.info(f"已找到 Qt 模块: {module}") | ||
| print(f"已找到 Qt 模块: {module}") | ||
| return True | ||
| except ImportError: # 模块未找到 | ||
| continue | ||
| return False | ||
| def _ensure_qapplication(self): | ||
| """确保 QApplication 实例存在""" | ||
| try: | ||
| if not QApplication.instance(): | ||
| # 对于非GUI应用,创建无窗口的QApplication | ||
| import sys | ||
| self.app = QApplication(sys.argv if hasattr(sys, "argv") else []) | ||
| logging.info("已创建 QApplication 实例") | ||
| except Exception as e: | ||
| logging.warning(f"创建 QApplication 失败: {str(e)}") | ||
| def _ensure_listen_operation_tables(self): | ||
| """确保监听操作的表被正确创建""" | ||
| try: | ||
| # 强制重新创建表 | ||
| self.listen_operation.create_table() | ||
| # 额外检查 change_log 表 | ||
| conn = self.listen_operation._get_connection() | ||
| cursor = conn.execute( | ||
| "SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'" | ||
| ) | ||
| if cursor.fetchone() is None: | ||
| logging.warning("change_log 表不存在,尝试手动创建") | ||
| conn.execute( | ||
| """ | ||
| CREATE TABLE change_log ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| table_name TEXT, | ||
| row_id INTEGER, | ||
| column_name TEXT, | ||
| old_value TEXT, | ||
| new_value TEXT, | ||
| is_delete integer DEFAULT 0, | ||
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | ||
| ) | ||
| """ | ||
| ) | ||
| conn.commit() | ||
| except Exception as e: | ||
| logging.error(f"确保监听表存在失败: {str(e)}") | ||
| def send_message(self, message: MessageItem, callback: Callable): | ||
| """发送消息到队列""" | ||
| self.receive_scheduler.send_message(message, callback) | ||
| def update_listen_data(self, key: str, value: str): | ||
| """更新监听数据""" | ||
| self.listen_operation.update_listen_data(key, value) | ||
| def get_listen_datas(self) -> List: | ||
| """获取所有监听数据""" | ||
| return self.listen_operation.get_values() | ||
| def get_listen_data(self, key: str): | ||
| """获取单个监听数据""" | ||
| return self.listen_operation.get_value(key) | ||
| def start(self): | ||
| """启动所有调度器组件""" | ||
| # 确保有 QApplication 实例(对于 GUI 应用) | ||
| self._ensure_qapplication() | ||
| self.receive_scheduler.start() | ||
| self.task_scheduler.start() | ||
| self.cleanup_scheduler.start_cleanup() | ||
| self.listen_scheduler.start() | ||
| logging.info("Qt 队列调度器已完全启动") | ||
| def stop(self): | ||
| """停止所有调度器组件""" | ||
| self.listen_scheduler.stop() | ||
| self.cleanup_scheduler.stop_cleanup() | ||
| self.task_scheduler.stop() | ||
| self.receive_scheduler.stop() | ||
| logging.info("Qt 队列调度器已完全停止") | ||
| def get_queue_info(self) -> Dict[str, Any]: | ||
| """获取队列信息""" | ||
| try: | ||
| return { | ||
| "queue_length": self.queue_operation.get_queue_length(), | ||
| "shard_num": self.queue_operation.shard_num, | ||
| "db_dir": self.queue_operation.db_dir, | ||
| "active_threads": QThreadPool.globalInstance().activeThreadCount(), # type: ignore | ||
| "max_threads": QThreadPool.globalInstance().maxThreadCount(), # type: ignore | ||
| } | ||
| except Exception as e: | ||
| logging.error(f"获取队列信息失败: {str(e)}") | ||
| return {} |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : callback_task.py | ||
| @Time : 2025-10-28 11:40:02 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : 回调任务 | ||
| """ | ||
| import asyncio | ||
| import logging | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
| from ...model import MessageItem | ||
| from typing import Callable | ||
| from qtpy.QtCore import QRunnable | ||
| class QtCallbackTask(QRunnable): | ||
| """Qt 回调任务""" | ||
| def __init__( | ||
| self, | ||
| callback: Callable, | ||
| message: MessageItem, | ||
| queue_operation: ShardedQueueOperation, | ||
| ): | ||
| super().__init__() | ||
| self.callback = callback | ||
| self.message = message | ||
| self.queue_operation = queue_operation | ||
| self.setAutoDelete(True) | ||
| def run(self): | ||
| """执行回调函数""" | ||
| try: | ||
| # 检查是否是协程函数 | ||
| if asyncio.iscoroutinefunction(self.callback): | ||
| # 对于异步回调,在当前线程中运行事件循环 | ||
| asyncio.run(self._run_async_callback()) | ||
| else: | ||
| self.callback(self.message) | ||
| except Exception as e: | ||
| logging.error(f"回调执行错误: {str(e)}") | ||
| finally: | ||
| # 删除消息 | ||
| try: | ||
| self.queue_operation.delete_message(self.message.id) | ||
| except Exception as e: | ||
| logging.error(f"删除消息失败 {self.message.id}: {str(e)}") | ||
| async def _run_async_callback(self): | ||
| """运行异步回调""" | ||
| try: | ||
| await self.callback(self.message) | ||
| except Exception as e: | ||
| logging.error(f"异步回调执行错误: {str(e)}") |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : listen_scheduler.py | ||
| @Time : 2025-10-28 14:22:38 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : 监听调度器 - 改进版,解决数据库锁定问题 | ||
| """ | ||
| import logging | ||
| import time | ||
| import sqlite3 | ||
| from .listen_task import QtListenTask | ||
| from ...queue_operation.listen_operation import ListenOperation | ||
| from qtpy.QtCore import QThreadPool, QMutex, QMutexLocker | ||
| import threading | ||
| class QtListenScheduler: | ||
| """Qt 监听调度器 - 改进版,解决数据库锁定问题""" | ||
| def __init__(self, listen_operation: ListenOperation): | ||
| self.listen_operation = listen_operation | ||
| self.is_running = False | ||
| self.listen_thread = None | ||
| self.thread_pool = QThreadPool.globalInstance() | ||
| self.db_mutex = QMutex() # 数据库操作互斥锁 | ||
| # 监听配置 | ||
| self.poll_interval = 0.1 # 轮询间隔(秒) | ||
| self.max_retries = 5 # 最大重试次数 | ||
| self.retry_delay = 0.2 # 重试延迟(秒) | ||
| # 确保监听表存在 | ||
| self._ensure_listen_tables() | ||
| def _ensure_listen_tables(self): | ||
| """确保监听相关的表存在""" | ||
| try: | ||
| # 使用互斥锁保护数据库操作 | ||
| with QMutexLocker(self.db_mutex): | ||
| # 检查表是否存在,如果不存在则创建 | ||
| conn = self.listen_operation._get_connection() | ||
| # 检查 change_log 表是否存在 | ||
| cursor = conn.execute( | ||
| "SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'" | ||
| ) | ||
| if cursor.fetchone() is None: | ||
| logging.info("创建缺失的 change_log 表") | ||
| conn.execute( | ||
| """ | ||
| CREATE TABLE change_log ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| table_name TEXT, | ||
| row_id INTEGER, | ||
| column_name TEXT, | ||
| old_value TEXT, | ||
| new_value TEXT, | ||
| is_delete integer DEFAULT 0, | ||
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | ||
| ) | ||
| """ | ||
| ) | ||
| conn.commit() | ||
| except Exception as e: | ||
| logging.error(f"确保监听表存在失败: {str(e)}") | ||
| def _safe_listen_data(self): | ||
| """安全地获取监听数据,带重试机制""" | ||
| for attempt in range(self.max_retries): | ||
| try: | ||
| # 使用互斥锁保护数据库操作 | ||
| with QMutexLocker(self.db_mutex): | ||
| status, change_data_items = self.listen_operation.listen_data() | ||
| return status, change_data_items | ||
| except sqlite3.OperationalError as e: | ||
| if "database is locked" in str(e) and attempt < self.max_retries - 1: | ||
| logging.warning( | ||
| f"数据库锁定,第 {attempt + 1} 次重试获取监听数据..." | ||
| ) | ||
| time.sleep(self.retry_delay) | ||
| else: | ||
| logging.error(f"获取监听数据失败: {str(e)}") | ||
| return False, [] | ||
| except Exception as e: | ||
| logging.error(f"获取监听数据时发生未知错误: {str(e)}") | ||
| return False, [] | ||
| return False, [] | ||
| def _process_listen_data(self, change_data_items): | ||
| """处理监听数据""" | ||
| processed_count = 0 | ||
| for data in change_data_items: | ||
| try: | ||
| # 修正字段索引 | ||
| if len(data) >= 8: | ||
| delete_id = data[0] # id | ||
| table_name = data[1] # table_name | ||
| row_id = data[2] # row_id | ||
| column_name = data[3] # column_name | ||
| old_value = data[4] # old_value | ||
| new_value = data[5] # new_value | ||
| # 使用线程池执行监听任务 | ||
| task = QtListenTask( | ||
| column_name, | ||
| new_value, | ||
| int(delete_id), | ||
| self.listen_operation, | ||
| self.db_mutex, | ||
| self.max_retries, | ||
| self.retry_delay, | ||
| ) | ||
| self.thread_pool.start(task) # type: ignore | ||
| processed_count += 1 | ||
| except Exception as e: | ||
| logging.error(f"处理监听数据失败: {str(e)}") | ||
| return processed_count | ||
| def _listen_loop(self): | ||
| """监听数据变化循环 - 改进版""" | ||
| consecutive_errors = 0 | ||
| max_consecutive_errors = 10 | ||
| while self.is_running: | ||
| try: | ||
| status, change_data_items = self._safe_listen_data() | ||
| if status and change_data_items: | ||
| processed_count = self._process_listen_data(change_data_items) | ||
| consecutive_errors = 0 # 重置连续错误计数 | ||
| if processed_count > 0: | ||
| logging.debug(f"成功处理 {processed_count} 个监听数据项") | ||
| elif not status: | ||
| # 没有数据或获取失败,短暂休眠 | ||
| time.sleep(self.poll_interval) | ||
| consecutive_errors += 1 | ||
| # 如果连续错误太多,延长休眠时间 | ||
| if consecutive_errors >= max_consecutive_errors: | ||
| # logging.warning("连续多次获取监听数据失败,延长休眠时间") | ||
| time.sleep(self.poll_interval * 5) | ||
| consecutive_errors = 0 # 重置计数 | ||
| else: | ||
| # 有状态但没有数据,正常休眠 | ||
| time.sleep(self.poll_interval) | ||
| consecutive_errors = 0 | ||
| except Exception as e: | ||
| logging.error(f"监听循环错误: {str(e)}") | ||
| consecutive_errors += 1 | ||
| time.sleep(self.poll_interval * 2) # 出错时延长休眠 | ||
| def start(self): | ||
| """启动监听调度器""" | ||
| if self.is_running: | ||
| return | ||
| self.is_running = True | ||
| self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True) | ||
| self.listen_thread.start() | ||
| logging.info("Qt 监听调度器已启动") | ||
| def stop(self): | ||
| """停止监听调度器""" | ||
| if not self.is_running: | ||
| return | ||
| self.is_running = False | ||
| if self.listen_thread and self.listen_thread.is_alive(): | ||
| self.listen_thread.join(timeout=3.0) | ||
| logging.info("Qt 监听调度器已停止") |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : listen_task.py | ||
| @Time : 2025-10-28 14:30:41 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : 监听任务 - 改进版,解决数据库锁定问题 | ||
| """ | ||
| from queue_sqlite.queue_operation.listen_operation import ListenOperation | ||
| from ...mounter.listen_mounter import ListenMounter | ||
| import logging | ||
| from qtpy.QtCore import QRunnable, QMutexLocker | ||
| import asyncio | ||
| from typing import Callable | ||
| import time | ||
| import sqlite3 | ||
| class QtListenTask(QRunnable): | ||
| """Qt 监听任务 - 改进版,解决数据库锁定问题""" | ||
| def __init__( | ||
| self, | ||
| key: str, | ||
| value: str, | ||
| delete_id: int, | ||
| listen_operation: ListenOperation, | ||
| db_mutex, | ||
| max_retries=5, | ||
| retry_delay=0.2, | ||
| ): | ||
| super().__init__() | ||
| self.key = key | ||
| self.value = value | ||
| self.delete_id = delete_id | ||
| self.listen_operation = listen_operation | ||
| self.db_mutex = db_mutex | ||
| self.max_retries = max_retries | ||
| self.retry_delay = retry_delay | ||
| self.setAutoDelete(True) | ||
| def run(self): | ||
| """执行监听回调""" | ||
| try: | ||
| listen_function = ListenMounter.get_Listener_function(self.key) | ||
| if listen_function: | ||
| if asyncio.iscoroutinefunction(listen_function): | ||
| # 异步监听函数 | ||
| asyncio.run(self._run_async_listener(listen_function)) | ||
| else: | ||
| # 同步监听函数 | ||
| listen_function(self.value) | ||
| except Exception as e: | ||
| logging.error(f"监听函数执行错误 {self.key}: {str(e)}") | ||
| finally: | ||
| # 安全删除变更日志 | ||
| self._safe_delete_change_log() | ||
| def _safe_delete_change_log(self): | ||
| """安全删除变更日志,带重试机制""" | ||
| for attempt in range(self.max_retries): | ||
| try: | ||
| # 使用互斥锁保护数据库操作 | ||
| with QMutexLocker(self.db_mutex): | ||
| self.listen_operation.delete_change_log(self.delete_id) | ||
| break # 成功则退出循环 | ||
| except sqlite3.OperationalError as e: | ||
| if "database is locked" in str(e) and attempt < self.max_retries - 1: | ||
| logging.debug( | ||
| f"删除变更日志时数据库锁定,第 {attempt + 1} 次重试..." | ||
| ) | ||
| time.sleep(self.retry_delay) | ||
| else: | ||
| logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}") | ||
| break | ||
| except Exception as e: | ||
| logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}") | ||
| break | ||
| async def _run_async_listener(self, listen_function: Callable): | ||
| """执行异步监听函数""" | ||
| try: | ||
| await listen_function(self.value) | ||
| except Exception as e: | ||
| logging.error(f"异步监听函数执行错误 {self.key}: {str(e)}") |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : receive_scheduler.py | ||
| @Time : 2025-10-28 14:31:06 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : 接收调度器 | ||
| """ | ||
| from typing import Callable, Optional | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
| from qtpy.QtCore import QThreadPool | ||
| import threading | ||
| from ...model import MessageItem | ||
| import time | ||
| import logging | ||
| from .callback_task import QtCallbackTask | ||
| class QtReceiveScheduler: | ||
| """Qt 接收调度器 - 独立实现""" | ||
| def __init__( | ||
| self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1 | ||
| ): | ||
| self.callbacks = {} | ||
| self.is_running = False | ||
| self.lock = threading.Lock() | ||
| self.queue_operation = queue_operation | ||
| self.receive_thread = None | ||
| self.thread_pool = QThreadPool.globalInstance() | ||
| def send_message(self, message: MessageItem, callback: Optional[Callable] = None): | ||
| """发送消息到队列""" | ||
| if callback is None: | ||
| callback = lambda message: logging.info(f"收到消息: {message.id}") | ||
| # 入队消息 | ||
| self.queue_operation.enqueue(message.to_dict_by_core()) | ||
| with self.lock: | ||
| self.callbacks[message.id] = callback | ||
| def _receive_loop(self): | ||
| """接收消息循环""" | ||
| while self.is_running: | ||
| try: | ||
| # 获取已完成的消息 | ||
| message_list = self.queue_operation.get_completed_messages() | ||
| if message_list: | ||
| for message_data in message_list: | ||
| try: | ||
| message = MessageItem.from_dict(message_data) | ||
| with self.lock: | ||
| callback = self.callbacks.pop(message.id, None) | ||
| if callback is None: | ||
| callback = lambda msg: None | ||
| # 使用线程池执行回调 | ||
| task = QtCallbackTask( | ||
| callback, message, self.queue_operation | ||
| ) | ||
| self.thread_pool.start(task) # type: ignore | ||
| except Exception as e: | ||
| logging.error(f"处理完成消息失败: {str(e)}") | ||
| # 即使处理失败也尝试删除消息 | ||
| try: | ||
| self.queue_operation.delete_message(message_data["id"]) | ||
| except: | ||
| pass | ||
| else: | ||
| time.sleep(0.05) # 短暂休眠避免CPU空转 | ||
| except Exception as e: | ||
| logging.error(f"接收消息循环错误: {str(e)}") | ||
| time.sleep(0.1) # 出错时稍长休眠 | ||
| def start(self): | ||
| """启动接收调度器""" | ||
| if self.is_running: | ||
| return | ||
| self.is_running = True | ||
| self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) | ||
| self.receive_thread.start() | ||
| logging.info("Qt 接收调度器已启动") | ||
| def stop(self): | ||
| """停止接收调度器""" | ||
| if not self.is_running: | ||
| return | ||
| self.is_running = False | ||
| if self.receive_thread and self.receive_thread.is_alive(): | ||
| self.receive_thread.join(timeout=3.0) | ||
| logging.info("Qt 接收调度器已停止") |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : task_executor.py | ||
| @Time : 2025-10-28 14:31:25 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : 任务执行器 | ||
| """ | ||
| import asyncio | ||
| import logging | ||
| from typing import Any, Callable, Dict | ||
| from datetime import datetime | ||
| from ...constant import MessageStatus | ||
| from ...mounter.task_mounter import TaskMounter | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
| from ...model.message_item import MessageItem | ||
| from qtpy.QtCore import QRunnable | ||
| class QtTaskExecutor(QRunnable): | ||
| """Qt 任务执行器 - 修复版""" | ||
| def __init__( | ||
| self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation | ||
| ): | ||
| super().__init__() | ||
| self.message_data = message_data | ||
| self.queue_operation = queue_operation | ||
| self.setAutoDelete(True) | ||
| def run(self): | ||
| """执行任务""" | ||
| try: | ||
| message = MessageItem.from_dict(self.message_data) | ||
| if message.destination == "client": | ||
| self._process_client_message(message) | ||
| return | ||
| task_function = TaskMounter.get_task_function(message.destination) | ||
| if task_function is None: | ||
| raise ValueError(f"任务函数未找到: {message.destination}") | ||
| # 执行任务 | ||
| if asyncio.iscoroutinefunction(task_function): | ||
| # 异步任务 - 在当前线程运行事件循环 | ||
| asyncio.run(self._execute_async_task(message, task_function)) | ||
| else: | ||
| # 同步任务 | ||
| self._execute_sync_task(message, task_function) | ||
| except Exception as e: | ||
| logging.error(f"任务执行错误: {str(e)}") | ||
| # 更新任务状态为失败 | ||
| try: | ||
| self.queue_operation.update_status( | ||
| self.message_data["id"], MessageStatus.FAILED.value | ||
| ) | ||
| self.queue_operation.update_result( | ||
| self.message_data["id"], f'{{"error": "{str(e)}"}}' | ||
| ) | ||
| except Exception as update_error: | ||
| logging.error(f"更新任务状态失败: {str(update_error)}") | ||
| def _process_client_message(self, message: MessageItem): | ||
| """处理客户端消息""" | ||
| message.status = MessageStatus.COMPLETED | ||
| message.result = {"result": "success"} | ||
| message.updatetime = datetime.now() | ||
| self._update_task_result(message) | ||
| def _execute_sync_task(self, message: MessageItem, task_function: Callable): | ||
| """执行同步任务""" | ||
| from ...cycle.task_cycle import TaskCycle | ||
| try: | ||
| task_cycle = TaskCycle(message, task_function) | ||
| task_cycle.run() | ||
| status = task_cycle.get_task_status() | ||
| if not status: | ||
| raise ValueError("任务未完成") | ||
| message.status = status | ||
| if message.status == MessageStatus.FAILED: | ||
| message.result = {"error": task_cycle.get_task_error()} | ||
| else: | ||
| # 获取序列化后的结果 | ||
| result_str = task_cycle.get_task_result() | ||
| try: | ||
| import json | ||
| message.result = json.loads(result_str) | ||
| except: | ||
| message.result = {"result": result_str} | ||
| self._update_task_result(message) | ||
| except Exception as e: | ||
| logging.error(f"同步任务执行失败 {message.id}: {str(e)}") | ||
| message.status = MessageStatus.FAILED | ||
| message.result = {"error": str(e)} | ||
| self._update_task_result(message) | ||
| async def _execute_async_task(self, message: MessageItem, task_function: Callable): | ||
| """执行异步任务""" | ||
| from ...cycle.async_task_cycle import AsyncTaskCycle | ||
| try: | ||
| task_cycle = AsyncTaskCycle(message, task_function) | ||
| await task_cycle.run() | ||
| status = task_cycle.get_task_status() | ||
| if not status: | ||
| raise ValueError("任务未完成") | ||
| message.status = status | ||
| if message.status == MessageStatus.FAILED: | ||
| message.result = {"error": task_cycle.get_task_error()} | ||
| else: | ||
| # 获取序列化后的结果 | ||
| result_str = task_cycle.get_task_result() | ||
| try: | ||
| import json | ||
| message.result = json.loads(result_str) | ||
| except: | ||
| message.result = {"result": result_str} | ||
| self._update_task_result(message) | ||
| except Exception as e: | ||
| logging.error(f"异步任务执行失败 {message.id}: {str(e)}") | ||
| message.status = MessageStatus.FAILED | ||
| message.result = {"error": str(e)} | ||
| self._update_task_result(message) | ||
| def _update_task_result(self, message: MessageItem): | ||
| """更新任务结果到数据库""" | ||
| import json | ||
| try: | ||
| # 序列化结果 | ||
| result_str = json.dumps(message.result) if message.result else "{}" | ||
| self.queue_operation.update_result(message.id, result_str) | ||
| self.queue_operation.update_status(message.id, message.status.value) | ||
| except Exception as e: | ||
| logging.error(f"更新任务结果失败 {message.id}: {str(e)}") |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : task_scheduler.py | ||
| @Time : 2025-10-28 14:19:39 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : Qt 任务调度器 | ||
| """ | ||
| import logging | ||
| from qtpy.QtCore import QThreadPool | ||
| from .task_executor import QtTaskExecutor | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
| import time | ||
| import threading | ||
| class QtTaskScheduler: | ||
| """Qt 任务调度器 - 独立实现""" | ||
| def __init__( | ||
| self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4 | ||
| ): | ||
| self.is_running = False | ||
| self.queue_operation = queue_operation | ||
| self.task_thread = None | ||
| self.thread_pool = QThreadPool.globalInstance() | ||
| # 设置最大线程数 | ||
| self.thread_pool.setMaxThreadCount(max(1, task_thread_num)) # type: ignore | ||
| def _task_loop(self): | ||
| """任务处理循环""" | ||
| while self.is_running: | ||
| try: | ||
| # 出队消息进行处理 | ||
| message_list = self.queue_operation.dequeue( | ||
| size=self.thread_pool.maxThreadCount() * 2 # type: ignore | ||
| ) | ||
| if message_list: | ||
| for message_data in message_list: | ||
| # 使用线程池执行任务 | ||
| task = QtTaskExecutor(message_data, self.queue_operation) | ||
| self.thread_pool.start(task) # type: ignore | ||
| else: | ||
| time.sleep(0.05) # 短暂休眠 | ||
| except Exception as e: | ||
| logging.error(f"任务调度循环错误: {str(e)}") | ||
| time.sleep(0.1) | ||
| def start(self): | ||
| """启动任务调度器""" | ||
| if self.is_running: | ||
| return | ||
| self.is_running = True | ||
| self.task_thread = threading.Thread(target=self._task_loop, daemon=True) | ||
| self.task_thread.start() | ||
| logging.info("Qt 任务调度器已启动") | ||
| def stop(self): | ||
| """停止任务调度器""" | ||
| if not self.is_running: | ||
| return | ||
| self.is_running = False | ||
| if self.task_thread and self.task_thread.is_alive(): | ||
| self.task_thread.join(timeout=3.0) | ||
| logging.info("Qt 任务调度器已停止") |
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : test_pyside6_scheduler.py | ||
| @Time : 2025-10-26 10:40:00 | ||
| @Author : chakcy | ||
| @description : 独立 PySide6 调度器测试 | ||
| """ | ||
| import time | ||
| from queue_sqlite.scheduler import QueueScheduler | ||
| from queue_sqlite.model import MessageItem | ||
| from queue_sqlite.mounter import task | ||
| @task(meta={"task_name": "pyside6_example"}) | ||
| def pyside6_example_task(message_item: MessageItem): | ||
| """PySide6 调度器测试任务""" | ||
| print(f"处理任务: {message_item.id}") | ||
| # 模拟一些工作 | ||
| result = sum(i * i for i in range(1000)) | ||
| message_item.result = { | ||
| "status": "completed", | ||
| "result": result, | ||
| "task_id": message_item.id, | ||
| } | ||
| return message_item | ||
| @task(meta={"task_name": "async_pyside6_task"}) | ||
| async def async_pyside6_task(message_item: MessageItem): | ||
| """异步任务测试""" | ||
| import asyncio | ||
| print(f"处理异步任务: {message_item.id}") | ||
| await asyncio.sleep(0.1) # 模拟异步操作 | ||
| message_item.result = {"status": "async_completed", "task_id": message_item.id} | ||
| return message_item | ||
| def task_callback(message_item: MessageItem): | ||
| """任务完成回调""" | ||
| print(f"任务完成回调: {message_item.id}, 结果: {message_item.result}") | ||
| def test_pyside6_scheduler(): | ||
| """测试独立的 PySide6 调度器""" | ||
| # 创建独立的 PySide6 调度器 | ||
| scheduler = QueueScheduler( | ||
| scheduler_type="qt", # 指定使用 PySide6 调度器 | ||
| ) | ||
| print("启动 qt 调度器...") | ||
| scheduler.start() | ||
| try: | ||
| # 发送同步任务 | ||
| for i in range(5): | ||
| message = MessageItem( | ||
| content={"task_index": i, "type": "sync"}, | ||
| destination="pyside6_example_task", | ||
| ) | ||
| scheduler.send_message(message, task_callback) | ||
| print(f"发送同步消息: {message.id}") | ||
| # 发送异步任务 | ||
| for i in range(5): | ||
| message = MessageItem( | ||
| content={"task_index": i, "type": "async"}, | ||
| destination="async_pyside6_task", | ||
| ) | ||
| scheduler.send_message(message, task_callback) | ||
| print(f"发送异步消息: {message.id}") | ||
| # 等待任务处理 | ||
| print("等待任务处理...") | ||
| for i in range(10): | ||
| queue_info = scheduler.scheduler.get_queue_info() | ||
| print(f"队列状态: {queue_info}") | ||
| time.sleep(1) | ||
| if queue_info.get("queue_length", 0) == 0: | ||
| break | ||
| finally: | ||
| print("停止 PySide6 调度器...") | ||
| scheduler.stop() | ||
| print("测试完成") | ||
| if __name__ == "__main__": | ||
| test_pyside6_scheduler() |
+4
-0
@@ -8,2 +8,3 @@ # Python-generated files | ||
| *.egg-info | ||
| .python-version | ||
@@ -28,1 +29,4 @@ # Virtual environments | ||
| *.pyd | ||
| # log | ||
| log/ |
+7
-2
| Metadata-Version: 2.4 | ||
| Name: queue-sqlite | ||
| Version: 0.2.0 | ||
| Version: 0.2.1 | ||
| Summary: A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems. | ||
@@ -15,8 +15,13 @@ Author-email: chakcy <947105045@qq.com> | ||
| Classifier: Typing :: Typed | ||
| Requires-Python: >=3.7 | ||
| Requires-Python: >=3.11 | ||
| Requires-Dist: queue-sqlite-core>=0.2.0 | ||
| Provides-Extra: pyqt5 | ||
| Requires-Dist: pyqt5; extra == 'pyqt5' | ||
| Requires-Dist: qtpy; extra == 'pyqt5' | ||
| Provides-Extra: pyqt6 | ||
| Requires-Dist: pyqt6; extra == 'pyqt6' | ||
| Requires-Dist: qtpy; extra == 'pyqt6' | ||
| Provides-Extra: pyside6 | ||
| Requires-Dist: pyside6; extra == 'pyside6' | ||
| Requires-Dist: qtpy; extra == 'pyside6' | ||
| Description-Content-Type: text/markdown | ||
@@ -23,0 +28,0 @@ |
+21
-12
@@ -16,3 +16,3 @@ [project] | ||
| dependencies = [ | ||
| "queue_sqlite_core>=0.2.0", | ||
| "queue-sqlite-core>=0.2.0", | ||
| ] | ||
@@ -22,12 +22,13 @@ description = "A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems." | ||
| readme = "README.md" | ||
| requires-python = ">=3.7" | ||
| version = "0.2.0" | ||
| requires-python = ">=3.11" | ||
| version = "0.2.1" | ||
| [project.optional-dependencies] | ||
| pyside6 = ["PySide6"] | ||
| pyqt6 = ["PyQt6"] | ||
| pyqt5 = ["qtpy", "PyQt5"] | ||
| pyqt6 = ["qtpy", "PyQt6"] | ||
| pyside6 = ["qtpy", "PySide6"] | ||
| [[tool.uv.index]] | ||
| default = true | ||
| url = "https://pypi.tuna.tsinghua.edu.cn/simple" | ||
| # [[tool.uv.index]] | ||
| # default = true | ||
| # url = "https://pypi.tuna.tsinghua.edu.cn/simple" | ||
@@ -40,6 +41,6 @@ [build-system] | ||
| dev = [ | ||
| "maturin>=1.9.4", | ||
| "psutil>=7.0.0", | ||
| "pytest>=7.4.4", | ||
| "twine>=4.0.2", | ||
| "maturin>=1.9.4", | ||
| "psutil>=7.0.0", | ||
| "pytest>=7.4.4", | ||
| "twine>=4.0.2", | ||
| ] | ||
@@ -49,1 +50,9 @@ | ||
| packages = ["src/queue_sqlite"] | ||
| [tool.uv.workspace] | ||
| members = [ | ||
| "src/queue_sqlite_core", | ||
| ] | ||
| [tool.uv.sources] | ||
| queue-sqlite-core = { workspace = true } |
@@ -0,0 +0,0 @@ # This file is automatically @generated by Cargo. |
@@ -0,0 +0,0 @@ [package] |
@@ -22,3 +22,3 @@ [project] | ||
| default = true | ||
| url = "http://124.71.68.6:3141/root/pypi" | ||
| url = "https://pypi.tuna.tsinghua.edu.cn/simple" | ||
@@ -30,4 +30,1 @@ [dependency-groups] | ||
| ] | ||
| [[tool.uv.index]] | ||
| url = "https://pypi.tuna.tsinghua.edu.cn/simple" |
@@ -20,2 +20,4 @@ from typing import Callable | ||
| @property | ||
| def shard_num(self) -> int: ... | ||
| @property | ||
| def db_dir(self) -> str: ... | ||
@@ -22,0 +24,0 @@ def __init__(self, shard_num: int, queue_name: str): ... |
@@ -20,2 +20,3 @@ #!/usr/bin/env python | ||
| import logging | ||
| import os | ||
@@ -25,6 +26,14 @@ logging.basicConfig( | ||
| ) | ||
| logger = logging.getLogger(__name__).addHandler(logging.NullHandler()) | ||
| # 设置 error 级别日志写入 error.log 文件 | ||
| if not os.path.exists("log"): | ||
| os.mkdir("log") | ||
| error_log_handler = logging.FileHandler("log/error.log", encoding="utf-8") | ||
| error_log_handler.setLevel(logging.ERROR) | ||
| error_log_handler.setFormatter( | ||
| logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | ||
| ) | ||
| logging.getLogger().addHandler(error_log_handler) | ||
| __title__ = "queue_sqlite" | ||
| __version__ = "0.1.0" | ||
| __version__ = "0.2.1" | ||
| __author__ = "chakcy" | ||
@@ -31,0 +40,0 @@ __email__ = "947105045@qq.com" |
@@ -11,3 +11,2 @@ #!/usr/bin/env python | ||
| from enum import Enum | ||
@@ -14,0 +13,0 @@ |
@@ -14,2 +14,3 @@ #!/usr/bin/env python | ||
| from ..constant import MessageStatus | ||
| from ..mounter.task_mounter import TaskMeta | ||
| import json | ||
@@ -20,3 +21,3 @@ import asyncio | ||
| def retry_async(max_retries=3, delay=1): | ||
| def retry_async(max_retries=3): | ||
| """ | ||
@@ -35,4 +36,6 @@ 异步重试装饰器 | ||
| # 使用message_item中的retry_count作为最大重试次数 | ||
| retries = getattr(self.message_item, "retry_count", max_retries) | ||
| task_meta: TaskMeta = self.callback.meta | ||
| retries = task_meta.max_retries | ||
| # retries = getattr(self.message_item, "retry_count", max_retries) | ||
| delay_time = task_meta.delay | ||
| # 至少尝试一次(retries+1),最多尝试max_retries+1次 | ||
@@ -47,3 +50,3 @@ actual_retries = min(retries, max_retries) if max_retries > 0 else retries | ||
| if attempt < actual_retries: | ||
| await asyncio.sleep(delay) | ||
| await asyncio.sleep(delay_time) | ||
| else: | ||
@@ -69,3 +72,3 @@ break | ||
| @retry_async(max_retries=3, delay=1) | ||
| @retry_async(max_retries=3) | ||
| async def run(self): | ||
@@ -84,19 +87,28 @@ try: | ||
| def get_task_result(self): | ||
| if isinstance(self.task_result, (dict, list)): | ||
| try: | ||
| return json.dumps(self.task_result) | ||
| except: | ||
| return json.dumps({"result": str(self.task_result)}) | ||
| """获取任务结果 - 优化版本""" | ||
| if self.task_result is None: | ||
| return json.dumps({"result": None}) | ||
| elif isinstance(self.task_result, str): | ||
| # 如果已经是字符串,尝试解析 | ||
| if isinstance(self.task_result, str): | ||
| try: | ||
| # 如果是 JSON 字符串,直接返回 | ||
| json.loads(self.task_result) | ||
| return self.task_result | ||
| except: | ||
| # 如果不是 JSON,包装成 JSON | ||
| return json.dumps({"result": self.task_result}) | ||
| elif isinstance(self.task_result, (int, float, bool)): | ||
| return json.dumps({"result": self.task_result}) | ||
| elif self.task_result is None: | ||
| return "null" | ||
| else: | ||
| # 如果是 MessageItem 对象,提取其中的 result 字段 | ||
| if isinstance(self.task_result, MessageItem): | ||
| result_data = self.task_result.result | ||
| if isinstance(result_data, (dict, list)): | ||
| return json.dumps(result_data) | ||
| else: | ||
| return json.dumps({"result": result_data}) | ||
| # 其他情况正常序列化 | ||
| try: | ||
| return json.dumps(self.task_result) | ||
| except: | ||
| return json.dumps({"result": str(self.task_result)}) | ||
@@ -103,0 +115,0 @@ |
@@ -15,2 +15,3 @@ #!/usr/bin/env python | ||
| from ..constant import MessageStatus | ||
| from ..mounter.task_mounter import TaskMeta | ||
| import json | ||
@@ -22,3 +23,3 @@ | ||
| def retry_sync(max_retries=3, delay=1): | ||
| def retry_sync(max_retries=3): | ||
| """ | ||
@@ -37,4 +38,6 @@ 同步重试装饰器 | ||
| # 使用message_item中的retry_count作为最大重试次数 | ||
| retries = getattr(self.message_item, "retry_count", max_retries) | ||
| task_meta: TaskMeta = self.callback.meta | ||
| retries = task_meta.max_retries | ||
| # retries = getattr(self.message_item, "retry_count", max_retries) | ||
| delay_time = task_meta.delay | ||
| # 至少尝试一次(retries+1),最多尝试max_retries+1次 | ||
@@ -48,4 +51,5 @@ actual_retries = min(retries, max_retries) if max_retries > 0 else retries | ||
| last_exception = e | ||
| self.message_item.retry_count = attempt | ||
| if attempt < actual_retries: | ||
| time.sleep(delay) | ||
| time.sleep(delay_time) | ||
| else: | ||
@@ -73,3 +77,3 @@ break | ||
| @retry_sync(max_retries=3, delay=1) | ||
| @retry_sync(max_retries=3) | ||
| def run(self): | ||
@@ -88,19 +92,28 @@ try: | ||
| def get_task_result(self): | ||
| if isinstance(self.task_result, (dict, list)): | ||
| try: | ||
| return json.dumps(self.task_result) | ||
| except: | ||
| return json.dumps({"result": str(self.task_result)}) | ||
| """获取任务结果 - 优化版本""" | ||
| if self.task_result is None: | ||
| return json.dumps({"result": None}) | ||
| elif isinstance(self.task_result, str): | ||
| # 如果已经是字符串,尝试解析 | ||
| if isinstance(self.task_result, str): | ||
| try: | ||
| # 如果是 JSON 字符串,直接返回 | ||
| json.loads(self.task_result) | ||
| return self.task_result | ||
| except: | ||
| # 如果不是 JSON,包装成 JSON | ||
| return json.dumps({"result": self.task_result}) | ||
| elif isinstance(self.task_result, (int, float, bool)): | ||
| return json.dumps({"result": self.task_result}) | ||
| elif self.task_result is None: | ||
| return "null" | ||
| else: | ||
| # 如果是 MessageItem 对象,提取其中的 result 字段 | ||
| if isinstance(self.task_result, MessageItem): | ||
| result_data = self.task_result.result | ||
| if isinstance(result_data, (dict, list)): | ||
| return json.dumps(result_data) | ||
| else: | ||
| return json.dumps({"result": result_data}) | ||
| # 其他情况正常序列化 | ||
| try: | ||
| return json.dumps(self.task_result) | ||
| except: | ||
| return json.dumps({"result": str(self.task_result)}) | ||
@@ -107,0 +120,0 @@ |
@@ -13,4 +13,5 @@ #!/usr/bin/env python | ||
| from .message_item import MessageItem | ||
| from .scheduler_config import SchedulerConfig | ||
| __all__ = ["MessageItem"] | ||
| __all__ = ["MessageItem", "SchedulerConfig"] |
@@ -11,3 +11,2 @@ #!/usr/bin/env python | ||
| from dataclasses import dataclass, field | ||
@@ -26,3 +25,3 @@ from typing import Optional, Dict, Any | ||
| # 必需字段 | ||
| content: dict = field( | ||
| content: dict = field( # type ignore | ||
| metadata={"description": "消息内容,包含具体的任务数据或信息"} | ||
@@ -87,3 +86,3 @@ ) | ||
| @classmethod | ||
| def from_dict(cls, data: Dict[str, Any]) -> "MessageItem": | ||
| def from_dict(cls, data: dict) -> "MessageItem": | ||
| """从字典创建消息对象""" | ||
@@ -90,0 +89,0 @@ # 处理日期时间字段 |
@@ -13,4 +13,13 @@ #!/usr/bin/env python | ||
| from typing import Callable, List | ||
| from dataclasses import dataclass, field | ||
| @dataclass | ||
| class TaskMeta: | ||
| name: str = field(default="", metadata={"help": "任务名称"}) | ||
| description: str = field(default="", metadata={"help": "任务描述"}) | ||
| max_retries: int = field(default=3, metadata={"help": "任务最大重试次数"}) | ||
| delay: float = field(default=1, metadata={"help": "任务延迟执行时间"}) | ||
| class TaskMounter: | ||
@@ -27,3 +36,10 @@ @classmethod | ||
| # 使用自定义名称或函数原名 | ||
| function.meta = meta # type: ignore | ||
| name = meta.get("name", function.__name__) | ||
| description = meta.get("description", "") | ||
| max_retries = meta.get("max_retries", 3) | ||
| delay = meta.get("delay", 1) | ||
| task_meta = TaskMeta( | ||
| name=name, description=description, max_retries=max_retries, delay=delay | ||
| ) | ||
| function.meta = task_meta # type: ignore | ||
| setattr(TaskMounter, function.__name__, function) | ||
@@ -39,2 +55,8 @@ return function | ||
| @classmethod | ||
| def get_task_meta(cls, task_name: str): | ||
| task_function = cls.get_task_function(task_name) | ||
| if task_function: | ||
| return getattr(task_function, "meta", {}) | ||
| @classmethod | ||
| def get_task_list(cls) -> List[str]: | ||
@@ -41,0 +63,0 @@ """获取所有挂载的任务函数名称列表""" |
@@ -8,3 +8,3 @@ #!/usr/bin/env python | ||
| @Email : 947105045@qq.com | ||
| @description : 监听操作模块 | ||
| @description : 监听操作模块 - 改进版,优化数据库连接 | ||
| """ | ||
@@ -16,2 +16,3 @@ | ||
| from typing import List, Tuple, Union | ||
| import logging | ||
@@ -26,8 +27,16 @@ | ||
| def _get_connection(self): | ||
| """获取数据库连接""" | ||
| return sqlite3.connect( | ||
| self.db_dir, | ||
| check_same_thread=False, | ||
| """获取数据库连接 - 改进版""" | ||
| conn = sqlite3.connect( | ||
| self.db_dir, check_same_thread=False, timeout=30.0 # 增加超时时间 | ||
| ) | ||
| # 设置SQLite优化参数 | ||
| conn.execute("PRAGMA journal_mode=WAL;") # 使用WAL模式提高并发 | ||
| conn.execute("PRAGMA synchronous=NORMAL;") | ||
| conn.execute("PRAGMA cache_size=-20000;") | ||
| conn.execute("PRAGMA busy_timeout=5000;") # 设置忙时超时 | ||
| conn.execute("PRAGMA mmap_size=268435456;") # 256MB内存映射 | ||
| return conn | ||
| def create_table(self): | ||
@@ -38,105 +47,146 @@ if len(self.listen_fields) == 0: | ||
| # 分别执行每个SQL语句而不是使用executescript | ||
| conn.execute( | ||
| try: | ||
| # 分别执行每个SQL语句而不是使用executescript | ||
| conn.execute( | ||
| """ | ||
| CREATE TABLE IF NOT EXISTS listen_table ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| key Text, | ||
| value JSON | ||
| ) | ||
| """ | ||
| CREATE TABLE IF NOT EXISTS listen_table ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| key Text, | ||
| value JSON | ||
| ) | ||
| """ | ||
| ) | ||
| conn.execute( | ||
| """ | ||
| CREATE TABLE IF NOT EXISTS change_log ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| table_name TEXT, | ||
| row_id INTEGER, | ||
| column_name TEXT, | ||
| old_value TEXT, | ||
| new_value TEXT, | ||
| is_delete integer DEFAULT 0, | ||
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | ||
| ) | ||
| """ | ||
| ) | ||
| # 检查触发器是否已存在,如果不存在则创建 | ||
| try: | ||
| conn.execute( | ||
| """ | ||
| CREATE TRIGGER IF NOT EXISTS track_value_change | ||
| AFTER UPDATE OF value ON listen_table | ||
| FOR EACH ROW | ||
| WHEN OLD.value <> NEW.value | ||
| BEGIN | ||
| INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value) | ||
| VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key); | ||
| END | ||
| CREATE TABLE IF NOT EXISTS change_log ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| table_name TEXT, | ||
| row_id INTEGER, | ||
| column_name TEXT, | ||
| old_value TEXT, | ||
| new_value TEXT, | ||
| is_delete integer DEFAULT 0, | ||
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | ||
| ) | ||
| """ | ||
| ) | ||
| except sqlite3.Error: | ||
| # 如果触发器创建失败,我们继续执行其他操作 | ||
| pass | ||
| conn.execute("PRAGMA journal_mode=WAL;") | ||
| conn.execute("PRAGMA synchronous=NORMAL;") | ||
| conn.execute("PRAGMA cache_size=-20000;") | ||
| conn.execute("PRAGMA mmap_size=1073741824;") | ||
| conn.execute("PRAGMA temp_store=MEMORY;") | ||
| conn.execute("PRAGMA busy_timeout=5000;") | ||
| conn.commit() | ||
| # 检查触发器是否已存在,如果不存在则创建 | ||
| try: | ||
| conn.execute( | ||
| """ | ||
| CREATE TRIGGER IF NOT EXISTS track_value_change | ||
| AFTER UPDATE OF value ON listen_table | ||
| FOR EACH ROW | ||
| WHEN OLD.value <> NEW.value | ||
| BEGIN | ||
| INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value) | ||
| VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key); | ||
| END | ||
| """ | ||
| ) | ||
| except sqlite3.Error: | ||
| # 如果触发器创建失败,我们继续执行其他操作 | ||
| pass | ||
| for listen_field in self.listen_fields: | ||
| sql = """ | ||
| INSERT INTO | ||
| listen_table (key, value) | ||
| VALUES | ||
| (?, ?) | ||
| """ | ||
| conn.execute(sql, (listen_field, "null")) | ||
| conn.commit() | ||
| for listen_field in self.listen_fields: | ||
| # 检查是否已存在该key | ||
| cursor = conn.execute( | ||
| "SELECT id FROM listen_table WHERE key = ?", (listen_field,) | ||
| ) | ||
| if cursor.fetchone() is None: | ||
| sql = """ | ||
| INSERT INTO | ||
| listen_table (key, value) | ||
| VALUES | ||
| (?, ?) | ||
| """ | ||
| conn.execute(sql, (listen_field, "null")) | ||
| conn.commit() | ||
| except Exception as e: | ||
| logging.error(f"创建监听表失败: {str(e)}") | ||
| finally: | ||
| conn.close() | ||
| def listen_data(self) -> Tuple[bool, Union[List[Tuple], str]]: | ||
| sql = f""" | ||
| SELECT * FROM change_log where is_delete = 0 ORDER BY id DESC LIMIT 100 | ||
| """ | ||
| """获取监听数据 - 改进版""" | ||
| conn = self._get_connection() | ||
| result = conn.execute(sql).fetchall() | ||
| if len(result) == 0: | ||
| return False, "No data found" | ||
| return True, result | ||
| try: | ||
| sql = """ | ||
| SELECT * FROM change_log where is_delete = 0 ORDER BY id DESC LIMIT 100 | ||
| """ | ||
| result = conn.execute(sql).fetchall() | ||
| if len(result) == 0: | ||
| return False, "No data found" | ||
| return True, result | ||
| except Exception as e: | ||
| logging.error(f"获取监听数据失败: {str(e)}") | ||
| return False, str(e) | ||
| finally: | ||
| conn.close() | ||
| def delete_change_log(self, delete_id): | ||
| sql = f""" | ||
| DELETE FROM change_log WHERE id = {delete_id} | ||
| """ | ||
| """删除变更日志 - 改进版""" | ||
| conn = self._get_connection() | ||
| conn.execute(sql) | ||
| try: | ||
| sql = """ | ||
| DELETE FROM change_log WHERE id = ? | ||
| """ | ||
| conn.execute(sql, (delete_id,)) | ||
| conn.commit() | ||
| except Exception as e: | ||
| logging.error(f"删除变更日志失败 {delete_id}: {str(e)}") | ||
| raise e # 重新抛出异常以便上层处理 | ||
| finally: | ||
| conn.close() | ||
| def update_listen_data(self, key, value): | ||
| sql = f""" | ||
| UPDATE listen_table SET value = '{value}' WHERE key = '{key}' | ||
| """ | ||
| """更新监听数据 - 改进版""" | ||
| conn = self._get_connection() | ||
| conn.execute(sql) | ||
| conn.commit() | ||
| try: | ||
| sql = """ | ||
| UPDATE listen_table SET value = ? WHERE key = ? | ||
| """ | ||
| conn.execute(sql, (value, key)) | ||
| conn.commit() | ||
| except Exception as e: | ||
| logging.error(f"更新监听数据失败 {key}: {str(e)}") | ||
| raise e | ||
| finally: | ||
| conn.close() | ||
| def get_value(self, key): | ||
| sql = f""" | ||
| SELECT value FROM listen_table WHERE key = '{key}' | ||
| """ | ||
| """获取值 - 改进版""" | ||
| conn = self._get_connection() | ||
| result = conn.execute(sql).fetchone() | ||
| if result is None: | ||
| try: | ||
| sql = """ | ||
| SELECT value FROM listen_table WHERE key = ? | ||
| """ | ||
| result = conn.execute(sql, (key,)).fetchone() | ||
| if result is None: | ||
| return None | ||
| return result[0] | ||
| except Exception as e: | ||
| logging.error(f"获取值失败 {key}: {str(e)}") | ||
| return None | ||
| return result[0] | ||
| finally: | ||
| conn.close() | ||
| def get_values(self): | ||
| sql = f""" | ||
| SELECT key, value FROM listen_table | ||
| """ | ||
| """获取所有值 - 改进版""" | ||
| conn = self._get_connection() | ||
| result = conn.execute(sql).fetchall() | ||
| return result | ||
| try: | ||
| sql = """ | ||
| SELECT key, value FROM listen_table | ||
| """ | ||
| result = conn.execute(sql).fetchall() | ||
| return result | ||
| except Exception as e: | ||
| logging.error(f"获取所有值失败: {str(e)}") | ||
| return [] | ||
| finally: | ||
| conn.close() |
@@ -11,9 +11,8 @@ #!/usr/bin/env python | ||
| from .standard import StandardQueueScheduler | ||
| from ._async import AsyncQueueScheduler | ||
| from .base import BaseScheduler | ||
| from ..model import MessageItem | ||
| from typing import Callable | ||
| import multiprocessing | ||
| from ..model import MessageItem, SchedulerConfig | ||
| from typing import Callable, Literal | ||
| import logging | ||
@@ -23,10 +22,21 @@ | ||
| try: | ||
| from .qt import QtQueueScheduler | ||
| print(SCHEDULER_TYPES) | ||
| if QtQueueScheduler.is_available(): | ||
| SCHEDULER_TYPES["qt"] = QtQueueScheduler | ||
| except ImportError: | ||
| logging.warning( | ||
| "Qt is not available, if you want to use it, please install PyQt5/PyQt6/PySide6" | ||
| ) | ||
| SchedulerType = Literal["standard", "async", "qt"] | ||
| class QueueScheduler(BaseScheduler): | ||
| def __init__( | ||
| self, | ||
| receive_thread_num=1, | ||
| task_thread_num=multiprocessing.cpu_count() * 2, | ||
| shard_num=4, | ||
| scheduler_type="standard", | ||
| scheduler_type: SchedulerType = "standard", | ||
| config: SchedulerConfig = SchedulerConfig(), | ||
| ): | ||
@@ -36,3 +46,3 @@ scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None) | ||
| raise ValueError(f"Invalid scheduler type: {scheduler_type}") | ||
| self.scheduler = scheduler_class(receive_thread_num, task_thread_num, shard_num) | ||
| self.scheduler = scheduler_class(config) | ||
@@ -39,0 +49,0 @@ if self.scheduler: |
@@ -22,2 +22,3 @@ #!/usr/bin/env python | ||
| from ..cleanup_scheduler import CleanupScheduler | ||
| from ...model import SchedulerConfig | ||
@@ -28,7 +29,10 @@ | ||
| def __init__( | ||
| self, receive_thread_num=1, task_thread_num=1, shard_num=4, queue_name="default" | ||
| self, | ||
| config: SchedulerConfig = SchedulerConfig(), | ||
| ): | ||
| self.queue_operation = ShardedQueueOperation(shard_num, queue_name=queue_name) | ||
| self.queue_operation = ShardedQueueOperation( | ||
| config.shard_num, queue_name=config.queue_name | ||
| ) | ||
| self.receive_scheduler = AsyncReceiveScheduler( | ||
| self.queue_operation, receive_thread_num | ||
| self.queue_operation, config.receive_thread_num | ||
| ) | ||
@@ -40,3 +44,5 @@ self.listen_operation = ListenOperation( | ||
| self.listen_scheduler = AsyncListenDataScheduler(self.listen_operation) | ||
| self.task_scheduler = AsyncTaskScheduler(self.queue_operation, task_thread_num) | ||
| self.task_scheduler = AsyncTaskScheduler( | ||
| self.queue_operation, config.task_thread_num | ||
| ) | ||
| self.cleanup_scheduler = CleanupScheduler(self.queue_operation) | ||
@@ -43,0 +49,0 @@ |
@@ -22,2 +22,3 @@ #!/usr/bin/env python | ||
| import logging | ||
| import json | ||
@@ -71,3 +72,3 @@ | ||
| try: | ||
| self.queue_operation.update_result(message.id, message.result) | ||
| self.queue_operation.update_result(message.id, json.dumps(message.result)) | ||
| self.queue_operation.update_status(message.id, message.status) | ||
@@ -74,0 +75,0 @@ except Exception as e: |
| from abc import ABC, abstractmethod | ||
| from typing import Callable | ||
| from ...model import MessageItem | ||
| from ...model import MessageItem, SchedulerConfig | ||
@@ -9,2 +9,5 @@ | ||
| def __init__(self, config: SchedulerConfig = SchedulerConfig()): | ||
| pass | ||
| @abstractmethod | ||
@@ -11,0 +14,0 @@ def send_message(self, message: MessageItem, callback: Callable): |
@@ -22,2 +22,3 @@ #!/usr/bin/env python | ||
| from ..base import BaseScheduler | ||
| from ...model import SchedulerConfig | ||
@@ -27,5 +28,8 @@ | ||
| def __init__( | ||
| self, receive_thread_num=1, task_thread_num=1, shard_num=4, queue_name="default" | ||
| self, | ||
| config: SchedulerConfig = SchedulerConfig(), | ||
| ): | ||
| self.queue_operation = ShardedQueueOperation(shard_num, queue_name) | ||
| self.queue_operation = ShardedQueueOperation( | ||
| config.shard_num, config.queue_name | ||
| ) | ||
| self.listen_operation = ListenOperation( | ||
@@ -38,5 +42,7 @@ os.path.join(self.queue_operation.db_dir, "listen.db") | ||
| self.receive_scheduler = ReceiveScheduler( | ||
| self.queue_operation, receive_thread_num | ||
| self.queue_operation, config.receive_thread_num | ||
| ) | ||
| self.task_scheduler = TaskScheduler(self.queue_operation, task_thread_num) | ||
| self.task_scheduler = TaskScheduler( | ||
| self.queue_operation, config.task_thread_num | ||
| ) | ||
| self.cleanup_scheduler = CleanupScheduler(self.queue_operation) | ||
@@ -43,0 +49,0 @@ |
@@ -30,3 +30,3 @@ from queue_sqlite.model import MessageItem | ||
| # print(f"callback: {cls.callback_counter}") | ||
| print(f"callback: {message_item}") | ||
| # print(f"callback: {message_item}") | ||
| # print(f"callback: {message_item.id}") | ||
@@ -49,5 +49,2 @@ # print(f"callback: {message_item.expire_time}") | ||
| queue_scheduler = QueueScheduler( | ||
| receive_thread_num=4, | ||
| task_thread_num=8, | ||
| shard_num=12, | ||
| scheduler_type="async", | ||
@@ -54,0 +51,0 @@ ) |
@@ -8,3 +8,3 @@ from listen import * | ||
| def test_listen_data(self): | ||
| scheduler = QueueScheduler() | ||
| scheduler = QueueScheduler(scheduler_type="qt") | ||
| scheduler.start() | ||
@@ -11,0 +11,0 @@ scheduler.update_listen_data("key_1", "value_1") |
@@ -30,3 +30,3 @@ from queue_sqlite.model import MessageItem | ||
| # print(f"callback: {cls.callback_counter}") | ||
| print(message_item.id) | ||
| # print(message_item.id) | ||
| # print(f"callback: {message_item}") | ||
@@ -60,5 +60,3 @@ # print(f"callback: {message_item.id}") | ||
| monitor_thread.start() | ||
| queue_scheduler = QueueScheduler( | ||
| receive_thread_num=4, task_thread_num=8, shard_num=12 | ||
| ) | ||
| queue_scheduler = QueueScheduler() | ||
@@ -65,0 +63,0 @@ # threads = [] |
Sorry, the diff of this file is not supported yet
| #!/usr/bin/env python | ||
| # -*- encoding: utf-8 -*- | ||
| """ | ||
| @File : to_json_utils.py | ||
| @Time : 2025-09-27 16:36:18 | ||
| @Author : chakcy | ||
| @Email : 947105045@qq.com | ||
| @description : xxxxxxxxx | ||
| """ | ||
| import sqlite3 | ||
| import json | ||
| import tkinter as tk | ||
| from tkinter import ttk, filedialog, messagebox | ||
| from datetime import datetime | ||
| class SQLiteToJSONConverter: | ||
| def __init__(self, root): | ||
| self.root = root | ||
| self.root.title("SQLite 表转 JSON 工具") | ||
| self.root.geometry("800x600") | ||
| # 变量 | ||
| self.db_path = tk.StringVar() | ||
| self.tables = [] | ||
| self.selected_table = tk.StringVar() | ||
| self.create_widgets() | ||
| def create_widgets(self): | ||
| # 主框架 | ||
| main_frame = ttk.Frame(self.root, padding="10") | ||
| main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # type: ignore | ||
| # 数据库选择部分 | ||
| db_frame = ttk.LabelFrame(main_frame, text="数据库选择", padding="5") | ||
| db_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5) # type: ignore | ||
| ttk.Entry(db_frame, textvariable=self.db_path, width=70).grid( | ||
| row=0, column=0, padx=5 | ||
| ) | ||
| ttk.Button(db_frame, text="浏览", command=self.browse_db).grid( | ||
| row=0, column=1, padx=5 | ||
| ) | ||
| ttk.Button(db_frame, text="连接", command=self.connect_db).grid( | ||
| row=0, column=2, padx=5 | ||
| ) | ||
| # 表选择部分 | ||
| table_frame = ttk.LabelFrame(main_frame, text="表选择", padding="5") | ||
| table_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5) # type: ignore | ||
| ttk.Label(table_frame, text="选择表:").grid(row=0, column=0, sticky=tk.W) | ||
| self.table_combo = ttk.Combobox( | ||
| table_frame, textvariable=self.selected_table, state="readonly" | ||
| ) | ||
| self.table_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5) # type: ignore | ||
| ttk.Button(table_frame, text="加载数据", command=self.load_table_data).grid( | ||
| row=0, column=2, padx=5 | ||
| ) | ||
| # 数据显示部分 | ||
| data_frame = ttk.LabelFrame(main_frame, text="表数据", padding="5") | ||
| data_frame.grid( | ||
| row=2, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5 # type: ignore | ||
| ) | ||
| # 创建Treeview显示数据 | ||
| columns = ("#0",) | ||
| self.tree = ttk.Treeview(data_frame, columns=columns, show="headings") | ||
| vsb = ttk.Scrollbar(data_frame, orient="vertical", command=self.tree.yview) | ||
| hsb = ttk.Scrollbar(data_frame, orient="horizontal", command=self.tree.xview) | ||
| self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) | ||
| self.tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # type: ignore | ||
| vsb.grid(row=0, column=1, sticky=(tk.N, tk.S)) # type: ignore | ||
| hsb.grid(row=1, column=0, sticky=(tk.W, tk.E)) # type: ignore | ||
| # 按钮部分 | ||
| button_frame = ttk.Frame(main_frame) | ||
| button_frame.grid(row=3, column=0, columnspan=2, pady=10) | ||
| ttk.Button(button_frame, text="导出为JSON", command=self.export_to_json).pack( | ||
| side=tk.LEFT, padx=5 | ||
| ) | ||
| ttk.Button(button_frame, text="清空", command=self.clear_all).pack( | ||
| side=tk.LEFT, padx=5 | ||
| ) | ||
| ttk.Button(button_frame, text="退出", command=self.root.quit).pack( | ||
| side=tk.LEFT, padx=5 | ||
| ) | ||
| # 配置权重 | ||
| self.root.columnconfigure(0, weight=1) | ||
| self.root.rowconfigure(0, weight=1) | ||
| main_frame.columnconfigure(0, weight=1) | ||
| main_frame.rowconfigure(2, weight=1) | ||
| data_frame.columnconfigure(0, weight=1) | ||
| data_frame.rowconfigure(0, weight=1) | ||
| def browse_db(self): | ||
| file_path = filedialog.askopenfilename( | ||
| title="选择SQLite数据库文件", | ||
| filetypes=[ | ||
| ("SQLite数据库", "*.db *.sqlite *.sqlite3"), | ||
| ("所有文件", "*.*"), | ||
| ], | ||
| ) | ||
| if file_path: | ||
| self.db_path.set(file_path) | ||
| def connect_db(self): | ||
| if not self.db_path.get(): | ||
| messagebox.showerror("错误", "请先选择数据库文件") | ||
| return | ||
| try: | ||
| conn = sqlite3.connect(self.db_path.get()) | ||
| cursor = conn.cursor() | ||
| # 获取所有表名 | ||
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") | ||
| self.tables = [table[0] for table in cursor.fetchall()] | ||
| self.table_combo["values"] = self.tables | ||
| if self.tables: | ||
| self.selected_table.set(self.tables[0]) | ||
| conn.close() | ||
| messagebox.showinfo( | ||
| "成功", f"成功连接到数据库,找到 {len(self.tables)} 个表" | ||
| ) | ||
| except sqlite3.Error as e: | ||
| messagebox.showerror("数据库错误", f"连接数据库时出错: {str(e)}") | ||
| def load_table_data(self): | ||
| if not self.selected_table.get(): | ||
| messagebox.showerror("错误", "请先选择表") | ||
| return | ||
| try: | ||
| conn = sqlite3.connect(self.db_path.get()) | ||
| conn.row_factory = sqlite3.Row # 这样可以使用列名访问数据 | ||
| cursor = conn.cursor() | ||
| # 获取表数据 | ||
| cursor.execute(f"SELECT * FROM {self.selected_table.get()}") | ||
| rows = cursor.fetchall() | ||
| # 清空Treeview | ||
| for item in self.tree.get_children(): | ||
| self.tree.delete(item) | ||
| # 设置列 | ||
| columns = [description[0] for description in cursor.description] | ||
| self.tree["columns"] = columns | ||
| for col in columns: | ||
| self.tree.heading(col, text=col) | ||
| self.tree.column(col, width=100, minwidth=50) | ||
| # 添加数据 | ||
| for row in rows: | ||
| self.tree.insert("", "end", values=tuple(row)) | ||
| conn.close() | ||
| except sqlite3.Error as e: | ||
| messagebox.showerror("数据库错误", f"加载数据时出错: {str(e)}") | ||
| def export_to_json(self): | ||
| if not self.selected_table.get(): | ||
| messagebox.showerror("错误", "请先选择表") | ||
| return | ||
| # 选择保存位置 | ||
| file_path = filedialog.asksaveasfilename( | ||
| title="保存JSON文件", | ||
| defaultextension=".json", | ||
| filetypes=[("JSON文件", "*.json"), ("所有文件", "*.*")], | ||
| ) | ||
| if not file_path: | ||
| return | ||
| try: | ||
| conn = sqlite3.connect(self.db_path.get()) | ||
| conn.row_factory = sqlite3.Row # 使用列名访问数据 | ||
| cursor = conn.cursor() | ||
| # 获取表数据 | ||
| cursor.execute(f"SELECT * FROM {self.selected_table.get()}") | ||
| rows = cursor.fetchall() | ||
| # 转换为字典列表 | ||
| result = [dict(row) for row in rows] | ||
| # 写入JSON文件 | ||
| with open(file_path, "w", encoding="utf-8") as f: | ||
| json.dump(result, f, indent=4, ensure_ascii=False) | ||
| conn.close() | ||
| messagebox.showinfo("成功", f"数据已成功导出到 {file_path}") | ||
| except sqlite3.Error as e: | ||
| messagebox.showerror("数据库错误", f"导出数据时出错: {str(e)}") | ||
| except Exception as e: | ||
| messagebox.showerror("错误", f"保存文件时出错: {str(e)}") | ||
| def clear_all(self): | ||
| self.db_path.set("") | ||
| self.tables = [] | ||
| self.table_combo["values"] = [] | ||
| self.selected_table.set("") | ||
| for item in self.tree.get_children(): | ||
| self.tree.delete(item) | ||
| self.tree["columns"] = ("#0",) | ||
| if __name__ == "__main__": | ||
| root = tk.Tk() | ||
| app = SQLiteToJSONConverter(root) | ||
| root.mainloop() |
Sorry, the diff of this file is not supported yet
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
190089
20.88%64
6.67%2880
36.23%