You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

netaio

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

netaio

Simple asyncio TCP client and server and UDP node library inspired by fastapi

0.0.8
pipPyPI
Maintainers
1

netaio

This is designed to be a simple and easy to use asyncio-based TCP client and server and UDP node library inspired by fastapi but for non-HTTP use cases.

Status

This is currently a work-in-progress. Remaining work before the v0.1.0 release:

  • Authorization plugin system
  • Cipher plugin system
  • Optional authorization plugin using HMAC
  • Optional cipher plugin using simple symmetric stream cipher
  • UDP node with multicast
  • Automatic peer advertisement/discovery/management for UDP node
  • Error/errored message handling system
  • Optional authorization plugin using tapescript
  • Optional cipher plugin using Curve25519 asymmetric encryption
  • Optional authorization plugin using Hashcash/PoW for anti-spam DoS protection
  • Ephemeral handlers (i.e. handlers that are removed after first use)
  • IPv6 support
  • Core daemon to proxy traffic for local apps
  • E2e encrypted chat app example

Issues are tracked here.

Usage

Install with pip install netaio. To use the optional asymmetric cryptography plugins, install with pip install netaio[asymmetric].

Brief examples are shown below. For more documentation, see the dox.md file generated by autodox.

TCPServer

from netaio import TCPServer, Body, Message, MessageType, HMACAuthPlugin
import asyncio


server = TCPServer(port=8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))

@server.on((MessageType.REQUEST_URI, b'something'))
async def something(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'This is it.', uri=b'something')
    return Message.prepare(body, MessageType.RESPOND_URI)

@server.on(MessageType.SUBSCRIBE_URI)
async def subscribe(msg: Message, writer: asyncio.StreamWriter):
    server.subscribe(msg.body.uri, writer)
    return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_SUBSCRIBE)

@server.on(MessageType.UNSUBSCRIBE_URI)
async def unsubscribe(msg: Message, writer: asyncio.StreamWriter):
    server.unsubscribe(msg.body.uri, writer)
    return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_UNSUBSCRIBE)

asyncio.run(server.start())

TCPClient

from netaio import TCPClient, Body, Message, MessageType, HMACAuthPlugin
import asyncio


client = TCPClient("127.0.0.1", 8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
received_resources = {}

@client.on(MessageType.RESPOND_URI)
def echo(msg: Message, writer: asyncio.StreamWriter):
    received_resources[msg.body.uri] = msg.body.content

async def run_client():
    request_body = Body.prepare(b'pls gibs me dat', uri=b'something')
    request_message = Message.prepare(request_body, MessageType.REQUEST_URI)
    await client.connect()
    await client.send(request_message)
    await client.receive_once()

asyncio.run(run_client())

print(received_resources)

UDPNode

from netaio import UDPNode, Peer, Body, Message, MessageType, HMACAuthPlugin
from os import urandom
import asyncio

local_peer = Peer(
    addrs={('127.0.0.1', 8888)},
    id=urandom(16),
    data=b''
)

echo_node = UDPNode(
    local_peer=local_peer,
    auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)

@echo_node.on(MessageType.REQUEST_URI)
def request_uri(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Sending echo to %s...", addr)
    return Message.prepare(msg.body, MessageType.OK)

@echo_node.on(MessageType.OK)
def echo(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Received echo from %s.", addr)

echo_msg = Message.prepare(Body.prepare(b'echo'), MessageType.REQUEST_URI)

async def main(local_addr: tuple[str, int], remote_addr: tuple[str, int]|None = None):
    echo_node.interface = local_addr[0]
    echo_node.port = local_addr[1]
    await echo_node.start()
    await echo_node.manage_peers_automatically(advertise_every=1, peer_timeout=3)
    while True:
        await asyncio.sleep(1)
        if remote_addr:
            echo_node.logger.info("Sending message to %s...", remote_addr)
            echo_node.send(echo_msg, remote_addr)
        else:
            if len(echo_node.peers) > 0:
                echo_node.logger.info("Broadcasting message to all known peers...")
                echo_node.broadcast(echo_msg)
            else:
                echo_node.logger.info("No peers known, waiting to discover peers...")

local_addr = ("0.0.0.0", 8888)
remote_addr = None
asyncio.run(main(local_addr, remote_addr))

Note that to run this example on a single machine, the port must be different in the second node instance, e.g. local_addr = ("127.0.0.1", 8889), and then the remote address must be set to the first node's address, e.g. remote_addr = ("127.0.0.1", 8888). Multicast will not work locally because of the different ports. If the interface is set to "0.0.0.0", multicast will work across the LAN, but this will result in the node hearing its own multicast messages; hence, the request_uri handler ignores messages from the local machine.

(It is technically possible to get multicast to work in one direction on a single machine by changing the .port property after one has started.)

Note also that when a peer is removed from the node's peer list, it is also unsubscribed from all URIs.

Plugin System

The plugin system is designed to be simple and easy to understand. Each plugin implements a specific protocol, and the TCPServer, TCPClient, and UDPNode classes automatically apply plugins to messages in the correct order to preserve the encapsulation model (see below).

There are three types of plugins defined with typing.Protocol interfaces:

  • AuthPluginProtocol
  • CipherPluginProtocol
  • PeerPluginProtocol

Authentication/Authorization

TCPServer, TCPClient, and UDPNode support an optional authentication/authorization plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the AuthPluginProtocol. Once the plugin has been instantiated, it can be passed to the TCPServer, TCPClient, and UDPNode constructors or set on the instances themselves. An auth plugin can also be set on a per-handler basis by passing the plugin as a second argument (or as the keyword argument auth_plugin) to the on or add_handler method. Currently, if an auth plugin is set both on the instance and per-handler, both will be checked before the handler function is called, and both will be applied to the response body; the per-handler plugin will be able to overwrite any auth fields set by the instance plugin, which may break communication -- each plugin instantiation should be configured to use its own writeable auth data fields.

Currently, netaio includes an HMACAuthPlugin that can be used by the server and client to authenticate and authorize requests. This uses a shared secret to generate and check HMACs over message bodies.

The TapescriptAuthPlugin is included in the optional netaio.asymmetric submodule, which requires the tapescript as a dependency and allows for customizable authentication/authorization models using the tapescript DSL for access controls.

Example of additional auth layer
from netaio import TCPServer, TCPClient, HMACAuthPlugin, MessageType, Body, Message
import asyncio

outer_auth_plugin = HMACAuthPlugin(config={"secret": "test"})
inner_auth_plugin = HMACAuthPlugin(config={"secret": "tset", "hmac_field": "camh"})
server = TCPServer(port=8888, auth_plugin=outer_auth_plugin)
client = TCPClient(host="127.0.0.1", port=8888, auth_plugin=outer_auth_plugin)

@server.on(MessageType.CREATE_URI, auth_plugin=inner_auth_plugin)
async def put_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Resource saved.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.OK)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(Body.prepare(b'test'), MessageType.CREATE_URI),
        auth_plugin=inner_auth_plugin
    )
    result = await client.receive_once()
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)

