
Security News
GitHub Actions Pricing Whiplash: Self-Hosted Actions Billing Change Postponed
GitHub postponed a new billing model for self-hosted Actions after developer pushback, but moved forward with hosted runner price cuts on January 1.
amqpipe
Advanced tools
.. image:: https://travis-ci.org/Fatal1ty/amqpipe.svg?branch=master :target: https://travis-ci.org/Fatal1ty/amqpipe
.. image:: https://requires.io/github/Fatal1ty/amqpipe/requirements.svg?branch=master :target: https://requires.io/github/Fatal1ty/amqpipe/requirements/?branch=master :alt: Requirements Status
.. image:: https://img.shields.io/pypi/v/amqpipe.svg :target: https://pypi.python.org/pypi/amqpipe
.. image:: https://img.shields.io/pypi/pyversions/amqpipe.svg :target: https://pypi.python.org/pypi/amqpipe/
.. image:: https://img.shields.io/badge/license-MIT-blue.svg :target: https://raw.githubusercontent.com/Fatal1ty/amqpipe/master/LICENSE
Twisted based pipeline framework for AMQP. It allow you to create fast asynchronous services which follow ideology:
Install via pip:
::
pip install amqpipe
The minimal module based on AMQPipe is:
.. code:: python
from amqpipe import AMQPipe
pipe = AMQPipe()
pipe.run()
It will simply get all messages from one RabbitMQ queue and publish them to other RabbitMQ exchange.
Now we define some action on messages:
.. code:: python
import hashlib
from amqpipe import AMQPipe
def action(message):
return hashlib.md5(message).hexdigest()
pipe = AMQPipe(action=action)
pipe.run()
It will publish md5 checksum for every message as result.
If messages in input queue are in predefined format then you can define converter-function:
.. code:: python
import hashlib
from amqpipe import AMQPipe
def converter(message):
return message['text']
def action(text):
return hashlib.md5(text).hexdigest()
pipe = AMQPipe(converter=converter, action=action)
pipe.run()
You can define service-specific arguments:
.. code:: python
import hashlib
from amqpipe import AMQPipe
class Processor:
def set_field(self, field):
self.field = field
processor = Processor()
def init(args):
processor.set_field(args.field)
def converter(message):
return message.get(processor.field)
def action(text):
return hashlib.md5(text).hexdigest()
pipe = AMQPipe(converter, action, init)
pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
pipe.run()
You can connect to database in init function or do some other things
for initialization.
If your action returns Deferred then result would be published to RabbitMQ when this Deferred will be resolved:
.. code:: python
import logging
from twisted.internet import defer
from amqpipe import AMQPipe
logger = logging.getLogger(__name__)
class Processor:
def set_field(self, field):
self.field = field
processor = Processor()
def init(args):
connect_to_db()
...
def converter(message):
return message.get(processor.field)
@defer.inlineCallbacks
def action(text):
result = yield db_query(text)
logger.info('Get from db: %s', result)
defer.returnValue(result)
pipe = AMQPipe(converter, action, init)
pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
pipe.run()
Init function may return Deferred too.
FAQs
Twisted based pipeline framework for AMQP
We found that amqpipe demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
GitHub postponed a new billing model for self-hosted Actions after developer pushback, but moved forward with hosted runner price cuts on January 1.

Research
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.

Security News
Socket CTO Ahmad Nassri shares practical AI coding techniques, tools, and team workflows, plus what still feels noisy and why shipping remains human-led.