🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Book a DemoInstallSign in
Socket

pycluon

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pycluon

A python wrapper around libcluon

0.2.3
PyPI
Maintainers
1

pycluon

A python wrapper around libcluon. pycluon aims to follow the API of libcluon as closely as possible to avoid surprises.

So far, pycluon wraps the following concepts from libcluon:

  • Envelope
  • OD4Session
  • UDPSender
  • UDPReceiver
  • TCPConnection
  • TCPServer
  • SharedMemory

It also bundles the following command-line applications:

  • protoc
  • cluon-msc
  • cluon-OD4toStdout
  • cluon-OD4toJSON
  • cluon-LCMtoJSON
  • cluon-filter
  • cluon-livefeed
  • cluon-rec2csv
  • cluon-replay

Versioning

pycluon versionlibcluon versionpython versions
0.1.00.0.1403.7, 3.8, 3.9, 3.10
0.1.10.0.1403.7, 3.8, 3.9, 3.10
0.1.20.0.1403.7, 3.8, 3.9, 3.10
0.1.30.0.1403.7, 3.8, 3.9, 3.10
0.2.00.0.1403.7, 3.8, 3.9, 3.10
0.2.10.0.1403.7, 3.8, 3.9, 3.10, 3.11
0.2.20.0.1453.7, 3.8, 3.9, 3.10, 3.11

Installation

pycluon is available on PyPI:

pip install pycluon

Examples

Import an odvd specification into a python module

from pycluon.importer import import_odvd

my_module = import_odvd("/path/to/my/odvd/specification.odvd")

Send an envelope

import time
from pycluon import OD4Session, Envelope

session = OD4Session(111)

message = my_module.MyMessage()

envelope = Envelope()
envelope.sent = envelope.sampled = time.time()
envelope.serialized_data = message.SerializeToString()
envelope.data_type = 13
envelope.sender_stamp = 13

session.send(envelope)

Receive an envelope

import time
from pycluon import OD4Session

session = OD4Session(111)

def callback(envelope):
    message = my_module.MyMessage()
    message.ParseFromString(envelope.serialized_data)
    print(f"Received at {envelope.received} seconds since epoch")

session.add_data_trigger(13, callback)

while session.is_running():
    time.sleep(0.01)

Write to a shared memory area

from datetime import datetime
from pycluon import SharedMemory

sm = SharedMemory("frame.argb", 640*480)

sm.lock()
sm.timestamp = datetime.now()
sm.data = b"<bytes>"
sm.unlock()
sm.notify_all()

Read from an existing shared memory area

from pycluon import SharedMemory

sm = SharedMemory("frame.argb")

sm.wait() # Wait for notification from writing process
sm.lock()
print(sm.timestamp)
print(sm.data)
sm.unlock()

See the tests for usage of UDPSender, UDPReceiver, TCPConnection and TCPServer.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts