Socket
Book a DemoInstallSign in
Socket

redisify

Package Overview
Dependencies
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

redisify

Redisify is a lightweight Python library that provides Redis-backed data structures like dicts, queues, locks, and semaphores, designed for distributed systems.

pipPyPI
Version
0.2.0
Maintainers
1

Redisify

Redisify is a lightweight Python library that provides Redis-backed data structures and distributed synchronization primitives. It is designed for distributed systems where persistent, shared, and async-compatible data structures are needed.

Python 3.10+ License: MIT PyPI version

🚀 Features

📦 Data Structures

  • RedisDict: A dictionary-like interface backed by Redis hash with full CRUD operations
  • RedisList: A list-like structure supporting indexing, insertion, deletion, and iteration
  • RedisQueue: A FIFO queue with blocking and async operations
  • RedisSet: A set-like structure with union, intersection, difference operations

🔐 Distributed Synchronization

  • RedisLock: Distributed locking mechanism with automatic cleanup

  • RedisSemaphore: Semaphore for controlling concurrent access

  • RedisLimiter: Rate limiting with token bucket algorithm

⚡ Advanced Features

  • Async/Await Support: All operations are async-compatible
  • Smart Serialization: Automatic serialization of complex objects using dill
  • Context Manager Support: Use with async with statements
  • Comprehensive Testing: Full test coverage for all components
  • Type Safety: Full type hints and documentation
  • Thread-Safe: All operations are thread and process safe

📦 Installation

pip install redisify

Or for development and testing:

git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]

⚡ Quick Start

import asyncio
from redisify import RedisDict, RedisList, RedisQueue, RedisSet, RedisLock, RedisSemaphore, RedisLimiter, connect_to_redis

async def main():
    # Connect to Redis
    connect_to_redis(host="localhost", port=6379, db=0)
    
    # Dictionary operations
    rdict = RedisDict("example:dict")
    await rdict["user:1"] = {"name": "Alice", "age": 30}
    user = await rdict["user:1"]
    print(user)  # {'name': 'Alice', 'age': 30}
    
    # List operations
    rlist = RedisList("example:list")
    await rlist.append("item1")
    await rlist.append("item2")
    first_item = await rlist[0]
    print(first_item)  # item1
    
    # Queue operations
    rqueue = RedisQueue("example:queue")
    await rqueue.put("task1")
    await rqueue.put("task2")
    task = await rqueue.get()
    print(task)  # task1
    
    # Set operations
    rset = RedisSet("example:set")
    await rset.add("item1")
    await rset.add("item2")
    items = await rset.to_set()
    print(items)  # {'item1', 'item2'}

asyncio.run(main())

📚 Detailed Usage

RedisDict

A distributed dictionary that supports any serializable Python objects as keys and values.

from redisify import RedisDict

rdict = RedisDict("users")

# Basic operations
await rdict["user1"] = {"name": "Alice", "age": 30}
await rdict["user2"] = {"name": "Bob", "age": 25}

# Get values
user1 = await rdict["user1"]
print(user1)  # {'name': 'Alice', 'age': 30}

# Check existence
if "user1" in rdict:
    print("User exists")

# Delete items
del await rdict["user2"]

# Iterate over items
async for key, value in rdict.items():
    print(f"{key}: {value}")

# Get with default
user = await rdict.get("user3", {"name": "Default", "age": 0})

RedisList

A distributed list with full indexing and slicing support.

from redisify import RedisList

rlist = RedisList("tasks")

# Add items
await rlist.append("task1")
await rlist.append("task2")
await rlist.insert(0, "priority_task")

# Access by index
first_task = await rlist[0]
print(first_task)  # priority_task

# Slicing support
tasks = await rlist[1:3]  # Get items at index 1 and 2

# Get length
length = await len(rlist)
print(length)  # 3

# Iterate
async for item in rlist:
    print(item)

# Remove items
await rlist.remove("task1", count=1)

RedisQueue

A distributed FIFO queue with blocking and non-blocking operations.

from redisify import RedisQueue

rqueue = RedisQueue(redis, "job_queue", maxsize=100)

# Producer
await rqueue.put("job1")
await rqueue.put("job2")

# Consumer (blocking)
job = await rqueue.get()  # Blocks until item available
print(job)  # job1

# Non-blocking get
try:
    job = await rqueue.get_nowait()
except asyncio.QueueEmpty:
    print("Queue is empty")

# Peek at next item without removing
next_job = await rqueue.peek()

# Check queue status
size = await rqueue.qsize()
is_empty = await rqueue.empty()

RedisSet

A distributed set with full set operations support.

from redisify import RedisSet

