New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

queue-sqlite

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-sqlite - pypi Package Compare versions

Comparing version
0.2.2
to
0.2.3
src/queue_sqlite_core/uv.lock

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

+0
-1

@@ -14,3 +14,2 @@ # Python-generated files

.arts
uv.lock
cache/

@@ -17,0 +16,0 @@

Metadata-Version: 2.4
Name: queue-sqlite
Version: 0.2.2
Version: 0.2.3
Summary: A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems.

@@ -5,0 +5,0 @@ Author-email: chakcy <947105045@qq.com>

@@ -22,3 +22,3 @@ [project]

requires-python = ">=3.11"
version = "0.2.2"
version = "0.2.3"

@@ -25,0 +25,0 @@ [project.optional-dependencies]

@@ -680,2 +680,4 @@ // src/lib.rs

conn.execute_batch("VACUUM")
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(())

@@ -682,0 +684,0 @@ }

@@ -36,3 +36,3 @@ #!/usr/bin/env python

__title__ = "queue_sqlite"
__version__ = "0.2.2"
__version__ = "0.2.3"
__author__ = "chakcy"

@@ -39,0 +39,0 @@ __email__ = "947105045@qq.com"

@@ -44,2 +44,3 @@ #!/usr/bin/env python

):
super().__init__(config)
scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None)

@@ -50,4 +51,4 @@ if scheduler_class is None:

# if self.scheduler:
# self.queue_operation = self.scheduler.queue_operation
if hasattr(self.scheduler, "queue_operation"):
self.queue_operation = self.scheduler.queue_operation

@@ -54,0 +55,0 @@ def send_message(self, message: MessageItem, callback: Callable):

@@ -27,2 +27,4 @@ #!/usr/bin/env python

self.listen_thread = None
self.process_lock = threading.Lock()
self.last_processed_id = 0

@@ -41,3 +43,3 @@ async def _process_listen_data(self, key, value, delete_id):

await loop.run_in_executor(executor, listen_function, value)
listen_function(value)
# listen_function(value)
except Exception as e:

@@ -53,7 +55,17 @@ ValueError(f"Error in {key} listener function: {e}")

tasks = []
if status:
for data in change_data_items:
if status and isinstance(change_data_items, list):
new_change_data_items = [
data
for data in change_data_items
if data[0] > self.last_processed_id
]
# 按ID升序排序,确保按顺序处理
new_change_data_items.sort(key=lambda x: x[0])
for data in new_change_data_items:
key = data[3]
new_value = data[5]
delete_id = data[0]
with self.process_lock:
if delete_id > self.last_processed_id:
self.last_processed_id = delete_id
tasks.append(

@@ -64,3 +76,3 @@ asyncio.create_task(

)
if tasks:
if tasks and self.is_running:
await asyncio.gather(*tasks, return_exceptions=True)

@@ -86,2 +98,3 @@

self.is_running = True
self.last_processed_id = 0
self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True)

@@ -88,0 +101,0 @@ self.listen_thread.start()

@@ -5,2 +5,3 @@ from abc import ABC, abstractmethod

from queue_sqlite_core import ShardedQueueOperation
import logging

@@ -11,8 +12,20 @@

def __init__(self, config: SchedulerConfig = SchedulerConfig()):
self._queue_operation = None
@property
def queue_operation(self) -> ShardedQueueOperation: ...
def queue_operation(self) -> ShardedQueueOperation:
if self._queue_operation is None:
raise RuntimeError("请先设置队列操作对象")
return self._queue_operation
def __init__(self, config: SchedulerConfig = SchedulerConfig()):
pass
@queue_operation.setter
def queue_operation(self, value: ShardedQueueOperation | None):
"""设置队列操作对象
Args:
value (ShardedQueueOperation): 队列操作对象
"""
self._queue_operation = value
@abstractmethod

@@ -52,1 +65,12 @@ def send_message(self, message: MessageItem, callback: Callable):

pass
def get_queue_info(self) -> dict:
try:
return {
"queue_length": self.queue_operation.get_queue_length(),
"shard_num": self.queue_operation.shard_num,
"db_dir": self.queue_operation.db_dir,
}
except Exception as e:
logging.error(f"获取队列消息失败: {str(e)}")
return {}

@@ -68,3 +68,3 @@ #!/usr/bin/env python

self.queue_operation.clean_old_messages(remove_days)
self.queue_operation.remove_expired_messages(remove_days)

@@ -71,0 +71,0 @@ except Exception as e:

@@ -61,3 +61,4 @@ #!/usr/bin/env python

