Developed by Yeison Nolberto Cardona Álvarez
Andrés Marino Álvarez Meza, PhD.
César Germán Castellanos Dominguez, PhD.
Digital Signal Processing and Control Group | Grupo de Control y Procesamiento Digital de Señales (GCPDS)
Universidad Nacional de Colombia sede Manizales
OpenBCI-Stream
High level Python module for EEG/EMG/ECG acquisition and distributed streaming for OpenBCI Cyton board.

Comprise a set of scripts that deals with the configuration and connection with the board, also is compatible with both connection modes supported by Cyton: RFduino (Serial dongle) and Wi-Fi (with the OpenBCI Wi-Fi Shield). These drivers are a stand-alone library that can handle the board from three different endpoints: (i) a Command-Line Interface (CLI) with simple instructions configure, start and stop data acquisition, debug stream status, and register events markers; (ii) a Python Module with high-level instructions and asynchronous acquisition; (iii) an object-proxying using Remote Python Call (RPyC) for distributed implementations that can manipulate the Python modules as if they were local, this last mode needs a daemon running in the remote host that will listen to connections and driving instructions.
The main functionality of the drivers live on to serve real-time and distributed access to data flow, even on single machine implementations, this is achieved by implementing Kafka and their capabilities to create multiple topics for classifying the streaming, these topics are used to separate the neurophysiological data from the event markers, so the clients can subscribe to a specific topic for injecting or read content, this means that is possible to implement an event register in a separate process that stream markers for all clients in real-time without handle dense time-series data. A crucial issue that stays on time synchronization, all systems components in the network should have the same real-time protocol (RTP) server reference.
Main features
- Asynchronous acquisition: Acquisition and deserialization are done in uninterrupted parallel processes. In this way, the sampling rate keeps stable as long as possible.
- Distributed streaming system: The acquisition, processing, visualizations, and any other system that needs to be fed with EEG/EMG/ECG real-time data can run with their architecture.
- Remote board handle: Same code syntax for developing and debug Cython boards connected to any node in the distributed system.
- Command-line interface: A simple interface for handle the start, stop, and access to data stream directly from the command line.
- Markers/Events handler: Besides the marker boardmode available in Cyton, a stream channel for the reading and writing of markers is available for use in any development.
- Multiple boards: Is possible to use multiple OpenBCI boards just by adding multiple endpoints to the commands.
Examples
from openbci_stream.acquisition import Cyton
openbci = Cyton('serial', endpoint='/dev/ttyUSB0', capture_stream=True)
openbci.stream(15)
from openbci_stream.acquisition import Cyton
openbci = Cyton('wifi', endpoint='192.68.1.113', capture_stream=True)
openbci.stream(15)
openbci.start_stream()
time.sleep(15)
openbci.stop_stream()
from openbci_stream.acquisition import Cyton
openbci = Cyton('serial', endpoint='/dev/ttyUSB0', host='192.168.1.1', capture_stream=True)
openbci.stream(15)
from openbci_stream.acquisition import OpenBCIConsumer
with OpenBCIConsumer() as stream:
for i, message in enumerate(stream):
if message.topic == 'eeg':
print(f"received {message.value['samples']} samples")
if i == 9:
break
from openbci_stream.acquisition import OpenBCIConsumer
with OpenBCIConsumer(mode='serial', endpoint='/dev/ttyUSB0', streaming_package_size=250) as (stream, openbci):
t0 = time.time()
for i, message in enumerate(stream):
if message.topic == 'eeg':
print(f"{i}: received {message.value['samples']} samples")
t0 = time.time()
if i == 9:
break
from openbci_stream.acquisition import Cyton
openbci = Cyton('wifi', endpoint=['192.68.1.113', '192.68.1.185'], capture_stream=True)
openbci.stream(15)
openbci.start_stream()
time.sleep(15)
openbci.stop_stream()