Product
Introducing SSO
Streamline your login process and enhance security by enabling Single Sign-On (SSO) on the Socket platform, now available for all customers on the Enterprise plan, supporting 20+ identity providers.
Readme
Asyncio Reliable Queue (based on redis)
Inspired by Tom DeWire's article "Reliable Queueing in Redis (Part 1)" [1] [2] and the "torrelque" python module [3].
- Asynchronous: based on asyncio and aioredis
- Reliable: at any moment task data stored in redis database
- Throttling: controls number of tasks in execution
- Delayed queue: defers task availability
- Dead letters: put task data in failed queue after number of predefined retry attempts
- Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5'
- Used in containerized applications (managed by kubernetes) in high load environments
pip install arque
import signal
import random
import logging
import asyncio
import aioredis
import time
from functools import wraps
from arque import Arque
logger = logging.getLogger(__name__)
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
logging.info(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)}outstanding tasks")
await asyncio.gather(*tasks)
logging.info(f"Flushing metrics")
loop.stop()
def aioredis_pool(host='redis://localhost', encoding='utf8'):
def wrapper(func):
@wraps(func)
async def wrapped():
redis = await aioredis.create_redis_pool(host, encoding=encoding)
try:
return await func(redis=redis)
finally:
redis.close()
await redis.wait_closed()
return wrapped
return wrapper
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def produce_task(redis=None):
logger.info('Starting producing...')
queue = Arque(redis=redis)
while True:
for _ in range(1):
task = {'value': random.randint(0, 99)}
task_id = f"custom_{task['value']}_{time.time()}"
logger.debug('Produced task %s', task)
await queue.enqueue(task, task_id=task_id, task_timeout=10, delay=1)
await asyncio.sleep(1)
async def process(task_data):
logger.debug('Consumed task %s', task_data)
await asyncio.sleep(1)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def consume_task(redis=None):
logger.info('Starting consuming...')
queue = Arque(redis=redis, working_limit=3)
while True:
task_id, task_data = await queue.dequeue()
if task_id == '__not_found__':
continue
if task_id == '__overloaded__':
print(f'TASK ID: {task_id}')
await asyncio.sleep(1)
continue
if task_id == '__marked_as_failed___':
print(f'FAILED ID: {task_id}')
continue
try:
await process(task_data)
await queue.release(task_id)
except Exception:
logger.exception('Job processing has failed')
await queue.requeue(task_id, delay=5)
stats = await queue.get_stats()
logger.info(stats)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def sweep_task(redis=None):
logger.info('Starting sweeping...')
queue = Arque(redis=redis, sweep_interval=5)
await queue.schedule_sweep()
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def stats_task(redis=None):
logger.info('Starting stats...')
queue = Arque(redis=redis)
while True:
stats = await queue.get_stats()
logger.info(stats)
await asyncio.sleep(5)
async def example():
tasks = []
for _ in range(5):
tasks.append(consume_task())
tasks.append(produce_task())
tasks.append(sweep_task())
tasks.append(stats_task())
await asyncio.gather(*tasks)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGUSR1)
for s in signals:
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
try:
loop.run_until_complete(example())
finally:
loop.close()
logging.info("Successfully shutdown...")
[1] Reliable Queueing in Redis (Part 1)
[2] DEWIRE Redis as a Reliable Work Queue.pdf
[3] torrelque
FAQs
Asyncio Reliable Queue (based on redis)
We found that arque demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Streamline your login process and enhance security by enabling Single Sign-On (SSO) on the Socket platform, now available for all customers on the Enterprise plan, supporting 20+ identity providers.
Security News
Tea.xyz, a crypto project aimed at rewarding open source contributions, is once again facing backlash due to an influx of spam packages flooding public package registries.
Security News
As cyber threats become more autonomous, AI-powered defenses are crucial for businesses to stay ahead of attackers who can exploit software vulnerabilities at scale.