Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

vxutils

Package Overview
Dependencies
Maintainers
1
Versions
68
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vxutils - npm Package Compare versions

Comparing version
20251230
to
20260114
+31
.trae/documents/修复 dataframe_cache.py 中的 SQL 语法及相关 Bug.md
# 修复 dataframe_cache.py 中的 SQL 语法和属性错误
## 1. 修正 SQL 语句 (src/vxutils/dataframe_cache.py)
将 `set` 方法中的 `INSERT` 语句修正为包含 `VALUES` 子句,并使用正确的列名 `data`。
```python
cursor.execute(
"""
INSERT INTO cache_data
(cache_key, data, data_type, ttl, expires_at, created_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(cache_key) DO UPDATE SET
data = excluded.data,
data_type = excluded.data_type,
ttl = excluded.ttl,
expires_at = excluded.expires_at,
created_at = excluded.created_at;
""",
(
cache_key,
data_bytes,
# ...
),
)
```
## 2. 修复属性访问错误
将 `clear`, `cleanup_expired`, `close` 方法中错误的 `self.conn` 修正为 `self._conn`,与 `__init__` 中保持一致。
## 3. 验证修复
创建一个简单的测试脚本,调用 `DataCache.set` 和 `DataCache.get` 以及 `DataCache.clear`,确保修复后功能正常且无报错。
"""SQLite缓存管理器"""
import io
import pickle
import logging
import sqlite3
import json
import hashlib
import time
from pathlib import Path
from functools import singledispatch
from typing import Optional, Any, Tuple
@singledispatch
def _serialize_data(data: Any) -> Tuple[bytes, str]:
"""序列化数据为字节流和数据类型"""
data_bytes = io.BytesIO()
pickle.dump(data, data_bytes)
return data_bytes.getvalue(), "python"
def _deserialize_data(data_bytes: bytes, data_type: str) -> Any:
"""反序列化数据为原始对象"""
if data_type == "polars":
import polars as pl
return pl.DataFrame(pl.read_parquet(io.BytesIO(data_bytes)))
elif data_type == "pandas":
import pandas as pd
return pd.DataFrame(pl.read_parquet(io.BytesIO(data_bytes)))
return pickle.loads(data_bytes)
"""反序列化数据"""
try:
import pandas as pd
@_serialize_data.register
def _(data: pd.DataFrame) -> Tuple[bytes, str]:
"""序列化DataFrame为字节流"""
data_bytes = io.BytesIO()
data.to_parquet(data_bytes)
return data_bytes.getvalue(), "pandas"
except ImportError:
pass
try:
import polars as pl
@_serialize_data.register
def _(data: pl.DataFrame) -> Tuple[bytes, str]:
"""序列化Polars DataFrame为Parquet"""
data_bytes = io.BytesIO()
data.write_parquet(data_bytes)
return data_bytes.getvalue(), "polars"
except ImportError:
pass
class Cache:
"""TTL缓存管理器"""
def __init__(self, db_path: Path = ":memory:"):
"""初始化缓存管理器,连接SQLite数据库"""
self._db_path = db_path
self._conn = sqlite3.connect(db_path, check_same_thread=False)
# 启用WAL模式提升并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._init_database()
def _init_database(self):
"""创建表和索引,支持版本管理"""
try:
cursor = self._conn.cursor()
# 检查表是否存在
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='cache_data'
""")
table_exists = cursor.fetchone() is not None
if not table_exists:
# 创建新表(支持版本管理)
cursor.execute("""
CREATE TABLE cache_data (
cache_key TEXT NOT NULL,
data BLOB NOT NULL,
data_type TEXT NOT NULL DEFAULT 'python',
ttl REAL NOT NULL DEFAULT 0,
expires_at REAL NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (cache_key)
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_cache_key ON cache_data(cache_key)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_expires_at ON cache_data(expires_at)
""")
self._conn.commit()
except sqlite3.Error as e:
logging.error(f"初始化数据库失败: {e}")
self._conn.rollback()
def _generate_cache_key(self, **params) -> str:
"""生成缓存键"""
# 将参数排序后序列化,确保相同参数生成相同键
param_str = json.dumps(params, sort_keys=True, ensure_ascii=False)
key_str = f"CACHEKEY_{param_str}"
# 使用 MD5 生成短键名
return hashlib.md5(key_str.encode("utf-8")).hexdigest()
def get(self, **params) -> Optional[pl.DataFrame]:
"""获取缓存的DataFrame,即使过期也返回(数据永久保留)"""
cache_key = self._generate_cache_key(**params)
current_time = time.time()
cursor = self._conn.cursor()
# 查询最新版本的数据(不过滤过期时间,数据永久保留)
cursor.execute(
"""
SELECT data,data_type,ttl,expires_at FROM cache_data
WHERE cache_key = ? AND expires_at > ?;
""",
(cache_key, current_time),
)
row = cursor.fetchone()
if row is None:
return None
data, data_type, ttl, expires_at = row
if ttl > 0:
expires_at = current_time + ttl
# 更新访问统计(更新最新版本)
cursor.execute(
"""
UPDATE cache_data
SET expires_at = ?
WHERE cache_key = ?
""",
(expires_at, cache_key),
)
self._conn.commit()
return _deserialize_data(data, data_type)
def set(
self,
data: Any,
ttl: float = 0,
expires_at: float = float("inf"),
**params,
) -> str:
"""保存DataFrame到缓存,设置过期时间(秒),默认0表示永不过期
Args:
data (Any): 要缓存的数据
ttl (float, optional): 缓存过期时间(秒),默认0表示永不过期
**params: 缓存键的参数
Returns:
str: 缓存键,如果过期时间无效则返回空字符串
"""
cache_key = self._generate_cache_key(**params)
current_time = time.time()
if ttl > 0:
expires_at = current_time + ttl
if expires_at <= current_time:
return ""
# DataFrame转为Parquet字节流
try:
data_bytes, data_type = _serialize_data(data)
# 插入或更新缓存数据(保留所有历史版本)
cursor = self._conn.cursor()
cursor.execute(
"""
INSERT INTO cache_data
(cache_key,data,data_type,ttl,expires_at,created_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(cache_key) DO UPDATE SET
data = excluded.data,
data_type = excluded.data_type,
ttl = excluded.ttl,
expires_at = excluded.expires_at,
created_at = excluded.created_at;
""",
(
cache_key,
data_bytes,
data_type,
ttl,
expires_at,
current_time,
),
)
self._conn.commit()
return cache_key
except (TypeError, ValueError, sqlite3.Error) as e:
logging.error(
f"缓存数据{params=},{ttl=},{expires_at=},{data=}设置失败: {e}"
)
self._conn.rollback()
return ""
def clear(self) -> int:
"""清理缓存"""
try:
cursor = self.conn.cursor()
cursor.execute("DELETE FROM cache_data")
count = cursor.rowcount
self.conn.commit()
return count
except sqlite3.Error as e:
logging.error(f"清理缓存失败: {e}")
self._conn.rollback()
return 0
def cleanup_expired(self) -> int:
"""清楚过期缓存"""
try:
current_time = time.time()
cursor = self._conn.cursor()
# 只标记为过期,不删除数据
cursor.execute(
"""
DELETE FROM cache_data
WHERE expires_at <= ?
""",
(current_time,),
)
count = cursor.rowcount
self._conn.commit()
return count
except sqlite3.Error as e:
logging.error(f"清理过期缓存失败: {e}")
self._conn.rollback()
return 0
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
if __name__ == "__main__":
cache_manager = Cache()
data = pl.DataFrame({"a": [1, 2, 3]})
data = 1234556
cache_key = cache_manager.set(data, ttl=4, ods="test", table="calendar")
print(cache_key)
# 等待过期
time.sleep(2)
data = cache_manager.get(ods="test", table="calendar")
print(data)
time.sleep(4.5)
data = cache_manager.get(ods="test", table="calendar")
print(data)
+1
-1
Metadata-Version: 2.4
Name: vxutils
Version: 20251230
Version: 20260114
Summary: A toolbox for vxquant

@@ -5,0 +5,0 @@ Author-email: libao <libao@vxquant.com>

[project]
name = "vxutils"
version = "20251230"
version = "20260114"
description = "A toolbox for vxquant"

@@ -5,0 +5,0 @@ readme = "README.md"

@@ -17,3 +17,3 @@ from .executor import VXThreadPoolExecutor, DynamicThreadPoolExecutor

retry,
timer,
Timer,
log_exception,

@@ -53,3 +53,3 @@ singleton,

"retry",
"timer",
"Timer",
"log_exception",

@@ -56,0 +56,0 @@ "singleton",

@@ -11,3 +11,3 @@ import logging

"retry",
"timer",
"Timer",
"log_exception",

@@ -50,3 +50,3 @@ "singleton",

class timer:
class Timer:
"""计时器"""

@@ -67,3 +67,3 @@

def __enter__(self) -> "timer":
def __enter__(self) -> "Timer":
if self._verbose:

@@ -70,0 +70,0 @@ logging.info(f"{self._descriptions} start...")

{
"TEST_KEY": "test"
}