Cipher (encryption/decryption)

TCPServer, TCPClient, and UDPNode support an optional cipher plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the CipherPluginProtocol. Once the plugin has been instantiated, it can be passed to the TCPServer, TCPClient, and UDPNode constructors or set on the instances themselves. A cipher plugin can also be set on a per-handler basis by passing the plugin as a third argument (or as the keyword argument auth_plugin) to the on or add_handler method. If a cipher plugin is set both on the instance and per-handler, both will be applied to the message.

Currently, netaio includes a Sha256StreamCipherPlugin that can be used by the server and client to encrypt and decrypt messages using a simple symmetric stream cipher. This uses a shared secret key and per-message IVs. Note that the encrypt_uri config option should be False to prevent the URI from being encrypted when using this as an additional, inner layer of encryption, else the URI will not be usable for routing requests/determining responses when the default key extractor is used (i.e. handlers set on the tuple of MessageType and URI).

The X25519CipherPlugin is included in the optional netaio.asymmetric submodule, which requires PyNaCl as a dependency and allows for asymmetric encryption using ECDHE (Elliptic Curve Diffie-Hellman Exchange). Note that this plugin should be used as an inner layer of encryption with automatic peer management and local peer data including {'pubkey': SigningKey(seed).verify_key}.

Example of additional encryption layer
from netaio import TCPServer, TCPClient, Sha256StreamCipherPlugin, MessageType, Body, Message
import asyncio

outer_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "test"})
inner_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "tset", "iv_field": "iv2"})
server = TCPServer(port=8888, cipher_plugin=outer_cipher_plugin)
client = TCPClient(host="127.0.0.1", port=8888, cipher_plugin=outer_cipher_plugin)

@server.on(MessageType.REQUEST_URI, cipher_plugin=inner_cipher_plugin)
async def request_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Super secret data.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.RESPOND_URI)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(
            Body.prepare(b'psst gimme the secret', uri=b'something'),
            MessageType.REQUEST_URI
        ),
        cipher_plugin=inner_cipher_plugin
    )
    result = await client.receive_once(cipher_plugin=inner_cipher_plugin)
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)

Peer Data Encoding/Decoding

To use the automatic peer management system, nodes accept a peer plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the PeerPluginProtocol. Once the plugin has been instantiated, it can be passed to the TCPServer, TCPClient, and UDPNode constructors or set on the instances themselves. Unlike the auth and cipher plugins, this is used only for encoding and decoding the data stored in Peer.data, and it is not settable on a per-handler basis.

Currently, netaio includes a DefaultPeerPlugin that is used to encode and decode peer data using the packify library to encode/decode dicts.

Included Plugins

The following plugins are included for convenience and as references for making custom plugins:

  • HMACAuthPlugin
  • Sha256StreamCipherPlugin
  • DefaultPeerPlugin
  • TapescriptAuthPlugin
  • X25519CipherPlugin
