Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pycluon

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pycluon

A python wrapper around libcluon

  • 0.2.3
  • PyPI
  • Socket score

Maintainers
1

pycluon

A python wrapper around libcluon. pycluon aims to follow the API of libcluon as closely as possible to avoid surprises.

So far, pycluon wraps the following concepts from libcluon:

  • Envelope
  • OD4Session
  • UDPSender
  • UDPReceiver
  • TCPConnection
  • TCPServer
  • SharedMemory

It also bundles the following command-line applications:

  • protoc
  • cluon-msc
  • cluon-OD4toStdout
  • cluon-OD4toJSON
  • cluon-LCMtoJSON
  • cluon-filter
  • cluon-livefeed
  • cluon-rec2csv
  • cluon-replay

Versioning

pycluon versionlibcluon versionpython versions
0.1.00.0.1403.7, 3.8, 3.9, 3.10
0.1.10.0.1403.7, 3.8, 3.9, 3.10
0.1.20.0.1403.7, 3.8, 3.9, 3.10
0.1.30.0.1403.7, 3.8, 3.9, 3.10
0.2.00.0.1403.7, 3.8, 3.9, 3.10
0.2.10.0.1403.7, 3.8, 3.9, 3.10, 3.11
0.2.20.0.1453.7, 3.8, 3.9, 3.10, 3.11

Installation

pycluon is available on PyPI:

pip install pycluon

Examples

Import an odvd specification into a python module

from pycluon.importer import import_odvd

my_module = import_odvd("/path/to/my/odvd/specification.odvd")

Send an envelope

import time
from pycluon import OD4Session, Envelope

session = OD4Session(111)

message = my_module.MyMessage()

envelope = Envelope()
envelope.sent = envelope.sampled = time.time()
envelope.serialized_data = message.SerializeToString()
envelope.data_type = 13
envelope.sender_stamp = 13

session.send(envelope)

Receive an envelope

import time
from pycluon import OD4Session

session = OD4Session(111)

def callback(envelope):
    message = my_module.MyMessage()
    message.ParseFromString(envelope.serialized_data)
    print(f"Received at {envelope.received} seconds since epoch")

session.add_data_trigger(13, callback)

while session.is_running():
    time.sleep(0.01)

Write to a shared memory area

from datetime import datetime
from pycluon import SharedMemory

sm = SharedMemory("frame.argb", 640*480)

sm.lock()
sm.timestamp = datetime.now()
sm.data = b"<bytes>"
sm.unlock()
sm.notify_all()

Read from an existing shared memory area

from pycluon import SharedMemory

sm = SharedMemory("frame.argb")

sm.wait() # Wait for notification from writing process
sm.lock()
print(sm.timestamp)
print(sm.data)
sm.unlock()

See the tests for usage of UDPSender, UDPReceiver, TCPConnection and TCPServer.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc