New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

queue-sqlite

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-sqlite - pypi Package Compare versions

Comparing version
0.2.0
to
0.2.1
+68
src/queue_sqlite/model/scheduler_config.py
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : scheduler_config.py
@Time : 2025-10-28 10:21:21
@Author : chakcy
@Email : 947105045@qq.com
@description : 队列调度器配置
"""
from dataclasses import dataclass, field
from typing import Dict, Any
import logging
@dataclass
class SchedulerConfig:
receive_thread_num: int = field(
default=1,
metadata={"help": "接收消息的线程数"},
)
task_thread_num: int = field(
default=4,
metadata={"help": "任务执行线程数"},
)
shard_num: int = field(
default=4,
metadata={"help": "分片数"},
)
queue_name: str = field(
default="default",
metadata={"help": "队列名称"},
)
meta: Dict[str, Any] = field(
default_factory=dict,
metadata={"help": "队列元数据"},
)
@classmethod
def get_field_descriptions(cls) -> Dict[str, str]:
"""获取所有字段的描述信息
Returns:
Dict[str, str]: 字段名到描述信息的映射
"""
from dataclasses import fields
descriptions = {}
for field_info in fields(cls):
if "help" in field_info.metadata:
descriptions[field_info.name] = field_info.metadata["help"]
else:
descriptions[field_info.name] = "无描述信息"
return descriptions
@classmethod
def print_field_info(cls):
"""打印所有字段的信息,包括类型、默认值和描述"""
from dataclasses import fields
logging.info(f"=== {cls.__name__} 字段信息 ===")
for field_info in fields(cls):
logging.info(
f"{field_info.name}: {field_info.type} (默认值: {field_info.default})"
)
logging.info(f" 描述: {field_info.metadata.get('help', '无描述信息')}")
logging.info("-" * 50)
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : __init__.py
@Time : 2025-10-26 10:26:16
@Author : chakcy
@Email : 947105045@qq.com
@description : Qt调度器 - 独立实现
"""
import logging
import os
from ..base import BaseScheduler
from ...model import MessageItem, SchedulerConfig
from queue_sqlite_core import ShardedQueueOperation
from ...queue_operation.listen_operation import ListenOperation
from .receive_scheduler import QtReceiveScheduler
from .listen_scheduler import QtListenScheduler
from .task_scheduler import QtTaskScheduler
from ..cleanup_scheduler import CleanupScheduler as QtCleanupScheduler
from qtpy.QtCore import QThreadPool
from qtpy.QtWidgets import QApplication
from typing import Callable, Dict, Any, List
class QtQueueScheduler(BaseScheduler):
"""完全独立的 Qt 队列调度器 - 修复版"""
qt_type = None
def __init__(
self,
config: SchedulerConfig = SchedulerConfig(),
):
# 初始化队列操作
self.queue_operation = ShardedQueueOperation(
config.shard_num, config.queue_name
)
# 初始化监听操作
db_dir = f"cache/{config.queue_name}"
os.makedirs(db_dir, exist_ok=True)
self.listen_operation = ListenOperation(f"{db_dir}/listen.db")
# 确保监听表被创建
self._ensure_listen_operation_tables()
# 初始化各个独立的调度器组件
self.receive_scheduler = QtReceiveScheduler(
self.queue_operation, config.receive_thread_num
)
self.task_scheduler = QtTaskScheduler(
self.queue_operation, config.task_thread_num
)
self.listen_scheduler = QtListenScheduler(self.listen_operation)
self.cleanup_scheduler = QtCleanupScheduler(self.queue_operation)
@classmethod
def is_available(cls) -> bool:
# 检查 PySide6/PyQt6/PyQt5/PySide2 模块中的一个
import importlib
qt_modules = ["PySide6", "PyQt6", "PyQt5", "PySide2"]
for module in qt_modules:
try:
importlib.import_module(module)
cls.qt_type = module
logging.info(f"已找到 Qt 模块: {module}")
print(f"已找到 Qt 模块: {module}")
return True
except ImportError: # 模块未找到
continue
return False
def _ensure_qapplication(self):
"""确保 QApplication 实例存在"""
try:
if not QApplication.instance():
# 对于非GUI应用,创建无窗口的QApplication
import sys
self.app = QApplication(sys.argv if hasattr(sys, "argv") else [])
logging.info("已创建 QApplication 实例")
except Exception as e:
logging.warning(f"创建 QApplication 失败: {str(e)}")
def _ensure_listen_operation_tables(self):
"""确保监听操作的表被正确创建"""
try:
# 强制重新创建表
self.listen_operation.create_table()
# 额外检查 change_log 表
conn = self.listen_operation._get_connection()
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'"
)
if cursor.fetchone() is None:
logging.warning("change_log 表不存在,尝试手动创建")
conn.execute(
"""
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}")
def send_message(self, message: MessageItem, callback: Callable):
"""发送消息到队列"""
self.receive_scheduler.send_message(message, callback)
def update_listen_data(self, key: str, value: str):
"""更新监听数据"""
self.listen_operation.update_listen_data(key, value)
def get_listen_datas(self) -> List:
"""获取所有监听数据"""
return self.listen_operation.get_values()
def get_listen_data(self, key: str):
"""获取单个监听数据"""
return self.listen_operation.get_value(key)
def start(self):
"""启动所有调度器组件"""
# 确保有 QApplication 实例(对于 GUI 应用)
self._ensure_qapplication()
self.receive_scheduler.start()
self.task_scheduler.start()
self.cleanup_scheduler.start_cleanup()
self.listen_scheduler.start()
logging.info("Qt 队列调度器已完全启动")
def stop(self):
"""停止所有调度器组件"""
self.listen_scheduler.stop()
self.cleanup_scheduler.stop_cleanup()
self.task_scheduler.stop()
self.receive_scheduler.stop()
logging.info("Qt 队列调度器已完全停止")
def get_queue_info(self) -> Dict[str, Any]:
"""获取队列信息"""
try:
return {
"queue_length": self.queue_operation.get_queue_length(),
"shard_num": self.queue_operation.shard_num,
"db_dir": self.queue_operation.db_dir,
"active_threads": QThreadPool.globalInstance().activeThreadCount(), # type: ignore
"max_threads": QThreadPool.globalInstance().maxThreadCount(), # type: ignore
}
except Exception as e:
logging.error(f"获取队列信息失败: {str(e)}")
return {}
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : callback_task.py
@Time : 2025-10-28 11:40:02
@Author : chakcy
@Email : 947105045@qq.com
@description : 回调任务
"""
import asyncio
import logging
from queue_sqlite_core import ShardedQueueOperation
from ...model import MessageItem
from typing import Callable
from qtpy.QtCore import QRunnable
class QtCallbackTask(QRunnable):
"""Qt 回调任务"""
def __init__(
self,
callback: Callable,
message: MessageItem,
queue_operation: ShardedQueueOperation,
):
super().__init__()
self.callback = callback
self.message = message
self.queue_operation = queue_operation
self.setAutoDelete(True)
def run(self):
"""执行回调函数"""
try:
# 检查是否是协程函数
if asyncio.iscoroutinefunction(self.callback):
# 对于异步回调,在当前线程中运行事件循环
asyncio.run(self._run_async_callback())
else:
self.callback(self.message)
except Exception as e:
logging.error(f"回调执行错误: {str(e)}")
finally:
# 删除消息
try:
self.queue_operation.delete_message(self.message.id)
except Exception as e:
logging.error(f"删除消息失败 {self.message.id}: {str(e)}")
async def _run_async_callback(self):
"""运行异步回调"""
try:
await self.callback(self.message)
except Exception as e:
logging.error(f"异步回调执行错误: {str(e)}")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : listen_scheduler.py
@Time : 2025-10-28 14:22:38
@Author : chakcy
@Email : 947105045@qq.com
@description : 监听调度器 - 改进版,解决数据库锁定问题
"""
import logging
import time
import sqlite3
from .listen_task import QtListenTask
from ...queue_operation.listen_operation import ListenOperation
from qtpy.QtCore import QThreadPool, QMutex, QMutexLocker
import threading
class QtListenScheduler:
"""Qt 监听调度器 - 改进版,解决数据库锁定问题"""
def __init__(self, listen_operation: ListenOperation):
self.listen_operation = listen_operation
self.is_running = False
self.listen_thread = None
self.thread_pool = QThreadPool.globalInstance()
self.db_mutex = QMutex() # 数据库操作互斥锁
# 监听配置
self.poll_interval = 0.1 # 轮询间隔(秒)
self.max_retries = 5 # 最大重试次数
self.retry_delay = 0.2 # 重试延迟(秒)
# 确保监听表存在
self._ensure_listen_tables()
def _ensure_listen_tables(self):
"""确保监听相关的表存在"""
try:
# 使用互斥锁保护数据库操作
with QMutexLocker(self.db_mutex):
# 检查表是否存在,如果不存在则创建
conn = self.listen_operation._get_connection()
# 检查 change_log 表是否存在
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'"
)
if cursor.fetchone() is None:
logging.info("创建缺失的 change_log 表")
conn.execute(
"""
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}")
def _safe_listen_data(self):
"""安全地获取监听数据,带重试机制"""
for attempt in range(self.max_retries):
try:
# 使用互斥锁保护数据库操作
with QMutexLocker(self.db_mutex):
status, change_data_items = self.listen_operation.listen_data()
return status, change_data_items
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < self.max_retries - 1:
logging.warning(
f"数据库锁定,第 {attempt + 1} 次重试获取监听数据..."
)
time.sleep(self.retry_delay)
else:
logging.error(f"获取监听数据失败: {str(e)}")
return False, []
except Exception as e:
logging.error(f"获取监听数据时发生未知错误: {str(e)}")
return False, []
return False, []
def _process_listen_data(self, change_data_items):
"""处理监听数据"""
processed_count = 0
for data in change_data_items:
try:
# 修正字段索引
if len(data) >= 8:
delete_id = data[0] # id
table_name = data[1] # table_name
row_id = data[2] # row_id
column_name = data[3] # column_name
old_value = data[4] # old_value
new_value = data[5] # new_value
# 使用线程池执行监听任务
task = QtListenTask(
column_name,
new_value,
int(delete_id),
self.listen_operation,
self.db_mutex,
self.max_retries,
self.retry_delay,
)
self.thread_pool.start(task) # type: ignore
processed_count += 1
except Exception as e:
logging.error(f"处理监听数据失败: {str(e)}")
return processed_count
def _listen_loop(self):
"""监听数据变化循环 - 改进版"""
consecutive_errors = 0
max_consecutive_errors = 10
while self.is_running:
try:
status, change_data_items = self._safe_listen_data()
if status and change_data_items:
processed_count = self._process_listen_data(change_data_items)
consecutive_errors = 0 # 重置连续错误计数
if processed_count > 0:
logging.debug(f"成功处理 {processed_count} 个监听数据项")
elif not status:
# 没有数据或获取失败,短暂休眠
time.sleep(self.poll_interval)
consecutive_errors += 1
# 如果连续错误太多,延长休眠时间
if consecutive_errors >= max_consecutive_errors:
# logging.warning("连续多次获取监听数据失败,延长休眠时间")
time.sleep(self.poll_interval * 5)
consecutive_errors = 0 # 重置计数
else:
# 有状态但没有数据,正常休眠
time.sleep(self.poll_interval)
consecutive_errors = 0
except Exception as e:
logging.error(f"监听循环错误: {str(e)}")
consecutive_errors += 1
time.sleep(self.poll_interval * 2) # 出错时延长休眠
def start(self):
"""启动监听调度器"""
if self.is_running:
return
self.is_running = True
self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True)
self.listen_thread.start()
logging.info("Qt 监听调度器已启动")
def stop(self):
"""停止监听调度器"""
if not self.is_running:
return
self.is_running = False
if self.listen_thread and self.listen_thread.is_alive():
self.listen_thread.join(timeout=3.0)
logging.info("Qt 监听调度器已停止")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : listen_task.py
@Time : 2025-10-28 14:30:41
@Author : chakcy
@Email : 947105045@qq.com
@description : 监听任务 - 改进版,解决数据库锁定问题
"""
from queue_sqlite.queue_operation.listen_operation import ListenOperation
from ...mounter.listen_mounter import ListenMounter
import logging
from qtpy.QtCore import QRunnable, QMutexLocker
import asyncio
from typing import Callable
import time
import sqlite3
class QtListenTask(QRunnable):
"""Qt 监听任务 - 改进版,解决数据库锁定问题"""
def __init__(
self,
key: str,
value: str,
delete_id: int,
listen_operation: ListenOperation,
db_mutex,
max_retries=5,
retry_delay=0.2,
):
super().__init__()
self.key = key
self.value = value
self.delete_id = delete_id
self.listen_operation = listen_operation
self.db_mutex = db_mutex
self.max_retries = max_retries
self.retry_delay = retry_delay
self.setAutoDelete(True)
def run(self):
"""执行监听回调"""
try:
listen_function = ListenMounter.get_Listener_function(self.key)
if listen_function:
if asyncio.iscoroutinefunction(listen_function):
# 异步监听函数
asyncio.run(self._run_async_listener(listen_function))
else:
# 同步监听函数
listen_function(self.value)
except Exception as e:
logging.error(f"监听函数执行错误 {self.key}: {str(e)}")
finally:
# 安全删除变更日志
self._safe_delete_change_log()
def _safe_delete_change_log(self):
"""安全删除变更日志,带重试机制"""
for attempt in range(self.max_retries):
try:
# 使用互斥锁保护数据库操作
with QMutexLocker(self.db_mutex):
self.listen_operation.delete_change_log(self.delete_id)
break # 成功则退出循环
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < self.max_retries - 1:
logging.debug(
f"删除变更日志时数据库锁定,第 {attempt + 1} 次重试..."
)
time.sleep(self.retry_delay)
else:
logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}")
break
except Exception as e:
logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}")
break
async def _run_async_listener(self, listen_function: Callable):
"""执行异步监听函数"""
try:
await listen_function(self.value)
except Exception as e:
logging.error(f"异步监听函数执行错误 {self.key}: {str(e)}")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : receive_scheduler.py
@Time : 2025-10-28 14:31:06
@Author : chakcy
@Email : 947105045@qq.com
@description : 接收调度器
"""
from typing import Callable, Optional
from queue_sqlite_core import ShardedQueueOperation
from qtpy.QtCore import QThreadPool
import threading
from ...model import MessageItem
import time
import logging
from .callback_task import QtCallbackTask
class QtReceiveScheduler:
"""Qt 接收调度器 - 独立实现"""
def __init__(
self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1
):
self.callbacks = {}
self.is_running = False
self.lock = threading.Lock()
self.queue_operation = queue_operation
self.receive_thread = None
self.thread_pool = QThreadPool.globalInstance()
def send_message(self, message: MessageItem, callback: Optional[Callable] = None):
"""发送消息到队列"""
if callback is None:
callback = lambda message: logging.info(f"收到消息: {message.id}")
# 入队消息
self.queue_operation.enqueue(message.to_dict_by_core())
with self.lock:
self.callbacks[message.id] = callback
def _receive_loop(self):
"""接收消息循环"""
while self.is_running:
try:
# 获取已完成的消息
message_list = self.queue_operation.get_completed_messages()
if message_list:
for message_data in message_list:
try:
message = MessageItem.from_dict(message_data)
with self.lock:
callback = self.callbacks.pop(message.id, None)
if callback is None:
callback = lambda msg: None
# 使用线程池执行回调
task = QtCallbackTask(
callback, message, self.queue_operation
)
self.thread_pool.start(task) # type: ignore
except Exception as e:
logging.error(f"处理完成消息失败: {str(e)}")
# 即使处理失败也尝试删除消息
try:
self.queue_operation.delete_message(message_data["id"])
except:
pass
else:
time.sleep(0.05) # 短暂休眠避免CPU空转
except Exception as e:
logging.error(f"接收消息循环错误: {str(e)}")
time.sleep(0.1) # 出错时稍长休眠
def start(self):
"""启动接收调度器"""
if self.is_running:
return
self.is_running = True
self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)
self.receive_thread.start()
logging.info("Qt 接收调度器已启动")
def stop(self):
"""停止接收调度器"""
if not self.is_running:
return
self.is_running = False
if self.receive_thread and self.receive_thread.is_alive():
self.receive_thread.join(timeout=3.0)
logging.info("Qt 接收调度器已停止")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : task_executor.py
@Time : 2025-10-28 14:31:25
@Author : chakcy
@Email : 947105045@qq.com
@description : 任务执行器
"""
import asyncio
import logging
from typing import Any, Callable, Dict
from datetime import datetime
from ...constant import MessageStatus
from ...mounter.task_mounter import TaskMounter
from queue_sqlite_core import ShardedQueueOperation
from ...model.message_item import MessageItem
from qtpy.QtCore import QRunnable
class QtTaskExecutor(QRunnable):
"""Qt 任务执行器 - 修复版"""
def __init__(
self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation
):
super().__init__()
self.message_data = message_data
self.queue_operation = queue_operation
self.setAutoDelete(True)
def run(self):
"""执行任务"""
try:
message = MessageItem.from_dict(self.message_data)
if message.destination == "client":
self._process_client_message(message)
return
task_function = TaskMounter.get_task_function(message.destination)
if task_function is None:
raise ValueError(f"任务函数未找到: {message.destination}")
# 执行任务
if asyncio.iscoroutinefunction(task_function):
# 异步任务 - 在当前线程运行事件循环
asyncio.run(self._execute_async_task(message, task_function))
else:
# 同步任务
self._execute_sync_task(message, task_function)
except Exception as e:
logging.error(f"任务执行错误: {str(e)}")
# 更新任务状态为失败
try:
self.queue_operation.update_status(
self.message_data["id"], MessageStatus.FAILED.value
)
self.queue_operation.update_result(
self.message_data["id"], f'{{"error": "{str(e)}"}}'
)
except Exception as update_error:
logging.error(f"更新任务状态失败: {str(update_error)}")
def _process_client_message(self, message: MessageItem):
"""处理客户端消息"""
message.status = MessageStatus.COMPLETED
message.result = {"result": "success"}
message.updatetime = datetime.now()
self._update_task_result(message)
def _execute_sync_task(self, message: MessageItem, task_function: Callable):
"""执行同步任务"""
from ...cycle.task_cycle import TaskCycle
try:
task_cycle = TaskCycle(message, task_function)
task_cycle.run()
status = task_cycle.get_task_status()
if not status:
raise ValueError("任务未完成")
message.status = status
if message.status == MessageStatus.FAILED:
message.result = {"error": task_cycle.get_task_error()}
else:
# 获取序列化后的结果
result_str = task_cycle.get_task_result()
try:
import json
message.result = json.loads(result_str)
except:
message.result = {"result": result_str}
self._update_task_result(message)
except Exception as e:
logging.error(f"同步任务执行失败 {message.id}: {str(e)}")
message.status = MessageStatus.FAILED
message.result = {"error": str(e)}
self._update_task_result(message)
async def _execute_async_task(self, message: MessageItem, task_function: Callable):
"""执行异步任务"""
from ...cycle.async_task_cycle import AsyncTaskCycle
try:
task_cycle = AsyncTaskCycle(message, task_function)
await task_cycle.run()
status = task_cycle.get_task_status()
if not status:
raise ValueError("任务未完成")
message.status = status
if message.status == MessageStatus.FAILED:
message.result = {"error": task_cycle.get_task_error()}
else:
# 获取序列化后的结果
result_str = task_cycle.get_task_result()
try:
import json
message.result = json.loads(result_str)
except:
message.result = {"result": result_str}
self._update_task_result(message)
except Exception as e:
logging.error(f"异步任务执行失败 {message.id}: {str(e)}")
message.status = MessageStatus.FAILED
message.result = {"error": str(e)}
self._update_task_result(message)
def _update_task_result(self, message: MessageItem):
"""更新任务结果到数据库"""
import json
try:
# 序列化结果
result_str = json.dumps(message.result) if message.result else "{}"
self.queue_operation.update_result(message.id, result_str)
self.queue_operation.update_status(message.id, message.status.value)
except Exception as e:
logging.error(f"更新任务结果失败 {message.id}: {str(e)}")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : task_scheduler.py
@Time : 2025-10-28 14:19:39
@Author : chakcy
@Email : 947105045@qq.com
@description : Qt 任务调度器
"""
import logging
from qtpy.QtCore import QThreadPool
from .task_executor import QtTaskExecutor
from queue_sqlite_core import ShardedQueueOperation
import time
import threading
class QtTaskScheduler:
"""Qt 任务调度器 - 独立实现"""
def __init__(
self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4
):
self.is_running = False
self.queue_operation = queue_operation
self.task_thread = None
self.thread_pool = QThreadPool.globalInstance()
# 设置最大线程数
self.thread_pool.setMaxThreadCount(max(1, task_thread_num)) # type: ignore
def _task_loop(self):
"""任务处理循环"""
while self.is_running:
try:
# 出队消息进行处理
message_list = self.queue_operation.dequeue(
size=self.thread_pool.maxThreadCount() * 2 # type: ignore
)
if message_list:
for message_data in message_list:
# 使用线程池执行任务
task = QtTaskExecutor(message_data, self.queue_operation)
self.thread_pool.start(task) # type: ignore
else:
time.sleep(0.05) # 短暂休眠
except Exception as e:
logging.error(f"任务调度循环错误: {str(e)}")
time.sleep(0.1)
def start(self):
"""启动任务调度器"""
if self.is_running:
return
self.is_running = True
self.task_thread = threading.Thread(target=self._task_loop, daemon=True)
self.task_thread.start()
logging.info("Qt 任务调度器已启动")
def stop(self):
"""停止任务调度器"""
if not self.is_running:
return
self.is_running = False
if self.task_thread and self.task_thread.is_alive():
self.task_thread.join(timeout=3.0)
logging.info("Qt 任务调度器已停止")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : test_pyside6_scheduler.py
@Time : 2025-10-26 10:40:00
@Author : chakcy
@description : 独立 PySide6 调度器测试
"""
import time
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task
@task(meta={"task_name": "pyside6_example"})
def pyside6_example_task(message_item: MessageItem):
"""PySide6 调度器测试任务"""
print(f"处理任务: {message_item.id}")
# 模拟一些工作
result = sum(i * i for i in range(1000))
message_item.result = {
"status": "completed",
"result": result,
"task_id": message_item.id,
}
return message_item
@task(meta={"task_name": "async_pyside6_task"})
async def async_pyside6_task(message_item: MessageItem):
"""异步任务测试"""
import asyncio
print(f"处理异步任务: {message_item.id}")
await asyncio.sleep(0.1) # 模拟异步操作
message_item.result = {"status": "async_completed", "task_id": message_item.id}
return message_item
def task_callback(message_item: MessageItem):
"""任务完成回调"""
print(f"任务完成回调: {message_item.id}, 结果: {message_item.result}")
def test_pyside6_scheduler():
"""测试独立的 PySide6 调度器"""
# 创建独立的 PySide6 调度器
scheduler = QueueScheduler(
scheduler_type="qt", # 指定使用 PySide6 调度器
)
print("启动 qt 调度器...")
scheduler.start()
try:
# 发送同步任务
for i in range(5):
message = MessageItem(
content={"task_index": i, "type": "sync"},
destination="pyside6_example_task",
)
scheduler.send_message(message, task_callback)
print(f"发送同步消息: {message.id}")
# 发送异步任务
for i in range(5):
message = MessageItem(
content={"task_index": i, "type": "async"},
destination="async_pyside6_task",
)
scheduler.send_message(message, task_callback)
print(f"发送异步消息: {message.id}")
# 等待任务处理
print("等待任务处理...")
for i in range(10):
queue_info = scheduler.scheduler.get_queue_info()
print(f"队列状态: {queue_info}")
time.sleep(1)
if queue_info.get("queue_length", 0) == 0:
break
finally:
print("停止 PySide6 调度器...")
scheduler.stop()
print("测试完成")
if __name__ == "__main__":
test_pyside6_scheduler()
+4
-0

@@ -8,2 +8,3 @@ # Python-generated files

*.egg-info
.python-version

@@ -28,1 +29,4 @@ # Virtual environments

*.pyd
# log
log/
+7
-2
Metadata-Version: 2.4
Name: queue-sqlite
Version: 0.2.0
Version: 0.2.1
Summary: A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems.

@@ -15,8 +15,13 @@ Author-email: chakcy <947105045@qq.com>

Classifier: Typing :: Typed
Requires-Python: >=3.7
Requires-Python: >=3.11
Requires-Dist: queue-sqlite-core>=0.2.0
Provides-Extra: pyqt5
Requires-Dist: pyqt5; extra == 'pyqt5'
Requires-Dist: qtpy; extra == 'pyqt5'
Provides-Extra: pyqt6
Requires-Dist: pyqt6; extra == 'pyqt6'
Requires-Dist: qtpy; extra == 'pyqt6'
Provides-Extra: pyside6
Requires-Dist: pyside6; extra == 'pyside6'
Requires-Dist: qtpy; extra == 'pyside6'
Description-Content-Type: text/markdown

@@ -23,0 +28,0 @@

@@ -16,3 +16,3 @@ [project]

dependencies = [
"queue_sqlite_core>=0.2.0",
"queue-sqlite-core>=0.2.0",
]

@@ -22,12 +22,13 @@ description = "A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems."

readme = "README.md"
requires-python = ">=3.7"
version = "0.2.0"
requires-python = ">=3.11"
version = "0.2.1"
[project.optional-dependencies]
pyside6 = ["PySide6"]
pyqt6 = ["PyQt6"]
pyqt5 = ["qtpy", "PyQt5"]
pyqt6 = ["qtpy", "PyQt6"]
pyside6 = ["qtpy", "PySide6"]
[[tool.uv.index]]
default = true
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
# [[tool.uv.index]]
# default = true
# url = "https://pypi.tuna.tsinghua.edu.cn/simple"

@@ -40,6 +41,6 @@ [build-system]

dev = [
"maturin>=1.9.4",
"psutil>=7.0.0",
"pytest>=7.4.4",
"twine>=4.0.2",
"maturin>=1.9.4",
"psutil>=7.0.0",
"pytest>=7.4.4",
"twine>=4.0.2",
]

@@ -49,1 +50,9 @@

packages = ["src/queue_sqlite"]
[tool.uv.workspace]
members = [
"src/queue_sqlite_core",
]
[tool.uv.sources]
queue-sqlite-core = { workspace = true }

@@ -0,0 +0,0 @@ # This file is automatically @generated by Cargo.

@@ -0,0 +0,0 @@ [package]

@@ -22,3 +22,3 @@ [project]

default = true
url = "http://124.71.68.6:3141/root/pypi"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"

@@ -30,4 +30,1 @@ [dependency-groups]

]
[[tool.uv.index]]
url = "https://pypi.tuna.tsinghua.edu.cn/simple"

@@ -20,2 +20,4 @@ from typing import Callable

@property
def shard_num(self) -> int: ...
@property
def db_dir(self) -> str: ...

@@ -22,0 +24,0 @@ def __init__(self, shard_num: int, queue_name: str): ...

@@ -20,2 +20,3 @@ #!/usr/bin/env python

import logging
import os

@@ -25,6 +26,14 @@ logging.basicConfig(

)
logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
# 设置 error 级别日志写入 error.log 文件
if not os.path.exists("log"):
os.mkdir("log")
error_log_handler = logging.FileHandler("log/error.log", encoding="utf-8")
error_log_handler.setLevel(logging.ERROR)
error_log_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logging.getLogger().addHandler(error_log_handler)
__title__ = "queue_sqlite"
__version__ = "0.1.0"
__version__ = "0.2.1"
__author__ = "chakcy"

@@ -31,0 +40,0 @@ __email__ = "947105045@qq.com"

@@ -11,3 +11,2 @@ #!/usr/bin/env python

from enum import Enum

@@ -14,0 +13,0 @@

@@ -14,2 +14,3 @@ #!/usr/bin/env python

from ..constant import MessageStatus
from ..mounter.task_mounter import TaskMeta
import json

@@ -20,3 +21,3 @@ import asyncio

def retry_async(max_retries=3, delay=1):
def retry_async(max_retries=3):
"""

