clp-ffi-py

This module provides Python packages to interface with CLP Core Features
through CLP's FFI (foreign function interface). At present, this library
supplies built-in functions for serializing/deserializing log messages using CLP.
Quick Start
Install with pip
:
python3 -m pip install --upgrade clp-ffi-py
Note:
- Python 3.7 or higher is required.
- Tested on Linux, macOS and Windows.
To install an older version or download the prebuilt whl
package, check the
project homepage on PyPI here.
Compatibility
Tested on Python 3.7, 3.8, 3.11, 3.12, and 3.13, and it should work on any Python
version >= 3.7.
API Reference
The API reference for this library can be found on our docs hub.
Building/Packaging
To manually build a package for distribution, follow the steps below.
Requirements
- A C++ compiler that supports C++20 and
std::span
, e.g:
clang++
>= 7
g++
>= 10
MSVC
>= 1930 (included in Visual Studio 2022)
- python3
- python3-dev
- python3-venv
- Task >= 3.38.0
Set up
Build commands
Using Key-Value Pair IR Streams
The CLP key-value pair IR stream, introduced in version 0.0.14, is a new IR stream format that
enables efficient serialization of key-value pair (kv-pair) log events.
We categorize the kv-pairs of a log event into two categories:
- Auto-generated kv-pairs: KV-pairs (e.g., timestamps, log levels, other metadata) that are
automatically generated by the logging library.
- User-generated kv-pairs: Custom kv-pairs (e.g., log messages).
Requirements
The serialization interface requires that kv-pairs are passed as MessagePack-encoded
Map objects, where keys and values are restricted to the following MessagePack types described
below.
Supported key types
Keys must be UTF-8-encoded strings.
Supported value types
Values must be one of the following MessagePack-types:
- Primitives:
- Integer
- Float
- String
- Boolean
- Null
- Maps with keys and values that have the same supported types described here.
- Arrays containing a sequence of supported primitives, arrays, or maps.
Unsupported value types
MessagePack's Binary
and Extension
types are not supported.
Example Code: Using Serializer
to serialize key-value pair log events into an IR stream
from clp_ffi_py.ir import Serializer
from clp_ffi_py.utils import serialize_dict_to_msgpack
with open("example.clp", "wb") as ir_stream, Serializer(ir_stream) as serializer:
serializer.serialize_log_event_from_msgpack_map(
auto_gen_msgpack_map=serialize_dict_to_msgpack({"level": "INFO"}),
user_gen_msgpack_map=serialize_dict_to_msgpack({"message": "Service started."}),
)
serializer.serialize_log_event_from_msgpack_map(
auto_gen_msgpack_map=serialize_dict_to_msgpack({"level": "WARN"}),
user_gen_msgpack_map=serialize_dict_to_msgpack({"uid": 12345, "ip": "127.0.0.1"}),
)
clp_ffi_py.utils.serialize_dict_to_msgpack
can be used to serialize a Python dictionary object
into a MessagePack object.
Example Code: Using Deserializer
to read KeyValuePairLogEvent
s from an IR stream
from clp_ffi_py.ir import Deserializer, KeyValuePairLogEvent
from typing import Optional
with open("example.clp", "rb") as ir_stream:
deserializer = Deserializer(ir_stream)
while True:
log_event: Optional[KeyValuePairLogEvent] = deserializer.deserialize_log_event()
if log_event is None:
break
auto_gen_kv_pairs, user_gen_kv_pairs = log_event.to_dict()
print(auto_gen_kv_pairs)
print(user_gen_kv_pairs)
Deserializer.deserialize_log_event
can be used to read from the IR stream and output
KeyValuePairLogEvent
objects.
KeyValuePairLogEvent.to_dict
can be used to convert the underlying deserialized results into
Python dictionaries.
[!IMPORTANT]
The current Deserializer
does not support reading the previous IR stream format. Backward
compatibility will be added in future releases.
CLP IR Readers
CLP IR Readers provide a convenient interface for CLP IR deserialization and search
methods.
[!IMPORTANT]
The readers below do not support reading or searching CLP key-value pair IR streams.
ClpIrStreamReader
- Read+deserialize any arbitrary CLP IR stream (as an instance of
IO[bytes]
).
- Can be used as an iterator that returns each log event as a
LogEvent
object.
- Can search target log events by giving a search query:
- Searching log events within a certain time range.
- Searching log messages that match certain wildcard queries.
ClpIrFileReader
- Simple wrapper around CLPIRStreamHandler that calls
open
with a given local
path.
Example Code: Using ClpIrFileReader to iterate and print log events
from pathlib import Path
from clp_ffi_py.ir import ClpIrFileReader
with ClpIrFileReader(Path("example.clp.zst")) as clp_reader:
for log_event in clp_reader:
print(log_event.get_formatted_message())
Each log event is represented by a LogEvent
object, which offers methods to
retrieve its underlying details, such as the timestamp and the log message. For
more information, use the following code to see all the available methods and
the associated docstring.
from clp_ffi_py.ir import LogEvent
help(LogEvent)
Example Code: Using Query to search log events by specifying a certain time range
from typing import List
from clp_ffi_py.ir import ClpIrStreamReader, LogEvent, Query, QueryBuilder
query_builder: QueryBuilder = QueryBuilder()
time_range_query: Query = (
query_builder
.set_search_time_lower_bound(1480366800000)
.set_search_time_upper_bound(1480388400000)
.build()
)
log_events: List[LogEvent] = []
with open("example.clp.zst", "rb") as compressed_log_file:
with ClpIrStreamReader(compressed_log_file) as clp_reader:
for log_event in clp_reader.search(time_range_query):
log_events.append(log_event)
Example Code: Using Query to search log messages of certain pattern(s) specified by wildcard queries.
from pathlib import Path
from typing import List, Tuple
from clp_ffi_py.ir import ClpIrFileReader, Query, QueryBuilder
from clp_ffi_py.wildcard_query import FullStringWildcardQuery, SubstringWildcardQuery
query_builder: QueryBuilder = QueryBuilder()
query_builder.add_wildcard_query(SubstringWildcardQuery("uid=*,status=failed"))
query_builder.add_wildcard_query(
FullStringWildcardQuery("*UID=*,Status=KILLED*", case_sensitive=True)
)
wildcard_search_query: Query = query_builder.build()
matched_log_messages: List[Tuple[int, str]] = []
with ClpIrFileReader(Path("example.clp.zst")) as clp_reader:
for log_event in clp_reader.search(wildcard_search_query):
matched_log_messages.append((log_event.get_timestamp(), log_event.get_log_message()))
A Query
object may have both the search time range and the wildcard queries
(WildcardQuery
) specified to support more complex search scenarios.
QueryBuilder
can be used to conveniently construct Query objects. For more
details, use the following code to access the related docstring.
from clp_ffi_py.ir import Query, QueryBuilder
from clp_ffi_py import FullStringWildcardQuery, SubstringWildcardQuery, WildcardQuery
help(Query)
help(QueryBuilder)
help(WildcardQuery)
help(FullStringWildcardQuery)
help(SubstringWildcardQuery)
Streaming Deserialize/Search Directly from S3 Remote Storage
When working with CLP IR files stored on S3-compatible storage systems,
smart_open can be used to open and read the IR stream for the following
benefits:
- It only performs stream operation and does not download the file to the disk.
- It only invokes a single
GET
request so that the API access cost is
minimized.
Here is an example:
from pathlib import Path
from clp_ffi_py.ir import ClpIrStreamReader
import boto3
import os
import smart_open
session = boto3.Session(
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
)
url = 's3://clp-example-s3-bucket/example.clp.zst'
with smart_open.open(
url, mode="rb", compression="disable", transport_params={"client": session.client("s3")}
) as istream:
with ClpIrStreamReader(istream, allow_incomplete_stream=True) as clp_reader:
for log_event in clp_reader:
print(log_event.get_formatted_message())
Note:
- Setting
compression="disable"
is necessary so that smart_open
doesn't
undo the IR file's Zstandard compression (based on the file's extension) before
streaming it to ClpIrStreamReader
; ClpIrStreamReader
expects the input
stream to be Zstandard-compressed.
- When
allow_incomplete_stream
is set to False (default), the reader will raise
clp_ffi_py.ir.IncompleteStreamError
if the stream is incomplete (it doesn't end
with the byte sequence indicating the stream's end). In practice, this can occur
if you're reading a stream that is still being written or wasn't properly
closed.
Parallel Processing
The Query
and LogEvent
classes can be serialized by pickle. Therefore,
deserializing and searching can be parallelized across streams/files using libraries
such as multiprocessing and tqlm.
Testing
python -m venv venv && . ./venv/bin/activate
pip install -r requirements-dev.txt
git submodule update --init --recursive
pip install -e .
python -m unittest -bv
Note: If the package is installed from a whl
file into the site packages,
rather than installed locally (pip install -e .
), the tester cannot be
launched from the project's root directory. If unittest
is ran from the root
directory, the local clp_ffi_py
directory will shadow the clp_ffi_py
module
installed. To run the tester with the installed package, try the following:
cd tests
python -m unittest -bv
Build and Test with cibuildwheel
This project utilizes cibuildwheel configuration. Whenever modifications
are made and committed to GitHub, the cibuildwheel Action will automatically
initiate, building this library for several Python environments across diverse
OS and architectures. You can access the build outcomes (wheel files) via the
GitHub Action page. For instructions on customizing the build targets or running
cibuildwheel locally, please refer to the official documentation of
cibuildwheel.
Adding files
Certain file types need to be added to our linting rules manually:
- CMake. If adding a CMake file, add it (or its parent directory) as an argument to the
gersemi
command in lint-tasks.yaml.
- If adding a directory, the file must be named
CMakeLists.txt
or use the .cmake
extension.
- YAML. If adding a YAML file (regardless of its extension), add it as an argument to the
yamllint
command in lint-tasks.yaml.
Linting
Before submitting a pull request, ensure you’ve run the linting commands below and either fixed any
violations or suppressed the warning.
To run all linting checks:
task lint:check
To run all linting checks AND automatically fix any fixable issues:
task lint:fix
Running specific linters
The commands above run all linting checks, but for performance you may want to run a subset (e.g.,
if you only changed C++ files, you don't need to run the YAML linting checks) using one of the tasks
in the table below.
lint:cmake-check | Runs the CMake linters. |
lint:cmake-fix | Runs the CMake linters and fixes any violations. |
lint:cpp-check | Runs the C++ linters (formatters and static analyzers). |
lint:cpp-fix | Runs the C++ linters and fixes some violations. |
lint:cpp-format-check | Runs the C++ formatters. |
lint:cpp-format-fix | Runs the C++ formatters and fixes some violations. |
lint:cpp-static-check | Runs the C++ static analyzers. |
lint:py-check | Runs the Python linters. |
lint:py-fix | Runs the Python linters and fixes some violations. |
lint:yml-check | Runs the YAML linters. |
lint:yml-fix | Runs the YAML linters and fixes some violations. |