Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
datarobot-model-metrics
Advanced tools
datarobot-model-metrics provides a framework to compute model ML metrics over time and produce aggregated metrics.
The datarobot-model-metrics
(dmm) library provides the tools necessary to create custom metrics,
including the following operations:
Review the following documentation to learn more about the datarobot-model-metrics
interfaces used to
perform custom metric operations:
For end-to-end examples, you can review the ./examples directory.
There are two primary ecosystems where you can develop a custom metric:
Python modules:
dmm
module (built in this repository—it will update the Python environment with all the required modules.Setting parameters:
Initializing your application, as show below, allows setting parameters using either method without significant changes.
The CustomMetricArgumentParser is a class that wraps the standard argparser.ArgumentParser
.
It provides some convenience functions that allow reading values from the environment or normal argument parsing.
When the CustomMetricArgumentParser.parse_args()
gets called, it checks for missing values.
The log_manager provides a set of functions to help with logging.
The DMM and the DataRobot public API client use standard Python logging
primitives.
A complete list of log classes with their current levels is available using get_log_levels()
.
The initialize_loggers()
function takes care of initializing all the loggers with a format like shown below:
2024-08-09 02:19:50 PM - dmm.data_source.datarobot_source - INFO - fetching the next predictions dataframe... 2024-07-15 00:00:00 - 2024-08-09 14:19:46.643722
2024-08-09 02:19:56 PM - urllib3.connectionpool - DEBUG - https://staging.datarobot.com:443 "POST /api/v2/deployments/66a90a712ad81645df8c469c/predictionDataExports/ HTTP/1.1" 202 368
The following snippet shows how to set up your runtime environment using the previously mentioned classes:
import sys
from dmm import CustomMetricArgumentParser
from dmm.log_manager import initialize_loggers
parser = CustomMetricArgumentParser(description="My new custom metric")
parser.add_base_args() # adds standard arguments
# Add more with standard ArgumentParser primitives, or some convenience functions such as add_environment_arg()
# Parse the program arguments (if any) to an argparse.Namespace.
args = parser.parse_args(sys.argv[1:])
# Initialize the logging based on the 'LOG' environment variable, or the --log option
initialize_loggers(args.log)
The standard/base arguments include:
dmm
and datarobot
modules to WARNING
.Here's an example of the help when using the CustomMetricArgumentParser
:
(model-runner) $ python3 custom.py --help
usage: custom.py [-h] [--api-key KEY] [--base-url URL] [--deployment-id ID] [--custom-metric-id ID] [--dry-run] [--start-ts TIMESTAMP] [--end-ts TIMESTAMP] [--max-rows ROWS] [--required] [--log [[NAME:]LEVEL ...]]
My new custom metric
optional arguments:
-h, --help show this help message and exit
--api-key KEY API key used to authenticate to server. Settable via 'API_KEY', required.
--base-url URL URL for server. Settable via 'BASE_URL' (default: https://staging.datarobot.com/api/v2), required.
--deployment-id ID Deployment ID. Settable via 'DEPLOYMENT_ID' (default: None), required.
--custom-metric-id ID
Custom metric ID. Settable via 'CUSTOM_METRIC_ID' (default: None), required.
--dry-run Dry run. Settable via 'DRY_RUN' (default: False).
--start-ts TIMESTAMP Start timestamp. Settable with 'START_TS', or 'LAST_SUCCESSFUL_RUN_TS' (when not dry run). Default is 2024-08-08 14:27:55.493027
--end-ts TIMESTAMP End timestamp. Settable with 'END_TS' or 'CURRENT_RUN_TS'. Default is 2024-08-09 14:27:55.493044.
--max-rows ROWS Maximum number of rows. Settable via 'MAX_ROWS' (default: 100000).
--required List the required properties and exit.
--log [[NAME:]LEVEL ...]
Logging level list. Settable via 'LOG' (default: WARNING).
(model-runner) $
This section mentions some utilities that may help you develop.
During development, it is common to run your code over the same data multiple times to see how your changes impact the results.
The save_to_csv()
utility allows you to save your results to a CSV file, so you can compare the results between successive runs on the same data.
The most commonly used data source is DataRobotSource. This data source connects to DataRobot to fetch selected data from the DataRobot platform.
Initialize DataRobotSource
with the following mandatory parameters:
from dmm.data_source import DataRobotSource
source = DataRobotSource(
base_url=DATAROBOT_ENDPOINT,
token=DATAROBOT_API_TOKEN,
deployment_id=deployment_id,
start=start_of_export_window,
end=end_of_export_window,
)
You can also provide the base_url
and token
parameters as environment variables:
os.environ['DATAROBOT_ENDPOINT']
and os.environ['BASE_URL']
from dmm.data_source import DataRobotSource
source = DataRobotSource(
deployment_id=deployment_id,
start=start_of_export_window,
end=end_of_export_window,
)
The following example initializes DataRobotSource
with all parameters:
from dmm.data_source import DataRobotSource
source = DataRobotSource(
base_url=DATAROBOT_ENDPOINT,
token=DATAROBOT_API_TOKEN,
client=None,
deployment_id=deployment_id,
model_id=model_id,
start=start_of_export_window,
end=end_of_export_window,
max_rows=10000,
delete_exports=False,
use_cache=False,
actuals_with_matched_predictions=True,
)
Parameter | Description |
---|---|
base_url: str | The DataRobot API URL; for example, https://app.datarobot.com/api/v2 . |
token: str | A DataRobot API token from Developer Tools. |
client: Optional[DataRobotClient] | Use the DataRobotClient object instead of base_url and token . |
deployment_id: str | The ID of the deployment evaluated by the custom metric. |
model_id: Optional[str] | The ID of the model evaluated by the custom metric. If you don't specify a model ID, the champion model ID is used. |
start: datetime | The start of the export window. Define the date you want to start to retrieving data from. |
end: datetime | The end of the export window. Define the date you want to retrieve data until. |
max_rows: Optional[int] | The maximum number of rows to fetch at once when the requested data doesn't fit into memory. |
delete_exports: Optional[bool] | If True , datasets with exported data created in the AI Catalog are automatically deleted. The default value is False . |
use_cache: Optional[bool] | If True , use existing datasets stored in the AI Catalog for time ranges included in previous exports. The default value is False . |
actuals_with_matched_predictions: Optional[bool] | If False , allow actuals export without matched predictions. The default value is True . |
The get_prediction_data
method returns a chunk of prediction data with the appropriate chunk ID;
the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows
parameter.
This method returns data until the data source is exhausted.
prediction_df_1, prediction_chunk_id_1 = source.get_prediction_data()
print(prediction_df_1.head(5).to_string())
print(f"chunk id: {prediction_chunk_id_1}")
DR_RESERVED_PREDICTION_TIMESTAMP DR_RESERVED_PREDICTION_VALUE_high DR_RESERVED_PREDICTION_VALUE_low date_non_unique date_random id 年月日
0 2023-09-13 11:02:51.248000+00:00 0.697782 0.302218 1950-10-01 1949-01-27 1 1949-01-01
1 2023-09-13 11:02:51.252000+00:00 0.581351 0.418649 1959-04-01 1949-02-03 2 1949-02-01
2 2023-09-13 11:02:51.459000+00:00 0.639347 0.360653 1954-05-01 1949-03-28 3 1949-03-01
3 2023-09-13 11:02:51.459000+00:00 0.627727 0.372273 1951-09-01 1949-04-07 4 1949-04-01
4 2023-09-13 11:02:51.664000+00:00 0.591612 0.408388 1951-03-01 1949-05-16 5 1949-05-01
chunk id: 0
When the data source is exhausted, None
and -1
are returned:
prediction_df_2, prediction_chunk_id_2 = source.get_prediction_data()
print(prediction_df_2)
print(prediction_chunk_id_2)
None
chunk id: -1
The reset
method resets the exhausted data source, allowing it to iterate from the beginning:
source.reset()
The get_all_prediction_data
method returns all prediction data available for a data source object in a single DataFrame:
prediction_df = source.get_all_prediction_data()
The get_actuals_data
method returns a chunk of actuals data with the appropriate chunk ID
the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows
parameter.
This method returns data until the data source is exhausted.
actuals_df_1, actuals_chunk_id_1 = source.get_actuals_data()
print(actuals_df_1.head(5).to_string())
print(f"chunk id: {actuals_chunk_id_1}")
association_id timestamp label actuals predictions predicted_class
0 1 2023-09-13 11:00:00+00:00 low 0 0.302218 high
194 57 2023-09-13 11:00:00+00:00 low 1 0.568564 low
192 56 2023-09-13 11:00:00+00:00 low 1 0.569865 low
190 55 2023-09-13 11:00:00+00:00 low 0 0.473282 high
196 58 2023-09-13 11:00:00+00:00 low 1 0.573861 low
chunk id: 0
To return raw data in the format of data from postgresql, set the return_original_column_names
parameter to True
:
actuals_df_1, actuals_chunk_id_1 = source.get_actuals_data()
print(actuals_df_1.head(5).to_string())
print(f"chunk id: {actuals_chunk_id_1}")
id timestamp label actuals y predicted_class
0 1 2023-09-13 11:00:00+00:00 low 0 0.302218 high
194 57 2023-09-13 11:00:00+00:00 low 1 0.568564 low
192 56 2023-09-13 11:00:00+00:00 low 1 0.569865 low
190 55 2023-09-13 11:00:00+00:00 low 0 0.473282 high
196 58 2023-09-13 11:00:00+00:00 low 1 0.573861 low
chunk id: 0
To return all actuals data available for a source object in a single DataFrame, use the get_all_actuals_data
method:
actuals_df = source.get_all_actuals_data()
When the data source is exhausted, None
and -1
are returned:
actuals_df_2, actuals_chunk_id_2 = source.get_actuals_data()
print(actuals_df_2)
print(actuals_chunk_id_2)
None
chunk id: -1
The reset
method resets the exhausted data source, allowing it to iterate from the beginning:
source.reset()
The get_training_data
method returns all data used for training in one call. The returned data is a pandas DataFrame:
train_df = source.get_training_data()
print(train_df.head(5).to_string())
y date_random date_non_unique 年月日
0 high 1949-01-27 1950-10-01 1949-01-01
1 high 1949-02-03 1959-04-01 1949-02-01
2 low 1949-03-28 1954-05-01 1949-03-01
3 high 1949-04-07 1951-09-01 1949-04-01
4 high 1949-05-16 1951-03-01 1949-05-01
The get_data
method returns combined_data
, which includes merged scoring data, predictions, and matched actuals:
This Metric Evaluator uses this method as the main data export method.
df, chunk_id_1 = source.get_data()
print(df.head(5).to_string())
print(f"chunk id: {chunk_id_1}")
timestamp predictions date_non_unique date_random association_id 年月日 predicted_class label actuals
0 2023-09-13 11:02:51.248000+00:00 0.302218 1950-10-01 1949-01-27 1 1949-01-01 high low 0
1 2023-09-13 11:02:51.252000+00:00 0.418649 1959-04-01 1949-02-03 2 1949-02-01 high low 0
2 2023-09-13 11:02:51.459000+00:00 0.360653 1954-05-01 1949-03-28 3 1949-03-01 high low 1
3 2023-09-13 11:02:51.459000+00:00 0.372273 1951-09-01 1949-04-07 4 1949-04-01 high low 0
4 2023-09-13 11:02:51.664000+00:00 0.408388 1951-03-01 1949-05-16 5 1949-05-01 high low 0
chunk id: 0
The get_all_data
returns all combined data available for that source object in a single DataFrame:
df = source.get_all_data()
The BatchDataRobotSource interface is for batch deployments.
The following example initializes BatchDataRobotSource
with all parameters:
from dmm.data_source import BatchDataRobotSource
source = BatchDataRobotSource(
base_url=DATAROBOT_ENDPOINT,
token=DATAROBOT_API_TOKEN,
client=None,
deployment_id=deployment_id,
model_id=model_id,
batch_ids=batch_ids,
max_rows=10000,
delete_exports=False,
use_cache=False,
)
The parameters for this method are analogous to those for DataRobotSource
.
The most important difference is that instead of the time range (start and end), you must provide batch IDs.
In addition, a batch source doesn't support actuals export.
The get_prediction_data
method returns a chunk of prediction data with the appropriate chunk ID;
the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows
parameter.
This method returns data until the data source is exhausted.
prediction_df_1, prediction_chunk_id_1 = source.get_prediction_data()
print(prediction_df_1.head(5).to_string())
print(f"chunk id: {prediction_chunk_id_1}")
AGE B CHAS CRIM DIS batch_id DR_RESERVED_BATCH_NAME timestamp INDUS LSTAT MEDV NOX PTRATIO RAD RM TAX ZN id
0 65.2 396.90 0 0.00632 4.0900 <batch_id> batch1 2023-06-23 09:47:47.060000+00:00 2.31 4.98 24.0 0.538 15.3 1 6.575 296 18.0 1
1 78.9 396.90 0 0.02731 4.9671 <batch_id> batch1 2023-06-23 09:47:47.060000+00:00 7.07 9.14 21.6 0.469 17.8 2 6.421 242 0.0 2
2 61.1 392.83 0 0.02729 4.9671 <batch_id> batch1 2023-06-23 09:47:47.060000+00:00 7.07 4.03 34.7 0.469 17.8 2 7.185 242 0.0 3
3 45.8 394.63 0 0.03237 6.0622 <batch_id> batch1 2023-06-23 09:47:47.060000+00:00 2.18 2.94 33.4 0.458 18.7 3 6.998 222 0.0 4
4 54.2 396.90 0 0.06905 6.0622 <batch_id> batch1 2023-06-23 09:47:47.060000+00:00 2.18 5.33 36.2 0.458 18.7 3 7.147 222 0.0 5
chunk id: 0
prediction_df = source.get_all_prediction_data()
source.reset()
df, chunk_id_1 = source.get_data()
The get_training_data
method returns all data used for training in one call. The returned data is a pandas DataFrame:
train_df = source.get_training_data()
Note:: actuals export for batches is not implemented yet.
If you aren't exporting data directly from DataRobot, and instead have it downloaded locally (for example),
you can load the dataset into DataFrameSource.
The DataFrameSource
method wraps any pd.DataFrame
to create a library-compatible source.
This is the easiest way to interact with the library when bringing your own data:
source = DataFrameSource(
df=pd.read_csv("./data_hour_of_week.csv"),
max_rows=10000,
timestamp_col="date"
)
df, chunk_id_1 = source.get_data()
print(df.head(5).to_string())
print(f"chunk id: {chunk_id_1}")
date y
0 1959-12-31 23:59:57 -0.183669
1 1960-01-01 01:00:02 0.283993
2 1960-01-01 01:59:52 0.020663
3 1960-01-01 03:00:14 0.404304
4 1960-01-01 03:59:58 1.005252
chunk id: 0
In addition, it is possible to create new data source definitions. To define a new data source, you can customize and implement the DataSourceBase interface.
The TimeBucket
enum defines the required data aggregation granularity over time . By default, TimeBucket
is set to TimeBucket.ALL
.
You can specify any of the following values: SECOND
, MINUTE
, HOUR
, DAY
, WEEK
, MONTH
, QUARTER
, or ALL
.
To change the TimeBucket
value, use the init
method: source.init(time_bucket)
:
# let's generate a dummy DataFrame with 2 rows per time bucket (Hour in this scenario)
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=10,
rows_per_time_bucket=2,
prediction_value=1,
with_actuals=True,
with_predictions=True,
time_bucket=TimeBucket.HOUR,
)
print(test_df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 0.999
1 01/06/2005 13:00:00.000000 1 0.999
2 01/06/2005 14:00:00.000000 1 0.999
3 01/06/2005 14:00:00.000000 1 0.999
4 01/06/2005 15:00:00.000000 1 0.999
5 01/06/2005 15:00:00.000000 1 0.999
6 01/06/2005 16:00:00.000000 1 0.999
7 01/06/2005 16:00:00.000000 1 0.999
8 01/06/2005 17:00:00.000000 1 0.999
9 01/06/2005 17:00:00.000000 1 0.999
# let's use DataFrameSource and load created DataFrame
source = DataFrameSource(
df=test_df,
max_rows=10000,
timestamp_col="timestamp",
)
# init source with the selected TimeBucket
source.init(TimeBucket.HOUR)
df, _ = source.get_data()
print(df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 0.999
1 01/06/2005 13:00:00.000000 1 0.999
df, _ = source.get_data()
print(df)
timestamp predictions actuals
2 01/06/2005 14:00:00.000000 1 0.999
3 01/06/2005 14:00:00.000000 1 0.999
source.init(TimeBucket.DAY)
df, _ = source.get_data()
print(df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 0.999
1 01/06/2005 13:00:00.000000 1 0.999
2 01/06/2005 14:00:00.000000 1 0.999
3 01/06/2005 14:00:00.000000 1 0.999
4 01/06/2005 15:00:00.000000 1 0.999
5 01/06/2005 15:00:00.000000 1 0.999
6 01/06/2005 16:00:00.000000 1 0.999
7 01/06/2005 16:00:00.000000 1 0.999
8 01/06/2005 17:00:00.000000 1 0.999
9 01/06/2005 17:00:00.000000 1 0.999
The returned data chunks follow the selected TimeBucket
. This is helpful in the
MetricEvaluator. In addition to TimeBucket
, the source respects the max_rows
parameter when generating data chunks; for example, using the same dataset as in the example above (but with max_rows
set to 3
):
source = DataFrameSource(
df=test_df,
max_rows=3,
timestamp_col="timestamp",
)
source.init(TimeBucket.DAY)
df, chunk_id = source.get_data()
print(df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 0.999
1 01/06/2005 13:00:00.000000 1 0.999
2 01/06/2005 14:00:00.000000 1 0.999
In DataRobotSource
, you can specify the TimeBucket
and max_rows
parameters for all export types except
training data export, which is returned in one piece.
The Deployment
class is a helper class which provides access to relevant deployment properties. This class is used
inside the DataRobotSource
to select the appropriate workflow to work with data.
import datarobot as dr
from dmm.data_source.datarobot.deployment import Deployment
DataRobotClient()
deployment = Deployment(deployment_id=deployment_id)
deployment_type = deployment.type()
target_column = deployment.target_column()
positive_class_label = deployment.positive_class_label()
negative_class_label = deployment.negative_class_label()
prediction_threshold = deployment.prediction_threshold()
.
.
.
The MetricBase provides an interface to define your own custom metrics.
See the examples of custom metrics located in the metric directory.
In MetricBase
, you must define the type of data a metric requires, and the custom metric must inherit that definition:
class MetricBase(object):
def __init__(
self,
name: str,
description: str = None,
need_predictions: bool = False,
need_actuals: bool = False,
need_scoring_data: bool = False,
need_training_data: bool = False,
):
self.name = name
self.description = description
self._need_predictions = need_predictions
self._need_actuals = need_actuals
self._need_scoring_data = need_scoring_data
self._need_training_data = need_training_data
In addition, you must implement the scoring and reduction methods in MetricBase
:
score
): Uses initialized data types to calculate a metric.reduce_func
): Reduces multiple values in the same TimeBucket
to one value. def score(
self,
scoring_data: pd.DataFrame,
predictions: np.ndarray,
actuals: np.ndarray,
fit_ctx=None,
metadata=None,
) -> float:
raise NotImplemented
def reduce_func(self) -> callable:
return np.mean
Two default classes can help you create your own custom metrics: ModelMetricBase
and DataMetricBase
.
ModelMetricBase
is the base class for metrics that require actuals and predictions for metric calculation.
class ModelMetricBase(MetricBase):
def __init__(
self, name: str, description: str = None, need_training_data: bool = False
):
super().__init__(
name=name,
description=description,
need_scoring_data=False,
need_predictions=True,
need_actuals=True,
need_training_data=need_training_data,
)
def score(
self,
prediction: np.ndarray,
actuals: np.ndarray,
fit_context=None,
metadata=None,
scoring_data=None,
) -> float:
raise NotImplemented
DataMetricBase
is the base class for metrics that require scoring data for metric calculation.
class DataMetricBase(MetricBase):
def __init__(
self, name: str, description: str = None, need_training_data: bool = False
):
super().__init__(
name=name,
description=description,
need_scoring_data=True,
need_predictions=False,
need_actuals=False,
need_training_data=need_training_data,
)
def score(
self,
scoring_data: pd.DataFrame,
fit_ctx=None,
metadata=None,
predictions=None,
actuals=None,
) -> float:
raise NotImplemented
In the case of LLM support, a new type of metric was introduced
LLMMetricBase
is the base class for metrics that require scoring data and predictions for metric calculation.
Which in the LLM world can be translated into prompts (user input) and completions (LLM response).
class LLMMetricBase(MetricBase):
def __init__(
self, name: str, description: str = None, need_training_data: bool = False
):
super().__init__(
name=name,
description=description,
need_scoring_data=True,
need_predictions=True,
need_actuals=False,
need_training_data=need_training_data,
)
def score(
self,
scoring_data: pd.DataFrame,
predictions: np.ndarray,
fit_ctx=None,
metadata=None,
actuals=None,
) -> float:
raise NotImplemented
To accelerate the implementation of custom metrics, you can use ready-made, proven metrics from
Sklearn.
Provide the name of a metric, using the SklearnMetric
class as the base class, to create a custom metric this way.
See the example below:
from dmm.metric import SklearnMetric
class MedianAbsoluteError(SklearnMetric):
"""
Metric that calculates the median absolute error of the difference between predictions and actuals
"""
def __init__(self):
super().__init__(
metric="median_absolute_error",
)
The PromptSimilarityMetricBase
is designed to make for easy comparison of LLM prompt and context vectors.
This class is generally used with TextGen models where the prompt and context vectors are populated as described below.
The base class takes care of pulling the vectors from the scoring_data
, and iterating over each entry.
The prompt vector is pulled from the prompt_column
(which defaults to _LLM_PROMPT_VECTOR
) of the scoring_data
.
The context vectors are pulled from the context_column
(which defaults to _LLM_CONTEXT
) of the scoring_data
. The context column contains a list of context dictionaries, and each context needs to have a vector
element.
Both the prompt_column
and context_column
are expected to be JSON encoded data.
A derived class must implement calculate_distance()
-- for this class, the score()
is already implemented.
The calculate_distance
function returns a single floating point value based on a single prompt_vector
, and a list of context_vectors
.
Using the PromptSimilarityMetricBase
to calculate the minimum Euclidean distance is shown below:
from dmm.metric import PromptSimilarityMetricBase
class EuclideanMinMetric(PromptSimilarityMetricBase):
"""Calculate the minimum Euclidean distance between a prompt vector, and a list of context vectors"""
def calculate_distance(self, prompt_vector: np.ndarray, context_vectors: List[np.ndarray]) -> float:
distances = [
np.linalg.norm(prompt_vector - context_vector)
for context_vector in context_vectors
]
return min(distances)
# an instantiation would potentially look like this
scorer = EuclideanMinMetric(name=custom_metric.name, description="Euclidean minimum distance between prompt and context vectors")
datarobot-model-metrics
The metrics mentioned above can provide the source of the custom metric definitions in the DataRobot platform.
The CustomMetric interface retrieves the metadata of an existing custom metric
in DataRobot to report data to that custom metric.
We can initialize the metric by providing the parameters explicitly (metric_id
, deployment_id
, model_id
, DataRobotClient()
):
from dmm import CustomMetric
cm = CustomMetric.from_id(metric_id=METRIC_ID, deployment_id=DEPLOYMENT_ID, model_id=MODEL_ID, client=CLIENT)
You can also define these parameters as environment variables: os.environ["DEPLOYMENT_ID"]
, os.environ["CUSTOM_METRIC_ID"]
os.environ['BASE_URL']
, and os.environ['DATAROBOT_ENDPOINT']
:
from dmm import CustomMetric
cm = CustomMetric.from_id()
In the case of batch mode, it is required to specify it:
from dmm import CustomMetric
cm = CustomMetric.from_id(is_batch=True)
The report
method submits custom metric values to a custom metric defined in DataRobot.
To use this method, report a dataframe in the shape of the output from the metric evaluator.
For more information, see MetricEvaluator.
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples median_absolute_error
1 01/06/2005 14:00:00.000000 2 0.001
response = cm.report(df=aggregated_metric_per_time_bucket)
print(response.status_code)
202
The dry_run
parameter determines if the custom metric values transfer is a dry run
(where the values aren't saved in the database) or if it is a production data transfer.
This parameter is set to false by default.
response = cm.report(df=aggregated_metric_per_time_bucket, dry_run=True)
print(response.status_code)
202
The MetricEvaluator class calculates metric values over time using the selected source.
This class is used to "stream" data through the metric object, generating metric values.
Initialize the MetricEvaluator
with the following mandatory parameters:
from dmm import MetricEvaluator, TimeBucket
from dmm import DataRobotSource
from dmm.metric import MedianAbsoluteError
source = DataRobotSource(
deployment_id=DEPLOYMENT_ID,
start=datetime.utcnow() - timedelta(weeks=1),
end=datetime.utcnow(),
)
metric = MedianAbsoluteError()
metric_evaluator = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.MINUTE)
To use MetricEvaluator
, create a metric class implementing the MetricBase
interface, a source implementing
DataSourceBase
, and then specify the level of aggregation granularity.
Initialize MetricEvaluator
with all parameters:
from dmm import ColumnName, MetricEvaluator, TimeBucket
metric_evaluator = MetricEvaluator(
metric=metric,
source=source,
time_bucket=TimeBucket.HOUR,
prediction_col=ColumnName.PREDICTIONS,
actuals_col=ColumnName.ACTUALS,
timestamp_col=ColumnName.TIMESTAMP,
filter_actuals=False,
filter_predictions=False,
filter_scoring_data=False,
segment_attribute=None,
segment_value=None,
)
Parameter | Description |
---|---|
metric: Union[str, MetricBase, List[str], List[MetricBase]] | If a string or list of strings is passed,then MetricEvaluator will look for matched sklearn metrics, in case a metrics or list of objects is passed they must implement MetricBase interface. |
source: DataSourceBase | Source to pull the data from, DataRobotSource or DataFrameSource or other sources that implement DataSourceBase interface. |
time_bucket: TimeBucket | Time bucket size to use for evaluating metrics, determines the granularity of aggregation. |
prediction_col: Optional[str] | The name of the column that contains predictions. |
actuals_col: Optional[str] | The name of the column that contains actuals. |
timestamp_col: Optional[str] | The name of the column that contains timestamps. |
filter_actuals: Optional[bool] | If True metric evaluator removes missing actuals values before scoring. The default value is False . |
filter_predictions: Optional[bool] | If True metric evaluator removes missing predictions values before scoring. The default value is False . |
filter_scoring_data: Optional[bool] | If True metric evaluator removes missing scoring values before scoring. The default value is False . |
segment_attribute: Optional[str] | The name of the column with segment values. |
segment_value: Optional[Union[str or List[str]]] | Single value or a list of values of the segment attribute to segment on. |
The score
method returns a metric aggregated as defined by `TimeBucket, The output returned as a pandas DataFrame contains
the results per time bucket for all data from the source.
source = DataRobotSource(
deployment_id=DEPLOYMENT_ID,
start=datetime.utcnow() - timedelta(hours=3),
end=datetime.utcnow(),
)
metric = LogLossFromSklearn()
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss
0 2023-09-14 13:29:48.065000+00:00 499 0.539315
1 2023-09-14 14:01:51.484000+00:00 499 0.539397
# we can see the evaluator's statistics
stats = me.stats()
print(stats)
total rows: 998, score calls: 2, reduce calls: 2
To pass more than one metric at a time, you can do the following:
metrics = [LogLossFromSklearn(), AsymmetricError(), RocAuc()]
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)
aggregated_metric_per_time_bucket = me.score()
stats = me.stats()
print(aggregated_metric_per_time_bucket.to_string())
print(stats)
timestamp samples log_loss Asymmetric Error roc_auc_score
0 2023-09-14 13:29:48.065000+00:00 499 0.539315 0.365571 0.787030
1 2023-09-14 14:01:51.484000+00:00 499 0.539397 0.365636 0.786837
total rows: 998, score calls: 6, reduce calls: 6
For your own data, you can provide the names of the columns to evaluate:
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=5,
rows_per_time_bucket=1,
prediction_value=1,
time_bucket=TimeBucket.DAY,
prediction_col="my_pred_col",
actuals_col="my_actuals_col",
timestamp_col="my_timestamp_col"
)
print(test_df)
my_timestamp_col my_pred_col my_actuals_col
0 01/06/2005 13:00:00.000000 1 0.999
1 02/06/2005 13:00:00.000000 1 0.999
2 03/06/2005 13:00:00.000000 1 0.999
3 04/06/2005 13:00:00.000000 1 0.999
4 05/06/2005 13:00:00.000000 1 0.999
source = DataFrameSource(
df=test_df,
max_rows=10000,
timestamp_col="timestamp",
)
metric = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
source=source,
time_bucket=TimeBucket.DAY,
prediction_col="my_pred_col",
actuals_col="my_actuals_col",
timestamp_col="my_timestamp_col"
)
aggregated_metric_per_time_bucket = me.score()
If some data is missing, use filtering flags. For example, the following example is for data with missing actuals.
In this scenario without a flag, an exception is raised:
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=10,
rows_per_time_bucket=5,
prediction_value=1,
time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[2] = None
test_df["actuals"].loc[5] = None
print(test_df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 0.999
1 01/06/2005 13:00:00.000000 1 0.999
2 01/06/2005 13:00:00.000000 1 NaN
3 01/06/2005 13:00:00.000000 1 0.999
4 01/06/2005 13:00:00.000000 1 0.999
5 01/06/2005 14:00:00.000000 1 NaN
6 01/06/2005 14:00:00.000000 1 0.999
7 01/06/2005 14:00:00.000000 1 0.999
8 01/06/2005 14:00:00.000000 1 0.999
9 01/06/2005 14:00:00.000000 1 0.999
source = DataFrameSource(df=test_df)
metric = MedianAbsoluteError()
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)
aggregated_metric_per_time_bucket = me.score()
"ValueError: Could not apply metric median_absolute_error, make sure you are passing the right data (see the sklearn docs).
The error message was: Input contains NaN."
For the same dataset, compare the previous result with the result when you enable the filter_actuals
flag:
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)
aggregated_metric_per_time_bucket = me.score()
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples median_absolute_error
0 01/06/2005 13:00:00.000000 4 0.001
1 01/06/2005 14:00:00.000000 4 0.001
Using the filter_actuals
, filter_predictions
, filter_scoring_data
flags, you can filter out missing values from
the data before calculating the metric. By default, these flags are set to False
.
If all data needed to calculate the metric is missing in the data chunk, we skip this data chunk with the appropriate log:
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=4,
rows_per_time_bucket=2,
prediction_value=1,
time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[0] = None
test_df["actuals"].loc[1] = None
print(test_df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 NaN
1 01/06/2005 13:00:00.000000 1 NaN
2 01/06/2005 14:00:00.000000 1 0.999
3 01/06/2005 14:00:00.000000 1 0.999
source = DataFrameSource(df=test_df)
metric = MedianAbsoluteError()
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)
aggregated_metric_per_time_bucket = me.score()
"removed 2 rows out of 2 in the data chunk before scoring, due to missing values in ['actuals'] data"
"data chunk is empty, skipping scoring..."
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples median_absolute_error
1 01/06/2005 14:00:00.000000 2 0.001
Perform segmented analysis by defining the segment_attribute
and each segment_value
:
metrics = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
source=source,
time_bucket=TimeBucket.HOUR,
segment_attribute="insulin",
segment_value="Down",
)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss [Down]
0 2023-09-14 13:29:49.737000+00:00 49 0.594483
1 2023-09-14 14:01:52.437000+00:00 49 0.594483
# passing more than one segment value
me = MetricEvaluator(metric=metric,
source=source,
time_bucket=TimeBucket.HOUR,
segment_attribute="insulin",
segment_value=["Down", "Steady"],
)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss [Down] log_loss [Steady]
0 2023-09-14 13:29:48.502000+00:00 199 0.594483 0.515811
1 2023-09-14 14:01:51.758000+00:00 199 0.594483 0.515811
# passing more than one segment value and more than one metric
me = MetricEvaluator(metric=[LogLossFromSklearn(), RocAuc()],
source=source,
time_bucket=TimeBucket.HOUR,
segment_attribute="insulin",
segment_value=["Down", "Steady"],
)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss [Down] log_loss [Steady] roc_auc_score [Down] roc_auc_score [Steady]
0 2023-09-14 13:29:48.502000+00:00 199 0.594483 0.515811 0.783333 0.826632
1 2023-09-14 14:01:51.758000+00:00 199 0.594483 0.515811 0.783333 0.826632
The BatchMetricEvaluator class uses aggregation per batch instead of aggregation over time.
For batches, you don't define TimeBucket
:
from dmm.batch_metric_evaluator import BatchMetricEvaluator
from dmm.data_source.datarobot_source import BatchDataRobotSource
from dmm.metric import MissingValuesFraction
source = BatchDataRobotSource(
deployment_id=DEPLOYMENT_ID,
batch_ids=BATCH_IDS,
model_id=MODEL_ID,
)
feature_name = 'RAD'
metric = MissingValuesFraction(feature_name=feature_name)
missing_values_fraction_evaluator = BatchMetricEvaluator(metric=metric, source=source)
aggregated_metric_per_batch = missing_values_fraction_evaluator.score()
print(aggregated_metric_per_batch.to_string())
batch_id samples Missing Values Fraction
0 <batch_id> 506 0.0
1 <batch_id> 506 0.0
2 <batch_id> 506 0.0
Note: For batches, actuals and multiple segments are not supported.
The IndividualMetricEvaluator class is used to evaluate metrics without data aggregation. Perform metric calculations on all exported data, return a list of individual results. This evaluator allows submitting individual data points with a corresponding association id. This is useful for the cases when you want to visualize your metric results alongside predictions and actuals. To use this evaluator with custom metric, it is necessary to provide score method that contains, among others, the following parameters: 'timestamps' and 'association_ids'.
from itertools import zip_longest
from typing import List
from datetime import datetime
from datetime import timedelta
from dmm import CustomMetric
from dmm import DataRobotSource
from dmm import SingleMetricResult
from dmm.individual_metric_evaluator import IndividualMetricEvaluator
from dmm.metric import LLMMetricBase
from nltk import sent_tokenize
import numpy as np
import pandas as pd
source = DataRobotSource(
deployment_id=DEPLOYMENT_ID,
start=datetime.utcnow() - timedelta(weeks=1),
end=datetime.utcnow(),
)
custom_metric = CustomMetric.from_id()
class SentenceCount(LLMMetricBase):
"""
Calculates the total number of sentences created while working with the LLM model.
Returns the sum of the number of sentences from prompts and completions.
"""
def __init__(self):
super().__init__(
name=custom_metric.name,
description="Calculates the total number of sentences created while working with the LLM model.",
need_training_data=False,
)
self.prompt_column = "promptColumn"
def score(
self,
scoring_data: pd.DataFrame,
predictions: np.ndarray,
timestamps: np.ndarray,
association_ids: np.ndarray,
**kwargs,
) -> List[SingleMetricResult]:
if self.prompt_column not in scoring_data.columns:
raise ValueError(
f"Prompt column {self.prompt_column} not found in the exported data, "
f"modify 'PROMPT_COLUMN' runtime parameter"
)
prompts = scoring_data[self.prompt_column].to_numpy()
sentence_count = []
for prompt, completion, ts, a_id in zip_longest(
prompts, predictions, timestamps, association_ids
):
if not isinstance(prompt, str) or not isinstance(completion, str):
continue
value = len(sent_tokenize(prompt)) + len(sent_tokenize(completion))
sentence_count.append(
SingleMetricResult(value=value, timestamp=ts, association_id=a_id)
)
return sentence_count
sentence_count_evaluator = IndividualMetricEvaluator(
metric=SentenceCount(),
source=source,
)
metric_results = sentence_count_evaluator.score()
The DR Custom Metrics module allows better synchronization with existing metrics on the DR side. The logic of this module is based on unique names for custom metrics, so you can operate on metrics without knowing their IDs. Thanks to this solution, we can define the metric earlier (e.g. before creating the deployment) and synchronize it with DR at the appropriate time.
This class DRCustomMetric
allows you to create new or fetch existing metrics from DR.
the logic is as follows:
DRCustomMetric.sync()
method retrieves information about existing custom metrics on the DR side, if a metric is
defined locally but is not on the DR side, it will be created on the DR side.DRCustomMetric.report()
method allows you to report a single value based on a unique name.Example:
dr_cm = DRCustomMetric(
dr_client=client, deployment_id=deployment_id, model_package_id=model_package_id
)
metric_config_yaml = f"""
customMetrics:
- name: new metric
description: foo bar
type: average
timeStep: hour
units: count
directionality: lowerIsBetter
isModelSpecific: yes
baselineValue: 0
"""
dr_cm.set_config(config_yaml=metric_config_yaml)
dr_cm.sync()
dr_cm.get_dr_custom_metrics()
> [{"name": "existing metric", "id": "65ef19410239ff8015f05a94", ...},
> {"name": "new metric", "id": "65ef197ce5d7b2176ceecf3a", ...}]
dr_cm.report_value("existing metric", 1)
dr_cm.report_value("new metric", 9)
FAQs
datarobot-model-metrics provides a framework to compute model ML metrics over time and produce aggregated metrics.
We found that datarobot-model-metrics demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.