Example instantiation of HMACAuthPlugin
from netaio import HMACAuthPlugin
auth_plugin = HMACAuthPlugin({
    "secret": "we attack at dawn",
    "hmac_field": "hmac", #default
    "nonce_field": "nonce", # default
    "ts_field": "ts", # default
})
Example instantiation of Sha256StreamCipherPlugin
from netaio import Sha256StreamCipherPlugin
cipher_plugin = Sha256StreamCipherPlugin({
    "key": "the key to success is held by people better than you",
    "iv_field": "iv", # default
    "encrypt_uri": True, # default; should be False for inner cipher plugins
})
Example instantiation of TapescriptAuthPlugin
from nacl.signing import SigningKey
from netaio.asymmetric import TapescriptAuthPlugin
import os
import tapescript

seed = os.urandom(32)
vkey = SigningKey(seed).verify_key

# pretending there is an admin key to allow admin auth bypass
if os.path.exists('admin.vkey.hex'):
    with open('admin.vkey.hex', 'r') as f:
        admin_vkey = bytes.fromhex(f.read())
else:
    admin_vkey = vkey

inner_script = tapescript.make_single_sig_lock(admin_vkey)

lock = tapescript.make_taproot_lock(vkey, script=inner_script)
witness_func = lambda seed, sigfields: tapescript.make_taproot_witness_keyspend(
    seed, sigfields, committed_script=inner_script
)
auth_plugin = TapescriptAuthPlugin({
    'seed': seed,
    'lock': lock,
    'nonce_field': 'nonce', # default
    'ts_field': 'ts', # default
    'witness_field': 'witness', # default
    'witness_func': witness_func,
    'contracts': {}, # default
    'plugins': {}, # default
})
Example instantiation of X25519CipherPlugin
from netaio.asymmetric import X25519CipherPlugin, Peer, DefaultPeerPlugin
import os

seed = os.urandom(32)
cipher_plugin = X25519CipherPlugin({
    'seed': seed,
    'encrypt_uri': False, # default
    # should not be True unless peers are set manually and this is the outer cipher
})

# for use with peer management, the pubkey should be sent to peers
local_peer = Peer(
    addrs={('0.0.0.0', 8888)},
    id=bytes(cipher_plugin.pubk),
    data=DefaultPeerPlugin().encode_data({
        "pubkey": bytes(cipher_plugin.pubk),
        "vkey": bytes(cipher_plugin.vkey), # optional
    }),
)

More documentation on the asymmetric plugins can be found in asymmetric.md.

Encapsulation Model

The encapsulation model for plugin interactions with messages is as follows:

Send
  • Per-handler/injected cipher_plugin.encrypt
  • Per-handler/injected auth_plugin.make
  • Instance self.cipher_plugin.encrypt
  • Instance self.auth_plugin.make
Receive
  • Instance self.auth_plugin.check
  • Instance self.cipher_plugin.decrypt
  • Per-handler/injected auth_plugin.check
  • Per-handler/injected cipher_plugin.decrypt

Peer Management

This package includes an optional automatic peer management system, which can be enabled on TCPServer, TCPClient, and UDPNode by awaiting a call to the manage_peers_automatically() method. For TCP, this will cause clients and servers to send ADVERTISE_PEER/PEER_DISCOVERED messages to each other upon connection or upon enabling of peer management for existing connections. For UDP, this will cause nodes to multicast ADVERTISE_PEER messages every 20 seconds by default, and any node that receives such a message will respond to that peer with a PEER_DISCOVERED message that includes its own information. This will populate the local peer lists on each server/client/node, enabling the broadcast method to send messages to all known peers.

The UDPNode.manage_peers_automatically method can accept optional arguments advertise_every: int = 20 and peer_timeout: int = 60. All three node types will accept the following optional arguments:

  • app_id: bytes = b'netaio'
  • auth_plugin: AuthPluginProtocol|None = None
  • cipher_plugin: CipherPluginProtocol|None = None

The auth_plugin and cipher_plugin provided here will be used only for the peer advertisement and response traffic.

Upon calling await client_or_server_or_node.stop_peer_management(), a DISCONNECT message will be sent by TCP nodes or multicast by UDP nodes; any node that receives a DISCONNECT message will remove that peer from the local peer lists and all subscriptions.

Testing

To test, clone the repo, install the dependencies (preferably within a virtual environment) using pip install -r requirements.txt, and run python -m unittest discover -s tests. Or to run the individual tests and see the output separated by test file, instead run find tests/ -name test_*.py -print -exec python {} \;.

Currently, there are 13 unit tests and 12 e2e tests. The unit tests cover the bundled plugins and miscellaneous features. The e2e tests start a server and client (or 2 clients), then send messages from the client to the server and receive responses; the UDP e2e test suite starts 2 nodes and treats them like a server and client to make testing a bit simpler and easier to follow. The automatic peer management system is also tested in both TCP and UDP. The bundled plugins are used for the e2e tests, and authentication failure cases are also tested.

License

Copyright (c) 2025 Jonathan Voss (k98kurz)

Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts