Chronon Python API
Overview
Chronon Python API for materializing configs to be run by the Chronon Engine. Contains python helpers to help managed a repo of feature and join definitions to be executed by the chronon scala engine.
User API Overview
Sources
Most fields are self explanatory. Time columns are expected to be in milliseconds (unixtime).
from ai.chronon.query import (
Query,
select,
)
from ai.chronon.api.ttypes import Source, EventSource, EntitySource
Query(
selects=select(
user="user_id",
created_at="created_at",
),
wheres=["has_availability = 1"],
start_partition="2021-01-01",
setups=["...UDF..."],
time_column="ts",
end_partition=None,
mutation_time_column="mutation_timestamp",
reversal_column="CASE WHEN mutation_type IN ('DELETE', 'UPDATE_BEFORE') THEN true ELSE false END"
)
user_activity = Source(entities=EntitySource(
snapshotTable="db_exports.table",
mutationTable="mutations_namespace.table_mutations",
mutationTopic="mutationsKafkaTopic",
query=Query(...)
)
website__views = Source(events=EventSource(
table="namespace.table",
topic="kafkaTopicForEvents",
)
Group By (Features)
Group Bys are aggregations over sources that define features. For example:
from ai.chronon.group_by import (
GroupBy,
Window,
TimeUnit,
Accuracy,
Operation,
Aggregations,
Aggregation,
DefaultAggregation,
)
from sources import test_sources
sum_cols = [f"active_{x}_days" for x in [30, 90, 120]]
v0 = GroupBy(
sources=test_source.user_activity,
keys=["user"],
aggregations=Aggregations(
user_active_1_day=Aggregation(operation=Operation.LAST),
second_feature=Aggregation(
input_column="active_7_days",
operation=Operation.SUM,
windows=[
Window(n, TimeUnit.DAYS) for n in [3, 5, 9]
]
),
) + [
Aggregation(
input_column=col,
operation=Operation.SUM
) for col in sum_columns
] + [
Aggregation(
input_column="device",
operation=LAST_K(10)
)
],
dependencies=[
"db_exports.table/ds={{ ds }}"
],
accuracy=Accuracy.SNAPSHOT,
env={
"backfill": {
"EXECUTOR_MEMORY": "4G"
},
},
online=True,
production=False
)
Join
A Join is a collection of feature values for the keys and (times if applicable) defined on the left (source). Example:
from ai.chronon.join import Join, JoinPart
from sources import test_sources
from group_bys.example_team import example_group_by
v1 = Join(
left=test_sources.website__views,
right_parts=[
JoinPart(group_by=example_group_by.v0),
],
online=True,
production=False,
env={"backfill": {"PARALLELISM": "10"}, "streaming": {"STREAMING_ENV_VAR": "VALUE"}},
)
Pre-commit Setup
- Install pre-commit and other dev libraries:
pip install -r requirements/dev.txt
- Run the following command under
api/py
to install the git hook scripts:
pre-commit install
To support more pre-commit hooks, add them to the .pre-commit-config.yaml
file.