good-redis
Fast-depends compatible Redis client - sync and async versions
Also adds "prioritized queue" redis commands
Prioritized Stream Implementation in Redis Client Library
The Prioritized Stream
is a custom data structure implemented using Redis, backed by Lua scripts to handle operations with priority-based ordering. The key features and behaviors of this prioritized stream are outlined below:
Key Components
-
Redis Keys:
:ps:keys
: Set containing all active prioritized stream names.
<name>:t
: Hash table where each key corresponds to a specific item, and its value represents the stored data.
<name>:s
: Sorted set where the score represents the priority of each key.
<name>:exc
: Set of keys that are excluded from re-adding, typically after being processed and removed.
<name>:exp
: Sorted set for tracking the expiration of keys.
-
Lua Scripts:
psadd
: Adds a key-value pair to the stream with a specified priority score. The item is added only if it’s not in the exclusion or expiration sets.
pspull
: Retrieves a specified number of items from the stream in order of highest priority. The items can be purged (removed) or retained with their priority decreased, and they can also be gated (prevented from being re-added).
psdecrementall
: Decreases the priority of all items in the stream by a specified amount.
Stream Operations
Usage Scenarios
- Task Queues: The prioritized stream can be used to implement task queues where tasks with higher priorities are processed first.
- Rate Limiting: Items can be gated and expired to prevent frequent re-processing.
- Deferred Processing: Items with lower priority can be retained and re-processed later when their priority increases.
This prioritized stream implementation allows for efficient management of time-sensitive and prioritized workloads in a Redis-backed environment, leveraging the power of Lua scripts for atomic and complex operations.
1. Adding Items to the Stream
from fast_depends import inject
@inject
def add_items_to_stream(
redis: Redis = RedisProvider(),
):
redis.psadd(name="task_queue", key="task_1", value="Process data", score=10)
items = [
("task_2", "Send email", 8),
("task_3", "Generate report", 12),
]
redis.psaddmany(name="task_queue", items=items)
redis.psadd(name="task_queue", key="task_1", value="Process data", score=15, readd=True)
add_items_to_stream()
from fast_depends import inject
@inject
async def add_items_to_stream_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
await redis.psadd(name="task_queue", key="task_1", value="Process data", score=10)
items = [
("task_2", "Send email", 8),
("task_3", "Generate report", 12),
]
await redis.psaddmany(name="task_queue", items=items)
await redis.psadd(name="task_queue", key="task_1", value="Process data", score=15, readd=True)
await add_items_to_stream_async()
2. Retrieving and Processing Items from the Stream
from fast_depends import inject
@inject
def process_items_from_stream(
redis: Redis = RedisProvider(),
):
item = redis.pspull(name="task_queue", count=1, purge=True)
print(item)
items = redis.pspull(name="task_queue", count=2, purge=False)
print(items)
process_items_from_stream()
from fast_depends import inject
@inject
async def process_items_from_stream_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
item = await redis.pspull(name="task_queue", count=1, purge=True)
print(item)
items = await redis.pspull(name="task_queue", count=2, purge=False)
print(items)
await process_items_from_stream_async()
3. Incrementing the Priority of a Specific Item
from fast_depends import inject
@inject
def increment_priority(
redis: Redis = RedisProvider(),
):
redis.psincrement(name="task_queue", key="task_2", score=5)
increment_priority()
from fast_depends import inject
@inject
async def increment_priority_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
await redis.psincrement(name="task_queue", key="task_2", score=5)
await increment_priority_async()
4. Deleting Items Below a Certain Priority Threshold
from fast_depends import inject
@inject
def delete_below_threshold(
redis: Redis = RedisProvider(),
):
redis.psdeletebelowthreshold(name="task_queue", threshold=10)
delete_below_threshold()
from fast_depends import inject
@inject
async def delete_below_threshold_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
await redis.psdeletebelowthreshold(name="task_queue", threshold=10)
await delete_below_threshold_async()
5. Viewing All Keys and Values in the Stream
from fast_depends import inject
@inject
def view_keys_and_values(
redis: Redis = RedisProvider(),
):
keys = redis.pskeys(name="task_queue")
print(keys)
values = redis.psvalues(name="task_queue")
print(values)
view_keys_and_values()
from fast_depends import inject
@inject
async def view_keys_and_values_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
keys = await redis.pskeys(name="task_queue")
print(keys)
values = await redis.psvalues(name="task_queue")
print(values)
await view_keys_and_values_async()
6. Decrementing the Priority of All Items in the Stream
from fast_depends import inject
@inject
def decrement_all_priorities(
redis: Redis = RedisProvider(),
):
redis.psdecrement_all(name="task_queue", decrement=2)
decrement_all_priorities()
from fast_depends import inject
@inject
async def decrement_all_priorities_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
await redis.psdecrement_all(name="task_queue", decrement=2)
await decrement_all_priorities_async()
7. Handling Expired Items
from fast_depends import inject
@inject
def handle_expired_items(
redis: Redis = RedisProvider(),
):
redis.psexpire()
handle_expired_items()
from fast_depends import inject
@inject
async def handle_expired_items_async(
rc: AsyncRedis = AsyncRedisProvider(),
):
async with rc as redis:
await redis.psexpire()
await handle_expired_items_async()
These examples demonstrate how to utilize the prioritized stream functionality in a dependency injection style with fast-depends
. This approach makes it easy to manage Redis clients and stream operations in both synchronous and asynchronous contexts.