@@ -35,4 +36,6 @@ 异步重试装饰器

# 使用message_item中的retry_count作为最大重试次数
retries = getattr(self.message_item, "retry_count", max_retries)
task_meta: TaskMeta = self.callback.meta
retries = task_meta.max_retries
# retries = getattr(self.message_item, "retry_count", max_retries)
delay_time = task_meta.delay
# 至少尝试一次(retries+1),最多尝试max_retries+1次

@@ -47,3 +50,3 @@ actual_retries = min(retries, max_retries) if max_retries > 0 else retries

if attempt < actual_retries:
await asyncio.sleep(delay)
await asyncio.sleep(delay_time)
else:

@@ -69,3 +72,3 @@ break

@retry_async(max_retries=3, delay=1)
@retry_async(max_retries=3)
async def run(self):

@@ -84,19 +87,28 @@ try:

def get_task_result(self):
if isinstance(self.task_result, (dict, list)):
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})
"""获取任务结果 - 优化版本"""
if self.task_result is None:
return json.dumps({"result": None})
elif isinstance(self.task_result, str):
# 如果已经是字符串,尝试解析
if isinstance(self.task_result, str):
try:
# 如果是 JSON 字符串,直接返回
json.loads(self.task_result)
return self.task_result
except:
# 如果不是 JSON,包装成 JSON
return json.dumps({"result": self.task_result})
elif isinstance(self.task_result, (int, float, bool)):
return json.dumps({"result": self.task_result})
elif self.task_result is None:
return "null"
else:
# 如果是 MessageItem 对象,提取其中的 result 字段
if isinstance(self.task_result, MessageItem):
result_data = self.task_result.result
if isinstance(result_data, (dict, list)):
return json.dumps(result_data)
else:
return json.dumps({"result": result_data})
# 其他情况正常序列化
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})

