Table of Contents
1. Introduction
This repository is a library for creating th2-data-services applications.
The library used to analyze stream data using aggregate operations mainly from
the "Report Data Provider". Data Services allows you to manipulate
the stream data processing workflow using pipelining.
The library allows you:
- Natively connect to "Report Data Provider" via
ProviderDataSource
class and extract TH2 Events/Messages via commands - Work with iterable objects (list, tuple, etc including files) via Data object using its features
- Manipulate the workflow to make some analysis by Data object methods
- Build Event Trees (
EventsTreeCollection
class)
Workflow manipulation tools allows you:
- Filtering stream data (
Data.filter
method) - Transforming stream data (
Data.map
method) - Limiting the number of processed streaming data (
Data.limit
method)
There is also another part of data services
2. Getting started
2.1. Installation
Core
-
From PyPI (pip)
This package can be found on PyPI.
pip install th2-data-services
-
From Source
git clone https://github.com/th2-net/th2-data-services
pip install th2-data-services/
Data sources (providers)
Since v1.3.0
, the library doesn't provide data source dependencies.
You should provide it manually during installation.
You just need to add square brackets after library name and put dependency name.
pip install th2-data-services[dependency_name]
Dependencies list
dependency name | provider version |
---|
RDP5 | 5 |
RDP6 | 6 |
Example
pip install th2-data-services[rdp5]
GRPC provider warning
This library has ability to interact with several versions of grpc providers, but it's limited by installed version of
th2_grpc_data_provider
package version. You can use only appropriate version of provider api, which is compatible with
installed version of th2_grpc_data_provider
.
By default, th2_data_services
uses the latest available version of provider api version.
Reasons for the restriction
- Two different versions of
th2_grpc_data_provider
can't be installed in the same virtual environment; - Two different versions of package
th2_grpc_data_provider
may depend on different versions of packages th2_grpc_common
; - In the case of using another package in the process of using
th2_data_services
(for example th2_common
),
which also depends on th2_grpc_common
, a version conflict may occur (both at the Python level and at the Protobuf level).
2.2. Example
A good, short example is worth a thousand words.
This example works with Events, but you also can do the same actions with Messages.
The following example as a file.
from collections import Generator
from typing import Tuple, List, Optional
from datetime import datetime
from th2_data_services import Data
from th2_data_services.events_tree import EventsTree
from th2_data_services.provider.v5.data_source.http import HTTPProvider5DataSource
from th2_data_services.provider.v5.commands import http as commands
from th2_data_services.provider.v5.events_tree import EventsTreeCollectionProvider5, ParentEventsTreeCollectionProvider5
from th2_data_services.provider.v5.filters.event_filters import NameFilter, TypeFilter, FailedStatusFilter
from th2_data_services.provider.v5.filters.message_filters import BodyFilter
import th2_data_services
th2_data_services.INTERACTIVE_MODE = True
DEMO_HOST = "10.100.66.66"
DEMO_PORT = "30999"
data_source = HTTPProvider5DataSource(f"http://{DEMO_HOST}:{DEMO_PORT}")
START_TIME = datetime(
year=2021, month=6, day=17, hour=9, minute=44, second=41, microsecond=692724
)
END_TIME = datetime(year=2021, month=6, day=17, hour=12, minute=45, second=50)
events: Data = data_source.command(
commands.GetEvents(
start_timestamp=START_TIME,
end_timestamp=END_TIME,
attached_messages=True,
filters=[
TypeFilter("Send message"),
NameFilter(["ExecutionReport", "NewOrderSingle"]),
FailedStatusFilter(),
],
)
)
messages: Data = data_source.command(
commands.GetMessages(
start_timestamp=START_TIME,
end_timestamp=END_TIME,
attached_events=True,
stream=["demo-conn2"],
filters=BodyFilter("195"),
)
)
filtered_events: Data = events.filter(lambda e: e["body"] != [])
def transform_function(record):
return {"eventName": record["eventName"], "successful": record["successful"]}
filtered_and_mapped_events = filtered_events.map(transform_function)
filtered_and_mapped_events_by_pipeline = events.filter(lambda e: e["body"] != []).map(transform_function)
assert list(filtered_and_mapped_events) == list(filtered_and_mapped_events_by_pipeline)
events_from_11_to_end: Generator = events.sift(skip=10)
only_first_10_events: Generator = events.sift(limit=10)
events.use_cache(True)
events.use_cache()
for event in events:
print(event)
number_of_events = events.len
assert events.is_empty is False
events_list = list(events)
desired_event = "9ce8a2ff-d600-4366-9aba-2082cfc69901:ef1d722e-cf5e-11eb-bcd0-ced60009573f"
desired_events = [
"deea079b-4235-4421-abf6-6a3ac1d04c76:ef1d3a20-cf5e-11eb-bcd0-ced60009573f",
"a34e3cb4-c635-4a90-8f42-37dd984209cb:ef1c5cea-cf5e-11eb-bcd0-ced60009573f",
]
desired_message = "demo-conn1:first:1619506157132265837"
desired_messages = [
"demo-conn1:first:1619506157132265836",
"demo-conn1:first:1619506157132265833",
]
data_source.command(commands.GetEventById(desired_event))
data_source.command(commands.GetEventsById(desired_events))
data_source.command(commands.GetMessageById(desired_message))
data_source.command(commands.GetMessagesById(desired_messages))
events_filtered: Data = events.filter(lambda record: record.get("batchId"))
events_filtered.use_cache()
list(events_filtered)
filtered_events_types = events_filtered.map(lambda record: {"eventType": record.get("eventType")})
events_without_types_with_batch = filtered_events_types.filter(lambda record: not record.get("eventType"))
events_without_types_with_batch.use_cache()
d1 = Data([1, 2, 3])
d2 = Data(["a", {"id": 123}, "c"])
d3 = Data([7, 8, 9])
data_via_init = Data([d1, d2, d3])
data_via_add = d1 + d2 + d3
data_with_non_data_obj_via_init = Data([d1, ["a", {"id": 123}, "c"], d3])
data_with_non_data_obj_via_add = d1 + ["a", {"id": 123}, "c"] + d3
events.build_cache("cache_filename_or_path")
data_obj_from_cache = Data.from_cache_file("cache_filename_or_path")
collection = EventsTreeCollectionProvider5(events)
assert collection.detached_events
collection = EventsTreeCollectionProvider5(events, data_source=data_source)
assert not collection.detached_events
leaves: Tuple[dict] = collection.get_leaves()
roots: List[str] = collection.get_roots_ids()
find_event: Optional[dict] = collection.find(lambda event: "Send message" in event["eventType"])
find_events: List[dict] = collection.findall(lambda event: event["successful"] is True)
ancestor: Optional[dict] = collection.find_ancestor(
"8bbe3717-cf59-11eb-a3f7-094f904c3a62", filter=lambda event: "RootEvent" in event["eventName"]
)
children: Tuple[dict] = collection.get_children("814422e1-9c68-11eb-8598-691ebd7f413d")
subtree: EventsTree = collection.get_subtree("8e23774d-cf59-11eb-a6e3-55bfdb2b3f21")
event_path: List[dict] = collection.get_full_path("8e2524fa-cf59-11eb-a3f7-094f904c3a62")
parent = collection.get_parent("8e2524fa-cf59-11eb-a3f7-094f904c3a62")
collection.append_event(
event={
"eventId": "a20f5ef4-c3fe-bb10-a29c-dd3d784909eb",
"parentEventId": "8e2524fa-cf59-11eb-a3f7-094f904c3a62",
"eventName": "StubEvent",
}
)
collection.show()
trees: List[EventsTree] = collection.get_trees()
tree: EventsTree = trees[0]
parentless_trees: List[EventsTree] = collection.get_parentless_trees()
collection = ParentEventsTreeCollectionProvider5(events, data_source=data_source)
collection.show()
2.3. Short theory
The library provides tools for handling stream data. What’s a stream? It's a sequence of elements from a source that
supports aggregate operations.
Terms
- Data object: An instance of
Data
class which is wrapper under stream. - Sequence of elements:
A Data object provides an interface to a sequenced set of values of a specific element type. Stream inside the Data
object don’t actually store elements; they are computed on demand.
- DataSource:
Any source of data. E.g. Report Data Provider, collections,
arrays, or I/O resources.
- ProviderDataSource:
The DataSource object whose source is Report Data Provider.
- SourceAPI:
Each source has its own API to retrieve data. SourceAPI is a class that provide API for some data source.
- Commands:
Objects that provide user-friendly interfaces for getting some data from DataSource. Commands use SourceAPI to
achieve it.
- Adapters:
It's similar to function for
Data.map
method. Adoptable commands used it to update the data stream. - Aggregate operations:
Common operations such as filter, map, limit and so on.
- Workflow: An ordered set of Aggregate operations.
Concept
The library describes the high-level interfaces ISourceAPI
, IDataSource
, ICommand
, IAdapter
.
Any data source must be described by the IDataSource
abstract class. These can be FileDataSource, CSVDataSource, _
DBDataSource_ and other.
Usually, data sources have some kind of API. Databases - provide SQL language, when working with a file, you can read
line by line, etc. This API is described by the ISourceAPI
class. Because different versions of the same data source
may have different API, it is better to create a class for each version.
Generally, data source APIs are hidden behind convenient interfaces. The role of these interfaces is played
by ICommand
classes.
IAdapter
classes transform data stream like functions for Data.map
method. Essentially it's the same thing but more
flexible.
Thus, the native ProviderDataSource
and the set of commands for it are described. This approach provides great
opportunities for extension. You can easily create your own unique commands for ProviderDataSource, as well as entire
DataSource classes.
Stream operations
Furthermore, stream operations have two fundamental characteristics that make them very different from collection
operations: Pipelining and Internal iteration.
Pipelining
Many stream operations return a stream themselves. This allows operations to be chained to form a larger pipeline.
Internal iteration
In contrast to collections, which are iterated explicitly (external iteration), stream operations do the iteration
behind the scenes for you. Note, it doesn't mean you cannot iterate the Data object.
Data caching
The Data object provides the ability to use the cache. The cache works for each Data object, that is, you choose
which Data object you want to save. The Data object cache is saved after the first iteration, but the iteration
source may be different.
If you don't use the cache, your source will be the data source you have in the Data Object. But if you use the cache,
your source can be the data source, the parent cache, or own cache:
- The data source:
If the Data Object doesn't have a parent cache and its cache.
- The parent cache:
If the Data Object has a parent cache. It doesn't matter what position the parent cache has in inheritance.
Data Object understands whose cache it is and executes the part of the workflow that was not executed.
- The own cache:
If it is not the first iteration of this Data object.
Note that the cache state of the Data object is not inherited.
Forced caching
You can tell DS to cache data to specific cache file, which won't be deleted after script end:
import datetime
from th2_data_services import Data
from th2_data_services.provider.v5.commands import http
from th2_data_services.provider.v5.data_source import HTTPProvider5DataSource
data_source = HTTPProvider5DataSource("http://HOST:PORT")
events: Data = data_source.command(
http.GetEvents(
start_timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=5),
end_timestamp=datetime.datetime.utcnow(),
attached_messages=True,
cache=True,
)
)
events.build_cache("my_cache.pickle")
Later you can create Data object from this cache file and use it as usual:
from th2_data_services import Data
events = Data.from_cache_file("my_cache.pickle")
for event_id in events.filter(lambda x: x["eventType"] == "Verification").map(lambda x: x["eventId"]):
print(event_id)
EventsTree and collections
EventsTree
EventsTree is a tree-based data structure of events. It allows you get children and parents of event, display tree, get
full path to event etc.
Details:
- EventsTree contains all events in memory.
- To reduce memory usage an EventsTreeCollection delete the 'body' field from events, but you can preserve it specify '
preserve_body'.
- Tree has some important terms:
- Ancestor is any relative of the event up the tree (grandparent, parent etc.).
- Parent is only the first relative of the event up the tree.
- Child is the first relative of the event down the tree.
Take a look at the following HTML tree to understand them.
<body> <!-- ancestor (grandparent), but not parent -->
<div> <!-- parent & ancestor -->
<p>Hello, world!</p> <!-- child -->
<p>Goodbye!</p> <!-- sibling -->
</div>
</body>
Collections
EventsTreeCollection is a collection of EventsTrees. The collection builds a few EventsTree by passed Data
object. Although you can change the tree directly, it's better to do it through collections because they are aware of
detached_events
and can solve some events dependencies. The collection has similar features like a single EventsTree
but applying them for all EventsTrees.
ParentEventsTreeCollection is a collection similar to EventsTreeCollection but containing only parent events that
are referenced in the data stream. It will be working data in the collection and trees of collection. The collection has
features similar to EventsTreeCollection.
Details:
- The collection has a feature to recover events. All events that are not in the received data stream, but which are
referenced will be loaded from the data source.
- If you haven't passed a DataSource object then the recovery of events will not occur.
- You can take
detached_events
to see which events are missing. It looks like {parent_id: [events are referenced]}
- If you want, you can build parentless trees where the missing events are stubbed instead. Just
use
get_parentless_trees()
.
Requirements:
- Events have to have
event_name
, event_id
, parent_event_id
fields, which are described in the
passed event_struct
object.
Hints
- Remove all unnecessary fields from events before passing to a collection to reduce memory usage.
- Use
show()
method to print the tree in tree-like view. - Note that the
get_x
methods will raise an exception if you pass an unknown event id, unlike the find_x
methods (
they return None). - If you want to know that specified event exists, use the python
in
keyword (e.g. 'event-id' in events_tree
). - Use the python
len
keyword to get events number in the tree.
2.4. Links
3. API
If you are looking for classes description see the API Documentation.
4. Examples
4.1. Notebooks
4.2. *.py