
Research
Security News
The Growing Risk of Malicious Browser Extensions
Socket researchers uncover how browser extensions in trusted stores are used to hijack sessions, redirect traffic, and manipulate user behavior.
A framework for processing streaming data through CPU-intensive tasks while maintaining order and tracking latency
A framework for processing streaming data through CPU-intensive tasks while maintaining order and tracking latency.
Combines async I/O with threaded CPU processing:
sequenceDiagram
participant Input as Async Input Stream
participant Main as Main Thread<br/>(Asyncio Event Loop)
participant Q1 as Input Queue
participant T1 as Thread 1<br/>(Stage 1: Validate)
participant Q2 as Queue 1
participant T2 as Thread 2<br/>(Stage 2: Transform)
participant Q3 as Queue 2
participant T3 as Thread 3<br/>(Stage 3: Serialize)
participant Q4 as Output Queue
participant Output as Async Output Stream
Note over Main: Pipeline Parallelism - Multiple items processed simultaneously
Input->>Main: yield Item A
Main->>Q1: put Item A
Q1->>T1: get Item A
Input->>Main: yield Item B
Main->>Q1: put Item B
Q1->>T1: get Item B
par Item A flows through pipeline
T1->>Q2: put processed Item A
Q2->>T2: get Item A
T2->>Q3: put processed Item A
Q3->>T3: get Item A
T3->>Q4: put processed Item A
and Item B follows behind
T1->>Q2: put processed Item B
Q2->>T2: get Item B
T2->>Q3: put processed Item B
and Item C enters pipeline
Input->>Main: yield Item C
Main->>Q1: put Item C
Q1->>T1: get Item C
T1->>Q2: put processed Item C
end
Q4->>Main: get Item A (ordered)
Main->>Output: yield Item A
Q4->>Main: get Item B (ordered)
Main->>Output: yield Item B
Note over Main,Output: Output buffer ensures<br/>items maintain input order
The asyncio event loop handles I/O operations while each pipeline stage runs in its own thread for true CPU parallelism.
import asyncio
from async_task_pipeline import AsyncTaskPipeline
# Create pipeline
pipeline = AsyncTaskPipeline(max_queue_size=100)
# Add processing stages
pipeline.add_stage("validate", validate_function)
pipeline.add_stage("transform", transform_function)
pipeline.add_stage("serialize", serialize_function)
# Start and run
await pipeline.start()
# Process streams concurrently
await asyncio.gather(
pipeline.process_input_stream(your_input_stream()),
consume_output(pipeline.generate_output_stream())
)
await pipeline.stop()
def cpu_intensive_task(data):
# Your CPU-heavy computation here
result = complex_computation(data)
return result
async def input_stream():
for item in data_source:
yield item
await asyncio.sleep(0) # Yield control
async def consume_output(output_stream):
async for result in output_stream:
# Handle processed result
print(f"Processed: {result}")
# Clear pipeline state
pipeline.clear()
# Stop gracefully
await pipeline.stop()
# Get performance metrics
summary = pipeline.get_latency_summary()
python example.py --enable-timing
The example demonstrates a 4-stage pipeline processing 50 items with simulated CPU-intensive tasks.
This project uses modern Python development tools managed through a Makefile and uv
.
# Install development dependencies and set up pre-commit hooks
make dev-setup
# Run all quality checks
make check
# Development setup
make install # Install the package
make install-dev # Install with development dependencies
make dev-setup # Complete development environment setup
# Code quality
make format # Format code with ruff
make lint # Lint code with ruff
make type-check # Run type checking with mypy
make test # Run tests with pytest
make test-cov # Run tests with coverage
make check # Run all quality checks
# Pre-commit
make pre-commit-install # Install pre-commit hooks
make pre-commit # Run pre-commit on all files
# Building and publishing
make build # Build the package
make publish-test # Publish to TestPyPI
make publish # Publish to PyPI
# Version management
make version-patch # Bump patch version
make version-minor # Bump minor version
make version-major # Bump major version
# Utilities
make clean # Clean up cache and build files
make watch-test # Run tests in watch mode
make help # Show all available commands
This project enforces high code quality standards:
ruff format
for consistent code styleruff check
for code quality and best practicesmypy
for static type analysispytest
with coverage reportingbandit
for security vulnerability scanningMake your changes and ensure all tests pass:
make check
Bump the version:
make version-patch # or version-minor/version-major
Build and publish:
make publish # or publish-test for TestPyPI
FAQs
A framework for processing streaming data through CPU-intensive tasks while maintaining order and tracking latency
We found that async-task-pipeline demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover how browser extensions in trusted stores are used to hijack sessions, redirect traffic, and manipulate user behavior.
Research
Security News
An in-depth analysis of credential stealers, crypto drainers, cryptojackers, and clipboard hijackers abusing open source package registries to compromise Web3 development environments.
Security News
pnpm 10.12.1 introduces a global virtual store for faster installs and new options for managing dependencies with version catalogs.