rococo
A Python library to help build things the way we want them built.
Anything worth doing is worth doing well. Anything worth doing twice is worth doing in rococo.
Decision Log -
How to document new decision
Basic Usage
Installation
Install using pip:
pip install rococo
Example
Models
from rococo.models import Person
someone = Person(first_name="John", last_name="Doe")
someone.prepare_for_save(changed_by_id=UUID("b30884cb-5127-457c-a633-4a800ad3c44b"))
someone.as_dict()
OUTPUT:
{
'active': True,
'changed_by_id': 'b30884cb-5127-457c-a633-4a800ad3c44b',
'changed_on': datetime.datetime(2023, 9, 20, 19, 50, 23, 532875),
'entity_id': 'ba68b3b1-fccd-4035-92f6-0ac2b29d71a1',
'first_name': 'John',
'last_name': 'Doe',
'previous_version': '3261fc4d-7db4-4945-91b5-9fb6a4b7dbc5',
'version': '4b6d92de-64bc-4dfb-a824-2151e8f11b73'
}
Messaging
RabbitMQ
from rococo.messaging import RabbitMqConnection
with RabbitMqConnection('host', 'port', 'username', 'password', 'virtual_host') as conn:
conn.send_message('queue_name', {'message': 'data'})
from rococo.messaging import RabbitMqConnection
def process_message(message_data: dict):
print(f"Processing message {message_data}...")
with RabbitMqConnection('host', 'port', 'username', 'password', 'virtual_host') as conn:
conn.consume_messages('queue_name', process_message)
SQS
from rococo.messaging import SqsConnection
with SqsConnection(region_name='us-east-1') as conn:
conn.send_message('queue_name', {'message': 'data'})
from rococo.messaging import SqsConnection
def process_message(message_data: dict):
print(f"Processing message {message_data}...")
with SqsConnection(region_name='us-east-1') as conn:
conn.consume_messages('queue_name', process_message)
conn = SqsConnection(region_name='us-east-1')
conn.send_message('queue_name', {'message': 'data'})
conn.consume_messages('queue_name', process_message)
Processing
Processing data from messages can be achieved by implementing the abstract class BaseServiceProcessor
within messaging/base.py
Data
SurrealDB
from rococo.data import SurrealDbAdapter
def get_db_connection():
endpoint = "ws://localhost:8000/rpc"
username = "myuser"
password = "mypassword"
namespace = "test"
database = "test"
return SurrealDbAdapter(endpoint, username, password, namespace, database)
with get_db_connection() as db:
db.execute_query("""insert into person {
user: 'me',
pass: 'very_safe',
tags: ['python', 'documentation']
};""")
print(db.execute_query("SELECT * FROM person;", {}))
Relationships in Surreal DB
Consider the following example models:
from dataclasses import field, dataclass
from rococo.repositories import SurrealDbRepository
from rococo.models import VersionedModel
from rococo.data import SurrealDbAdapter
@dataclass
class Email(VersionedModel):
email_address: str = None
@dataclass
class LoginMethod(VersionedModel):
email: str = field(default=None, metadata={
'relationship': {'model': Email, 'type': 'direct'},
'field_type': 'record_id'
})
method_type: str = None
@dataclass
class Person(VersionedModel):
login_method: str = field(default=None, metadata={
'relationship': {'model': LoginMethod, 'type': 'direct'},
'field_type': 'record_id'
})
name: str = None
@dataclass
class Organization(VersionedModel):
person: str = field(default=None, metadata={
'relationship': {'model': Person, 'type': 'direct'},
'field_type': 'record_id'
})
name: str = None
def get_db_connection():
endpoint = "ws://localhost:8000/rpc"
username = "root"
password = "root"
namespace = "breton1"
database = "bretondb1"
return SurrealDbAdapter(endpoint, username, password, namespace, database)
email = Email(email_address="test@example.com")
login_method = LoginMethod(
method_type="email-password",
email=email
)
person = Person(
name="Person1",
login_method=login_method.entity_id
)
organization = Organization(
name="Organization1",
person=str(person.entity_id)
)
with get_db_connection() as adapter:
person_repo = SurrealDbRepository(adapter, Person, None, None)
organization_repo = SurrealDbRepository(adapter, Organization, None, None)
login_method_repo = SurrealDbRepository(adapter, LoginMethod, None, None)
email_repo = SurrealDbRepository(adapter, Email, None, None)
organization_repo.save(organization)
person_repo.save(person)
login_method_repo.save(login_method)
email_repo.save(email)
organization = organization_repo.get_one({"entity_id": organization.entity_id}, fetch_related=['person'])
print(organization.person.entity_id)
print(organization.person.name)
organization = organization_repo.get_one({"entity_id": organization.entity_id})
print(organization.person.entity_id)
try:
print(organization.person.name)
except AttributeError:
pass
print(organization.as_dict(True))
organization = organization_repo.get_one({"entity_id": organization.entity_id}, fetch_related=['person', 'person.login_method', 'person.login_method.email'])
print(organization.entity_id)
print(organization.person.entity_id)
print(organization.person.login_method.entity_id)
print(organization.person.login_method.email.entity_id)
print(organization.person.login_method.email.email_address)
print(organization.as_dict(True))
Many-to-many relationships
from typing import List
@dataclass
class Investor(VersionedModel):
name: str = None
person: str = field(default=None, metadata={
'relationship': {'model': Person, 'type': 'direct'},
'field_type': 'record_id'
})
investments: List[VersionedModel] = field(default=None, metadata={
'relationship': {'model': 'Investment', 'type': 'associative', 'name': 'investswith', 'direction': 'out'},
'field_type': 'm2m_list'
})
@dataclass
class Investment(VersionedModel):
name: str = None
investors: List[VersionedModel] = field(default=None, metadata={
'relationship': {'model': 'Investor', 'type': 'associative', 'name': 'investswith', 'direction': 'in'},
'field_type': 'm2m_list'
})
investor1 = Investor(name="Investor1", person=person)
investor2 = Investor(name="Investor2", person=person.entity_id)
investor3 = Investor(name="Investor3", person=Person(entity_id=person.entity_id))
investment1 = Investment(name="Investment1")
investment2 = Investment(name="Investment2")
investment3 = Investment(name="Investment3")
with get_db_connection() as adapter:
investor_repo = SurrealDbRepository(adapter, Investor, None, None)
investment_repo = SurrealDbRepository(adapter, Investment, None, None)
investor_repo.save(investor1)
investor_repo.save(investor2)
investor_repo.save(investor3)
investment_repo.save(investment1)
investment_repo.save(investment2)
investment_repo.save(investment3)
investor_repo.relate(investor1, 'investswith', investment2)
investment_repo.relate(investor1, 'investswith', Investment(entity_id=investment3.entity_id))
investor_repo.relate(Investor(entity_id=investor2.entity_id), 'investswith', investment1)
investor_repo.relate(investor2, 'investswith', investment3)
investment_repo.relate(investor3, 'investswith', investment1)
investment_repo.relate(investor3, 'investswith', investment2)
for investment in investment_repo.get_many({}, fetch_related=['investors']):
print("Investment: ", investment.as_dict(True))
print()
for investment in investment_repo.get_many({}, fetch_related=['investors', 'investors.person', 'investors.person.login_method', 'investors.person.login_method.email']):
print("Investment: ", investment.as_dict(True))
print()
investor_with_investments = investor_repo.get_one({'entity_id': investor1.entity_id}, fetch_related=['investments'])
investments = investor_with_investments.investments
for investment in investments:
print(investment.as_dict())
Relationships in MySQL
Consider the following example models:
from dataclasses import field, dataclass
from rococo.repositories.mysql import MySqlRepository
from rococo.models import VersionedModel
from rococo.data import MySqlAdapter
@dataclass
class Email(VersionedModel):
email_address: str = None
@dataclass
class LoginMethod(VersionedModel):
email_id: str = None
method_type: str = None
@dataclass
class Person(VersionedModel):
login_method_id: str = None
name: str = None
@dataclass
class Organization(VersionedModel):
person_id: str = None
name: str = None
def get_db_connection():
return MySqlAdapter('localhost', 3306, 'root', 'ransomsnare_root_pass', 'testdb')
email = Email(email_address="test@example.com")
login_method = LoginMethod(
method_type="email-password",
email=email.entity_id
)
person = Person(
name="Person1",
login_method=login_method.entity_id
)
organization = Organization(
name="Organization1",
person=person.entity_id
)
with get_db_connection() as adapter:
person_repo = MySqlRepository(adapter, Person, None, None)
organization_repo = MySqlRepository(adapter, Organization, None, None)
login_method_repo = MySqlRepository(adapter, LoginMethod, None, None)
email_repo = MySqlRepository(adapter, Email, None, None)
organization_repo.save(organization)
person_repo.save(person)
login_method_repo.save(login_method)
email_repo.save(email)
organization = organization_repo.get_one({"entity_id": organization.entity_id})
print(organization.person_id)
print(organization.as_dict(True))
person = person_repo.get_one({"entity_id": organization.person})
person_orgs = organization_repo.get_many({
"person_id": person.entity_id
})
for org in person_orgs:
print(org.as_dict(True))
How to use the adapter and base Repository in another projects
class LoginMethodRepository(BaseRepository):
def __init__(self, adapter, message_adapter, queue_name):
super().__init__(adapter, LoginMethod, message_adapter, queue_name)
def save(self, login_method: LoginMethod, send_message: bool = False):
with self.adapter:
return super().save(login_method,send_message)
def get_one(self, conditions: Dict[str, Any]):
with self.adapter:
return super().get_one(conditions)
def get_many(self, conditions: Dict[str, Any]):
with self.adapter:
return super().get_many(conditions)
-
The LoginMethodRepository class is a concrete implementation of the BaseRepository class. It is responsible for managing LoginMethod objects in the database.
The init() method takes an adapter object as input. This adapter object is responsible for communicating with the database. The adapter object is passed to the super().init() method, which initializes the base repository class.
It also takes in a message adapter and queue name for RabbitMQ and SQS messaging which can later be used in the save() method by passing a boolean.
The save() method takes a LoginMethod object as input and saves it to the database. The get_one() method takes a dictionary of conditions as input and returns a single LoginMethod object that matches those conditions. The get_many() method takes a dictionary of conditions as input and returns a list of LoginMethod objects that match those conditions.
RepositoryFactory
class RepositoryFactory:
_repositories = {}
@classmethod
def _get_db_connection(cls):
endpoint = "ws://localhost:8000/rpc"
username = "myuser"
password = "mypassword"
namespace = "hell"
db_name = "abclolo"
return SurrealDbAdapter(endpoint, username, password, namespace, db_name)
@classmethod
def get_repository(cls, repo_class: Type[BaseRepository]):
if repo_class not in cls._repositories:
adapter = cls._get_db_connection()
cls._repositories[repo_class] = repo_class(adapter)
return cls._repositories[repo_class]
-
The RepositoryFactory class is a singleton class that is responsible for creating and managing repositories. It uses a cache to store the repositories that it has already created. This allows it to avoid creating the same repository multiple times.
The _get_db_connection() method creates a new database connection using the specified endpoint, username, password, namespace, and database name. The get_repository() method takes a repository class as input and returns the corresponding repository object. If the repository object does not already exist in the cache, then the factory will create a new one and add it to the cache.
Sample usage
sample_data = LoginMethod(
person_id="asd123123",
method_type="email",
method_data={},
email="user@example.com",
password="hashed_password",
)
repo = RepositoryFactory.get_repository(LoginMethodRepository)
result = repo.save(sample_data)
print("Done", repo.get_one({}))
-
The above code creates a new LoginMethod object and saves it to the database using the LoginMethodRepository object. It then retrieves the saved object from the database and prints it to the console.
This is just a simple example of how to use the LoginMethodRepository and RepositoryFactory classes. You can use these classes to manage any type of object in a database.
Rococo MySQL CLI (rococo-mysql
)
This CLI interface provides commands for managing MySQL migrations using the Rococo module. It supports creating new migrations, running forward and backward migrations, and retrieving the current database version. The CLI also handles environment variables from .env
files for database connection configurations.
Usage
rococo-mysql [OPTIONS] COMMAND
Options
--migrations-dir
(optional): Path to the migrations directory of your project. Defaults to checking standard directories (flask/app/migrations
, api/app/migrations
, app/migrations
).--env-files
(optional): Paths to environment files containing database connection details (e.g., .env.secrets
, <APP_ENV>.env
).
Commands
new
Creates a new migration file in the specified migrations directory.
rococo-mysql new
rf
Runs the forward migration, applying all unapplied migrations in sequence.
rococo-mysql rf
rb
Runs the backward migration, rolling back the last applied migration.
rococo-mysql rb
version
Displays the current database version.
rococo-mysql version
Environment Configuration
- If no
--env-files
are provided, the CLI attempts to load environment variables from .env.secrets
and an environment-specific <APP_ENV>.env
file. - The environment variables required for the database connection are:
MYSQL_HOST
MYSQL_PORT
MYSQL_USER
MYSQL_PASSWORD
MYSQL_DATABASE
Example
Running a forward migration:
rococo-mysql --migrations-dir=app/migrations --env-files=.env .env.secrets rf
This command runs all pending migrations using the specified environment files and migrations directory.
-
Create an email_transmitter
directory in your project under services
directory.
-
Add a config.json
file in the email_transmitter
directory. No other file is needed in this directory.
-
The config.json
should contain a configuration object with the following keys:
configurations
: A list of configuration objects. Currently, we are only using mailjet
provider. An example config for mailjet
provider looks like:
[
{
"provider": "mailjet",
"sourceEmail": "EcorRouge <system@ecorrouge.com>",
"errorReportingEmail": "system@ecorrouge.com"
}
]
events
: An object whose keys represent an event name and the value represents an object that represents the email to be sent when that event is received. An example events
object looks like:
{
"USER_CREATED": {
"subject": "Welcome {{var:recipient_name}}",
"templateName": "Welcome (PROD and TEST)",
"id": {
"mailjet": 4777555
}
}
}
-
Example config.json
:
{
"configurations": [
{
"provider": "mailjet",
"sourceEmail": "EcorRouge <system@ecorrouge.com>",
"errorReportingEmail": "system@ecorrouge.com"
}
],
"events": {
"USER_CREATED": {
"subject": "Welcome {{var:recipient_name}}",
"templateName": "Welcome (PROD and TEST)",
"id": {
"mailjet": 4777555
}
}
}
}
-
Add the email_transmitter
service to docker-compose.yml
. A simple definition looks like:
services:
email_transmitter:
image: ecorrouge/email-transmitter:latest
container_name: project_email_transmitter
restart: unless-stopped
env_file:
- ../.env
volumes:
- <path_to_email_transmitter_service>/config.json:/app/src/services/email_transmitter/src/config.json
-
Make sure MAILJET_API_KEY
and MAILJET_API_SECRET
are available in the provided env_file
file(s).
-
Make sure EmailServiceProcessor_QUEUE_NAME
and QUEUE_NAME_PREFIX
are available in the provided env_file
file(s).
-
Make sure the following variables are also available in the provided env_file
file(s):
RABBITMQ_HOST
RABBITMQ_PORT
RABBITMQ_USER
RABBITMQ_PASSWORD
RABBITMQ_VIRTUAL_HOST
-
Make sure the service is added to the same network as rest of the services that are to going to be calling this service.
-
How to call email-transmitter to send an email from application code:
from rococo.messaging import RabbitMqConnection
EMAIL_TRANSMITTER_QUEUE_NAME = os.getenv('QUEUE_NAME_PREFIX') + os.getenv('EmailServiceProcessor_QUEUE_NAME')
user_created = User(...)
message = {
"event": "USER_CREATED",
"data": {
"confirmation_link": confirmation_link,
"recipient_name": user.name,
},
"to_emails": [user.email],
}
with RabbitMqConnection('host', 'port', 'username', 'password', 'virtual_host') as conn:
conn.send_message(EMAIL_TRANSMITTER_QUEUE_NAME, message)
Deployment
The process described is a Continuous Integration (CI) and Continuous Deployment (CD) pipeline for a Python package using GitHub Actions. Here's the breakdown:
Development Phase
Developers push their changes directly to the main branch.
This branch is likely used for ongoing development work.
Staging/Testing Phase
When the team is ready to test a potential release, they push the code to a staging branch.
Once the code is pushed to this branch, GitHub Actions automatically publishes the package to the test PyPi server.
The package can then be reviewed and tested by visiting https://test.pypi.org/project/rococo/.
This step ensures that the package works as expected on the PyPi platform without affecting the live package.
Release/Publish Phase
When the team is satisfied with the testing and wants to release the package to the public, they create and publish a release on the GitHub repository.
Following this action, GitHub Actions takes over and automatically publishes the package to the official PyPi server.
The package can then be accessed and downloaded by the public at https://pypi.org/project/rococo/.
In essence, there are three primary phases:
- Development (main branch)
- Testing (staging branch with test PyPi server)
- Release (triggered by a GitHub release and published to the official PyPi server).
Local Development
To install local Rococo version in other project, upload to your PyPi:
- Run command "python setup.py sdist" to generate tar.gz file that will be uploaded to PyPi
- create ./pypirc file in the root of the directory and add:
[pypi]
username = token
password = THE_TOKEN_PROVIDED_BY_PYPI
- run the command: twine upload --config-file=./.pypirc dist/*