is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
is_processed integer DEFAULT 0
)

@@ -67,2 +68,11 @@ """

conn.commit()
else:
# 如果表存在,检查是否已有 is_processed 列
cursor = conn.execute("PRAGMA table_info(change_log)")
columns = [row[1] for row in cursor.fetchall()]
if "is_processed" not in columns:
conn.execute(
"ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0"
)
conn.commit()

@@ -78,4 +88,9 @@ except Exception as e:

with QMutexLocker(self.db_mutex):
status, change_data_items = self.listen_operation.listen_data()
return status, change_data_items
conn = self.listen_operation._get_connection()
cursor = conn.execute(
"SELECT id, table_name, row_id, column_name, old_value, new_value, is_delete, timestamp "
"FROM change_log WHERE is_processed = 0 ORDER BY id ASC"
)
change_data_items = cursor.fetchall()
return True, change_data_items
except sqlite3.OperationalError as e:

@@ -96,2 +111,24 @@ if "database is locked" in str(e) and attempt < self.max_retries - 1:

def _mark_processed(self, log_id):
"""标记指定ID的日志为已处理"""
try:
with QMutexLocker(self.db_mutex):
conn = self.listen_operation._get_connection()
# 添加 is_processed 字段来标记是否已经处理
# 首先检查表结构是否包含该字段,如果没有则添加
cursor = conn.execute("PRAGMA table_info(change_log)")
columns = [row[1] for row in cursor.fetchall()]
if "is_processed" not in columns:
conn.execute(
"ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0"
)
# 更新指定ID的记录为已处理
conn.execute(
"UPDATE change_log SET is_processed = 1 WHERE id = ?", (log_id,)
)
conn.commit()
except Exception as e:
logging.error(f"标记已处理记录失败: {str(e)}")
def _process_listen_data(self, change_data_items):

@@ -122,2 +159,3 @@ """处理监听数据"""

self.thread_pool.start(task) # type: ignore
self._mark_processed(int(delete_id))
processed_count += 1

@@ -124,0 +162,0 @@

@@ -17,2 +17,4 @@ #!/usr/bin/env python

import multiprocessing
import time
import logging

@@ -26,2 +28,6 @@

self.listen_thread = None
# 使用一个线程锁来确保监听逻辑的原子性
self.process_lock = threading.Lock()
# 记录最后处理的ID,避免重复处理
self.last_processed_id = 0

@@ -36,13 +42,30 @@ def _process_listen_data(self, key, value, delete_id):

finally:
self.listen_operation.delete_change_log(delete_id=delete_id)
# 确保变更日志被删除
try:
self.listen_operation.delete_change_log(delete_id=delete_id)
except Exception as e:
# 记录错误但不抛出异常
logging.error(f"Error in deleting change log: {e}")
def listen(self):
while self.is_running:
# 获取比上次处理ID更大的变更记录,避免重复处理
status, change_data_items = self.listen_operation.listen_data()
if status:
for data in change_data_items:
if status and isinstance(change_data_items, list):
# 过滤出ID大于上次处理ID的记录
new_change_data_items = [
data
for data in change_data_items
if data[0] > self.last_processed_id
]
# 按ID升序排序,确保按顺序处理
new_change_data_items.sort(key=lambda x: x[0])
for data in new_change_data_items:
key = data[3]
new_value = data[5]
delete_id = data[0]
if self.is_running:
# 提交任务处理
self.executor.submit(

@@ -52,2 +75,10 @@ self._process_listen_data, key, new_value, delete_id

# 更新最后处理ID
with self.process_lock:
if delete_id > self.last_processed_id:
self.last_processed_id = delete_id
# 添加小延迟,避免过度占用CPU
time.sleep(0.001)
def start_listen_data(self):

@@ -57,2 +88,3 @@ if self.is_running:

self.is_running = True
self.last_processed_id = 0 # 重置最后处理ID
self.listen_thread = threading.Thread(target=self.listen)

@@ -59,0 +91,0 @@ self.listen_thread.start()

@@ -8,3 +8,3 @@ from listen import *

def test_listen_data(self):
scheduler = QueueScheduler()
scheduler = QueueScheduler(scheduler_type="qt")
scheduler.start()

@@ -20,3 +20,6 @@ scheduler.update_listen_data("key_1", "value_1")

scheduler.stop()
print(scheduler.get_listen_datas())
print(scheduler.get_listen_data("key_1"))
# print(scheduler.get_listen_datas())
# print(scheduler.get_listen_data("key_1"))
# TestListen().test_listen_data()