netaio
This is designed to be a simple and easy to use asyncio-based TCP client and
server and UDP node library inspired by fastapi but for non-HTTP use cases.
Status
This is currently a work-in-progress. Remaining work before the v0.1.0 release:
Issues are tracked here.
Usage
Install with pip install netaio
. To use the optional asymmetric cryptography
plugins, install with pip install netaio[asymmetric]
.
Brief examples are shown below. For more documentation, see the
dox.md file generated by
autodox.
TCPServer
from netaio import TCPServer, Body, Message, MessageType, HMACAuthPlugin
import asyncio
server = TCPServer(port=8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
@server.on((MessageType.REQUEST_URI, b'something'))
async def something(msg: Message, writer: asyncio.StreamWriter):
body = Body.prepare(b'This is it.', uri=b'something')
return Message.prepare(body, MessageType.RESPOND_URI)
@server.on(MessageType.SUBSCRIBE_URI)
async def subscribe(msg: Message, writer: asyncio.StreamWriter):
server.subscribe(msg.body.uri, writer)
return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_SUBSCRIBE)
@server.on(MessageType.UNSUBSCRIBE_URI)
async def unsubscribe(msg: Message, writer: asyncio.StreamWriter):
server.unsubscribe(msg.body.uri, writer)
return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_UNSUBSCRIBE)
asyncio.run(server.start())
TCPClient
from netaio import TCPClient, Body, Message, MessageType, HMACAuthPlugin
import asyncio
client = TCPClient("127.0.0.1", 8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
received_resources = {}
@client.on(MessageType.RESPOND_URI)
def echo(msg: Message, writer: asyncio.StreamWriter):
received_resources[msg.body.uri] = msg.body.content
async def run_client():
request_body = Body.prepare(b'pls gibs me dat', uri=b'something')
request_message = Message.prepare(request_body, MessageType.REQUEST_URI)
await client.connect()
await client.send(request_message)
await client.receive_once()
asyncio.run(run_client())
print(received_resources)
UDPNode
from netaio import UDPNode, Peer, Body, Message, MessageType, HMACAuthPlugin
from os import urandom
import asyncio
local_peer = Peer(
addrs={('127.0.0.1', 8888)},
id=urandom(16),
data=b''
)
echo_node = UDPNode(
local_peer=local_peer,
auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)
@echo_node.on(MessageType.REQUEST_URI)
def request_uri(msg: Message, addr: tuple[str, int]):
echo_node.logger.info("Sending echo to %s...", addr)
return Message.prepare(msg.body, MessageType.OK)
@echo_node.on(MessageType.OK)
def echo(msg: Message, addr: tuple[str, int]):
echo_node.logger.info("Received echo from %s.", addr)
echo_msg = Message.prepare(Body.prepare(b'echo'), MessageType.REQUEST_URI)
async def main(local_addr: tuple[str, int], remote_addr: tuple[str, int]|None = None):
echo_node.interface = local_addr[0]
echo_node.port = local_addr[1]
await echo_node.start()
await echo_node.manage_peers_automatically(advertise_every=1, peer_timeout=3)
while True:
await asyncio.sleep(1)
if remote_addr:
echo_node.logger.info("Sending message to %s...", remote_addr)
echo_node.send(echo_msg, remote_addr)
else:
if len(echo_node.peers) > 0:
echo_node.logger.info("Broadcasting message to all known peers...")
echo_node.broadcast(echo_msg)
else:
echo_node.logger.info("No peers known, waiting to discover peers...")
local_addr = ("0.0.0.0", 8888)
remote_addr = None
asyncio.run(main(local_addr, remote_addr))
Note that to run this example on a single machine, the port must be different
in the second node instance, e.g. local_addr = ("127.0.0.1", 8889)
, and then
the remote address must be set to the first node's address, e.g.
remote_addr = ("127.0.0.1", 8888)
. Multicast will not work locally because of
the different ports. If the interface is set to "0.0.0.0", multicast will work
across the LAN, but this will result in the node hearing its own multicast
messages; hence, the request_uri
handler ignores messages from the local
machine.
(It is technically possible to get multicast to work in one direction on a
single machine by changing the .port
property after one has started.)
Note also that when a peer is removed from the node's peer list, it is also
unsubscribed from all URIs.
Plugin System
The plugin system is designed to be simple and easy to understand. Each plugin
implements a specific protocol, and the TCPServer
, TCPClient
, and
UDPNode
classes automatically apply plugins to messages in the correct order
to preserve the encapsulation model (see below).
There are three types of plugins defined with typing.Protocol
interfaces:
AuthPluginProtocol
CipherPluginProtocol
PeerPluginProtocol
Authentication/Authorization
TCPServer
, TCPClient
, and UDPNode
support an optional
authentication/authorization plugin.
Each plugin is instantiated with a dict of configuration parameters, and it must
implement the AuthPluginProtocol
. Once the plugin has been instantiated, it
can be passed to the TCPServer
, TCPClient
, and UDPNode
constructors or set
on the instances themselves. An auth plugin can also be set on a per-handler
basis by passing the plugin as a second argument (or as the keyword argument
auth_plugin
) to the on
or add_handler
method. Currently, if an auth plugin
is set both on the instance and per-handler, both will be checked before the
handler function is called, and both will be applied to the response body; the
per-handler plugin will be able to overwrite any auth fields set by the instance
plugin, which may break communication -- each plugin instantiation should be
configured to use its own writeable auth data fields.
Currently, netaio includes an HMACAuthPlugin
that can be used by the server
and client to authenticate and authorize requests. This uses a shared secret to
generate and check HMACs over message bodies.
The TapescriptAuthPlugin
is included in the optional netaio.asymmetric
submodule, which requires the tapescript
as a dependency and allows for customizable authentication/authorization models
using the tapescript DSL for access controls.
Example of additional auth layer
from netaio import TCPServer, TCPClient, HMACAuthPlugin, MessageType, Body, Message
import asyncio
outer_auth_plugin = HMACAuthPlugin(config={"secret": "test"})
inner_auth_plugin = HMACAuthPlugin(config={"secret": "tset", "hmac_field": "camh"})
server = TCPServer(port=8888, auth_plugin=outer_auth_plugin)
client = TCPClient(host="127.0.0.1", port=8888, auth_plugin=outer_auth_plugin)
@server.on(MessageType.CREATE_URI, auth_plugin=inner_auth_plugin)
async def put_uri(msg: Message, writer: asyncio.StreamWriter):
body = Body.prepare(b'Resource saved.', uri=msg.body.uri)
return Message.prepare(body, MessageType.OK)
async def main():
task = asyncio.create_task(server.start())
await asyncio.sleep(0.1)
await client.connect()
await client.send(
Message.prepare(Body.prepare(b'test'), MessageType.CREATE_URI),
auth_plugin=inner_auth_plugin
)
result = await client.receive_once()
await client.close()
task.cancel()
return result
response = asyncio.run(main())
print(response)
Cipher (encryption/decryption)
TCPServer
, TCPClient
, and UDPNode
support an optional cipher plugin. Each
plugin is instantiated with a dict of configuration parameters, and it must
implement the CipherPluginProtocol
. Once the plugin has been instantiated, it
can be passed to the TCPServer
, TCPClient
, and UDPNode
constructors or set
on the instances themselves. A cipher plugin can also be set on a per-handler
basis by passing the plugin as a third argument (or as the keyword argument
auth_plugin
) to the on
or add_handler
method. If a cipher plugin is set
both on the instance and per-handler, both will be applied to the message.
Currently, netaio includes a Sha256StreamCipherPlugin
that can be used by
the server and client to encrypt and decrypt messages using a simple symmetric
stream cipher. This uses a shared secret key and per-message IVs. Note that the
encrypt_uri
config option should be False
to prevent the URI from being
encrypted when using this as an additional, inner layer of encryption, else the
URI will not be usable for routing requests/determining responses when the
default key extractor is used (i.e. handlers set on the tuple of MessageType
and URI).
The X25519CipherPlugin
is included in the optional netaio.asymmetric
submodule, which requires PyNaCl as a
dependency and allows for asymmetric encryption using ECDHE (Elliptic Curve
Diffie-Hellman Exchange). Note that this plugin should be used as an inner
layer of encryption with automatic peer management and local peer data including
{'pubkey': SigningKey(seed).verify_key}
.
Example of additional encryption layer
from netaio import TCPServer, TCPClient, Sha256StreamCipherPlugin, MessageType, Body, Message
import asyncio
outer_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "test"})
inner_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "tset", "iv_field": "iv2"})
server = TCPServer(port=8888, cipher_plugin=outer_cipher_plugin)
client = TCPClient(host="127.0.0.1", port=8888, cipher_plugin=outer_cipher_plugin)
@server.on(MessageType.REQUEST_URI, cipher_plugin=inner_cipher_plugin)
async def request_uri(msg: Message, writer: asyncio.StreamWriter):
body = Body.prepare(b'Super secret data.', uri=msg.body.uri)
return Message.prepare(body, MessageType.RESPOND_URI)
async def main():
task = asyncio.create_task(server.start())
await asyncio.sleep(0.1)
await client.connect()
await client.send(
Message.prepare(
Body.prepare(b'psst gimme the secret', uri=b'something'),
MessageType.REQUEST_URI
),
cipher_plugin=inner_cipher_plugin
)
result = await client.receive_once(cipher_plugin=inner_cipher_plugin)
await client.close()
task.cancel()
return result
response = asyncio.run(main())
print(response)
Peer Data Encoding/Decoding
To use the automatic peer management system, nodes accept a peer plugin. Each
plugin is instantiated with a dict of configuration parameters, and it must
implement the PeerPluginProtocol
. Once the plugin has been instantiated, it
can be passed to the TCPServer
, TCPClient
, and UDPNode
constructors or set
on the instances themselves. Unlike the auth and cipher plugins, this is used
only for encoding and decoding the data stored in Peer.data
, and it is not
settable on a per-handler basis.
Currently, netaio includes a DefaultPeerPlugin
that is used to encode and
decode peer data using the packify library
to encode/decode dicts.
Included Plugins
The following plugins are included for convenience and as references for making
custom plugins:
HMACAuthPlugin
Sha256StreamCipherPlugin
DefaultPeerPlugin
TapescriptAuthPlugin
X25519CipherPlugin
Example instantiation of HMACAuthPlugin
from netaio import HMACAuthPlugin
auth_plugin = HMACAuthPlugin({
"secret": "we attack at dawn",
"hmac_field": "hmac",
"nonce_field": "nonce",
"ts_field": "ts",
})
Example instantiation of Sha256StreamCipherPlugin
from netaio import Sha256StreamCipherPlugin
cipher_plugin = Sha256StreamCipherPlugin({
"key": "the key to success is held by people better than you",
"iv_field": "iv",
"encrypt_uri": True,
})
Example instantiation of TapescriptAuthPlugin
from nacl.signing import SigningKey
from netaio.asymmetric import TapescriptAuthPlugin
import os
import tapescript
seed = os.urandom(32)
vkey = SigningKey(seed).verify_key
if os.path.exists('admin.vkey.hex'):
with open('admin.vkey.hex', 'r') as f:
admin_vkey = bytes.fromhex(f.read())
else:
admin_vkey = vkey
inner_script = tapescript.make_single_sig_lock(admin_vkey)
lock = tapescript.make_taproot_lock(vkey, script=inner_script)
witness_func = lambda seed, sigfields: tapescript.make_taproot_witness_keyspend(
seed, sigfields, committed_script=inner_script
)
auth_plugin = TapescriptAuthPlugin({
'seed': seed,
'lock': lock,
'nonce_field': 'nonce',
'ts_field': 'ts',
'witness_field': 'witness',
'witness_func': witness_func,
'contracts': {},
'plugins': {},
})
Example instantiation of X25519CipherPlugin
from netaio.asymmetric import X25519CipherPlugin, Peer, DefaultPeerPlugin
import os
seed = os.urandom(32)
cipher_plugin = X25519CipherPlugin({
'seed': seed,
'encrypt_uri': False,
})
local_peer = Peer(
addrs={('0.0.0.0', 8888)},
id=bytes(cipher_plugin.pubk),
data=DefaultPeerPlugin().encode_data({
"pubkey": bytes(cipher_plugin.pubk),
"vkey": bytes(cipher_plugin.vkey),
}),
)
More documentation on the asymmetric plugins can be found in
asymmetric.md.
Encapsulation Model
The encapsulation model for plugin interactions with messages is as follows:
Send
- Per-handler/injected
cipher_plugin.encrypt
- Per-handler/injected
auth_plugin.make
- Instance
self.cipher_plugin.encrypt
- Instance
self.auth_plugin.make
Receive
- Instance
self.auth_plugin.check
- Instance
self.cipher_plugin.decrypt
- Per-handler/injected
auth_plugin.check
- Per-handler/injected
cipher_plugin.decrypt
Peer Management
This package includes an optional automatic peer management system, which can be
enabled on TCPServer
, TCPClient
, and UDPNode
by awaiting a call to the
manage_peers_automatically()
method. For TCP, this will cause clients and
servers to send ADVERTISE_PEER
/PEER_DISCOVERED
messages to each other upon
connection or upon enabling of peer management for existing connections. For UDP,
this will cause nodes to multicast ADVERTISE_PEER
messages every 20 seconds by
default, and any node that receives such a message will respond to that peer
with a PEER_DISCOVERED
message that includes its own information. This will
populate the local peer lists on each server/client/node, enabling the
broadcast
method to send messages to all known peers.
The UDPNode.manage_peers_automatically
method can accept optional arguments
advertise_every: int = 20
and peer_timeout: int = 60
. All three node types
will accept the following optional arguments:
app_id: bytes = b'netaio'
auth_plugin: AuthPluginProtocol|None = None
cipher_plugin: CipherPluginProtocol|None = None
The auth_plugin
and cipher_plugin
provided here will be used only for the
peer advertisement and response traffic.
Upon calling await client_or_server_or_node.stop_peer_management()
, a
DISCONNECT
message will be sent by TCP nodes or multicast by UDP nodes; any
node that receives a DISCONNECT
message will remove that peer from the local
peer lists and all subscriptions.
Testing
To test, clone the repo, install the dependencies (preferably within a virtual
environment) using pip install -r requirements.txt
, and run
python -m unittest discover -s tests
. Or to run the individual tests and see
the output separated by test file, instead run
find tests/ -name test_*.py -print -exec python {} \;
.
Currently, there are 13 unit tests and 12 e2e tests. The unit tests cover the
bundled plugins and miscellaneous features. The e2e tests start a server and
client (or 2 clients), then send messages from the client to the server and
receive responses; the UDP e2e test suite starts 2 nodes and treats them like a
server and client to make testing a bit simpler and easier to follow. The
automatic peer management system is also tested in both TCP and UDP. The bundled
plugins are used for the e2e tests, and authentication failure cases are also
tested.
License
Copyright (c) 2025 Jonathan Voss (k98kurz)
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear in
all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.