Redisify
Redisify is a lightweight Python library that provides Redis-backed data structures and distributed synchronization primitives. It is designed for distributed systems where persistent, shared, and async-compatible data structures are needed.

🚀 Features
📦 Data Structures
- RedisDict: A dictionary-like interface backed by Redis hash with full CRUD operations
- RedisList: A list-like structure supporting indexing, insertion, deletion, and iteration
- RedisQueue: A FIFO queue with blocking and async operations
- RedisSet: A set-like structure with union, intersection, difference operations
🔐 Distributed Synchronization
-
RedisLock: Distributed locking mechanism with automatic cleanup
-
RedisSemaphore: Semaphore for controlling concurrent access
-
RedisLimiter: Rate limiting with token bucket algorithm
⚡ Advanced Features
- Async/Await Support: All operations are async-compatible
- Smart Serialization: Automatic serialization of complex objects using dill
- Context Manager Support: Use with
async with statements
- Comprehensive Testing: Full test coverage for all components
- Type Safety: Full type hints and documentation
- Thread-Safe: All operations are thread and process safe
📦 Installation
pip install redisify
Or for development and testing:
git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]
⚡ Quick Start
import asyncio
from redisify import RedisDict, RedisList, RedisQueue, RedisSet, RedisLock, RedisSemaphore, RedisLimiter, connect_to_redis
async def main():
connect_to_redis(host="localhost", port=6379, db=0)
rdict = RedisDict("example:dict")
await rdict["user:1"] = {"name": "Alice", "age": 30}
user = await rdict["user:1"]
print(user)
rlist = RedisList("example:list")
await rlist.append("item1")
await rlist.append("item2")
first_item = await rlist[0]
print(first_item)
rqueue = RedisQueue("example:queue")
await rqueue.put("task1")
await rqueue.put("task2")
task = await rqueue.get()
print(task)
rset = RedisSet("example:set")
await rset.add("item1")
await rset.add("item2")
items = await rset.to_set()
print(items)
asyncio.run(main())
📚 Detailed Usage
RedisDict
A distributed dictionary that supports any serializable Python objects as keys and values.
from redisify import RedisDict
rdict = RedisDict("users")
await rdict["user1"] = {"name": "Alice", "age": 30}
await rdict["user2"] = {"name": "Bob", "age": 25}
user1 = await rdict["user1"]
print(user1)
if "user1" in rdict:
print("User exists")
del await rdict["user2"]
async for key, value in rdict.items():
print(f"{key}: {value}")
user = await rdict.get("user3", {"name": "Default", "age": 0})
RedisList
A distributed list with full indexing and slicing support.
from redisify import RedisList
rlist = RedisList("tasks")
await rlist.append("task1")
await rlist.append("task2")
await rlist.insert(0, "priority_task")
first_task = await rlist[0]
print(first_task)
tasks = await rlist[1:3]
length = await len(rlist)
print(length)
async for item in rlist:
print(item)
await rlist.remove("task1", count=1)
RedisQueue
A distributed FIFO queue with blocking and non-blocking operations.
from redisify import RedisQueue
rqueue = RedisQueue(redis, "job_queue", maxsize=100)
await rqueue.put("job1")
await rqueue.put("job2")
job = await rqueue.get()
print(job)
try:
job = await rqueue.get_nowait()
except asyncio.QueueEmpty:
print("Queue is empty")
next_job = await rqueue.peek()
size = await rqueue.qsize()
is_empty = await rqueue.empty()
RedisSet
A distributed set with full set operations support.
from redisify import RedisSet
set1 = RedisSet(redis, "set1")
set2 = RedisSet(redis, "set2")
await set1.add("item1")
await set1.add("item2")
await set2.add("item2")
await set2.add("item3")
union = await set1.union(set2)
intersection = await set1.intersection(set2)
difference = await set1.difference(set2)
print(union)
print(intersection)
print(difference)
if "item1" in set1:
print("Item exists")
python_set = await set1.to_set()
RedisLock
A distributed lock for critical section protection.
from redisify import RedisLock
lock = RedisLock(redis, "resource_lock")
await lock.acquire()
try:
print("Resource locked")
finally:
await lock.release()
async with RedisLock(redis, "resource_lock"):
print("Resource locked automatically")
RedisSemaphore
A distributed semaphore for controlling concurrent access.
from redisify import RedisSemaphore
semaphore = RedisSemaphore("api_limit", 3)
async def api_call():
async with semaphore:
print("API call executing")
await asyncio.sleep(1)
tasks = [api_call() for _ in range(10)]
await asyncio.gather(*tasks)
current_value = await semaphore.value()
print(f"Currently {current_value} semaphores are acquired")
if await semaphore.can_acquire():
await semaphore.acquire()
RedisLimiter
A distributed rate limiter using token bucket algorithm.
from redisify import RedisLimiter
limiter = RedisLimiter("api_rate", 10, 60)
async def make_request():
if await limiter.acquire():
print("Request allowed")
else:
print("Rate limit exceeded")
async with RedisLimiter("api_rate", 10, 60):
print("Request allowed")
🔧 Serialization
Redisify includes a smart serializer that handles complex objects using dill:
from pydantic import BaseModel
from redisify import RedisDict
class User(BaseModel):
name: str
age: int
user = User(name="Alice", age=30)
rdict = RedisDict("users")
await rdict["user1"] = user
retrieved_user = await rdict["user1"]
print(type(retrieved_user))
print(retrieved_user.name)
class CustomObject:
def __init__(self, data):
self.data = data
def __repr__(self):
return f"CustomObject({self.data})"
obj = CustomObject("test")
await rdict["custom"] = obj
retrieved_obj = await rdict["custom"]
print(retrieved_obj)
📖 API Documentation
For detailed API documentation, see the docstrings in the source code:
⚡ Performance Considerations
Memory Usage
- All objects are serialized before storage, which increases memory usage
- Consider using simple data types for large datasets
- Use
clear() method to free memory when structures are no longer needed
Network Latency
- All operations are async and non-blocking
- Use connection pooling for better performance
- Consider using Redis clusters for high-availability setups
Serialization Overhead
- Complex objects take longer to serialize/deserialize
- Consider using simple data structures for frequently accessed data
- The dill serializer handles most Python objects efficiently
🧪 Testing
Make sure you have Redis running (locally or via Docker), then:
pytest -v tests
pytest --cov=redisify tests
pytest tests/test_redis_dict.py -v
pytest --asyncio-mode=auto tests/
Running Redis with Docker
docker run -d -p 6379:6379 redis:latest
docker run -d -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature)
- Make your changes
- Add tests for new functionality
- Run the test suite (
pytest tests/)
- Commit your changes (
git commit -m 'Add amazing feature')
- Push to the branch (
git push origin feature/amazing-feature)
- Open a Pull Request
Development Setup
git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]
pre-commit install
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
📋 Requirements
- Python 3.10+
- Redis server (local or remote)
- redis Python client (redis-py)
- dill (for object serialization)
🙏 Acknowledgments
📞 Support