python-compton
An abstract data-flow framework for quantitative trading, which decouples data initialization, data composition and data processing.
Install
pip install compton
Usage
from compton import (
Orchestrator,
Provider,
Reducer,
Consumer
)
Vector
We call a tuple of hashable parameters as a vector which is used to identify a certain kind of data.
from enum import Enum
class DataType(Enum):
KLINE = 1
ORDER_BOOK = 2
class TimeSpan(Enum):
DAY = 1
WEEK = 2
vector = (DataType.KLINE, TimeSpan.DAY)
Orchestrator(reducers, loop=None)
- reducers
List[Reducer]
reducers to compose data - loop?
Optional[EventLoop]
The event loop object to use. In most cases, you should NOT pass this argument, unless you exact know what you are doing.
Orchestrator(
reducers
).connect(
provider
).subscribe(
consumer
).add('TSLA')
orchestrator.connect(provider: Provider) -> self
Connects to a data provider
orchestrator.subscribe(consumer: Consumer) -> self
Subscribes the consumer to orchestrator.
orchestrator.add(symbol: str) -> self
Adds a new symbol to orchestrator, and start the data flow for symbol
Provider
Provider
is an abstract class which provides initial data and data updates.
A provider should be implemented to support many symbols
We must inherit class Provider
and implement some abstract method before use.
@property vector
returns an Vector
async def init()
method returns the initial data- There is an protected method
self.dispatch(symbol, payload)
to set the payload updated, which should only be called in a coroutine, or a RuntimeError
is raised.
class MyProvider(Provider):
@property
def vector(self):
return (DataType.KLINE, TimeSpan.DAY)
async def init(self, symbol):
return {}
Reducer
Another abstract class which handles data composition.
The reducer.vector
could be a generic vector which applies partial match to other vectors
class MyReducer(Reducer):
@property
def vector(self):
return (DataType.KLINE,)
def merge(self, old, new):
if old is None:
return clean(new)
return {**old, **new}
Consumer
A consumer could subscribes to more than one kind of data types
class MyConsumer(Consumer):
@property
def vectors(self):
return [
(DataType.KLINE, TimeSpan.DAY),
(DataType.KLINE, TimeSpan.WEEK)
]
@property
def all(self) -> bool:
"""
`True` indicates that the consumer will only go processing
if both of the data corresponds with the two vectors have changes
And by default, `Consumer::all` is False
"""
return True
@property
def concurrency(self) -> int:
"""
Concurrency limit for method `process()`
By default, `Consumer::concurrency` is `0` which means no limit
"""
return 1
def should_process(self, *payloads) -> bool:
"""
If this method returns `False`, then the data update will not be processed
"""
return True
async def process(self, symbol, kline_day, kline_week):
await doSomething(symbol, kline_day, kline_week)
License
MIT