
Product
Introducing Scala and Kotlin Support in Socket
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.
finalsa-sqs-consumer
Advanced tools
High-performance SQS message consumer with worker-based concurrency, dependency injection, and async support for Python applications
A Python package for creating SQS message consumers in FastAPI applications with built-in dependency injection, interceptors, and async support.
SqsDepends
pip install finalsa-sqs-consumer
from finalsa.sqs.consumer import SqsApp, SqsDepends
# Create app instance with worker-based concurrency
app = SqsApp(
app_name="my-consumer",
queue_url="https://sqs.region.amazonaws.com/account/queue-name",
max_number_of_messages=10,
workers=8 # 8 concurrent workers for high throughput
)
# Define a simple handler
@app.handler("user.created")
async def handle_user_created(message: dict):
print(f"User created: {message}")
# Define handler with dependencies
@app.handler("order.created")
async def handle_order_created(
message: dict,
db_service: DatabaseService = SqsDepends(DatabaseService)
):
await db_service.process_order(message)
# Run the consumer with concurrent workers
if __name__ == "__main__":
app.run() # Starts 8 worker processes
Main application class that manages message consumption and routing.
app = SqsApp(
app_name="my-app", # Application identifier
queue_url="...", # SQS queue URL
max_number_of_messages=10, # Max messages per batch
workers=8, # Number of concurrent workers (like uvicorn)
message_timeout=300.0, # Message processing timeout in seconds (default: 5 minutes)
interceptors=[] # List of interceptor classes
)
Worker-based Processing:
Message Timeout:
Register handlers for specific message topics:
@app.handler("topic.name")
async def my_handler(message: dict, context: dict = None):
# Process message
pass
### Dependency Injection
Use `SqsDepends` for dependency injection:
```python
class MyService:
def process(self, data): ...
@app.handler("topic")
async def handler(
message: dict,
service: MyService = SqsDepends(MyService)
):
service.process(message)
Create custom interceptors for cross-cutting concerns:
from finalsa.sqs.consumer import AsyncConsumerInterceptor
class LoggingInterceptor(AsyncConsumerInterceptor):
async def before_consume(self, topic: str, message: dict):
print(f"Processing {topic}: {message}")
async def after_consume(self, topic: str, result):
print(f"Completed {topic}")
app = SqsApp(interceptors=[LoggingInterceptor])
Configure timeout limits for message processing to prevent workers from being blocked:
# Fast operations (API calls, simple DB operations)
fast_app = SqsApp(
app_name="fast-processor",
queue_url="...",
workers=5,
message_timeout=30.0 # 30 seconds
)
# Data processing operations
data_app = SqsApp(
app_name="data-processor",
queue_url="...",
workers=3,
message_timeout=300.0 # 5 minutes (default)
)
# Heavy computation operations
heavy_app = SqsApp(
app_name="heavy-processor",
queue_url="...",
workers=2,
message_timeout=1800.0 # 30 minutes
)
When a message handler exceeds the timeout:
# This handler has a 2-minute timeout
app = SqsApp(message_timeout=120.0)
@app.handler("data.process")
async def process_data(message: dict):
# This operation must complete within 2 minutes
# or it will be cancelled and logged as timeout
await heavy_data_processing(message)
Use SqsAppTest
for testing message handlers:
from finalsa.sqs.consumer import SqsAppTest
def test_user_handler():
test_app = SqsAppTest(app)
# Test handler
result = test_app.test_handler(
"user.created",
{"user_id": 123, "name": "John"}
)
assert result is not None
The library provides specific exceptions:
TopicNotFoundException
: Handler not found for topicInvalidMessageException
: Message format validation failedTopicAlreadyRegisteredException
: Duplicate topic registrationAWS_REGION
: AWS region for SQSAWS_ACCESS_KEY_ID
: AWS access keyAWS_SECRET_ACCESS_KEY
: AWS secret keyExpected SQS message format:
{
"topic": "user.created",
"data": {
"user_id": 123,
"name": "John Doe"
},
"metadata": {
"correlation_id": "uuid",
"timestamp": "2024-01-01T00:00:00Z"
}
}
from finalsa.sqs.consumer import SignalHandler
signal_handler = SignalHandler(logger)
# Automatic graceful shutdown on SIGTERM/SIGINT
Configure workers for high-throughput message processing:
# High throughput configuration
app = SqsApp(
app_name="high-throughput-service",
queue_url="...",
max_number_of_messages=10, # Receive multiple messages per batch
workers=16 # 16 concurrent workers
)
@app.handler("bulk.process")
async def process_bulk_data(message: dict):
# Each message processed by available worker
await process_large_dataset(message)
app = SqsApp(workers=10) # Process messages with 10 concurrent workers
Benefits of Worker-based Processing:
app = SqsApp(max_number_of_messages=10) # Receive up to 10 messages per batch
pytest
ruff check .
coverage run -m pytest
coverage report
MIT License - see LICENSE.md for details.
finalsa-common-models
: Shared data modelsfinalsa-sqs-client
: SQS client implementationfinalsa-sns-client
: SNS client for notificationsfinalsa-dependency-injector
: Dependency injection frameworkFAQs
High-performance SQS message consumer with worker-based concurrency, dependency injection, and async support for Python applications
We found that finalsa-sqs-consumer demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.
Application Security
/Security News
Socket CEO Feross Aboukhadijeh and a16z partner Joel de la Garza discuss vibe coding, AI-driven software development, and how the rise of LLMs, despite their risks, still points toward a more secure and innovative future.
Research
/Security News
Threat actors hijacked Toptal’s GitHub org, publishing npm packages with malicious payloads that steal tokens and attempt to wipe victim systems.