MRSAL
Intro
Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika. The goal is to make Mrsal trivial to re-use in all services of a distributed system and to make the use of advanced message queing protocols easy and safe. No more big chunks of repetive code across your services or bespoke solutions to handle dead letters.
Mrsal is Arabic for a small arrow and is used to describe something that performs a task with lightness and speed.
Quick Start guide
0. Install
First things first:
pip install mrsal
We need to install RabbitMQ to use Mrsal. Head over to install RabbitMQ. Make sure to stick to the configuration that you give the installation throughout this guide. You can also use the Dockerfile and the docker-compose that we are using in the full guide.
Next set the default username, password and servername for your RabbitMQ setup. It's advisable to use a .env
script or the rc file for persistence.
[RabbitEnvVars]
RABBITMQ_DEFAULT_USER=******
RABBITMQ_DEFAULT_PASS=******
RABBITMQ_DEFAULT_VHOST=******
RABBITMQ_DOMAIN=******
RABBITMQ_DOMAIN_TLS=******
RABBITMQ_GUI_PORT=******
RABBITMQ_PORT=******
RABBITMQ_PORT_TLS=******
RABBITMQ_CAFILE=/path/to/file
RABBITMQ_CERT=/path/to/file
RABBITMQ_KEY=/path/to/file
Please read the full guide to understand what Mrsal currently can and can't do.
Mrsal was first developed by NeoMedSys and the research group CRAI at the univeristy hospital of Oslo.
1. Setup and connect
The first thing we need to do is to setup our rabbit server before we can subscribe and publish to it. Lets set up a server on our localhost with the port and credentials we used when spinning up the docker-compose
import json
import pika
from mrsal.mrsal import Mrsal
SSL = False
port = RABBITMQ_PORT_TLS if SSL else RABBITMQ_PORT
host = RABBITMQ_DOMAIN_TLS if SSL else RABBITMQ_DOMAIN
credentials=(RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS)
v_host = RABBITMQ_DEFAULT_VHOST
mrsal = Mrsal(
host=host,
port=port,
credentials=credentials,
virtual_host=v_host,
ssl=SSL
)
mrsal.connect_to_server()
2 Publish
Now lets publish our message of friendship on the friendship exchange.
Note: When fast_setup=True
that means Mrsal will create the specified exchange
and queue
, then bind them together using routing_key
.
prop = pika.BasicProperties(
app_id='friendship_app',
message_id='friendship_msg',
content_type='text/plain',
content_encoding='utf-8',
delivery_mode=pika.DeliveryMode.Persistent,
headers=None)
message_body = 'Hello'
mrsal.publish_message(exchange='friendship',
exchange_type='direct',
queue='friendship_queue',
routing_key='friendship_key',
message=json.dumps(message_body),
prop=prop,
fast_setup=True)
3 Consume
Now lets setup a consumer that will listen to our very important messages. If you are using scripts rather than notebooks then it's advisable to run consume and publish separately. We are going to need a callback function which is triggered upon receiving the message from the queue we subscribe to. You can use the callback function to activate something in your system.
Note:
- If you start a consumer with
callback_with_delivery_info=True
then your callback function should have at least these params (method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str)
. - If not, then it should have at least
(message_param: str)
import json
def consumer_callback_with_delivery_info(host_param: str, queue_param: str, method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str):
str_message = json.loads(message_param).replace('"', '')
if 'Hello' in str_message:
app_id = properties.app_id
msg_id = properties.message_id
print(f'app_id={app_id}, msg_id={msg_id}')
print('Hola habibi')
return True
return False
def consumer_callback(host_param: str, queue_param: str, message_param: str):
str_message = json.loads(message_param).replace('"', '')
if 'Hello' in str_message:
print('Hola habibi')
return True
return False
mrsal.start_consumer(
queue='friendship_queue',
callback=consumer_callback_with_delivery_info,
callback_args=(test_config.HOST, 'friendship_queue'),
inactivity_timeout=1,
requeue=False,
fast_setup=True,
callback_with_delivery_info=True
)
Done! Your first message of friendship has been sent to the friendship queue on the exchange of friendship.
3 Concurrent Consumers
Sometimes we need to start multiple consumers to listen to the same queue and process received messages concurrently.
You can do that by calling start_concurrence_consumer
which takes total_threads
param in addition to the same parameters used in start_consumer
.
This method will create a thread pool and spawn new Mrsal
object and start new consumer for every thread.
import json
import time
import pika
from pika.exchange_type import ExchangeType
import mrsal.config.config as config
import tests.config as test_config
from mrsal.mrsal import Mrsal
mrsal = Mrsal(host=test_config.HOST,
port=config.RABBITMQ_PORT,
credentials=config.RABBITMQ_CREDENTIALS,
virtual_host=config.V_HOST)
mrsal.connect_to_server()
APP_ID="TEST_CONCURRENT_CONSUMERS"
EXCHANGE="GoodFriends"
EXCHANGE_TYPE='direct'
QUEUE_EMERGENCY="alleSindInkludiert"
NUM_THREADS=3
NUM_MESSAGES=3
INACTIVITY_TIMEOUT=30
ROUTING_KEY="bleib-cool"
MESSAGE_ID="Bleib cool und alles wird besser"
def test_concurrent_consumer():
mrsal.start_concurrence_consumer(total_threads=NUM_THREADS, queue=QUEUE_EMERGENCY,
callback=consumer_callback_with_delivery_info,
callback_args=(test_config.HOST, QUEUE_EMERGENCY),
exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE,
routing_key=ROUTING_KEY,
inactivity_timeout=INACTIVITY_TIMEOUT,
fast_setup=True,
callback_with_delivery_info=True)
mrsal.close_connection()
def consumer_callback_with_delivery_info(host_param: str, queue_param: str, method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str):
time.sleep(5)
return True
That simple! You have now setup a full advanced message queueing protocol that you can use to promote friendship or other necessary communication between your services.
Note! Please refer to the >>>FULL GUIDE
<<< on how to use customize Mrsal to meet specific needs. There are many parameters and settings that you can use to set up a more sophisticated communication protocol.
References