DataMux
DataMux is a library to stream data into real-time analytics pipelines.
It provides the modes listed below.
- Proxy Mode: to interface and proxy live data from sensors
- Replay Mode: to replay stored data from datasets
- Simulate Mode: to stream guided/unguided mock data for testing
Installation
First, install datamux as a pip package.
pip install streaminghub-datamux
Initialization
Next, configure where datamux should look for data and metadata. We use $HOME/streaminghub
by default. This configuration will be stored at $HOME/.streaminghubrc
.
python -m streaminghub_datamux init --data_dir="$HOME/streaminghub" --meta_dir="$HOME/streaminghub"
Usage
Import and Setup
In your Python script, first import datamux as follows.
import streaminghub_datamux as dm
Next, instantiate the Datamux API. Here, you have two options:
api = dm.API()
api = dm.RemoteAPI(rpc_name="<rpc>", codec_name="<codec>")
await api.connect(server_host="<host>", server_port=<port>)
Replay Recordings from Collections
collections = await api.list_collections()
streams = await api.list_collection_streams(collection_id="<id>")
attrs = dict({"subject": "A", "session": "1", "task": "1"})
sink = asyncio.Queue()
ack = await api.replay_collection_stream(collection_id="<id>", stream_id="<id>", attrs, sink)
assert ack.randseq is not None
while True:
item = await sink.get()
if item == util.END_OF_STREAM:
break
await api.stop_task(ack.randseq)
Proxy Live Streams from Devices
nodes = await api.list_live_nodes()
streams = await api.list_live_streams(node_id="<id>")
attrs = dict({"subject": "A", "session": "1", "task": "1"})
sink = asyncio.Queue()
ack = await api.proxy_live_stream(node_id="<id>", stream_id="<id>", attrs, sink)
assert ack.randseq is not None
while True:
item = await sink.get()
if item == util.END_OF_STREAM:
break
await api.stop_task(ack.randseq)
Start a Remote API
You can start a remote API using the command below.
python -m streaminghub_datamux serve -H "<host_name>" -p <port> -r <rpc_name> -c <codec_name>
For Developers
git clone https://github.com/nirdslab/streaminghub.git
cd streaminghub/
python -m pip install -e streaminghub_datamux/