@@ -103,0 +115,0 @@

@@ -15,2 +15,3 @@ #!/usr/bin/env python

from ..constant import MessageStatus
from ..mounter.task_mounter import TaskMeta
import json

@@ -22,3 +23,3 @@

def retry_sync(max_retries=3, delay=1):
def retry_sync(max_retries=3):
"""

@@ -37,4 +38,6 @@ 同步重试装饰器

# 使用message_item中的retry_count作为最大重试次数
retries = getattr(self.message_item, "retry_count", max_retries)
task_meta: TaskMeta = self.callback.meta
retries = task_meta.max_retries
# retries = getattr(self.message_item, "retry_count", max_retries)
delay_time = task_meta.delay
# 至少尝试一次(retries+1),最多尝试max_retries+1次

@@ -48,4 +51,5 @@ actual_retries = min(retries, max_retries) if max_retries > 0 else retries

last_exception = e
self.message_item.retry_count = attempt
if attempt < actual_retries:
time.sleep(delay)
time.sleep(delay_time)
else:

@@ -73,3 +77,3 @@ break

@retry_sync(max_retries=3, delay=1)
@retry_sync(max_retries=3)
def run(self):

@@ -88,19 +92,28 @@ try:

def get_task_result(self):
if isinstance(self.task_result, (dict, list)):
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})
"""获取任务结果 - 优化版本"""
if self.task_result is None:
return json.dumps({"result": None})
elif isinstance(self.task_result, str):
# 如果已经是字符串,尝试解析
if isinstance(self.task_result, str):
try:
# 如果是 JSON 字符串,直接返回
json.loads(self.task_result)
return self.task_result
except:
# 如果不是 JSON,包装成 JSON
return json.dumps({"result": self.task_result})
elif isinstance(self.task_result, (int, float, bool)):
return json.dumps({"result": self.task_result})
elif self.task_result is None:
return "null"
else:
# 如果是 MessageItem 对象,提取其中的 result 字段
if isinstance(self.task_result, MessageItem):
result_data = self.task_result.result
if isinstance(result_data, (dict, list)):
return json.dumps(result_data)
else:
return json.dumps({"result": result_data})
# 其他情况正常序列化
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})

@@ -107,0 +120,0 @@

@@ -13,4 +13,5 @@ #!/usr/bin/env python

from .message_item import MessageItem
from .scheduler_config import SchedulerConfig
__all__ = ["MessageItem"]
__all__ = ["MessageItem", "SchedulerConfig"]

@@ -11,3 +11,2 @@ #!/usr/bin/env python

from dataclasses import dataclass, field

@@ -26,3 +25,3 @@ from typing import Optional, Dict, Any

# 必需字段
content: dict = field(
content: dict = field( # type ignore
metadata={"description": "消息内容,包含具体的任务数据或信息"}

@@ -87,3 +86,3 @@ )

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MessageItem":
def from_dict(cls, data: dict) -> "MessageItem":
"""从字典创建消息对象"""

@@ -90,0 +89,0 @@ # 处理日期时间字段

@@ -13,4 +13,13 @@ #!/usr/bin/env python

from typing import Callable, List
from dataclasses import dataclass, field
@dataclass
class TaskMeta:
name: str = field(default="", metadata={"help": "任务名称"})
description: str = field(default="", metadata={"help": "任务描述"})
max_retries: int = field(default=3, metadata={"help": "任务最大重试次数"})
delay: float = field(default=1, metadata={"help": "任务延迟执行时间"})
class TaskMounter:

@@ -27,3 +36,10 @@ @classmethod

# 使用自定义名称或函数原名
function.meta = meta # type: ignore
name = meta.get("name", function.__name__)
description = meta.get("description", "")
max_retries = meta.get("max_retries", 3)
delay = meta.get("delay", 1)
task_meta = TaskMeta(
name=name, description=description, max_retries=max_retries, delay=delay
)
function.meta = task_meta # type: ignore
setattr(TaskMounter, function.__name__, function)

@@ -39,2 +55,8 @@ return function

@classmethod
def get_task_meta(cls, task_name: str):
task_function = cls.get_task_function(task_name)
if task_function:
return getattr(task_function, "meta", {})
@classmethod
def get_task_list(cls) -> List[str]:

@@ -41,0 +63,0 @@ """获取所有挂载的任务函数名称列表"""

@@ -8,3 +8,3 @@ #!/usr/bin/env python

@Email : 947105045@qq.com
@description : 监听操作模块
@description : 监听操作模块 - 改进版,优化数据库连接
"""