set1 = RedisSet(redis, "set1")
set2 = RedisSet(redis, "set2")

# Add items
await set1.add("item1")
await set1.add("item2")
await set2.add("item2")
await set2.add("item3")

# Set operations
union = await set1.union(set2)
intersection = await set1.intersection(set2)
difference = await set1.difference(set2)

print(union)  # {'item1', 'item2', 'item3'}
print(intersection)  # {'item2'}
print(difference)  # {'item1'}

# Membership testing
if "item1" in set1:
    print("Item exists")

# Convert to Python set
python_set = await set1.to_set()

RedisLock

A distributed lock for critical section protection.

from redisify import RedisLock

lock = RedisLock(redis, "resource_lock")

# Manual lock/unlock
await lock.acquire()
try:
    # Critical section
    print("Resource locked")
finally:
    await lock.release()

# Context manager (recommended)
async with RedisLock(redis, "resource_lock"):
    print("Resource locked automatically")
    # Lock is automatically released

RedisSemaphore

A distributed semaphore for controlling concurrent access.

from redisify import RedisSemaphore

# Limit to 3 concurrent operations
semaphore = RedisSemaphore("api_limit", 3)

async def api_call():
    async with semaphore:
        print("API call executing")
        await asyncio.sleep(1)

# Run multiple concurrent calls
tasks = [api_call() for _ in range(10)]
await asyncio.gather(*tasks)

# Check current semaphore value
current_value = await semaphore.value()
print(f"Currently {current_value} semaphores are acquired")

# Non-blocking check
if await semaphore.can_acquire():
    await semaphore.acquire()

RedisLimiter

A distributed rate limiter using token bucket algorithm.

from redisify import RedisLimiter

# Rate limit: 10 requests per minute
limiter = RedisLimiter("api_rate", 10, 60)

async def make_request():
    if await limiter.acquire():
        print("Request allowed")
        # Make API call
    else:
        print("Rate limit exceeded")

# Context manager with automatic retry
async with RedisLimiter("api_rate", 10, 60):
    print("Request allowed")
    # Make API call

🔧 Serialization

Redisify includes a smart serializer that handles complex objects using dill:

from pydantic import BaseModel
from redisify import RedisDict

class User(BaseModel):
    name: str
    age: int

user = User(name="Alice", age=30)
rdict = RedisDict("users")

# Pydantic models are automatically serialized
await rdict["user1"] = user

# And automatically deserialized
retrieved_user = await rdict["user1"]
print(type(retrieved_user))  # <class '__main__.User'>
print(retrieved_user.name)  # Alice

# Custom objects work too
class CustomObject:
    def __init__(self, data):
        self.data = data
    
    def __repr__(self):
        return f"CustomObject({self.data})"

obj = CustomObject("test")
await rdict["custom"] = obj
retrieved_obj = await rdict["custom"]
print(retrieved_obj)  # CustomObject(test)

📖 API Documentation

For detailed API documentation, see the docstrings in the source code:

⚡ Performance Considerations

Memory Usage

  • All objects are serialized before storage, which increases memory usage
  • Consider using simple data types for large datasets
  • Use clear() method to free memory when structures are no longer needed

Network Latency

  • All operations are async and non-blocking
  • Use connection pooling for better performance
  • Consider using Redis clusters for high-availability setups

Serialization Overhead

  • Complex objects take longer to serialize/deserialize
  • Consider using simple data structures for frequently accessed data
  • The dill serializer handles most Python objects efficiently

🧪 Testing

Make sure you have Redis running (locally or via Docker), then:

# Run all tests
pytest -v tests

# Run with coverage
pytest --cov=redisify tests

# Run specific test file
pytest tests/test_redis_dict.py -v

# Run with async support
pytest --asyncio-mode=auto tests/

Running Redis with Docker

# Start Redis server
docker run -d -p 6379:6379 redis:latest

# Or with Redis Stack (includes RedisInsight)
docker run -d -p 6379:6379 -p 8001:8001 redis/redis-stack:latest

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

  • Fork the repository
  • Create a feature branch (git checkout -b feature/amazing-feature)
  • Make your changes
  • Add tests for new functionality
  • Run the test suite (pytest tests/)
  • Commit your changes (git commit -m 'Add amazing feature')
  • Push to the branch (git push origin feature/amazing-feature)
  • Open a Pull Request

Development Setup

git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]
pre-commit install  # Optional: for code formatting

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

📋 Requirements

  • Python 3.10+
  • Redis server (local or remote)
  • redis Python client (redis-py)
  • dill (for object serialization)

🙏 Acknowledgments

📞 Support

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts