Security News
PyPI’s New Archival Feature Closes a Major Security Gap
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
AML models are meant to be deployed to GPU instances to provide inference service. If the code that operates the model uses the GPU for inferencing in each request separately, the overall performance of the model will be quite inefficent. This SDK has APIs that can allocate batches of inference requests to run on the GPU in a separate thread, thereby considerably improving the usage efficiency of GPU and making the model more performant.
The SDK also collects telemetry data for each so the performance of the model can be evaluated and tracked and provides logging primitives that can be used to produce additional troubleshooting information.
For AML models deployed with CD pipeline, Logs generated by pyraisdk will be found in AuxUnstructuredEventTable
and Metrics in AuxStructuredEventTable
. More details please refer to raidri.
From remote model dashboard, we can see below data from pyraisdk and debug model side metrics. From there you can explore Queries and either play with the matviews more or else look at the definition of the matviews and thus see how those queries are done.
Follow the steps below to integrate pyraisdk.
from pyraisdk import rlog
from pyraisdk.dynbatch import BaseModel, DynamicBatchModel
BaseModel
class, which includes methods preprocess()
and predict()
, to define how data must be preprocessed and how inferences are made.init()
function if deploying with scoring script), initialize logging, instantiate object of class that implements BaseModel
, and define global variable for batched model as shown below.rlog.initialize()
global batched_model
malware_model = MalwareModel()
batched_model = DynamicBatchModel(malware_model)
run()
function if deploying with scoring script), pass list of requests to the batched model's predict()
method and return results. Under-the-hood, pyraisdk creates batches and uses your model's preprocess()
and predict()
methods to generate results.return batched_model.predict(json.loads(request_data)['data'])
EventLogger
methods. It is not necessary to log latency information, as pyraisdk does so automatically.In addition to following the CD pipeline's developer guidelines to onboard your model to RAI's deployment infrastructure, you must configure BatchingConfig
for each deployment target to which your model will be deployed. To do so,
deployment-target-configs/../*.json
Version
, InstanceType
/ SKU, and BatchingConfig
as shown below{
"Name": "MalwareNeural",
"Version": 19,
"InstanceType": "Standard_NC6s_v3",
"BatchingConfig": {
"MaxBatchSize": 12,
"IdleBatchSize": 5,
"MaxBatchInterval": 0.002
}
}
When the CD pipeline deploys the model to an AML online deployment, it exports environment variables for each BatchingConfig
field, which pyraisdk utilizes at process start to configure batching parameters. The CD pipeline also exports environment variables to enable pyraisdk to log to RAI's Azure Data Explorer clusters.
Test or Deploy independetly outside of RAI CD Pipeline, it's required to set environment variables for BatchingConfig
manually before object of DynamicBatchModel
is created. Refer to Batching Parameter.
The recommended way to set these variables is calling os.environ.setdefault
in init()
. Like below:
def init():
os.environ.setdefault('PYRAISDK_MAX_BATCH_SIZE', '12')
os.environ.setdefault('PYRAISDK_IDLE_BATCH_SIZE', '5')
os.environ.setdefault('PYRAISDK_MAX_BATCH_INTERVAL', '0.002')
...
batch_model = DynamicBatchModel(malware_model)
This should work well independent of RAI CD Pipeline. And you don't need to specifically remove these 3 lines when switching to deploy in CD Pipeline.
But please note that this kind of code should be Avoided: os.environ['PYRAISDK_MAX_BATCH_SIZE'] = '12'
. It will override the configuration from CD pipeline, which is not desirable in most cases.
To enable log publishing (to eventhub), there are another several environment variables need to be set, refer to following logging part. It's optional.
There are APIs you must implement in your model to support batching of inference requests for best model performance. Those APIs allow the SDK to distribute load efficiently to the GPU instances. The APIs are:
preprocess
Modifies the input to the model, if necessary. For example, if your model needs the input in a special JSON format instead of as a list
of strings, you can do that modification in the preprocess method.predict
Executes the model inference for a list of input stringsBatching parameters are mandatory and should be ready before DynamicBatchModel is created. They are set through environment variables:
Build YourModel
class inherited from pyraisdk.dynbatch.BaseModel
.
from typing import List
from pyraisdk.dynbatch import BaseModel
class YourModel(BaseModel):
def predict(self, items: List[str]) -> List[int]:
rs = []
for item in items:
rs.append(len(item))
return rs
def preprocess(self, items: List[str]) -> List[str]:
rs = []
for item in items:
rs.append(f'[{item}]')
return rs
Initialize a pyraisdk.dynbatch.DynamicBatchModel
with YourModel
instance, and call predict / predict_one
for inferencing.
from pyraisdk.dynbatch import DynamicBatchModel
# prepare model
simple_model = YourModel()
batch_model = DynamicBatchModel(simple_model)
# predict
items = ['abc', '123456', 'xyzcccffaffaaa']
predictions = batch_model.predict(items)
assert predictions == [5, 8, 16]
# predict_one
item = 'abc'
prediction = batch_model.predict_one(item)
assert prediction == 5
Concurrent requests to predict / predict_one
, in different threads.
from threading import Thread
from pyraisdk.dynbatch import DynamicBatchModel
# prepare model
simple_model = YourModel()
batch_model = DynamicBatchModel(simple_model)
# thread run function
def run(name, num):
for step in range(num):
item = f'{name}-{step}'
prediction = batch_model.predict_one(item)
assert prediction == len(item) + 2
# start concurrent inference
threads = [Thread(target=run, args=(f'{tid}', 100)) for tid in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
This module is for logging and event tracing.
def initialize(
eh_hostname: Optional[str] = None,
client_id: Optional[str] = None,
eh_conn_str: Optional[str] = None,
eh_structured: Optional[str] = None,
eh_unstructured: Optional[str] = None,
role: Optional[str] = None,
instance: Optional[str] = None,
sys_metrics_enable: bool = True,
)
Parameter description for initialize
:
${EVENTHUB_NAMESPACE}.servicebus.windows.net
RemoteModel
${MODEL_NAME}|${ENDPOINT_VERSION}|{hostname}
or ${MODEL_NAME}|${ENDPOINT_VERSION}|{_probably_unique_id()}
def event(self, key: str, code: str, numeric: float, detail: str='', corr_id: str='', elem: int=-1)
def infof(self, format: str, *args: Any)
def infocf(self, corr_id: str, elem: int, format: str, *args: Any)
def warnf(self, format: str, *args: Any)
def warncf(self, corr_id: str, elem: int, format: str, *args: Any)
def errorf(self, format: str, *args: Any)
def errorcf(self, corr_id: str, elem: int, ex: Optional[Exception], format: str, *args: Any)
def fatalf(self, format: str, *args: Any)
def fatalcf(self, corr_id: str, elem: int, ex: Optional[Exception], format: str, *args: Any)
# export EVENTHUB_AUX_UNSTRUCTURED='ehunstruct'
# export EVENTHUB_AUX_STRUCTURED='ehstruct'
# export UAI_CLIENT_ID='xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
# export EVENTHUB_NAMESPACE='raieusdev-eh-namespace'
from pyraisdk import rlog
rlog.initialize()
rlog.infof('this is a info message %s', 123)
rlog.event('LifetimeEvent', 'STOP_GRACEFUL_SIGNAL', 0, 'detail info')
# export EVENTHUB_AUX_UNSTRUCTURED='ehunstruct'
# export EVENTHUB_AUX_STRUCTURED='ehstruct'
# export EVENTHUB_CONN_STRING='<connection string>'
from pyraisdk import rlog
rlog.initialize()
rlog.infocf('corrid', -1, 'this is a info message: %s', 123)
rlog.event('RequestDuration', '200', 0.01, 'this is duration in seconds')
from pyraisdk import rlog
rlog.initialize(eh_structured='ehstruct', eh_unstructured='ehunstruct', eh_conn_str='<eventhub-conn-str>')
rlog.errorcf('corrid', -1, Exception('error msg'), 'error message: %s %s', 1,2)
rlog.event('CpuUsage', '', 0.314, detail='cpu usage', corr_id='corrid', elem=-1)
This module is for emitting metrics using OTEL
def initialize(
service_name: Optional[str] = None,
service_namespace: Optional[str] = None,
service_version: Optional[str]=None,
service_instance_id: Optional[str]=None,
host_name: Optional[str]=None
)
def shutdown()
from pyraisdk import otel
from opentelemetry.metrics import get_meter
# Initailize the sdk
otel.initialize()
# Note that meter name is not stored in Geneva. So the name can be anything.
meter = get_meter("rai-models")
session_counter = meter.create_counter("rai.model.session")
session_counter.add(1, {
"model": "tms",
"version": "54",
})
# call shutdown before process exit to allow exporting any pending metrics to the collector
# Not needed for SIGTERM and SIGINT signals as the sdk internally consumes these signals and shutsdown automatically
otel.shutdown()
FAQs
Unknown package
We found that pyraisdk demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
Research
Security News
Malicious npm package postcss-optimizer delivers BeaverTail malware, targeting developer systems; similarities to past campaigns suggest a North Korean connection.
Security News
CISA's KEV data is now on GitHub, offering easier access, API integration, commit history tracking, and automated updates for security teams and researchers.