@@ -16,2 +16,3 @@

from typing import List, Tuple, Union
import logging

@@ -26,8 +27,16 @@

def _get_connection(self):
"""获取数据库连接"""
return sqlite3.connect(
self.db_dir,
check_same_thread=False,
"""获取数据库连接 - 改进版"""
conn = sqlite3.connect(
self.db_dir, check_same_thread=False, timeout=30.0 # 增加超时时间
)
# 设置SQLite优化参数
conn.execute("PRAGMA journal_mode=WAL;") # 使用WAL模式提高并发
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA cache_size=-20000;")
conn.execute("PRAGMA busy_timeout=5000;") # 设置忙时超时
conn.execute("PRAGMA mmap_size=268435456;") # 256MB内存映射
return conn
def create_table(self):

@@ -38,105 +47,146 @@ if len(self.listen_fields) == 0:

# 分别执行每个SQL语句而不是使用executescript
conn.execute(
try:
# 分别执行每个SQL语句而不是使用executescript
conn.execute(
"""
CREATE TABLE IF NOT EXISTS listen_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key Text,
value JSON
)
"""
CREATE TABLE IF NOT EXISTS listen_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key Text,
value JSON
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
# 检查触发器是否已存在,如果不存在则创建
try:
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS track_value_change
AFTER UPDATE OF value ON listen_table
FOR EACH ROW
WHEN OLD.value <> NEW.value
BEGIN
INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value)
VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key);
END
CREATE TABLE IF NOT EXISTS change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
except sqlite3.Error:
# 如果触发器创建失败,我们继续执行其他操作
pass
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA cache_size=-20000;")
conn.execute("PRAGMA mmap_size=1073741824;")
conn.execute("PRAGMA temp_store=MEMORY;")
conn.execute("PRAGMA busy_timeout=5000;")
conn.commit()
# 检查触发器是否已存在,如果不存在则创建
try:
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS track_value_change
AFTER UPDATE OF value ON listen_table
FOR EACH ROW
WHEN OLD.value <> NEW.value
BEGIN
INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value)
VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key);
END
"""
)
except sqlite3.Error:
# 如果触发器创建失败,我们继续执行其他操作
pass
for listen_field in self.listen_fields:
sql = """
INSERT INTO
listen_table (key, value)
VALUES
(?, ?)
"""
conn.execute(sql, (listen_field, "null"))
conn.commit()
for listen_field in self.listen_fields:
# 检查是否已存在该key
cursor = conn.execute(
"SELECT id FROM listen_table WHERE key = ?", (listen_field,)
)
if cursor.fetchone() is None:
sql = """
INSERT INTO
listen_table (key, value)
VALUES
(?, ?)
"""
conn.execute(sql, (listen_field, "null"))
conn.commit()
except Exception as e:
logging.error(f"创建监听表失败: {str(e)}")
finally:
conn.close()
def listen_data(self) -> Tuple[bool, Union[List[Tuple], str]]:
sql = f"""
SELECT * FROM change_log where is_delete = 0 ORDER BY id DESC LIMIT 100
"""
"""获取监听数据 - 改进版"""
conn = self._get_connection()
result = conn.execute(sql).fetchall()
if len(result) == 0:
return False, "No data found"
return True, result
try:
sql = """
SELECT * FROM change_log where is_delete = 0 ORDER BY id DESC LIMIT 100
"""
result = conn.execute(sql).fetchall()
if len(result) == 0:
return False, "No data found"
return True, result
except Exception as e:
logging.error(f"获取监听数据失败: {str(e)}")
return False, str(e)
finally:
conn.close()
def delete_change_log(self, delete_id):
sql = f"""
DELETE FROM change_log WHERE id = {delete_id}
"""
"""删除变更日志 - 改进版"""
conn = self._get_connection()
conn.execute(sql)
try:
sql = """
DELETE FROM change_log WHERE id = ?
"""
conn.execute(sql, (delete_id,))
conn.commit()
except Exception as e:
logging.error(f"删除变更日志失败 {delete_id}: {str(e)}")
raise e # 重新抛出异常以便上层处理
finally:
conn.close()
def update_listen_data(self, key, value):
sql = f"""
UPDATE listen_table SET value = '{value}' WHERE key = '{key}'
"""
"""更新监听数据 - 改进版"""
conn = self._get_connection()
conn.execute(sql)
conn.commit()
try:
sql = """
UPDATE listen_table SET value = ? WHERE key = ?
"""
conn.execute(sql, (value, key))
conn.commit()
except Exception as e:
logging.error(f"更新监听数据失败 {key}: {str(e)}")
raise e
finally:
conn.close()
def get_value(self, key):
sql = f"""
SELECT value FROM listen_table WHERE key = '{key}'
"""
"""获取值 - 改进版"""
conn = self._get_connection()
result = conn.execute(sql).fetchone()
if result is None:
try:
sql = """
SELECT value FROM listen_table WHERE key = ?
"""
result = conn.execute(sql, (key,)).fetchone()
if result is None:
return None
return result[0]
except Exception as e:
logging.error(f"获取值失败 {key}: {str(e)}")
return None
return result[0]
finally:
conn.close()
def get_values(self):
sql = f"""
SELECT key, value FROM listen_table
"""
"""获取所有值 - 改进版"""
conn = self._get_connection()
result = conn.execute(sql).fetchall()
return result
try:
sql = """
SELECT key, value FROM listen_table
"""
result = conn.execute(sql).fetchall()
return result
except Exception as e:
logging.error(f"获取所有值失败: {str(e)}")
return []
finally:
conn.close()

@@ -11,9 +11,8 @@ #!/usr/bin/env python

from .standard import StandardQueueScheduler
from ._async import AsyncQueueScheduler
from .base import BaseScheduler
from ..model import MessageItem
from typing import Callable
import multiprocessing
from ..model import MessageItem, SchedulerConfig
from typing import Callable, Literal
import logging

@@ -23,10 +22,21 @@

try:
from .qt import QtQueueScheduler
print(SCHEDULER_TYPES)
if QtQueueScheduler.is_available():
SCHEDULER_TYPES["qt"] = QtQueueScheduler
except ImportError:
logging.warning(
"Qt is not available, if you want to use it, please install PyQt5/PyQt6/PySide6"
)
SchedulerType = Literal["standard", "async", "qt"]
class QueueScheduler(BaseScheduler):
def __init__(
self,
receive_thread_num=1,
task_thread_num=multiprocessing.cpu_count() * 2,
shard_num=4,
scheduler_type="standard",
scheduler_type: SchedulerType = "standard",
config: SchedulerConfig = SchedulerConfig(),
):

@@ -36,3 +46,3 @@ scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None)

raise ValueError(f"Invalid scheduler type: {scheduler_type}")
self.scheduler = scheduler_class(receive_thread_num, task_thread_num, shard_num)
self.scheduler = scheduler_class(config)

@@ -39,0 +49,0 @@ if self.scheduler:

@@ -22,2 +22,3 @@ #!/usr/bin/env python

from ..cleanup_scheduler import CleanupScheduler
from ...model import SchedulerConfig

@@ -28,7 +29,10 @@

def __init__(
self, receive_thread_num=1, task_thread_num=1, shard_num=4, queue_name="default"
self,
config: SchedulerConfig = SchedulerConfig(),
):
self.queue_operation = ShardedQueueOperation(shard_num, queue_name=queue_name)
self.queue_operation = ShardedQueueOperation(
config.shard_num, queue_name=config.queue_name
)
self.receive_scheduler = AsyncReceiveScheduler(
self.queue_operation, receive_thread_num
self.queue_operation, config.receive_thread_num
)

@@ -40,3 +44,5 @@ self.listen_operation = ListenOperation(

self.listen_scheduler = AsyncListenDataScheduler(self.listen_operation)
self.task_scheduler = AsyncTaskScheduler(self.queue_operation, task_thread_num)
self.task_scheduler = AsyncTaskScheduler(
self.queue_operation, config.task_thread_num
)
self.cleanup_scheduler = CleanupScheduler(self.queue_operation)

@@ -43,0 +49,0 @@

@@ -22,2 +22,3 @@ #!/usr/bin/env python

import logging
import json

@@ -71,3 +72,3 @@

try:
self.queue_operation.update_result(message.id, message.result)
self.queue_operation.update_result(message.id, json.dumps(message.result))
self.queue_operation.update_status(message.id, message.status)

@@ -74,0 +75,0 @@ except Exception as e:

from abc import ABC, abstractmethod
from typing import Callable
from ...model import MessageItem
from ...model import MessageItem, SchedulerConfig

@@ -9,2 +9,5 @@

def __init__(self, config: SchedulerConfig = SchedulerConfig()):
pass
@abstractmethod

@@ -11,0 +14,0 @@ def send_message(self, message: MessageItem, callback: Callable):

@@ -22,2 +22,3 @@ #!/usr/bin/env python

from ..base import BaseScheduler
from ...model import SchedulerConfig

@@ -27,5 +28,8 @@

def __init__(
self, receive_thread_num=1, task_thread_num=1, shard_num=4, queue_name="default"
self,
config: SchedulerConfig = SchedulerConfig(),
):
self.queue_operation = ShardedQueueOperation(shard_num, queue_name)
self.queue_operation = ShardedQueueOperation(
config.shard_num, config.queue_name
)
self.listen_operation = ListenOperation(

@@ -38,5 +42,7 @@ os.path.join(self.queue_operation.db_dir, "listen.db")

self.receive_scheduler = ReceiveScheduler(
self.queue_operation, receive_thread_num
self.queue_operation, config.receive_thread_num
)
self.task_scheduler = TaskScheduler(self.queue_operation, task_thread_num)
self.task_scheduler = TaskScheduler(
self.queue_operation, config.task_thread_num
)
self.cleanup_scheduler = CleanupScheduler(self.queue_operation)

@@ -43,0 +49,0 @@

@@ -30,3 +30,3 @@ from queue_sqlite.model import MessageItem

# print(f"callback: {cls.callback_counter}")
print(f"callback: {message_item}")
# print(f"callback: {message_item}")
# print(f"callback: {message_item.id}")

@@ -49,5 +49,2 @@ # print(f"callback: {message_item.expire_time}")

queue_scheduler = QueueScheduler(
receive_thread_num=4,
task_thread_num=8,
shard_num=12,
scheduler_type="async",

@@ -54,0 +51,0 @@ )

@@ -8,3 +8,3 @@ from listen import *

def test_listen_data(self):
scheduler = QueueScheduler()
scheduler = QueueScheduler(scheduler_type="qt")
scheduler.start()

@@ -11,0 +11,0 @@ scheduler.update_listen_data("key_1", "value_1")

@@ -30,3 +30,3 @@ from queue_sqlite.model import MessageItem

# print(f"callback: {cls.callback_counter}")
print(message_item.id)
# print(message_item.id)
# print(f"callback: {message_item}")

@@ -60,5 +60,3 @@ # print(f"callback: {message_item.id}")

monitor_thread.start()
queue_scheduler = QueueScheduler(
receive_thread_num=4, task_thread_num=8, shard_num=12
)
queue_scheduler = QueueScheduler()

@@ -65,0 +63,0 @@ # threads = []

Sorry, the diff of this file is not supported yet

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : to_json_utils.py
@Time : 2025-09-27 16:36:18
@Author : chakcy
@Email : 947105045@qq.com
@description : xxxxxxxxx
"""
import sqlite3
import json
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from datetime import datetime
class SQLiteToJSONConverter:
def __init__(self, root):
self.root = root
self.root.title("SQLite 表转 JSON 工具")
self.root.geometry("800x600")
# 变量
self.db_path = tk.StringVar()
self.tables = []
self.selected_table = tk.StringVar()
self.create_widgets()
def create_widgets(self):
# 主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # type: ignore
# 数据库选择部分
db_frame = ttk.LabelFrame(main_frame, text="数据库选择", padding="5")
db_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5) # type: ignore
ttk.Entry(db_frame, textvariable=self.db_path, width=70).grid(
row=0, column=0, padx=5
)
ttk.Button(db_frame, text="浏览", command=self.browse_db).grid(
row=0, column=1, padx=5
)
ttk.Button(db_frame, text="连接", command=self.connect_db).grid(
row=0, column=2, padx=5
)
# 表选择部分
table_frame = ttk.LabelFrame(main_frame, text="表选择", padding="5")
table_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5) # type: ignore
ttk.Label(table_frame, text="选择表:").grid(row=0, column=0, sticky=tk.W)
self.table_combo = ttk.Combobox(
table_frame, textvariable=self.selected_table, state="readonly"
)
self.table_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5) # type: ignore
ttk.Button(table_frame, text="加载数据", command=self.load_table_data).grid(
row=0, column=2, padx=5
)
# 数据显示部分
data_frame = ttk.LabelFrame(main_frame, text="表数据", padding="5")
data_frame.grid(
row=2, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5 # type: ignore
)
# 创建Treeview显示数据
columns = ("#0",)
self.tree = ttk.Treeview(data_frame, columns=columns, show="headings")
vsb = ttk.Scrollbar(data_frame, orient="vertical", command=self.tree.yview)
hsb = ttk.Scrollbar(data_frame, orient="horizontal", command=self.tree.xview)
self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
self.tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # type: ignore
vsb.grid(row=0, column=1, sticky=(tk.N, tk.S)) # type: ignore
hsb.grid(row=1, column=0, sticky=(tk.W, tk.E)) # type: ignore
# 按钮部分
button_frame = ttk.Frame(main_frame)
button_frame.grid(row=3, column=0, columnspan=2, pady=10)
ttk.Button(button_frame, text="导出为JSON", command=self.export_to_json).pack(
side=tk.LEFT, padx=5
)
ttk.Button(button_frame, text="清空", command=self.clear_all).pack(
side=tk.LEFT, padx=5
)
ttk.Button(button_frame, text="退出", command=self.root.quit).pack(
side=tk.LEFT, padx=5
)
# 配置权重
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(2, weight=1)
data_frame.columnconfigure(0, weight=1)
data_frame.rowconfigure(0, weight=1)
def browse_db(self):
file_path = filedialog.askopenfilename(
title="选择SQLite数据库文件",
filetypes=[
("SQLite数据库", "*.db *.sqlite *.sqlite3"),
("所有文件", "*.*"),
],
)
if file_path:
self.db_path.set(file_path)
def connect_db(self):
if not self.db_path.get():
messagebox.showerror("错误", "请先选择数据库文件")
return
try:
conn = sqlite3.connect(self.db_path.get())
cursor = conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
self.tables = [table[0] for table in cursor.fetchall()]
self.table_combo["values"] = self.tables
if self.tables:
self.selected_table.set(self.tables[0])
conn.close()
messagebox.showinfo(
"成功", f"成功连接到数据库,找到 {len(self.tables)} 个表"
)
except sqlite3.Error as e:
messagebox.showerror("数据库错误", f"连接数据库时出错: {str(e)}")
def load_table_data(self):
if not self.selected_table.get():
messagebox.showerror("错误", "请先选择表")
return
try:
conn = sqlite3.connect(self.db_path.get())
conn.row_factory = sqlite3.Row # 这样可以使用列名访问数据
cursor = conn.cursor()
# 获取表数据
cursor.execute(f"SELECT * FROM {self.selected_table.get()}")
rows = cursor.fetchall()
# 清空Treeview
for item in self.tree.get_children():
self.tree.delete(item)
# 设置列
columns = [description[0] for description in cursor.description]
self.tree["columns"] = columns
for col in columns:
self.tree.heading(col, text=col)
self.tree.column(col, width=100, minwidth=50)
# 添加数据
for row in rows:
self.tree.insert("", "end", values=tuple(row))
conn.close()
except sqlite3.Error as e:
messagebox.showerror("数据库错误", f"加载数据时出错: {str(e)}")
def export_to_json(self):
if not self.selected_table.get():
messagebox.showerror("错误", "请先选择表")
return
# 选择保存位置
file_path = filedialog.asksaveasfilename(
title="保存JSON文件",
defaultextension=".json",
filetypes=[("JSON文件", "*.json"), ("所有文件", "*.*")],
)
if not file_path:
return
try:
conn = sqlite3.connect(self.db_path.get())
conn.row_factory = sqlite3.Row # 使用列名访问数据
cursor = conn.cursor()
# 获取表数据
cursor.execute(f"SELECT * FROM {self.selected_table.get()}")
rows = cursor.fetchall()
# 转换为字典列表
result = [dict(row) for row in rows]
# 写入JSON文件
with open(file_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
conn.close()
messagebox.showinfo("成功", f"数据已成功导出到 {file_path}")
except sqlite3.Error as e:
messagebox.showerror("数据库错误", f"导出数据时出错: {str(e)}")
except Exception as e:
messagebox.showerror("错误", f"保存文件时出错: {str(e)}")
def clear_all(self):
self.db_path.set("")
self.tables = []
self.table_combo["values"] = []
self.selected_table.set("")
for item in self.tree.get_children():
self.tree.delete(item)
self.tree["columns"] = ("#0",)
if __name__ == "__main__":
root = tk.Tk()
app = SQLiteToJSONConverter(root)
root.mainloop()

Sorry, the diff of this file is not supported yet