Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Asyncio Reliable Queue (based on redis)
Inspired by Tom DeWire's article "Reliable Queueing in Redis (Part 1)" [1] [2] and the "torrelque" python module [3].
- Asynchronous: based on asyncio and aioredis
- Reliable: at any moment task data stored in redis database
- Throttling: controls number of tasks in execution
- Delayed queue: defers task availability
- Dead letters: put task data in failed queue after number of predefined retry attempts
- Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5'
- Used in containerized applications (managed by kubernetes) in high load environments
pip install arque
import signal
import random
import logging
import asyncio
import aioredis
import time
from functools import wraps
from arque import Arque
logger = logging.getLogger(__name__)
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
logging.info(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)}outstanding tasks")
await asyncio.gather(*tasks)
logging.info(f"Flushing metrics")
loop.stop()
def aioredis_pool(host='redis://localhost', encoding='utf8'):
def wrapper(func):
@wraps(func)
async def wrapped():
redis = await aioredis.create_redis_pool(host, encoding=encoding)
try:
return await func(redis=redis)
finally:
redis.close()
await redis.wait_closed()
return wrapped
return wrapper
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def produce_task(redis=None):
logger.info('Starting producing...')
queue = Arque(redis=redis)
while True:
for _ in range(1):
task = {'value': random.randint(0, 99)}
task_id = f"custom_{task['value']}_{time.time()}"
logger.debug('Produced task %s', task)
await queue.enqueue(task, task_id=task_id, task_timeout=10, delay=1)
await asyncio.sleep(1)
async def process(task_data):
logger.debug('Consumed task %s', task_data)
await asyncio.sleep(1)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def consume_task(redis=None):
logger.info('Starting consuming...')
queue = Arque(redis=redis, working_limit=3)
while True:
task_id, task_data = await queue.dequeue()
if task_id == '__not_found__':
continue
if task_id == '__overloaded__':
print(f'TASK ID: {task_id}')
await asyncio.sleep(1)
continue
if task_id == '__marked_as_failed___':
print(f'FAILED ID: {task_id}')
continue
try:
await process(task_data)
await queue.release(task_id)
except Exception:
logger.exception('Job processing has failed')
await queue.requeue(task_id, delay=5)
stats = await queue.get_stats()
logger.info(stats)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def sweep_task(redis=None):
logger.info('Starting sweeping...')
queue = Arque(redis=redis, sweep_interval=5)
await queue.schedule_sweep()
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def stats_task(redis=None):
logger.info('Starting stats...')
queue = Arque(redis=redis)
while True:
stats = await queue.get_stats()
logger.info(stats)
await asyncio.sleep(5)
async def example():
tasks = []
for _ in range(5):
tasks.append(consume_task())
tasks.append(produce_task())
tasks.append(sweep_task())
tasks.append(stats_task())
await asyncio.gather(*tasks)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGUSR1)
for s in signals:
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
try:
loop.run_until_complete(example())
finally:
loop.close()
logging.info("Successfully shutdown...")
[1] Reliable Queueing in Redis (Part 1)
[2] DEWIRE Redis as a Reliable Work Queue.pdf
[3] torrelque
FAQs
Asyncio Reliable Queue (based on redis)
We found that arque demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.