queue-sqlite
Advanced tools
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
+0
-1
@@ -14,3 +14,2 @@ # Python-generated files | ||
| .arts | ||
| uv.lock | ||
| cache/ | ||
@@ -17,0 +16,0 @@ |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: queue-sqlite | ||
| Version: 0.2.2 | ||
| Version: 0.2.3 | ||
| Summary: A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems. | ||
@@ -5,0 +5,0 @@ Author-email: chakcy <947105045@qq.com> |
+1
-1
@@ -22,3 +22,3 @@ [project] | ||
| requires-python = ">=3.11" | ||
| version = "0.2.2" | ||
| version = "0.2.3" | ||
@@ -25,0 +25,0 @@ [project.optional-dependencies] |
@@ -680,2 +680,4 @@ // src/lib.rs | ||
| conn.execute_batch("VACUUM") | ||
| .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?; | ||
| Ok(()) | ||
@@ -682,0 +684,0 @@ } |
@@ -36,3 +36,3 @@ #!/usr/bin/env python | ||
| __title__ = "queue_sqlite" | ||
| __version__ = "0.2.2" | ||
| __version__ = "0.2.3" | ||
| __author__ = "chakcy" | ||
@@ -39,0 +39,0 @@ __email__ = "947105045@qq.com" |
@@ -44,2 +44,3 @@ #!/usr/bin/env python | ||
| ): | ||
| super().__init__(config) | ||
| scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None) | ||
@@ -50,4 +51,4 @@ if scheduler_class is None: | ||
| # if self.scheduler: | ||
| # self.queue_operation = self.scheduler.queue_operation | ||
| if hasattr(self.scheduler, "queue_operation"): | ||
| self.queue_operation = self.scheduler.queue_operation | ||
@@ -54,0 +55,0 @@ def send_message(self, message: MessageItem, callback: Callable): |
@@ -27,2 +27,4 @@ #!/usr/bin/env python | ||
| self.listen_thread = None | ||
| self.process_lock = threading.Lock() | ||
| self.last_processed_id = 0 | ||
@@ -41,3 +43,3 @@ async def _process_listen_data(self, key, value, delete_id): | ||
| await loop.run_in_executor(executor, listen_function, value) | ||
| listen_function(value) | ||
| # listen_function(value) | ||
| except Exception as e: | ||
@@ -53,7 +55,17 @@ ValueError(f"Error in {key} listener function: {e}") | ||
| tasks = [] | ||
| if status: | ||
| for data in change_data_items: | ||
| if status and isinstance(change_data_items, list): | ||
| new_change_data_items = [ | ||
| data | ||
| for data in change_data_items | ||
| if data[0] > self.last_processed_id | ||
| ] | ||
| # 按ID升序排序,确保按顺序处理 | ||
| new_change_data_items.sort(key=lambda x: x[0]) | ||
| for data in new_change_data_items: | ||
| key = data[3] | ||
| new_value = data[5] | ||
| delete_id = data[0] | ||
| with self.process_lock: | ||
| if delete_id > self.last_processed_id: | ||
| self.last_processed_id = delete_id | ||
| tasks.append( | ||
@@ -64,3 +76,3 @@ asyncio.create_task( | ||
| ) | ||
| if tasks: | ||
| if tasks and self.is_running: | ||
| await asyncio.gather(*tasks, return_exceptions=True) | ||
@@ -86,2 +98,3 @@ | ||
| self.is_running = True | ||
| self.last_processed_id = 0 | ||
| self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True) | ||
@@ -88,0 +101,0 @@ self.listen_thread.start() |
@@ -5,2 +5,3 @@ from abc import ABC, abstractmethod | ||
| from queue_sqlite_core import ShardedQueueOperation | ||
| import logging | ||
@@ -11,8 +12,20 @@ | ||
| def __init__(self, config: SchedulerConfig = SchedulerConfig()): | ||
| self._queue_operation = None | ||
| @property | ||
| def queue_operation(self) -> ShardedQueueOperation: ... | ||
| def queue_operation(self) -> ShardedQueueOperation: | ||
| if self._queue_operation is None: | ||
| raise RuntimeError("请先设置队列操作对象") | ||
| return self._queue_operation | ||
| def __init__(self, config: SchedulerConfig = SchedulerConfig()): | ||
| pass | ||
| @queue_operation.setter | ||
| def queue_operation(self, value: ShardedQueueOperation | None): | ||
| """设置队列操作对象 | ||
| Args: | ||
| value (ShardedQueueOperation): 队列操作对象 | ||
| """ | ||
| self._queue_operation = value | ||
| @abstractmethod | ||
@@ -52,1 +65,12 @@ def send_message(self, message: MessageItem, callback: Callable): | ||
| pass | ||
| def get_queue_info(self) -> dict: | ||
| try: | ||
| return { | ||
| "queue_length": self.queue_operation.get_queue_length(), | ||
| "shard_num": self.queue_operation.shard_num, | ||
| "db_dir": self.queue_operation.db_dir, | ||
| } | ||
| except Exception as e: | ||
| logging.error(f"获取队列消息失败: {str(e)}") | ||
| return {} |
@@ -68,3 +68,3 @@ #!/usr/bin/env python | ||
| self.queue_operation.clean_old_messages(remove_days) | ||
| self.queue_operation.remove_expired_messages(remove_days) | ||
@@ -71,0 +71,0 @@ except Exception as e: |
@@ -61,3 +61,4 @@ #!/usr/bin/env python | ||
| is_delete integer DEFAULT 0, | ||
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | ||
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | ||
| is_processed integer DEFAULT 0 | ||
| ) | ||
@@ -67,2 +68,11 @@ """ | ||
| conn.commit() | ||
| else: | ||
| # 如果表存在,检查是否已有 is_processed 列 | ||
| cursor = conn.execute("PRAGMA table_info(change_log)") | ||
| columns = [row[1] for row in cursor.fetchall()] | ||
| if "is_processed" not in columns: | ||
| conn.execute( | ||
| "ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0" | ||
| ) | ||
| conn.commit() | ||
@@ -78,4 +88,9 @@ except Exception as e: | ||
| with QMutexLocker(self.db_mutex): | ||
| status, change_data_items = self.listen_operation.listen_data() | ||
| return status, change_data_items | ||
| conn = self.listen_operation._get_connection() | ||
| cursor = conn.execute( | ||
| "SELECT id, table_name, row_id, column_name, old_value, new_value, is_delete, timestamp " | ||
| "FROM change_log WHERE is_processed = 0 ORDER BY id ASC" | ||
| ) | ||
| change_data_items = cursor.fetchall() | ||
| return True, change_data_items | ||
| except sqlite3.OperationalError as e: | ||
@@ -96,2 +111,24 @@ if "database is locked" in str(e) and attempt < self.max_retries - 1: | ||
| def _mark_processed(self, log_id): | ||
| """标记指定ID的日志为已处理""" | ||
| try: | ||
| with QMutexLocker(self.db_mutex): | ||
| conn = self.listen_operation._get_connection() | ||
| # 添加 is_processed 字段来标记是否已经处理 | ||
| # 首先检查表结构是否包含该字段,如果没有则添加 | ||
| cursor = conn.execute("PRAGMA table_info(change_log)") | ||
| columns = [row[1] for row in cursor.fetchall()] | ||
| if "is_processed" not in columns: | ||
| conn.execute( | ||
| "ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0" | ||
| ) | ||
| # 更新指定ID的记录为已处理 | ||
| conn.execute( | ||
| "UPDATE change_log SET is_processed = 1 WHERE id = ?", (log_id,) | ||
| ) | ||
| conn.commit() | ||
| except Exception as e: | ||
| logging.error(f"标记已处理记录失败: {str(e)}") | ||
| def _process_listen_data(self, change_data_items): | ||
@@ -122,2 +159,3 @@ """处理监听数据""" | ||
| self.thread_pool.start(task) # type: ignore | ||
| self._mark_processed(int(delete_id)) | ||
| processed_count += 1 | ||
@@ -124,0 +162,0 @@ |
@@ -17,2 +17,4 @@ #!/usr/bin/env python | ||
| import multiprocessing | ||
| import time | ||
| import logging | ||
@@ -26,2 +28,6 @@ | ||
| self.listen_thread = None | ||
| # 使用一个线程锁来确保监听逻辑的原子性 | ||
| self.process_lock = threading.Lock() | ||
| # 记录最后处理的ID,避免重复处理 | ||
| self.last_processed_id = 0 | ||
@@ -36,13 +42,30 @@ def _process_listen_data(self, key, value, delete_id): | ||
| finally: | ||
| self.listen_operation.delete_change_log(delete_id=delete_id) | ||
| # 确保变更日志被删除 | ||
| try: | ||
| self.listen_operation.delete_change_log(delete_id=delete_id) | ||
| except Exception as e: | ||
| # 记录错误但不抛出异常 | ||
| logging.error(f"Error in deleting change log: {e}") | ||
| def listen(self): | ||
| while self.is_running: | ||
| # 获取比上次处理ID更大的变更记录,避免重复处理 | ||
| status, change_data_items = self.listen_operation.listen_data() | ||
| if status: | ||
| for data in change_data_items: | ||
| if status and isinstance(change_data_items, list): | ||
| # 过滤出ID大于上次处理ID的记录 | ||
| new_change_data_items = [ | ||
| data | ||
| for data in change_data_items | ||
| if data[0] > self.last_processed_id | ||
| ] | ||
| # 按ID升序排序,确保按顺序处理 | ||
| new_change_data_items.sort(key=lambda x: x[0]) | ||
| for data in new_change_data_items: | ||
| key = data[3] | ||
| new_value = data[5] | ||
| delete_id = data[0] | ||
| if self.is_running: | ||
| # 提交任务处理 | ||
| self.executor.submit( | ||
@@ -52,2 +75,10 @@ self._process_listen_data, key, new_value, delete_id | ||
| # 更新最后处理ID | ||
| with self.process_lock: | ||
| if delete_id > self.last_processed_id: | ||
| self.last_processed_id = delete_id | ||
| # 添加小延迟,避免过度占用CPU | ||
| time.sleep(0.001) | ||
| def start_listen_data(self): | ||
@@ -57,2 +88,3 @@ if self.is_running: | ||
| self.is_running = True | ||
| self.last_processed_id = 0 # 重置最后处理ID | ||
| self.listen_thread = threading.Thread(target=self.listen) | ||
@@ -59,0 +91,0 @@ self.listen_thread.start() |
@@ -8,3 +8,3 @@ from listen import * | ||
| def test_listen_data(self): | ||
| scheduler = QueueScheduler() | ||
| scheduler = QueueScheduler(scheduler_type="qt") | ||
| scheduler.start() | ||
@@ -20,3 +20,6 @@ scheduler.update_listen_data("key_1", "value_1") | ||
| scheduler.stop() | ||
| print(scheduler.get_listen_datas()) | ||
| print(scheduler.get_listen_data("key_1")) | ||
| # print(scheduler.get_listen_datas()) | ||
| # print(scheduler.get_listen_data("key_1")) | ||
| # TestListen().test_listen_data() |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
532258
150.03%67
3.08%3079
3.36%