torch-system
The beta version of IA training system, created using domain driven design and an event driven architecture.
Installation
Make sure you have a pytorch distribution installed. If you don't, go to the official website and follow the instructions.
Then, you can install the package using pip:
pip install torchsystem
Soon I will be adding the package to conda-forge when the package is more stable. Meanwhile you can install it with conda pip
conda install pip
pip install torchsystem
Introduction
Machine learning systems are getting more and more complex, and the need for a more organized and structured way to build and maintain them is becoming more evident. Training a neural network requires to define a cluster of related objects that should be treated as a single unit, this defines an aggregate. The training process mutates the state of the aggregate producing data that should be stored alongside the state of the aggregate in a transactional way. This establishes a clear bounded context that should be modeled using Domain Driven Design (DDD) principles.
The torch-system is a framework based on DDD and Event Driven Architecture (EDA) principles, using the pybondi library. It aims to provide a way to model complex machine models using aggregates and training flows using commands and events, and persist states and results using the repositories, the unit of work pattern and pub/sub.
It also provides out of the box tools for managing the training process, model compilation, centralized settings with enviroments variables using pydantic-settings, automatic parameter tracking using mlregistry.
Getting Started
The main concepts of the torch-system are:
- Aggregate: A cluster of related objects, for example neural networks, optimizers, optimizers, etc.. Each aggregate has a unique identifier and a root that can publish domain events. For example, let's say we need to model a classifier, we can define an aggregate called
Classifier
that contains a neural network, an optimizer, a loss function, etc.
from typing import Any
from typing import Callable
from torch import Tensor
from torch import inferece_mode
from torch.nn import Module
from torch.optim import Optimizer
from torchsystem import Aggregate
from torchsystem import Loader
class Classifier(Aggregate):
def __init__(self, id: Any, model: Module, criterion: Module, optimizer: Optimizer):
super().__init__(id)
self.model = model
self.criterion = criterion
self.optimizer = optimizer
def forward(self, input: Tensor):
return self.model(input)
def loss(self, output: Tensor, target: Tensor) -> Tensor:
return self.criterion(output, target)
def fit(self, loader: Loader, callback: Callable):
for batch, (input, target) in enumerate(loader, start=1):
self.optimizer.zero_grad()
output = self(input)
loss = self.loss(output, target)
loss.backward()
self.optimizer.step()
callback(self.id, batch, loss.item(), output, target)
@inference_mode()
def evaluate(self, loader: Loader, callback: Callable):
for batch, (input, target) in enumerate(loader, start=1):
output = self(input)
loss = self.loss(output, target)
callback(self.id, batch, loss.item(), output, target)
In DDD, aggregates can be complex, with multiple fields or dependencies that require specific rules for instantiation. Factories manage this complexity by encapsulating the creation logic. In modern machine learning frameworks, model creating go hand in hand with model compilation, so it makes sense to encapsulate the compilation process as a factory alike object that produces compiled aggregates.
In torchsystem can create a compiled instance of the aggregate using the compiler class. Let's say we have a model named MLP
.
model = MLP(784, 128, 10, p=0.2, activation='relu')
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
compiler = Compiler(Classifier)
classifier = compiler.compile('1', model, criterion, optimizer)
But what about configuration? torch configurations can be very complex to manage, for example compilers have a set of parameters
that can be configured, but we are not seeing them in the example above.
Every object in the torchsytem has a settings object instance initialized by default that can be used to configure them. For example, if you want to change the configuration of the compiler, you can do it like this:
from torchsystem.settings import Settings, CompilerSettings
settings = Settings(compiler=CompilerSettings(fullgraph=True))
compiler = Compiler(Classifier, settings=settings)
But if you don't want to be messing around passing settings objects to every object you create, you can define enviroment variables
that will be readed automatically thanks to pydantic-settings. For example, you can define a .env
file in the root of your project.
COMPILER_FULLGRAPH=True
COMPILER_RAISE_ON_ERROR=True
LOADER_PIN_MEMORY=True
LOADER_PIN_MEMORY_DEVICE='cuda:0'
LOADER_NUM_WORKERS=4
And that's it, the settings will be readed automatically when you create the compiler object, without manually passing the settings object to every object you create.
For more complex usage cases, you will need to create your own settings object to pass to your
defined aggregates. This can be done simply by inheriting from the BaseSettings
class and defining the settings you need.
By default, the Settings object has an AggregateSettings
object specifying the device for your aggregate. So you can refactor your aggregate to use it in it's training loop.
from torchsystem.settings import Settings
class Classifier(Aggregate):
def __init__(self, id: Any, model: Module, criterion: Module, optimizer: Optimizer, settings: Settings = None):
super().__init__(id)
self.model = model
self.criterion = criterion
self.optimizer = optimizer
self.settings = settings or Settings()
...
def fit(self, loader: Loader, callback: Callable):
device = self.settings.aggregate.device
for batch, (input, target) in enumerate(loader, start=1):
input, target = input.to(device), target.to(device)
...
And in your .env
file you can define the device for your aggregate.
AGGREGATE_DEVICE='cuda:0'
Now let's train the classifier using loaders. The Loaders
are a way to encapsulate the data loading process. You can use raw
data loaders from pytorch if you want, just like you were doing:
from torch.utils.data import DataLoader
loaders = [
('train', DataLoader(Digits(train=True), batch_size=32, shuffle=True)),
('eval', DataLoader(Digits(train=False), batch_size=32, shuffle=False))
]
But if you decide to use the Loaders
class will automatically track the parameters of you dataloaders using
the mlregistry
library.
from torchsystem import Loaders
loaders = Loaders()
loaders.add('train', Digits(train=True), batch_size=32, shuffle=True)
settings = Settings(loaders=LoadersSettings(num_of_workers=2))
loaders.add('eval', Digits(train=False), batch_size=32, shuffle=False, settings=settings)
Finally, you can train the classifier using predefined training and evaluation loops in the commands (or something defined by you in
a command handler). Let's start a training session.
from torchsystem import Session
from torchsystem.commands import Iterate
from torchsystem.commands import Train, Evaluate
with Session() as session:
session.add(classifier)
for epoch in range(1, 10):
session.execute(Iterate(classifier, loaders))
The Session
class is a context manager that will automatically start and stop a pub/sub system, commit or rollback it in case of errors,
store or restore the state of the aggregates given a defined repository, handle the events produced by the aggregates during the execution
of the commands using a messagebus and handlers you define, and will not be restricted to the default command handlers, you can also use your own with handlers you define.
"But what about metrics? I'm just seeing the loss being logged in my terminal". That's what callbacks are for. By default, torchsystem tracks the loss of your model, but this can be extended to any metric you want with callbacks. There are some predefined callbacks in the torchsystem.callbacks
.
from torchsystem.callbacks import Callbacks
from torchsystem.callbacks.average import Loss, Accuracy
callbacks = Callbacks([Loss(), Accuracy()])
...
for epoch in range(1, 10):
session.execute(Iterate(classifier, loaders, callbacks))
You can publish the metrics produced in the callbacks using a publisher. The publisher will publish the metrics in a topic, and you can subscribe to that topic to get the metrics. A publisher will be automatically created by the session and you can add subscribers fron there, but you can also pass your own publisher to the session, as you can with a repository or even a messagebus.
from torchsystem import Publisher
publisher = Publisher()
publisher.subscribe('metrics', lambda metric: print(metric))
repository = MyRepository()
callbacks.bind(publisher)
with Session(repository, publisher) as session:
session.add(classifier)
for epoch in range(1, 10):
session.execute(Iterate(classifier, loaders, callbacks))
if epoch % 4 == 0:
session.commit()
if epoch % 5 == 0:
raise Exception('Something went wrong')
There are even more stuff you can do with the torchsystem. Let's say you don't want to use repositories but persist your aggregates with events instead. Let's see an example of how you can do this using also the mlregistry
library for metadata tracking.
from mlregistry import get_metadata
from torchsystem.storage import Models, Criterions, Optimizers
from torchsystem.events import Added, RolledBack, Saved
from torchsystem.events import Iterated
Models.register(MLP)
Criterions.register(CrossEntropyLoss)
Optimizers.register(Adam)
model = MLP(784, 128, 10, p=0.2, activation='relu')
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
def persist_model(event: Added[Classifier]):
metadata = get_metadata(event.aggregate.model)
print(f'Persisting model {metadata.name} with parameters {metadata.parameters}')
models = Models()
models.store(event.aggregate.model)
...
def persist_optimizer(event: Added[Classifier], optimizers: Optimizers):
metadata = get_metadata(event.aggregate.criterion)
criterions.store(event.aggregate.criterion)
...
def print_datasets(event: Iterated[Classifier]):
for phase, loader in event.loaders:
metadata = get_metadata(loader.dataset)
...
Session.add_event_handler(Added, persist_model)
Session.add_event_handler(Added, lambda event: persist_optimizer(event, Optimizers()))
with Session() as session:
session.add(classifier)
for epoch in range(1, 10):
session.execute(Iterate(classifier, loaders, callbacks))
session.commit()
And that's it, you have a complete training system using DDD and EDA principles. You can define your own aggregates, commands, events, repositories, and handlers to create a complex training system that can be easily maintained and extended. There are event more stuff you can do with the torchsystem.
This is a very first working version of the torchsystem. The idea is to deploy real time machine learning systems not only for inference, but also for training in servers using REST apis. A lot of wild ideas come to my mind when I think about the possibilities of this system, like deploy distributed multi-model agents that can train them selves, models that raise events containing embeddings similar to a brain reacting to something, etc.
Any feedback or contribution is welcome.