
Product
Socket Now Supports pylock.toml Files
Socket now supports pylock.toml, enabling secure, reproducible Python builds with advanced scanning and full alignment with PEP 751's new standard.
A lightweight, async Redis-based queue for small applications, alternative to Kafka.
redisaq
is a Python library for distributed job queuing and processing using Redis Streams. It provides a robust, scalable solution for handling distributed workloads with features like consumer groups, automatic partition rebalancing, and fault tolerance.
Install redisaq
from PyPI:
pip install redisaq
enqueue(payload)
batch_enqueue(payloads)
maxlen
) and trimming behavior (approximate
)request_partition_increase()
orjson
)XREADGROUP
asyncio
for non-blocking operationsaioredis
Warning: Unbounded streams (maxlen=None
) can consume significant Redis memory. Set maxlen
(e.g., 1000) to limit stream size in production.
from redisaq import Producer, Consumer
import asyncio
async def process_message(message):
print(f"Processing message {message.msg_id}: {message.payload}")
await asyncio.sleep(1) # Simulate work
async def main():
# Initialize producer with topic and max stream length
producer = Producer(
topic="notifications",
maxlen=1000,
redis_url="redis://localhost:6379/0"
)
await producer.connect()
# Send some messages
await producer.batch_enqueue([
{"type": "email", "to": "user1@example.com", "subject": "Hello"},
{"type": "sms", "to": "+1234567890", "text": "Hi there"}
])
# Initialize consumer
consumer = Consumer(
topic="notifications",
group_name="notification_processors",
batch_size=10,
heartbeat_interval=3.0,
redis_url="redis://localhost:6379/0"
)
# Connect and start processing
await consumer.connect()
await consumer.consume(process_message)
# Cleanup
await producer.close()
await consumer.close()
if __name__ == "__main__":
asyncio.run(main())
from redisaq import Producer
async def send_notifications():
producer = Producer(topic="notifications", init_partitions=3)
await producer.connect()
await producer.enqueue({"user_id": "123", "content": "Hello"})
import asyncio
from redisaq import Consumer, Message
from typing import List
async def process_batch(messages: List[Message]):
print(f"Processing batch of {len(messages)} messages")
for msg in messages:
# Process each message in the batch
print(f"Message {msg.msg_id}: {msg.payload}")
consumer = Consumer(
topic="notifications",
batch_size=10, # Process up to 10 messages at once
heartbeat_interval=3.0
)
asyncio.run(consumer.consume_batch(process_batch))
import msgpack
from redisaq import Producer, Consumer
# Custom serializer/deserializer
def msgpack_serializer(data):
return msgpack.packb(data).decode('utf-8')
def msgpack_deserializer(data):
return msgpack.unpackb(data.encode('utf-8'))
# Use custom serialization
producer = Producer(
topic="data",
serializer=msgpack_serializer
)
consumer = Consumer(
topic="data",
deserializer=msgpack_deserializer
)
See examples/fastapi
for a full-featured FastAPI integration.
redisaq
with a FastAPI application for job submission and processing. See examples/fastapi/README.md.poetry run pytest
MIT
FAQs
A lightweight, async Redis-based queue for small applications, alternative to Kafka.
We found that redisaq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports pylock.toml, enabling secure, reproducible Python builds with advanced scanning and full alignment with PEP 751's new standard.
Security News
Research
Socket uncovered two npm packages that register hidden HTTP endpoints to delete all files on command.
Research
Security News
Malicious Ruby gems typosquat Fastlane plugins to steal Telegram bot tokens, messages, and files, exploiting demand after Vietnam’s Telegram ban.