CogniHub PyEffectRef
CogniHub 项目的响应式编程库,提供类似 Vue 3 Composition API 的响应式系统.本库分为底层接口和高级接口两个层次,满足不同复杂度的使用需求.
特性
- 🔄 响应式编程: 类似 Vue 3 Composition API 的响应式系统
- 🔒 线程安全: 支持多线程环境下的安全操作
- ⚡ 异步支持: 完整支持 asyncio 协程环境
- 🎯 类型提示: 完整的 TypeScript 风格类型提示支持
- 🏗️ 分层设计: 底层接口(Ref/effect) + 高级接口(ReactiveDict/ReadOnlyView)
- 🎛️ 执行控制: 支持同步、异步、顺序执行等多种模式
安装
pip install cognihub-pyeffectref
或从源码安装:
git clone https://github.com/hsz1273327/cognihub_pyeffectref.git
cd cognihub_pyeffectref
pip install -e .
📚 架构概览
本库采用分层设计,提供两个层次的接口:
🔧 底层接口 (Low-level APIs)
Ref[T]: 响应式数据容器,支持泛型类型指定
effect: 副作用装饰器,自动追踪依赖关系
ReadOnlyRef[T]: 只读响应式引用
特点: 直接使用泛型指定类型,适合简单数据结构和性能敏感场景
🏗️ 高级接口 (High-level APIs)
ReactiveDict: 响应式字典,支持嵌套结构,配合 TypedDict 和 cast 使用
ReadOnlyView: 只读视图,配合 Protocol 和 cast 使用
特点: 需要结合 TypedDict 和 Protocol 指定复杂类型结构,适合复杂应用场景
🚀 快速开始
1️⃣ 底层接口示例
基本用法 - 泛型类型指定
from cognihub_pyeffectref import Ref, effect
from typing import List, Dict
count: Ref[int] = Ref(0)
name: Ref[str] = Ref("Alice")
items: Ref[List[str]] = Ref(["apple", "banana"])
@effect
def log_count() -> None:
print(f"Count is: {count.value}")
@effect
def log_greeting() -> None:
print(f"Hello, {name.value}!")
log_count()
log_greeting()
count.value = 5
name.value = "Bob"
同步/异步/多线程支持
默认单线程同步执行,通过Ref.configure_sync_task_executor可配置为异步或多线程执行.
注意:
- 请仅在需要时使用异步或多线程,因为这会增加复杂性和调试难度.
- 仅在应用入口处设置一次执行器配置即可,不需要在每个模块中重复设置.设置接口是一次性的,一旦设置过了就无法改变
import asyncio
import threading
import time
from cognihub_pyeffectref import Ref, effect
data: Ref[str] = Ref("initial")
@effect
def sync_effect() -> None:
print(f"Sync effect: {data.value}")
@effect
async def async_effect() -> None:
print(f"Async effect: {data.value}")
await asyncio.sleep(0.1)
def thread_worker(thread_id: int) -> None:
@effect
def thread_effect() -> None:
print(f"Thread {thread_id} effect: {data.value}")
thread_effect()
time.sleep(0.1)
async def main():
sync_effect()
await async_effect()
threads = []
for i in range(3):
thread = threading.Thread(target=thread_worker, args=(i,))
thread.start()
threads.append(thread)
data.value = "updated"
await asyncio.sleep(0.2)
for thread in threads:
thread.join()
执行器配置 - 控制回调执行方式
默认的回调执行方式:
-
在未设置执行器时
- 同步回调在当前线程同步执行.
- 异步回调在 asyncio 事件循环中异步执行.
-
在设置了执行器的情况下
- 同步回调会在执行器中多线程后台执行.
- 异步回调会在 asyncio 事件循环中异步执行.
实例的精细化回调执行控制
实例可以在初始化时额外指定其用subscribe注册的同步回调函数的执行方式.
subscribe_immediate=True: 强制在当前线程同步执行回调,忽略全局执行器配置.
subscribe_sequential=True: 保证同步回调函数按注册顺序在后台执行,适用于需要顺序执行的场景.
import concurrent.futures
from cognihub_pyeffectref import Ref
Ref.configure_sync_task_executor('asyncio')
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
Ref.configure_sync_task_executor(executor)
immediate_ref = Ref(0, subscribe_immediate=True)
sequential_ref = Ref(0, subscribe_sequential=True)
2️⃣ 高级接口示例
ReactiveDict - 结合 TypedDict 指定结构
from cognihub_pyeffectref import ReactiveDict, effect
from typing import TypedDict, cast
class UserData(TypedDict):
name: str
email: str
age: int
is_active: bool
user_dict = ReactiveDict({
'name': 'Alice',
'email': 'alice@example.com',
'age': 25,
'is_active': True
})
user: UserData = cast(UserData, user_dict)
@effect
def watch_user() -> None:
print(f"User: {user['name']} ({user['age']})")
print(f"Email: {user['email']}")
print(f"Active: {user['is_active']}")
watch_user()
user['name'] = 'Bob'
user['age'] = 26
ReadOnlyView - 结合 Protocol 创建只读视图
from cognihub_pyeffectref import ReactiveDict, ReadOnlyView, ReadOnlyRef, effect
from typing import Protocol, cast, Any
class UserViewProtocol(Protocol):
name: ReadOnlyRef[str]
email: ReadOnlyRef[str]
age: ReadOnlyRef[int]
is_active: ReadOnlyRef[bool]
def __call__(self) -> dict[str, Any]: ...
user_data = ReactiveDict({
'name': 'Alice',
'email': 'alice@example.com',
'age': 25,
'is_active': True
})
user_view = cast(UserViewProtocol, ReadOnlyView(user_data))
@effect
def watch_user_view() -> None:
print(f"Name: {user_view.name.value}")
print(f"Email: {user_view.email.value}")
print(f"Age: {user_view.age.value}")
watch_user_view()
user_data.name = "Bob"
复杂嵌套数据结构
from cognihub_pyeffectref import ReactiveDict, ReadOnlyView, ReadOnlyRef, effect
from typing import Protocol, cast, Any
class DatabaseConfig(Protocol):
host: ReadOnlyRef[str]
port: ReadOnlyRef[int]
name: ReadOnlyRef[str]
class ApiConfig(Protocol):
base_url: ReadOnlyRef[str]
timeout: ReadOnlyRef[int]
retry_count: ReadOnlyRef[int]
class AppConfig(Protocol):
database: DatabaseConfig
api: ApiConfig
debug_mode: ReadOnlyRef[bool]
def __call__(self) -> dict[str, Any]: ...
config_data = ReactiveDict({
'database': {
'host': 'localhost',
'port': 5432,
'name': 'myapp'
},
'api': {
'base_url': 'https://api.example.com',
'timeout': 30,
'retry_count': 3
},
'debug_mode': False
})
config_view = cast(AppConfig, ReadOnlyView(config_data))
@effect
def watch_config() -> None:
db_host = config_view.database.host.value
api_url = config_view.api.base_url.value
debug = config_view.debug_mode.value
print(f"Database: {db_host}")
print(f"API: {api_url}")
print(f"Debug: {debug}")
watch_config()
config_data.database.host = 'production-db'
config_data.debug_mode = True
📖 API 参考
🔧 底层接口 (Low-level APIs)
Ref[T]
响应式数据容器类,支持泛型类型指定.
构造函数
Ref(initial_value: T, subscribe_immediate: bool = False, subscribe_sequential: bool = False)
initial_value: 初始值
subscribe_immediate: 是否强制在当前线程同步执行回调 (忽略全局执行器配置)
subscribe_sequential: 是否保证回调按注册顺序执行
属性
方法
subscribe(callback: Callable[[T, T], None]): 订阅值变化
unsubscribe(callback: Callable[[T, T], None]): 取消订阅
configure_sync_task_executor(executor): 配置全局同步任务执行器
类方法
Ref.configure_sync_task_executor('asyncio' | ThreadPoolExecutor): 配置全局执行器
ReadOnlyRef[T]
只读响应式引用,从 Ref 创建.
构造函数
ReadOnlyRef(ref: Ref[T]): 从现有 Ref 创建只读引用
属性
value: T: 只读访问引用的值 (无 setter)
方法
subscribe(callback: Callable[[T, T], None]): 订阅值变化
unsubscribe(callback: Callable[[T, T], None]): 取消订阅
effect
副作用装饰器,自动追踪 Ref 依赖关系.
@effect
def my_effect() -> None:
pass
effect_wrapper = effect(my_function)
effect_wrapper()
EffectWrapper
effect 装饰器返回的包装器类.
方法
🏗️ 高级接口 (High-level APIs)
ReactiveDict
响应式字典,支持嵌套结构和动态键.
构造函数
ReactiveDict(initial_data: dict = None): 创建响应式字典
方法
to_dict() -> dict: 转换为普通字典
keys(), values(), items(): 字典接口方法
get(key, default=None): 获取值
pop(key, default=None): 删除并返回值
clear(): 清空字典
update(other): 更新字典
特性
- 支持嵌套结构自动转换为 ReactiveDict
- 动态属性访问:
obj.key 等价于 obj['key']
- 与 TypedDict 结合使用获得类型提示
ReadOnlyView
只读视图,从 ReactiveDict 创建结构化的只读访问.
构造函数
ReadOnlyView(reactive_dict: ReactiveDict): 创建只读视图
特性
- 递归将所有值转换为 ReadOnlyRef
- 支持嵌套结构的只读访问
- 与 Protocol 结合获得完整类型提示
- 调用
view() 返回当前状态的字典快照
使用模式
class MyDataProtocol(Protocol):
field: ReadOnlyRef[str]
def __call__(self) -> dict[str, Any]: ...
data = ReactiveDict({'field': 'value'})
view = cast(MyDataProtocol, ReadOnlyView(data))
print(view.field.value)
snapshot = view()
⚡ 执行模式详解
同步执行 (默认)
from cognihub_pyeffectref import Ref, effect
data = Ref("initial")
@effect
def sync_effect() -> None:
print(f"Current value: {data.value}")
sync_effect()
data.value = "changed"
异步执行 (asyncio 环境)
import asyncio
from cognihub_pyeffectref import Ref, effect
Ref.configure_sync_task_executor('asyncio')
data = Ref("initial")
@effect
async def async_effect() -> None:
print(f"Async value: {data.value}")
await asyncio.sleep(0.1)
async def main():
await async_effect()
data.value = "changed"
await asyncio.sleep(0.2)
asyncio.run(main())
多线程执行
import threading
import concurrent.futures
from cognihub_pyeffectref import Ref, effect
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
Ref.configure_sync_task_executor(executor)
data = Ref(0)
@effect
def thread_effect() -> None:
thread_id = threading.get_ident()
value = data.value
print(f"Thread {thread_id}: {value}")
threads = []
for i in range(3):
thread = threading.Thread(target=thread_effect)
thread.start()
threads.append(thread)
data.value = 42
for thread in threads:
thread.join()
executor.shutdown(wait=True)
执行控制选项
from cognihub_pyeffectref import Ref
immediate_ref = Ref(0, subscribe_immediate=True)
sequential_ref = Ref(0, subscribe_sequential=True)
combined_ref = Ref(0, subscribe_immediate=True, subscribe_sequential=True)
🔒 线程安全特性
本库在设计时充分考虑了线程安全:
内部锁机制
Ref 使用内部锁保护订阅者集合的并发修改
- 支持在多线程环境中安全地读写响应式数据
ReactiveDict 的嵌套操作也是线程安全的
上下文隔离
- 异步环境使用
contextvars 隔离 effect 上下文
- 多线程环境使用
threading.local 进行线程隔离
- 确保副作用函数在正确的上下文中执行
执行器配置
- 全局执行器配置支持
'asyncio' 和自定义 ThreadPoolExecutor
subscribe_immediate=True 可强制在当前线程同步执行,提供确定性行为
subscribe_sequential=True 保证回调按注册顺序执行,避免竞态条件
🎯 使用场景推荐
何时使用底层接口 (Ref/effect)
✅ 适合场景:
- 简单的响应式状态管理
- 性能敏感的应用
- 需要精确控制执行时机
- 与现有代码集成
counter: Ref[int] = Ref(0)
user_name: Ref[str] = Ref("Anonymous")
何时使用高级接口 (ReactiveDict/ReadOnlyView)
✅ 适合场景:
- 复杂的嵌套数据结构
- 需要结构化的数据访问
- 配置管理系统
- 大型应用的状态管理
app_config = ReactiveDict({
'database': {'host': 'localhost', 'port': 5432},
'cache': {'ttl': 3600, 'max_size': 1000},
'features': {'debug': False, 'analytics': True}
})
混合使用模式
from cognihub_pyeffectref import Ref, ReactiveDict, ReadOnlyView, effect
from typing import Protocol, cast
app_state: Ref[str] = Ref("initializing")
user_count: Ref[int] = Ref(0)
config_data = ReactiveDict({
'ui': {'theme': 'dark', 'language': 'en'},
'api': {'timeout': 30, 'retries': 3}
})
class ConfigProtocol(Protocol):
ui: dict[str, str]
api: dict[str, int]
config = cast(ConfigProtocol, config_data)
@effect
def sync_state() -> None:
state = app_state.value
theme = config['ui']['theme']
print(f"App {state} with {theme} theme")
sync_state()
🛠️ 开发指南
环境设置
git clone https://github.com/hsz1273327/cognihub_pyeffectref.git
cd cognihub_pyeffectref
pip install -e ".[dev]"
运行测试
pytest
pytest --cov=cognihub_pyeffectref --cov-report=html
pytest tests/test_ref.py -v
代码质量检查
black .
isort .
mypy cognihub_pyeffectref
flake8 cognihub_pyeffectref
项目结构
cognihub_pyeffectref/
├── cognihub_pyeffectref/ # 主要源码
│ ├── __init__.py # 公共接口导出
│ ├── ref.py # 底层接口:Ref, ReadOnlyRef
│ ├── effect.py # effect 装饰器和 EffectWrapper
│ ├── reactive_dict.py # 高级接口:ReactiveDict
│ └── local.py # 上下文管理(threading.local 等)
├── tests/ # 测试文件
│ ├── test_ref.py # Ref 相关测试
│ ├── test_effect.py # effect 相关测试
│ └── test_reactive_dict.py # ReactiveDict 相关测试
├── examples/ # 使用示例
└── docs/ # 文档
🤝 贡献指南
欢迎贡献代码!请遵循以下流程:
开发流程
- Fork 项目并创建功能分支
- 编写代码并确保测试通过
- 添加测试覆盖新功能
- 更新文档如有必要
- 提交 Pull Request
代码规范
- 使用 类型提示 (
typing 模块)
- 遵循 PEP 8 代码风格
- 编写 docstring 描述函数和类
- 保持 测试覆盖率 > 90%
- 确保所有测试通过
提交信息规范
使用语义化提交信息:
feat: 添加新功能
fix: 修复 bug
docs: 更新文档
test: 添加测试
refactor: 重构代码
style: 代码格式调整
Issue 和 PR 模板
- Bug 报告: 提供复现步骤、预期行为、实际行为
- 功能请求: 描述用例、期望 API、实现建议
- Pull Request: 关联 Issue、说明变更、添加测试
📄 许可证
本项目采用 MIT 许可证.详见 LICENSE 文件.
📚 相关资源
🔄 变更日志
查看 CHANGELOG.md 了解完整变更历史.
感谢使用 CogniHub PyEffectRef! 🚀
如有问题或建议,欢迎提交 Issue 或参与讨论.