New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@agoric/network

Package Overview
Dependencies
Maintainers
0
Versions
958
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@agoric/network - npm Package Compare versions

Comparing version 0.1.1-dev-780a461.0 to 0.1.1-dev-785e426.0

src/shapes.d.ts

5

./src/index.js
export * from './network.js';
export { default as makeRouter, makeRouterProtocol } from './router.js';
export * from './shapes.js';
export { prepareRouter, prepareRouterProtocol } from './router.js';
export * from './multiaddr.js';
export * from './bytes.js';
// eslint-disable-next-line import/export -- doesn't know types
export * from './types.js';

46

package.json
{
"name": "@agoric/network",
"version": "0.1.1-dev-780a461.0+780a461",
"version": "0.1.1-dev-785e426.0+785e426",
"description": "Agoric's network protocol API",

@@ -11,5 +11,5 @@ "type": "module",

"prepack": "tsc --build tsconfig.build.json",
"postpack": "git clean -f '*.d.ts*'",
"postpack": "git clean -f '*.d.ts*' '*.tsbuildinfo'",
"test": "ava",
"test:c8": "c8 $C8_OPTIONS ava",
"test:c8": "c8 --all $C8_OPTIONS ava",
"test:xs": "exit 0",

@@ -25,18 +25,23 @@ "lint-fix": "yarn lint:eslint --fix",

"dependencies": {
"@agoric/assert": "0.6.1-dev-780a461.0+780a461",
"@agoric/internal": "0.3.3-dev-780a461.0+780a461",
"@agoric/store": "0.9.3-dev-780a461.0+780a461",
"@endo/base64": "^1.0.0",
"@endo/far": "^1.0.1",
"@endo/promise-kit": "^1.0.1"
"@agoric/internal": "0.3.3-dev-785e426.0+785e426",
"@agoric/store": "0.9.3-dev-785e426.0+785e426",
"@agoric/vat-data": "0.5.3-dev-785e426.0+785e426",
"@endo/base64": "^1.0.9",
"@endo/errors": "^1.2.8",
"@endo/far": "^1.1.9",
"@endo/pass-style": "^1.4.7",
"@endo/patterns": "^1.4.7",
"@endo/promise-kit": "^1.1.8"
},
"devDependencies": {
"@agoric/swingset-vat": "0.32.3-dev-780a461.0+780a461",
"@endo/bundle-source": "^3.0.1",
"@agoric/swingset-liveslots": "0.10.3-dev-785e426.0+785e426",
"@agoric/swingset-vat": "0.32.3-dev-785e426.0+785e426",
"@agoric/vow": "0.1.1-dev-785e426.0+785e426",
"@agoric/zone": "0.2.3-dev-785e426.0+785e426",
"@endo/bundle-source": "^3.5.0",
"ava": "^5.3.0",
"c8": "^7.13.0"
"c8": "^10.1.2"
},
"exports": {
".": "./src/index.js",
"./exported.js": "./exported.js"
".": "./src/index.js"
},

@@ -47,6 +52,3 @@ "files": [

"scripts/",
"tools/",
"*.json",
"globals.d.ts",
"exported.js"
"tools/"
],

@@ -57,7 +59,7 @@ "publishConfig": {

"engines": {
"node": ">=14.15.0"
"node": "^18.12 || ^20.9"
},
"ava": {
"files": [
"test/**/test-*.js"
"test/**/*.test.*"
],

@@ -71,5 +73,5 @@ "require": [

"typeCoverage": {
"atLeast": 76.57
"atLeast": 91.16
},
"gitHead": "780a4613a6a5f805cedf2686cb2b3c0412f7ffff"
"gitHead": "785e426312297a21731f67c9e36815502cbf9fb9"
}

@@ -60,12 +60,9 @@ # Network API

To get a listening port, you need a `NetworkInterface` object (such as the one on your `ag-solo` under `home.network`) and ask it to `bind()` to an endpoint. You can either provide a specific port name, or allow the API to allocate a random one for you. The endpoint specifies the type of connection that this port will be able to accept (IBC, TCP, etc), and some properties of that connection. `bind()` uses a "multiaddress" to encode this information.
To get a listening port, you need a `NetworkInterface` object (such as the one on your `ag-solo` under `home.network`) and ask it for a port, via the `PortAllocator`.
```js
// ask for a random allocation - ends with a slash
E(home.network).bind('/ibc-port/')
E(home.network).getPortAllocator()
.then(portAllocator => E(portAllocator).allocateCustomIBCPort())
.then(port => usePort(port));
// or ask for a specific port name
E(home.network).bind('/ibc-port/my-cool-port-name')
.then(port => usePort(port));
```

@@ -151,3 +148,3 @@

Removing a listener doesn't release the port address to make it available for other `bind()` requests. You can call:
Removing a listener doesn't release the port address to make it available for other `PortAllocator` requests. You can call:

@@ -154,0 +151,0 @@ ```js

/**
* Convert some data to bytes.
* @import {Bytes} from './types.js';
*/
/** @typedef {Bytes | Buffer | Uint8Array | Iterable<number>} ByteSource */
/**
* This function is a coercer instead of an asserter because in a future where
* binary data has better support across vats and potentially its own type, we
* might allow more `specimen`s than just `ByteSource`.
*
* @param {Data} data
* @param {unknown} specimen
* @returns {ByteSource}
*/
export function coerceToByteSource(specimen: unknown): ByteSource;
/**
* Convert a Uint8Array or other sequence of octets to a string representation
* that `@endo/marshal` accepts as Passable.
*
* @param {ByteSource} byteSource
* @returns {Bytes}
*/
export function toBytes(data: Data): Bytes;
export function toBytes(byteSource: ByteSource): Bytes;
/**

@@ -18,8 +32,8 @@ * Convert bytes to a String.

*
* @param {Data} data
* @param {ByteSource} byteSource
* @returns {string} base64 encoding
*/
export function dataToBase64(data: Data): string;
export function byteSourceToBase64(byteSource: ByteSource): string;
/**
* Decodes a string into base64.
* Decodes a base64 string into bytes.
*

@@ -30,2 +44,5 @@ * @param {string} string Base64-encoded string

export function base64ToBytes(string: string): Bytes;
export type ByteSource = Bytes | Buffer | Uint8Array | Iterable<number>;
import type { Bytes } from './types.js';
import type { Bytes as Bytes_1 } from './types.js';
//# sourceMappingURL=bytes.d.ts.map

@@ -1,23 +0,69 @@

/// <reference path="./types.js" />
// @ts-check
import { X, Fail } from '@endo/errors';
import { encodeBase64, decodeBase64 } from '@endo/base64';
import { isObject } from '@endo/pass-style';
/* eslint-disable no-bitwise */
/**
* Convert some data to bytes.
* @import {Bytes} from './types.js';
*/
/** @typedef {Bytes | Buffer | Uint8Array | Iterable<number>} ByteSource */
/**
* This function is a coercer instead of an asserter because in a future where
* binary data has better support across vats and potentially its own type, we
* might allow more `specimen`s than just `ByteSource`.
*
* @param {Data} data
* @returns {Bytes}
* @param {unknown} specimen
* @returns {ByteSource}
*/
export function toBytes(data) {
/** @type {Data | number[]} */
let bytes = data;
// TODO: We really need marshallable TypedArrays.
if (typeof bytes === 'string') {
bytes = bytes.split('').map(c => c.charCodeAt(0));
export function coerceToByteSource(specimen) {
if (typeof specimen === 'string') {
return specimen;
}
isObject(specimen) ||
assert.fail(X`non-object ${specimen} is not a ByteSource`, TypeError);
const obj = /** @type {{}} */ (specimen);
typeof obj[Symbol.iterator] === 'function' ||
assert.fail(X`non-iterable ${specimen} is not a ByteSource`, TypeError);
// Good enough... it's iterable and can be converted later.
return /** @type {ByteSource} */ (specimen);
}
/**
* @param {ByteSource} contents
*/
const coerceToByteArray = contents => {
if (typeof contents === 'string') {
return Uint8Array.from(contents, c => {
const b = c.charCodeAt(0);
b <= 0xff || Fail`character cannot be coerced to an octet: ${c}`;
return b;
});
} else if (contents instanceof Uint8Array) {
// Reconstruct to ensure we have a Uint8Array and not a Buffer.
return new Uint8Array(
contents.buffer,
contents.byteOffset,
contents.byteLength,
);
}
return new Uint8Array(contents);
};
/**
* Convert a Uint8Array or other sequence of octets to a string representation
* that `@endo/marshal` accepts as Passable.
*
* @param {ByteSource} byteSource
* @returns {Bytes}
*/
export function toBytes(byteSource) {
// We return the raw characters in the lower half of
// the String's representation.
const buf = new Uint8Array(bytes);
return String.fromCharCode.apply(null, buf);
const buf = coerceToByteArray(byteSource);
return String.fromCharCode(...buf);
}

@@ -38,16 +84,7 @@

*
* @param {Data} data
* @param {ByteSource} byteSource
* @returns {string} base64 encoding
*/
export function dataToBase64(data) {
/** @type {Uint8Array} */
let bytes;
if (typeof data === 'string') {
bytes = new Uint8Array(data.length);
for (let i = 0; i < data.length; i += 1) {
bytes[i] = data.charCodeAt(i);
}
} else {
bytes = new Uint8Array(data);
}
export function byteSourceToBase64(byteSource) {
const bytes = coerceToByteArray(byteSource);
return encodeBase64(bytes);

@@ -57,3 +94,3 @@ }

/**
* Decodes a string into base64.
* Decodes a base64 string into bytes.
*

@@ -60,0 +97,0 @@ * @param {string} string Base64-encoded string

export * from "./network.js";
export * from "./shapes.js";
export * from "./multiaddr.js";
export * from "./bytes.js";
export { default as makeRouter, makeRouterProtocol } from "./router.js";
export * from "./types.js";
export { prepareRouter, prepareRouterProtocol } from "./router.js";
//# sourceMappingURL=index.d.ts.map
export * from './network.js';
export { default as makeRouter, makeRouterProtocol } from './router.js';
export * from './shapes.js';
export { prepareRouter, prepareRouterProtocol } from './router.js';
export * from './multiaddr.js';
export * from './bytes.js';
// eslint-disable-next-line import/export -- doesn't know types
export * from './types.js';
/**
* @param {ConnectionHandler} handler0
* @param {Endpoint} addr0
* @param {ConnectionHandler} handler1
* @param {Endpoint} addr1
* @param {WeakSet<Connection>} [current]
* @returns {[Connection, Connection]}
*/
export function crossoverConnection(handler0: ConnectionHandler, addr0: Endpoint, handler1: ConnectionHandler, addr1: Endpoint, current?: WeakSet<Connection> | undefined): [Connection, Connection];
/**
* Get the list of prefixes from longest to shortest.

@@ -16,23 +7,31 @@ *

export function getPrefixes(addr: string): string[];
/** @typedef {ReturnType<typeof prepareEchoConnectionKit>} MakeEchoConnectionKit */
/**
* Create a protocol that has a handler.
*
* @param {ProtocolHandler} protocolHandler
* @returns {Protocol} the local capability for connecting and listening
*/
export function makeNetworkProtocol(protocolHandler: ProtocolHandler): Protocol;
/**
* Create a ConnectionHandler that just echoes its packets.
*
* @returns {ConnectionHandler}
*/
export function makeEchoConnectionHandler(): ConnectionHandler;
export function makeNonceMaker(prefix?: string, suffix?: string): () => Promise<string>;
/**
* Create a protocol handler that just connects to itself.
*
* @param {ProtocolHandler['onInstantiate']} [onInstantiate]
* @returns {ProtocolHandler} The localhost handler
* @param {import('@agoric/base-zone').Zone} zone
* @param {VowTools} powers
*/
export function makeLoopbackProtocolHandler(onInstantiate?: ProtocolHandler['onInstantiate']): ProtocolHandler;
export function prepareLoopbackProtocolHandler(zone: import("@agoric/base-zone").Zone, { watch, allVows }: VowTools): (instancePrefix?: string) => import("@endo/exo").Guarded<{
onCreate(_impl: any, _protocolHandler: any): Promise<void>;
generatePortID(_localAddr: any, _protocolHandler: any): Promise<string>;
onBind(_port: any, _localAddr: any, _protocolHandler: any): Promise<void>;
/**
* @param {*} _port
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @returns {import('@agoric/vow').PromiseVow<AttemptDescription>}}
*/
onConnect(_port: any, localAddr: Endpoint, remoteAddr: Endpoint): import("@agoric/vow").PromiseVow<AttemptDescription>;
onInstantiate(_port: any, _localAddr: any, _remote: any, _protocol: any): Promise<string>;
onListen(port: any, localAddr: any, listenHandler: any, _protocolHandler: any): Promise<void>;
/**
* @param {Remote<Port>} port
* @param {Endpoint} localAddr
* @param {Remote<ListenHandler>} listenHandler
* @param {*} _protocolHandler
*/
onListenRemove(port: Remote<Port>, localAddr: Endpoint, listenHandler: Remote<ListenHandler>, _protocolHandler: any): Promise<void>;
onRevoke(_port: any, _localAddr: any, _protocolHandler: any): Promise<void>;
}>;
/**

@@ -43,4 +42,109 @@ * Compatibility note: this must match what our peers use, so don't change it

export const ENDPOINT_SEPARATOR: "/";
export function rethrowUnlessMissing(err: any): undefined;
export function makeConnection(handler: ConnectionHandler, localAddr: Endpoint, remoteAddr: Endpoint, current?: Set<Closable> | undefined): Connection;
export const CLOSE_REASON_FINALIZER: "closed-by-finalizer";
export function rethrowUnlessMissing(err: unknown): undefined;
export function crossoverConnection(zone: import("@agoric/zone").Zone, handler0: Remote<Required<ConnectionHandler>>, addr0: Endpoint, handler1: Remote<Required<ConnectionHandler>>, addr1: Endpoint, makeConnection: (opts: ConnectionOpts) => Connection, finalizer: Finalizer, current?: WeakSetStore<Closable>): Connection[];
export function prepareNetworkProtocol(zone: import("@agoric/base-zone").Zone, powers: Powers): (protocolHandler: Remote<ProtocolHandler>) => Protocol;
export function prepareEchoConnectionKit(zone: import("@agoric/base-zone").Zone): () => import("@endo/exo").GuardedKit<{
handler: {
/**
* @param {Connection} _connection
* @param {Bytes} bytes
* @param {ConnectionHandler} _connectionHandler
*/
onReceive(_connection: Connection, bytes: Bytes, _connectionHandler: ConnectionHandler): Promise<string>;
/**
* @param {Connection} _connection
* @param {CloseReason} [reason]
* @param {ConnectionHandler} [_connectionHandler]
*/
onClose(_connection: Connection, reason?: CloseReason, _connectionHandler?: ConnectionHandler): Promise<void>;
};
listener: {
onAccept(_port: any, _localAddr: any, _remoteAddr: any, _listenHandler: any): Promise<import("@endo/exo").Guarded<{
/**
* @param {Connection} _connection
* @param {Bytes} bytes
* @param {ConnectionHandler} _connectionHandler
*/
onReceive(_connection: Connection, bytes: Bytes, _connectionHandler: ConnectionHandler): Promise<string>;
/**
* @param {Connection} _connection
* @param {CloseReason} [reason]
* @param {ConnectionHandler} [_connectionHandler]
*/
onClose(_connection: Connection, reason?: CloseReason, _connectionHandler?: ConnectionHandler): Promise<void>;
}>>;
onListen(port: any, _listenHandler: any): Promise<void>;
};
}>;
export function preparePortAllocator(zone: import("@agoric/base-zone").Zone, { watch }: Powers): (args_0: {
protocol: Protocol;
}) => import("@endo/exo").Guarded<{
allocateCustomIBCPort(specifiedName?: string): Promise<import("@agoric/vow").Vow<Port>>;
allocateICAControllerPort(): Promise<import("@agoric/vow").Vow<Port>>;
allocateICQControllerPort(): Promise<import("@agoric/vow").Vow<Port>>;
allocateCustomLocalPort(specifiedName?: string): Promise<import("@agoric/vow").Vow<Port>>;
}>;
export function prepareNetworkPowers(zone: import("@agoric/base-zone").Zone, vowTools: VowTools): Powers;
export type MakeEchoConnectionKit = ReturnType<typeof prepareEchoConnectionKit>;
export type Powers = VowTools & {
finalizer: Finalizer;
};
export type ConnectionOpts = {
addrs: Endpoint[];
handlers: Remote<Required<ConnectionHandler>>[];
conns: MapStore<number, Connection>;
current: WeakSetStore<Closable>;
l: 0 | 1;
r: 0 | 1;
};
export type PortAllocator = ReturnType<ReturnType<typeof preparePortAllocator>>;
export type Finalizer = ReturnType<typeof prepareFinalizer>;
import type { VowTools } from '@agoric/vow';
import type { Endpoint } from './types.js';
import type { AttemptDescription } from './types.js';
import type { Port } from './types.js';
import type { Remote } from '@agoric/vow';
import type { ListenHandler } from './types.js';
import type { ConnectionHandler } from './types.js';
import type { Connection } from './types.js';
import type { Closable } from './types.js';
import type { ProtocolHandler } from './types.js';
import type { Protocol } from './types.js';
import type { Bytes } from './types.js';
import type { CloseReason } from './types.js';
/** @typedef {ReturnType<ReturnType<typeof preparePortAllocator>>} PortAllocator */
/**
* Return a package-specific singleton that pins objects until they are
* explicitly unpinned or finalized. It needs to pin objects only because they
* are resources that need to be released.
*
* The reason this functionality wasn't just baked into the other network exos
* is to maintain upgrade-compatible with minimal additional changes.
*
* @param {import('@agoric/base-zone').Zone} zone
* @param {VowTools} vowTools
*/
declare function prepareFinalizer(zone: import("@agoric/base-zone").Zone, { watch }: VowTools): import("@endo/exo").Guarded<{
has(obj: any): boolean;
/**
* Add a connection and handler for an `onClose` method to be called upon
* finalization.
* @param {Remote<Connection>} conn
* @param {Remote<Required<ConnectionHandler>>} handler
*/
initConnection(conn: Remote<Connection>, handler: Remote<Required<ConnectionHandler>>): void;
/**
* Add an object with a `close` method to be called (such as an
* `inboundAttempt`) upon finalization.
* @param {Remote<{ close(): PromiseVow<any> }>} closer
*/
initCloser(closer: Remote<{
close(): PromiseVow<any>;
}>): void;
finalize(obj: any): import("@agoric/vow").Vow<any> | undefined;
unpin(obj: any): void;
}>;
import type { PromiseVow } from '@agoric/vow';
export {};
//# sourceMappingURL=network.d.ts.map

@@ -1,11 +0,22 @@

import { makeScalarMapStore, makeLegacyMap } from '@agoric/store';
import { Far, E } from '@endo/far';
import { makePromiseKit } from '@endo/promise-kit';
import { Fail } from '@agoric/assert';
import { whileTrue } from '@agoric/internal';
// @ts-check
/// <reference types="@agoric/store/exported.js" />
import { Fail } from '@endo/errors';
import { E } from '@endo/far';
import { M } from '@endo/patterns';
import { toBytes } from './bytes.js';
import { Shape } from './shapes.js';
import '@agoric/store/exported.js';
/// <reference path="./types.js" />
/**
* @import {AttemptDescription, Bytes, CloseReason, Closable, Connection, ConnectionHandler, Endpoint, ListenHandler, Port, Protocol, ProtocolHandler, ProtocolImpl} from './types.js';
* @import {PromiseVow, Remote, VowTools} from '@agoric/vow';
*/
/** @typedef {VowTools & { finalizer: Finalizer }} Powers */
const sink = () => {};
harden(sink);
/**

@@ -17,2 +28,6 @@ * Compatibility note: this must match what our peers use, so don't change it

// Mark the finalizer close reason.
export const CLOSE_REASON_FINALIZER = 'closed-by-finalizer';
/** @param {unknown} err */
export const rethrowUnlessMissing = err => {

@@ -23,3 +38,3 @@ // Ugly hack rather than being able to determine if the function

!(err instanceof TypeError) ||
!err.message.match(/target has no method|is not a function$/)
!String(err.message).match(/target has no method|is not a function$/)
) {

@@ -32,85 +47,157 @@ throw err;

/**
* Create a handled Connection.
* Get the list of prefixes from longest to shortest.
*
* @param {ConnectionHandler} handler
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @param {Set<Closable>} [current]
* @returns {Connection}
* @param {string} addr
*/
export const makeConnection = (
handler,
localAddr,
remoteAddr,
current = new Set(),
) => {
let closed;
/** @type {Set<import('@endo/promise-kit').PromiseKit<Bytes>>} */
const pendingAcks = new Set();
/** @type {Connection} */
const connection = Far('Connection', {
getLocalAddress() {
return localAddr;
export function getPrefixes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
/** @type {string[]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
ret.push(prefix);
}
return ret;
}
/**
* Validate IBC port name
* @param {string} specifiedName
*/
function throwIfInvalidPortName(specifiedName) {
// Contains between 2 and 128 characters
// Can contain alphanumeric characters
// Valid symbols: ., ,, _, +, -, #, [, ], <, >
const portNameRegex = new RegExp('^[a-zA-Z0-9.,_+\\-#<>\\[\\]]{2,128}$');
if (!portNameRegex.test(specifiedName)) {
throw Error(`Invalid IBC port name: ${specifiedName}`);
}
}
/**
* @typedef {object} ConnectionOpts
* @property {Endpoint[]} addrs
* @property {Remote<Required<ConnectionHandler>>[]} handlers
* @property {MapStore<number, Connection>} conns
* @property {WeakSetStore<Closable>} current
* @property {0|1} l
* @property {0|1} r
*/
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {Powers} powers
*/
const prepareHalfConnection = (zone, { watch, allVows, finalizer }) => {
const makeHalfConnectionKit = zone.exoClassKit(
'Connection',
Shape.ConnectionI,
/** @param {ConnectionOpts} opts */
({ addrs, handlers, conns, current, l, r }) => {
return {
addrs,
handlers,
conns,
current,
l,
r,
/** @type {string | undefined} */
closed: undefined,
};
},
getRemoteAddress() {
return remoteAddr;
{
connection: {
getLocalAddress() {
const { addrs, l } = this.state;
return addrs[l];
},
getRemoteAddress() {
const { addrs, r } = this.state;
return addrs[r];
},
/** @param {Bytes} packetBytes */
async send(packetBytes) {
const { closed, handlers, r, conns } = this.state;
if (closed) {
throw Error(closed);
}
const innerVow = watch(
E(handlers[r]).onReceive(
conns.get(r),
toBytes(packetBytes),
handlers[r],
),
this.facets.openConnectionAckWatcher,
);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
async close() {
const { closed, current, conns, l, r } = this.state;
if (closed) {
throw Error(closed);
}
this.state.closed = 'Connection closed';
// Tear down both sides.
const lconn = conns.get(l);
const rconn = conns.get(r);
current.delete(lconn);
current.delete(rconn);
const innerVow = watch(
allVows([finalizer.finalize(lconn), finalizer.finalize(rconn)]),
this.facets.sinkWatcher,
);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
},
openConnectionAckWatcher: {
onFulfilled(ack) {
return toBytes(ack || '');
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
sinkWatcher: {
onFulfilled(_value) {
return undefined;
},
},
},
async close() {
if (closed) {
throw closed;
}
current.delete(connection);
closed = Error('Connection closed');
for (const ackDeferred of [...pendingAcks.values()]) {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(closed);
}
await E(handler)
.onClose(connection, undefined, handler)
.catch(rethrowUnlessMissing);
},
async send(data, opts) {
// console.log('send', data, local === srcHandler);
if (closed) {
throw closed;
}
const bytes = toBytes(data);
const ackDeferred = makePromiseKit();
pendingAcks.add(ackDeferred);
E(handler)
.onReceive(connection, bytes, handler, opts)
.catch(err => {
rethrowUnlessMissing(err);
return '';
})
.then(
ack => {
pendingAcks.delete(ackDeferred);
ackDeferred.resolve(toBytes(ack));
},
err => {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(err);
},
);
return ackDeferred.promise;
},
});
);
current.add(connection);
E(handler)
.onOpen(connection, localAddr, remoteAddr, handler)
.catch(rethrowUnlessMissing);
return connection;
const makeHalfConnection = ({ addrs, handlers, conns, current, l, r }) => {
const { connection } = makeHalfConnectionKit({
addrs,
handlers,
conns,
current,
l,
r,
});
return harden(connection);
};
return makeHalfConnection;
};
/**
* @param {ConnectionHandler} handler0
* @param {import('@agoric/zone').Zone} zone
* @param {Remote<Required<ConnectionHandler>>} handler0
* @param {Endpoint} addr0
* @param {ConnectionHandler} handler1
* @param {Remote<Required<ConnectionHandler>>} handler1
* @param {Endpoint} addr1
* @param {WeakSet<Connection>} [current]
* @returns {[Connection, Connection]}
* @param {(opts: ConnectionOpts) => Connection} makeConnection
* @param {Finalizer} finalizer
* @param {WeakSetStore<Closable>} [current]
*/
export function crossoverConnection(
export const crossoverConnection = (
zone,
handler0,

@@ -120,143 +207,267 @@ addr0,

addr1,
current = new WeakSet(),
) {
/** @type {Connection[]} */
const conns = [];
/** @type {ConnectionHandler[]} */
const handlers = [handler0, handler1];
/** @type {Endpoint[]} */
const addrs = [addr0, addr1];
makeConnection,
finalizer,
current = zone.detached().weakSetStore('crossoverCurrentConnections'),
) => {
const detached = zone.detached();
function makeHalfConnection(l, r) {
let closed;
conns[l] = Far('Connection', {
getLocalAddress() {
return addrs[l];
},
getRemoteAddress() {
return addrs[r];
},
async send(packetBytes) {
if (closed) {
throw closed;
}
const ack = await E(handlers[r])
.onReceive(conns[r], toBytes(packetBytes), handlers[r])
.catch(rethrowUnlessMissing);
return toBytes(ack || '');
},
async close() {
if (closed) {
throw closed;
}
closed = Error('Connection closed');
current.delete(conns[l]);
await E(handlers[l])
.onClose(conns[l], undefined, handlers[l])
.catch(rethrowUnlessMissing);
},
});
}
/** @type {MapStore<number, Connection>} */
const conns = detached.mapStore('addrToConnections');
makeHalfConnection(0, 1);
makeHalfConnection(1, 0);
/** @type {Remote<Required<ConnectionHandler>>[]} */
const handlers = harden([handler0, handler1]);
/** @type {Endpoint[]} */
const addrs = harden([addr0, addr1]);
/**
* @param {0|1} l
* @param {0|1} r
*/
const makeHalfConnection = (l, r) => {
conns.init(l, makeConnection({ addrs, handlers, conns, current, l, r }));
};
/**
* @param {number} l local side of the connection
* @param {number} r remote side of the connection
*/
function openHalfConnection(l, r) {
current.add(conns[l]);
const openHalfConnection = (l, r) => {
const lconn = conns.get(l);
current.add(lconn);
if (!finalizer.has(lconn)) {
finalizer.initConnection(lconn, handlers[l]);
}
E(handlers[l])
.onOpen(conns[l], addrs[l], addrs[r], handlers[l])
.onOpen(lconn, addrs[l], addrs[r], handlers[l])
.catch(rethrowUnlessMissing);
}
};
makeHalfConnection(0, 1);
makeHalfConnection(1, 0);
openHalfConnection(0, 1);
openHalfConnection(1, 0);
const [conn0, conn1] = conns;
return [conn0, conn1];
}
return [conns.get(0), conns.get(1)];
};
/**
* Get the list of prefixes from longest to shortest.
*
* @param {string} addr
* @param {import('@agoric/zone').Zone} zone
* @param {(opts: ConnectionOpts) => Connection} makeConnection
* @param {Powers} powers
*/
export function getPrefixes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
const prepareInboundAttempt = (zone, makeConnection, { watch, finalizer }) => {
const makeInboundAttemptKit = zone.exoClassKit(
'InboundAttempt',
Shape.InboundAttemptI,
/**
* @param {object} opts
* @param {string} opts.localAddr
* @param {string} opts.remoteAddr
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections
* @param {string} opts.listenPrefix
* @param {MapStore<Endpoint, [Port, Remote<Required<ListenHandler>>]>} opts.listening
*/
({
localAddr,
remoteAddr,
currentConnections,
listenPrefix,
listening,
}) => {
/** @type {string | undefined} */
let consummated;
/** @type {string[]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
ret.push(prefix);
}
return ret;
}
return {
localAddr,
remoteAddr,
consummated,
currentConnections,
listenPrefix,
listening,
};
},
{
inboundAttempt: {
getLocalAddress() {
// Return address metadata.
return this.state.localAddr;
},
getRemoteAddress() {
return this.state.remoteAddr;
},
async close() {
const { consummated, localAddr, remoteAddr } = this.state;
const { listening, listenPrefix, currentConnections } = this.state;
if (consummated) {
throw Error(consummated);
}
this.state.consummated = 'Already closed';
const [port, listener] = listening.get(listenPrefix);
const current = currentConnections.get(port);
current.delete(this.facets.inboundAttempt);
finalizer.unpin(this.facets.inboundAttempt);
const innerVow = watch(
E(listener).onReject(port, localAddr, remoteAddr, listener),
this.facets.sinkWatcher,
);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
/**
* @param {object} opts
* @param {string} [opts.localAddress]
* @param {string} [opts.remoteAddress]
* @param {Remote<ConnectionHandler>} opts.handler
*/
async accept({ localAddress, remoteAddress, handler: rchandler }) {
const { consummated, localAddr, remoteAddr } = this.state;
const { listening, listenPrefix, currentConnections } = this.state;
if (consummated) {
throw Error(consummated);
}
if (localAddress === undefined) {
localAddress = localAddr;
}
this.state.consummated = `${localAddress} Already accepted`;
if (remoteAddress === undefined) {
remoteAddress = remoteAddr;
}
const [port, listener] = listening.get(listenPrefix);
const current = currentConnections.get(port);
current.delete(this.facets.inboundAttempt);
return watch(
E(listener).onAccept(port, localAddress, remoteAddress, listener),
this.facets.inboundAttemptAcceptWatcher,
{
localAddress,
rchandler,
remoteAddress,
current,
},
);
},
},
inboundAttemptAcceptWatcher: {
onFulfilled(lchandler, watchContext) {
const { localAddress, rchandler, remoteAddress, current } =
watchContext;
return crossoverConnection(
zone,
/** @type {Remote<Required<ConnectionHandler>>} */ (lchandler),
localAddress,
/** @type {Remote<Required<ConnectionHandler>>} */ (rchandler),
remoteAddress,
makeConnection,
finalizer,
current,
)[1];
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
sinkWatcher: {
onFulfilled(_value) {
return undefined;
},
},
},
);
const makeInboundAttempt = ({
localAddr,
remoteAddr,
currentConnections,
listenPrefix,
listening,
}) => {
const { inboundAttempt } = makeInboundAttemptKit({
localAddr,
remoteAddr,
currentConnections,
listenPrefix,
listening,
});
return harden(inboundAttempt);
};
return makeInboundAttempt;
};
/** @enum {typeof RevokeState[keyof typeof RevokeState]} */
const RevokeState = /** @type {const} */ ({
NOT_REVOKED: 0,
REVOKING: 1,
REVOKED: 2,
});
harden(RevokeState);
/**
* Create a protocol that has a handler.
*
* @param {ProtocolHandler} protocolHandler
* @returns {Protocol} the local capability for connecting and listening
* @param {import('@agoric/zone').Zone} zone
* @param {Powers} powers
*/
export function makeNetworkProtocol(protocolHandler) {
/** @type {LegacyMap<Port, Set<Closable>>} */
// Legacy because we're storing a JS Set
const currentConnections = makeLegacyMap('port');
const preparePort = (zone, powers) => {
const makeIncapable = zone.exoClass('Incapable', undefined, () => ({}), {});
const { finalizer, watch, allVows } = powers;
/**
* Currently must be a single listenHandler. TODO: Do something sensible with
* multiple handlers?
*
* @type {MapStore<Endpoint, [Port, ListenHandler]>}
* @param {object} opts
* @param {Endpoint} opts.localAddr
* @param {MapStore<Endpoint, [Port, Remote<Required<ListenHandler>>]>} opts.listening
* @param {SetStore<Remote<Connection>>} opts.openConnections
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections
* @param {MapStore<string, Port>} opts.boundPorts
* @param {Remote<ProtocolHandler>} opts.protocolHandler
* @param {Remote<ProtocolImpl>} opts.protocolImpl
*/
const listening = makeScalarMapStore('localAddr');
/** @type {MapStore<string, Port>} */
const boundPorts = makeScalarMapStore('localAddr');
/** @param {Endpoint} localAddr */
const bind = async localAddr => {
// Check if we are underspecified (ends in slash)
const underspecified = localAddr.endsWith(ENDPOINT_SEPARATOR);
for await (const _ of whileTrue(() => underspecified)) {
const portID = await E(protocolHandler).generatePortID(
localAddr,
protocolHandler,
);
const newAddr = `${localAddr}${portID}`;
if (!boundPorts.has(newAddr)) {
localAddr = newAddr;
break;
}
}
if (boundPorts.has(localAddr)) {
return boundPorts.get(localAddr);
}
/** @enum {number} */
const RevokeState = {
NOT_REVOKED: 0,
REVOKING: 1,
REVOKED: 2,
const initPort = ({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl,
}) => {
return {
listening,
openConnections,
currentConnections,
boundPorts,
localAddr,
protocolHandler,
protocolImpl,
/** @type {RevokeState | undefined} */
revoked: undefined,
};
};
/** @type {RevokeState} */
let revoked = RevokeState.NOT_REVOKED;
const openConnections = new Set();
/** @type {Port} */
const port = Far('Port', {
const makePortKit = zone.exoClassKit('Port', Shape.PortI, initPort, {
port: {
getLocalAddress() {
// Works even after revoke().
return localAddr;
return this.state.localAddr;
},
/** @param {Remote<ListenHandler>} listenHandler */
async addListener(listenHandler) {
!revoked || Fail`Port ${localAddr} is revoked`;
const { revoked, listening, localAddr, protocolHandler } = this.state;
!revoked || Fail`Port ${this.state.localAddr} is revoked`;
listenHandler || Fail`listenHandler is not defined`;
if (listening.has(localAddr)) {

@@ -268,21 +479,34 @@ // Last one wins.

}
listening.set(localAddr, [port, listenHandler]);
listening.set(localAddr, [
this.facets.port,
/** @type {Remote<Required<ListenHandler>>} */ (listenHandler),
]);
E(lhandler).onRemove(lport, lhandler).catch(rethrowUnlessMissing);
} else {
listening.init(localAddr, [port, listenHandler]);
listening.init(
localAddr,
harden([
this.facets.port,
/** @type {Remote<Required<ListenHandler>>} */ (listenHandler),
]),
);
}
// TODO: Check that the listener defines onAccept.
// ASSUME: that the listener defines onAccept.
await E(protocolHandler).onListen(
port,
localAddr,
listenHandler,
protocolHandler,
const innerVow = watch(
E(protocolHandler).onListen(
this.facets.port,
localAddr,
listenHandler,
protocolHandler,
),
this.facets.portAddListenerWatcher,
{ listenHandler },
);
await E(listenHandler)
.onListen(port, listenHandler)
.catch(rethrowUnlessMissing);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
/** @param {Remote<ListenHandler>} listenHandler */
async removeListener(listenHandler) {
const { listening, localAddr, protocolHandler } = this.state;
listening.has(localAddr) || Fail`Port ${localAddr} is not listening`;

@@ -292,38 +516,97 @@ listening.get(localAddr)[1] === listenHandler ||

listening.delete(localAddr);
await E(protocolHandler).onListenRemove(
port,
localAddr,
listenHandler,
protocolHandler,
const innerVow = watch(
E(protocolHandler).onListenRemove(
this.facets.port,
localAddr,
listenHandler,
protocolHandler,
),
this.facets.portRemoveListenerWatcher,
{ listenHandler },
);
await E(listenHandler)
.onRemove(port, listenHandler)
.catch(rethrowUnlessMissing);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
async connect(remotePort, connectionHandler = {}) {
/**
* @param {Endpoint} remotePort
* @param {Remote<ConnectionHandler>} [connectionHandler]
*/
async connect(
remotePort,
connectionHandler = /** @type {Remote<ConnectionHandler>} */ (
makeIncapable()
),
) {
const { revoked, localAddr, protocolImpl } = this.state;
!revoked || Fail`Port ${localAddr} is revoked`;
/** @type {Endpoint} */
const dst = harden(remotePort);
// eslint-disable-next-line no-use-before-define
const conn = await protocolImpl.outbound(port, dst, connectionHandler);
if (revoked) {
void E(conn).close();
} else {
openConnections.add(conn);
}
return conn;
return watch(
E(protocolImpl).outbound(this.facets.port, dst, connectionHandler),
this.facets.portConnectWatcher,
{ chandler: connectionHandler },
);
},
async revoke() {
const { revoked, localAddr } = this.state;
const { protocolHandler } = this.state;
revoked !== RevokeState.REVOKED ||
Fail`Port ${localAddr} is already revoked`;
revoked = RevokeState.REVOKING;
await E(protocolHandler).onRevoke(port, localAddr, protocolHandler);
revoked = RevokeState.REVOKED;
this.state.revoked = RevokeState.REVOKING;
const revokeVow = watch(
E(protocolHandler).onRevoke(
this.facets.port,
localAddr,
protocolHandler,
),
this.facets.portRevokeWatcher,
);
return watch(revokeVow, this.facets.portRevokeCleanupWatcher);
},
},
portAddListenerWatcher: {
onFulfilled(_value, watcherContext) {
const { listenHandler } = watcherContext;
return E(listenHandler).onListen(this.facets.port, listenHandler);
},
},
portRemoveListenerWatcher: {
onFulfilled(_value, watcherContext) {
const { listenHandler } = watcherContext;
return E(listenHandler).onRemove(this.facets.port, listenHandler);
},
},
portConnectWatcher: {
onFulfilled(conn, { chandler }) {
const { openConnections, revoked } = this.state;
if (!finalizer.has(conn)) {
finalizer.initConnection(conn, chandler);
}
if (revoked) {
return finalizer.finalize(conn);
}
openConnections.add(conn);
return conn;
},
},
portRevokeWatcher: {
onFulfilled(_value) {
const { currentConnections, listening, localAddr } = this.state;
const port = this.facets.port;
// Clean up everything we did.
const ps = [...currentConnections.get(port)].map(conn =>
E(conn)
.close()
.catch(_ => {}),
const values = [...currentConnections.get(port).values()];
const ps = [];
ps.push(
...values.map(obj =>
watch(finalizer.finalize(obj), this.facets.sinkWatcher),
),
);
if (listening.has(localAddr)) {

@@ -333,255 +616,1001 @@ const listener = listening.get(localAddr)[1];

}
await Promise.all(ps);
currentConnections.delete(port);
return watch(allVows(ps), this.facets.rethrowUnlessMissingWatcher);
},
},
sinkWatcher: {
onFulfilled() {
return undefined;
},
onRejected() {
return undefined;
},
},
portRevokeCleanupWatcher: {
onFulfilled(_value) {
const { currentConnections, boundPorts, localAddr } = this.state;
this.state.revoked = RevokeState.REVOKED;
currentConnections.delete(this.facets.port);
boundPorts.delete(localAddr);
return `Port ${localAddr} revoked`;
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
});
const makePort = ({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl,
}) => {
const { port } = makePortKit({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl,
});
await E(protocolHandler).onBind(port, localAddr, protocolHandler);
boundPorts.init(localAddr, port);
currentConnections.init(port, new Set());
return port;
return harden(port);
};
/** @type {ProtocolImpl} */
const protocolImpl = Far('ProtocolImpl', {
bind,
async inbound(listenAddr, remoteAddr) {
let lastFailure = Error(`No listeners for ${listenAddr}`);
for await (const listenPrefix of getPrefixes(listenAddr)) {
if (!listening.has(listenPrefix)) {
continue;
}
const [port, listener] = listening.get(listenPrefix);
let localAddr;
await (async () => {
// See if our protocol is willing to receive this connection.
const localInstance = await E(protocolHandler)
.onInstantiate(port, listenPrefix, remoteAddr, protocolHandler)
.catch(rethrowUnlessMissing);
localAddr = localInstance
return makePort;
};
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {Powers} powers
*/
const prepareBinder = (zone, powers) => {
const makeConnection = prepareHalfConnection(zone, powers);
const { watch, finalizer } = powers;
const makeInboundAttempt = prepareInboundAttempt(
zone,
makeConnection,
powers,
);
const makePort = preparePort(zone, powers);
const detached = zone.detached();
const makeFullBinderKit = zone.exoClassKit(
'binder',
{
protocolImpl: Shape.ProtocolImplI,
binder: M.interface('Binder', {
bindPort: M.callWhen(Shape.Endpoint).returns(Shape.Vow$(Shape.Port)),
}),
binderInboundInstantiateWatcher: M.interface(
'BinderInboundInstantiateWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderInboundInstantiateCatchWatcher: M.interface(
'BinderInboundInstantiateCatchWatcher',
{
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundInstantiateWatcher: M.interface(
'BinderOutboundInstantiateWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundConnectWatcher: M.interface(
'BinderOutboundConnectWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundCatchWatcher: M.interface('BinderOutboundCatchWatcher', {
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
binderOutboundInboundWatcher: M.interface(
'BinderOutboundInboundWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundAcceptWatcher: M.interface('BinderOutboundAcceptWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
binderBindGeneratePortWatcher: M.interface(
'BinderBindGeneratePortWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderPortWatcher: M.interface('BinderPortWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
binderBindWatcher: M.interface('BinderBindWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
rethrowUnlessMissingWatcher: M.interface('RethrowUnlessMissingWatcher', {
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
},
/**
* @param {object} opts
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections
* @param {MapStore<string, Port>} opts.boundPorts
* @param {MapStore<Endpoint, [Port, Remote<Required<ListenHandler>>]>} opts.listening
* @param {Remote<ProtocolHandler>} opts.protocolHandler
*/
({ currentConnections, boundPorts, listening, protocolHandler }) => {
/** @type {SetStore<Connection>} */
const openConnections = detached.setStore('openConnections');
return {
currentConnections,
boundPorts,
listening,
revoked: RevokeState.NOT_REVOKED,
openConnections,
protocolHandler,
};
},
{
protocolImpl: {
/**
* @param {Endpoint} listenAddr
* @param {Endpoint} remoteAddr
*/
async inbound(listenAddr, remoteAddr) {
const { listening, protocolHandler } = this.state;
const prefixes = getPrefixes(listenAddr);
let listenPrefixIndex = 0;
let listenPrefix;
while (listenPrefixIndex < prefixes.length) {
listenPrefix = prefixes[listenPrefixIndex];
if (!listening.has(listenPrefix)) {
listenPrefixIndex += 1;
continue;
}
break;
}
if (listenPrefixIndex >= prefixes.length) {
throw Error(`No listeners for ${listenAddr}`);
}
const [port] = listening.get(/** @type {string} **/ (listenPrefix));
const innerVow = watch(
E(
/** @type {Remote<Required<ProtocolHandler>>} */ (
protocolHandler
),
).onInstantiate(
/** @type {Port} **/ (port),
prefixes[listenPrefixIndex],
remoteAddr,
protocolHandler,
),
this.facets.binderInboundInstantiateWatcher,
{
listenAddr,
remoteAddr,
port,
listenPrefixIndex,
},
);
return watch(
innerVow,
this.facets.binderInboundInstantiateCatchWatcher,
{
listenPrefixIndex,
listenAddr,
remoteAddr,
lastFailure: Error(`No listeners for ${listenAddr}`),
},
);
},
/**
* @param {Port} port
* @param {Endpoint} remoteAddr
* @param {ConnectionHandler} lchandler
*/
async outbound(port, remoteAddr, lchandler) {
const { protocolHandler } = this.state;
const localAddr = await E(port).getLocalAddress();
// Allocate a local address.
const instantiateInnerVow = watch(
E(
/** @type {Remote<Required<ProtocolHandler>>} */ (
protocolHandler
),
).onInstantiate(port, localAddr, remoteAddr, protocolHandler),
this.facets.binderOutboundInstantiateWatcher,
{
port,
localAddr,
remoteAddr,
protocolHandler,
},
);
const instantiateVow = watch(
instantiateInnerVow,
this.facets.rethrowUnlessMissingWatcher,
);
const attemptVow = watch(
instantiateVow,
this.facets.binderOutboundInboundWatcher,
{
localAddr,
remoteAddr,
},
);
const acceptedVow = watch(
attemptVow,
this.facets.binderOutboundAcceptWatcher,
{
handler: lchandler,
},
);
return watch(acceptedVow, this.facets.binderOutboundCatchWatcher, {
port,
remoteAddr,
lchandler,
localAddr,
});
},
async bindPort(localAddr) {
return this.facets.binder.bindPort(localAddr);
},
},
binder: {
/** @param {string} localAddr */
async bindPort(localAddr) {
const { protocolHandler } = this.state;
// Check if we are underspecified (ends in slash)
const underspecified = localAddr.endsWith(ENDPOINT_SEPARATOR);
const localAddrVow = watch(
E(protocolHandler).generatePortID(localAddr, protocolHandler),
this.facets.binderBindGeneratePortWatcher,
{
underspecified,
localAddr,
},
);
return watch(localAddrVow, this.facets.binderBindWatcher);
},
},
binderInboundInstantiateWatcher: {
onFulfilled(localInstance, watchContext) {
const { listenAddr, remoteAddr, port, listenPrefixIndex } =
watchContext;
const { listening, currentConnections } = this.state;
const prefixes = getPrefixes(listenAddr);
const localAddr = localInstance
? `${listenAddr}/${localInstance}`
: listenAddr;
})().catch(e => {
lastFailure = e;
});
if (!localAddr) {
continue;
}
// We have a legitimate inbound attempt.
let consummated;
const current = currentConnections.get(port);
const inboundAttempt = Far('InboundAttempt', {
getLocalAddress() {
// Return address metadata.
return localAddr;
},
getRemoteAddress() {
return remoteAddr;
},
async close() {
if (consummated) {
throw consummated;
const current = currentConnections.get(port);
const inboundAttempt = makeInboundAttempt({
localAddr,
remoteAddr,
currentConnections,
listenPrefix: prefixes[listenPrefixIndex],
listening,
});
current.add(inboundAttempt);
finalizer.initCloser(inboundAttempt);
return inboundAttempt;
},
},
binderInboundInstantiateCatchWatcher: {
onRejected(e, watchContext) {
let { lastFailure, listenPrefixIndex } = watchContext;
try {
rethrowUnlessMissing(e);
} catch (innerE) {
lastFailure = innerE;
}
const { listenAddr, remoteAddr } = watchContext;
const { listening, protocolHandler } = this.state;
const prefixes = getPrefixes(listenAddr);
let listenPrefix;
listenPrefixIndex += 1;
while (listenPrefixIndex < prefixes.length) {
listenPrefix = prefixes[listenPrefixIndex];
if (!listening.has(listenPrefix)) {
listenPrefixIndex += 1;
continue;
}
consummated = Error(`Already closed`);
current.delete(inboundAttempt);
await E(listener)
.onReject(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing);
break;
}
if (listenPrefixIndex >= prefixes.length) {
throw lastFailure;
}
const [port] = listening.get(/** @type {string} */ (listenPrefix));
const innerVow = watch(
E(
/** @type {Remote<Required<ProtocolHandler>>} */ (
protocolHandler
),
).onInstantiate(
port,
prefixes[listenPrefixIndex],
remoteAddr,
protocolHandler,
),
this.facets.binderInboundInstantiateWatcher,
{
listenAddr,
remoteAddr,
port,
listenPrefixIndex,
},
);
return watch(
innerVow,
this.facets.binderInboundInstantiateCatchWatcher,
{
...watchContext,
lastFailure,
listenPrefixIndex,
},
);
},
},
binderOutboundInstantiateWatcher: {
onFulfilled(localInstance, watchContext) {
const { localAddr } = watchContext;
return localInstance ? `${localAddr}/${localInstance}` : localAddr;
},
},
binderOutboundConnectWatcher: {
onFulfilled(
{
handler: rchandler,
remoteAddress: negotiatedRemoteAddress,
localAddress: negotiatedLocalAddress,
},
async accept({
localAddress = localAddr,
remoteAddress = remoteAddr,
handler: rchandler,
}) {
if (consummated) {
throw consummated;
}
consummated = Error(`Already accepted`);
current.delete(inboundAttempt);
watchContext,
) {
const {
lastFailure,
lchandler,
localAddr: requestedLocalAddress,
remoteAddr: requestedRemoteAddress,
port,
} = watchContext;
const lchandler = await E(listener).onAccept(
const { currentConnections } = this.state;
if (!rchandler) {
throw lastFailure;
}
const current = currentConnections.get(port);
return crossoverConnection(
zone,
/** @type {Remote<Required<ConnectionHandler>>} */ (lchandler),
negotiatedLocalAddress || requestedLocalAddress,
/** @type {Remote<Required<ConnectionHandler>>} */ (rchandler),
negotiatedRemoteAddress || requestedRemoteAddress,
makeConnection,
finalizer,
current,
)[0];
},
},
binderOutboundCatchWatcher: {
onRejected(e, watchContext) {
let lastFailure;
try {
rethrowUnlessMissing(e);
} catch (innerE) {
lastFailure = innerE;
}
const { port, remoteAddr, lchandler, localAddr } = watchContext;
const { protocolHandler } = this.state;
const connectVow = watch(
E(protocolHandler).onConnect(
port,
localAddr,
remoteAddr,
listener,
);
return crossoverConnection(
lchandler,
localAddress,
rchandler,
remoteAddress,
current,
)[1];
},
});
current.add(inboundAttempt);
return inboundAttempt;
}
throw lastFailure;
},
async outbound(port, remoteAddr, lchandler) {
const localAddr =
/** @type {string} */
(await E(port).getLocalAddress());
protocolHandler,
),
);
// Allocate a local address.
const initialLocalInstance = await E(protocolHandler)
.onInstantiate(port, localAddr, remoteAddr, protocolHandler)
.catch(rethrowUnlessMissing);
const initialLocalAddr = initialLocalInstance
? `${localAddr}/${initialLocalInstance}`
: localAddr;
return watch(connectVow, this.facets.binderOutboundConnectWatcher, {
lastFailure,
remoteAddr,
localAddr,
lchandler,
port,
});
},
},
binderOutboundInboundWatcher: {
onFulfilled(initialLocalAddress, watchContext) {
const { remoteAddr, localAddr } = watchContext;
let lastFailure;
let accepted;
await (async () => {
// Attempt the loopback connection.
const attempt = await protocolImpl.inbound(
remoteAddr,
initialLocalAddr,
);
accepted = await attempt.accept({ handler: lchandler });
})().catch(e => {
lastFailure = e;
});
if (accepted) {
return accepted;
}
if (initialLocalAddress === undefined) {
initialLocalAddress = localAddr;
}
const {
remoteAddress = remoteAddr,
handler: rchandler,
localAddress = localAddr,
} =
/** @type {Partial<AttemptDescription>} */
(
await E(protocolHandler).onConnect(
// Attempt the loopback connection.
return this.facets.protocolImpl.inbound(
remoteAddr,
initialLocalAddress,
);
},
},
binderOutboundAcceptWatcher: {
onFulfilled(attempt, watchContext) {
const { handler } = watchContext;
return E(attempt).accept({ handler });
},
},
binderBindGeneratePortWatcher: {
onFulfilled(portID, watchContext) {
const { localAddr, underspecified } = watchContext;
const { protocolHandler, boundPorts } = this.state;
if (!underspecified) {
return localAddr;
}
const newAddr = `${localAddr}${portID}`;
if (!boundPorts.has(newAddr)) {
return newAddr;
}
return watch(
E(protocolHandler).generatePortID(localAddr, protocolHandler),
this.facets.binderBindGeneratePortWatcher,
watchContext,
);
},
},
binderPortWatcher: {
onFulfilled(_value, watchContext) {
const { port, localAddr } = watchContext;
const { boundPorts, currentConnections } = this.state;
boundPorts.init(localAddr, port);
currentConnections.init(
port,
initialLocalAddr,
remoteAddr,
lchandler,
zone.detached().setStore('connections'),
);
return port;
},
},
binderBindWatcher: {
onFulfilled(localAddr) {
const {
boundPorts,
listening,
openConnections,
currentConnections,
protocolHandler,
)
);
} = this.state;
if (!rchandler) {
throw lastFailure;
}
if (boundPorts.has(localAddr)) {
return boundPorts.get(localAddr);
}
const current = currentConnections.get(port);
return crossoverConnection(
lchandler,
localAddress,
rchandler,
remoteAddress,
current,
)[0];
const port = makePort({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl: this.facets.protocolImpl,
});
return watch(
E(protocolHandler).onBind(port, localAddr, protocolHandler),
this.facets.binderPortWatcher,
{
port,
localAddr,
},
);
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
},
});
);
// Wire up the local protocol to the handler.
void E(protocolHandler).onCreate(protocolImpl, protocolHandler);
const makeBinderKit = ({
currentConnections,
boundPorts,
listening,
protocolHandler,
}) => {
const { protocolImpl, binder } = makeFullBinderKit({
currentConnections,
boundPorts,
listening,
protocolHandler,
});
return harden({ protocolImpl, binder });
};
return makeBinderKit;
};
// Return the user-facing protocol.
return Far('binder', { bind });
}
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {Powers} powers
*/
export const prepareNetworkProtocol = (zone, powers) => {
const makeBinderKit = prepareBinder(zone, powers);
/**
* @param {Remote<ProtocolHandler>} protocolHandler
* @returns {Protocol}
*/
const makeNetworkProtocol = protocolHandler => {
const detached = zone.detached();
/** @type {MapStore<Port, SetStore<Closable>>} */
const currentConnections = detached.mapStore('portToCurrentConnections');
/** @type {MapStore<string, Port>} */
const boundPorts = detached.mapStore('addrToPort');
/** @type {MapStore<Endpoint, [Port, Remote<Required<ListenHandler>>]>} */
const listening = detached.mapStore('listening');
const { binder, protocolImpl } = makeBinderKit({
currentConnections,
boundPorts,
listening,
protocolHandler,
});
// Wire up the local protocol to the handler.
void E(protocolHandler).onCreate(protocolImpl, protocolHandler);
return binder;
};
return makeNetworkProtocol;
};
/**
* Create a ConnectionHandler that just echoes its packets.
*
* @returns {ConnectionHandler}
* @param {import('@agoric/base-zone').Zone} zone
*/
export function makeEchoConnectionHandler() {
let closed;
/** @type {Connection} */
return Far('ConnectionHandler', {
async onReceive(_connection, bytes, _connectionHandler) {
if (closed) {
throw closed;
}
return bytes;
export const prepareEchoConnectionKit = zone => {
const makeEchoConnectionKit = zone.exoClassKit(
'EchoConnectionKit',
{
handler: M.interface('ConnectionHandler', {
onReceive: M.callWhen(
Shape.Connection,
Shape.Bytes,
Shape.ConnectionHandler,
)
.optional(Shape.Opts)
.returns(Shape.Data),
onClose: M.callWhen(Shape.Connection)
.optional(M.any(), Shape.ConnectionHandler)
.returns(M.undefined()),
}),
listener: M.interface('Listener', {
onListen: M.callWhen(Shape.Port, Shape.ListenHandler).returns(
Shape.Vow$(M.undefined()),
),
onAccept: M.callWhen(
Shape.Port,
Shape.Endpoint,
Shape.Endpoint,
Shape.ListenHandler,
).returns(Shape.Vow$(Shape.ConnectionHandler)),
}),
},
async onClose(_connection, _reason, _connectionHandler) {
if (closed) {
throw closed;
}
closed = Error('Connection closed');
() => {
return {
/** @type {string | undefined} */
closed: undefined,
};
},
});
}
{
handler: {
/**
* @param {Connection} _connection
* @param {Bytes} bytes
* @param {ConnectionHandler} _connectionHandler
*/
async onReceive(_connection, bytes, _connectionHandler) {
const { closed } = this.state;
export function makeNonceMaker(prefix = '', suffix = '') {
let nonce = 0;
return async () => {
nonce += 1;
return `${prefix}${nonce}${suffix}`;
if (closed) {
throw Error(closed);
}
return bytes;
},
/**
* @param {Connection} _connection
* @param {CloseReason} [reason]
* @param {ConnectionHandler} [_connectionHandler]
*/
async onClose(_connection, reason, _connectionHandler) {
const { closed } = this.state;
if (closed) {
throw Error(closed);
}
this.state.closed = reason || 'Connection closed';
},
},
listener: {
async onAccept(_port, _localAddr, _remoteAddr, _listenHandler) {
return this.facets.handler;
},
async onListen(port, _listenHandler) {
console.debug(`listening on echo port: ${port}`);
},
},
},
);
return makeEchoConnectionKit;
};
/** @typedef {ReturnType<typeof prepareEchoConnectionKit>} MakeEchoConnectionKit */
/**
* Create a protocol handler that just connects to itself.
*
* @param {import('@agoric/base-zone').Zone} zone
* @param {VowTools} powers
*/
export function prepareLoopbackProtocolHandler(zone, { watch, allVows }) {
const detached = zone.detached();
/** @param {string} [instancePrefix] */
const initHandler = (instancePrefix = 'nonce/') => {
/** @type {MapStore<string, [Remote<Port>, Remote<Required<ListenHandler>>]>} */
const listeners = detached.mapStore('localAddr');
return {
listeners,
portNonce: 0n,
instancePrefix,
instanceNonce: 0n,
};
};
const makeLoopbackProtocolHandlerKit = zone.exoClassKit(
'ProtocolHandler',
Shape.ProtocolHandlerI,
/** @param {string} [instancePrefix] */
initHandler,
{
protocolHandler: {
async onCreate(_impl, _protocolHandler) {
// noop
},
async generatePortID(_localAddr, _protocolHandler) {
this.state.portNonce += 1n;
return `port${this.state.portNonce}`;
},
async onBind(_port, _localAddr, _protocolHandler) {
// noop, for now; Maybe handle a bind?
},
/**
* @param {*} _port
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @returns {import('@agoric/vow').PromiseVow<AttemptDescription>}}
*/
async onConnect(_port, localAddr, remoteAddr) {
const { listeners } = this.state;
const [lport, lhandler] = listeners.get(remoteAddr);
const acceptVow = watch(
E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler),
this.facets.protocolHandlerAcceptWatcher,
);
const instantiateInnerVow = watch(
E(this.facets.protocolHandler).onInstantiate(
lport,
remoteAddr,
localAddr,
this.facets.protocolHandler,
),
this.facets.protocolHandlerInstantiateWatcher,
);
const instantiateVow = watch(
instantiateInnerVow,
this.facets.rethrowUnlessMissingWatcher,
);
return watch(
allVows([acceptVow, instantiateVow]),
this.facets.protocolHandlerConnectWatcher,
);
},
async onInstantiate(_port, _localAddr, _remote, _protocol) {
const { instancePrefix } = this.state;
this.state.instanceNonce += 1n;
return `${instancePrefix}${this.state.instanceNonce}`;
},
async onListen(port, localAddr, listenHandler, _protocolHandler) {
const { listeners } = this.state;
// This implementation has a simple last-one-wins replacement policy.
// Other handlers might use different policies.
if (listeners.has(localAddr)) {
const lhandler = listeners.get(localAddr)[1];
if (lhandler !== listenHandler) {
listeners.set(
localAddr,
harden([
port,
/** @type {Remote<Required<ListenHandler>>} */ (
listenHandler
),
]),
);
}
} else {
listeners.init(
localAddr,
harden([
port,
/** @type {Remote<Required<ListenHandler>>} */ (listenHandler),
]),
);
}
},
/**
* @param {Remote<Port>} port
* @param {Endpoint} localAddr
* @param {Remote<ListenHandler>} listenHandler
* @param {*} _protocolHandler
*/
async onListenRemove(port, localAddr, listenHandler, _protocolHandler) {
const { listeners } = this.state;
const [lport, lhandler] = listeners.get(localAddr);
lport === port || Fail`Port does not match listener on ${localAddr}`;
lhandler === listenHandler ||
Fail`Listen handler does not match listener on ${localAddr}`;
listeners.delete(localAddr);
},
async onRevoke(_port, _localAddr, _protocolHandler) {
// This is an opportunity to clean up resources.
},
},
protocolHandlerAcceptWatcher: {
onFulfilled(rchandler) {
return rchandler;
},
},
protocolHandlerConnectWatcher: {
onFulfilled(results) {
return {
remoteInstance: results[0],
handler: results[1],
};
},
},
protocolHandlerInstantiateWatcher: {
onFulfilled(remoteInstance) {
return remoteInstance;
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
},
);
/** @param {string} [instancePrefix] */
const makeLoopbackProtocolHandler = instancePrefix => {
const { protocolHandler } = makeLoopbackProtocolHandlerKit(instancePrefix);
return harden(protocolHandler);
};
return makeLoopbackProtocolHandler;
}
/**
* Create a protocol handler that just connects to itself.
*
* @param {ProtocolHandler['onInstantiate']} [onInstantiate]
* @returns {ProtocolHandler} The localhost handler
* @param {import('@agoric/base-zone').Zone} zone
* @param {Powers} powers
*/
export function makeLoopbackProtocolHandler(
onInstantiate = makeNonceMaker('nonce/'),
) {
/** @type {MapStore<string, [Port, ListenHandler]>} */
const listeners = makeScalarMapStore('localAddr');
export const preparePortAllocator = (zone, { watch }) =>
zone.exoClass(
'PortAllocator',
M.interface('PortAllocator', {
allocateCustomIBCPort: M.callWhen()
.optional(M.string())
.returns(Shape.Vow$(Shape.Port)),
allocateICAControllerPort: M.callWhen().returns(Shape.Vow$(Shape.Port)),
allocateICQControllerPort: M.callWhen().returns(Shape.Vow$(Shape.Port)),
allocateCustomLocalPort: M.callWhen()
.optional(M.string())
.returns(Shape.Vow$(Shape.Port)),
}),
/**
*
* @param {object} opts
* @param {Protocol} opts.protocol
*/
({ protocol }) => ({ protocol, lastICAPortNum: 0n, lastICQPortNum: 0n }),
{
async allocateCustomIBCPort(specifiedName = '') {
const { state } = this;
let localAddr = `/ibc-port/`;
const makePortID = makeNonceMaker('port');
if (specifiedName) {
throwIfInvalidPortName(specifiedName);
return Far('ProtocolHandler', {
// eslint-disable-next-line no-empty-function
async onCreate(_impl, _protocolHandler) {
// TODO
localAddr = `/ibc-port/custom-${specifiedName}`;
}
// Allocate an IBC port with a unique generated name.
return watch(E(state.protocol).bindPort(localAddr));
},
async allocateICAControllerPort() {
const { state } = this;
state.lastICAPortNum += 1n;
return watch(
E(state.protocol).bindPort(
`/ibc-port/icacontroller-${state.lastICAPortNum}`,
),
);
},
async allocateICQControllerPort() {
const { state } = this;
state.lastICQPortNum += 1n;
return watch(
E(state.protocol).bindPort(
`/ibc-port/icqcontroller-${state.lastICQPortNum}`,
),
);
},
async allocateCustomLocalPort(specifiedName = '') {
const { state } = this;
let localAddr = `/local/`;
if (specifiedName) {
throwIfInvalidPortName(specifiedName);
localAddr = `/local/custom-${specifiedName}`;
}
// Allocate a local port with a unique generated name.
return watch(E(state.protocol).bindPort(localAddr));
},
},
async generatePortID(_protocolHandler) {
return makePortID();
);
/** @typedef {ReturnType<ReturnType<typeof preparePortAllocator>>} PortAllocator */
/**
* Return a package-specific singleton that pins objects until they are
* explicitly unpinned or finalized. It needs to pin objects only because they
* are resources that need to be released.
*
* The reason this functionality wasn't just baked into the other network exos
* is to maintain upgrade-compatible with minimal additional changes.
*
* @param {import('@agoric/base-zone').Zone} zone
* @param {VowTools} vowTools
*/
const prepareFinalizer = (zone, { watch }) => {
/**
* @type {MapStore<{},
* { conn: Remote<Connection>, handler: Remote<Required<ConnectionHandler>>} |
* { closer: Remote<{ close(): PromiseVow<any> }> }
* >}
*/
const objToFinalizerInfo = zone.mapStore('objToFinalizerInfo');
return zone.exo('NetworkFinalizer', undefined, {
has(obj) {
return objToFinalizerInfo.has(obj);
},
async onBind(_port, _localAddr, _protocolHandler) {
// TODO: Maybe handle a bind?
/**
* Add a connection and handler for an `onClose` method to be called upon
* finalization.
* @param {Remote<Connection>} conn
* @param {Remote<Required<ConnectionHandler>>} handler
*/
initConnection(conn, handler) {
objToFinalizerInfo.init(conn, harden({ conn, handler }));
},
async onConnect(_port, localAddr, remoteAddr, _chandler, protocolHandler) {
const [lport, lhandler] = listeners.get(remoteAddr);
const rchandler =
/** @type {ConnectionHandler} */
(await E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler));
// console.log(`rchandler is`, rchandler);
const remoteInstance = await E(protocolHandler)
.onInstantiate(lport, remoteAddr, localAddr, protocolHandler)
.catch(rethrowUnlessMissing);
return {
remoteInstance,
handler: rchandler,
};
/**
* Add an object with a `close` method to be called (such as an
* `inboundAttempt`) upon finalization.
* @param {Remote<{ close(): PromiseVow<any> }>} closer
*/
initCloser(closer) {
objToFinalizerInfo.init(closer, harden({ closer }));
},
onInstantiate,
async onListen(port, localAddr, listenHandler, _protocolHandler) {
// TODO: Implement other listener replacement policies.
if (listeners.has(localAddr)) {
const lhandler = listeners.get(localAddr)[1];
if (lhandler !== listenHandler) {
// Last-one-wins.
listeners.set(localAddr, [port, listenHandler]);
}
} else {
listeners.init(localAddr, [port, listenHandler]);
finalize(obj) {
if (!objToFinalizerInfo.has(obj)) {
return;
}
const disposeInfo = objToFinalizerInfo.get(obj);
if ('conn' in disposeInfo) {
// A connection+handler.
const { conn, handler } = disposeInfo;
objToFinalizerInfo.delete(obj);
return watch(E(handler).onClose(conn, CLOSE_REASON_FINALIZER, handler));
} else if ('closer' in disposeInfo) {
// Just something with a `close` method.
const { closer } = disposeInfo;
objToFinalizerInfo.delete(obj);
return watch(E(closer).close());
}
},
async onListenRemove(port, localAddr, listenHandler, _protocolHandler) {
const [lport, lhandler] = listeners.get(localAddr);
lport === port || Fail`Port does not match listener on ${localAddr}`;
lhandler === listenHandler ||
Fail`Listen handler does not match listener on ${localAddr}`;
listeners.delete(localAddr);
unpin(obj) {
objToFinalizerInfo.delete(obj);
},
async onRevoke(_port, _localAddr, _protocolHandler) {
// TODO: maybe clean up?
},
});
}
};
harden(prepareFinalizer);
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {VowTools} vowTools
* @returns {Powers}
*/
export const prepareNetworkPowers = (zone, vowTools) => {
const finalizer = prepareFinalizer(zone, vowTools);
return harden({ ...vowTools, finalizer });
};
/** @typedef {ReturnType<typeof prepareFinalizer>} Finalizer */
/**
* @import {Endpoint, Port, Protocol, ProtocolHandler} from './types.js';
* @import {PromiseVow, Remote, VowTools} from '@agoric/vow';
*/
/**
* @template T

@@ -11,32 +15,36 @@ * @typedef {object} Router A delimited string router implementation

*/
export const RouterI: import("@endo/patterns").InterfaceGuard<{
getRoutes: import("@endo/patterns").MethodGuard;
register: import("@endo/patterns").MethodGuard;
unregister: import("@endo/patterns").MethodGuard;
}>;
export function prepareRouter<T>(zone: import("@agoric/base-zone").Zone): () => import("@endo/exo").Guarded<{
/** @param {Endpoint} addr */
getRoutes(addr: Endpoint): [string, T][];
/**
* @param {string} prefix
* @param {T} route
*/
register(prefix: string, route: T): void;
/**
* @param {string} prefix
* @param {T} route
*/
unregister(prefix: string, route: T): void;
}>;
export function prepareRouterProtocol(zone: import("@agoric/base-zone").Zone, powers: import("./network.js").Powers, E?: typeof defaultE): () => import("@endo/exo").Guarded<{
/**
* @param {string[]} paths
* @param {Remote<ProtocolHandler>} protocolHandler
*/
registerProtocolHandler(paths: string[], protocolHandler: Remote<ProtocolHandler>): void;
/**
* @param {string} prefix
* @param {Remote<ProtocolHandler>} protocolHandler
*/
unregisterProtocolHandler(prefix: string, protocolHandler: Remote<ProtocolHandler>): void;
/** @param {Endpoint} localAddr */
bindPort(localAddr: Endpoint): Promise<Port | import("@agoric/vow").Vow<Port>>;
}>;
/**
* Create a slash-delimited router.
*
* @template T
* @returns {Router<T>} a new Router
*/
export default function makeRouter<T>(): Router<T>;
/**
* @typedef {object} RouterProtocol
* @property {(prefix: string) => Promise<Port>} bind
* @property {(paths: string[], protocolHandler: ProtocolHandler) => void} registerProtocolHandler
* @property {(prefix: string, protocolHandler: ProtocolHandler) => void} unregisterProtocolHandler
*/
/**
* Create a router that behaves like a Protocol.
*
* @param {typeof defaultE} [E] Eventual sender
* @returns {RouterProtocol} The new delegated protocol
*/
export function makeRouterProtocol(E?: ((<T>(x: T) => import("@endo/eventual-send/src/E.js").ECallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T>>) & {
readonly get: <T_1>(x: T_1) => import("@endo/eventual-send/src/E.js").EGetters<import("@endo/eventual-send").LocalRecord<T_1>>;
readonly resolve: {
(): Promise<void>;
<T_2>(value: T_2): Promise<Awaited<T_2>>;
<T_3>(value: T_3 | PromiseLike<T_3>): Promise<Awaited<T_3>>;
};
readonly sendOnly: <T_4>(x: T_4) => import("@endo/eventual-send/src/E.js").ESendOnlyCallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T_4>>;
readonly when: <T_5, U = T_5>(x: T_5 | PromiseLike<T_5>, onfulfilled?: ((value: T_5) => import("@endo/eventual-send").ERef<U>) | undefined, onrejected?: ((reason: any) => import("@endo/eventual-send").ERef<U>) | undefined) => Promise<U>;
}) | undefined): RouterProtocol;
/**
* A delimited string router implementation

@@ -62,6 +70,12 @@ */

export type RouterProtocol = {
bind: (prefix: string) => Promise<Port>;
bindPort: (prefix: string) => PromiseVow<Port>;
registerProtocolHandler: (paths: string[], protocolHandler: ProtocolHandler) => void;
unregisterProtocolHandler: (prefix: string, protocolHandler: ProtocolHandler) => void;
};
import type { Endpoint } from './types.js';
import { E as defaultE } from '@endo/far';
import type { ProtocolHandler } from './types.js';
import type { Remote } from '@agoric/vow';
import type { Port } from './types.js';
import type { PromiseVow } from '@agoric/vow';
//# sourceMappingURL=router.d.ts.map

@@ -1,10 +0,18 @@

import { Far, E as defaultE } from '@endo/far';
import { makeScalarMapStore } from '@agoric/store';
import { Fail } from '@agoric/assert';
import { makeNetworkProtocol, ENDPOINT_SEPARATOR } from './network.js';
// @ts-check
import '@agoric/store/exported.js';
/// <reference types="@agoric/store/exported.js" />
/// <reference path="./types.js" />
import { Fail } from '@endo/errors';
import { E as defaultE } from '@endo/far';
import { M } from '@endo/patterns';
import { ENDPOINT_SEPARATOR, prepareNetworkProtocol } from './network.js';
import { Shape } from './shapes.js';
/**
* @import {Endpoint, Port, Protocol, ProtocolHandler} from './types.js';
* @import {PromiseVow, Remote, VowTools} from '@agoric/vow';
*/
/**
* @template T

@@ -20,47 +28,77 @@ * @typedef {object} Router A delimited string router implementation

export const RouterI = M.interface('Router', {
getRoutes: M.call(Shape.Endpoint).returns(M.arrayOf([M.string(), M.any()])),
register: M.call(M.string(), M.any()).returns(M.undefined()),
unregister: M.call(M.string(), M.any()).returns(M.undefined()),
});
/**
* Create a slash-delimited router.
*
* @template T
* @returns {Router<T>} a new Router
* @param {import('@agoric/base-zone').Zone} zone
*/
export default function makeRouter() {
/** @type {MapStore<string, T>} */
const prefixToRoute = makeScalarMapStore('prefix');
return Far('Router', {
getRoutes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
/** @type {[string, T][]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
if (prefixToRoute.has(prefix)) {
ret.push([prefix, prefixToRoute.get(prefix)]);
export const prepareRouter = zone => {
const detached = zone.detached();
const makeRouter = zone.exoClass(
'Router',
RouterI,
() => {
/** @type {MapStore<string, T>} */
const prefixToRoute = detached.mapStore('prefix');
return {
prefixToRoute,
};
},
{
/** @param {Endpoint} addr */
getRoutes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
/** @type {[string, T][]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
if (this.state.prefixToRoute.has(prefix)) {
ret.push([prefix, this.state.prefixToRoute.get(prefix)]);
}
// Trim off the last value (after the slash).
const defaultPrefix = prefix.slice(
0,
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1,
);
if (this.state.prefixToRoute.has(defaultPrefix)) {
ret.push([
defaultPrefix,
this.state.prefixToRoute.get(defaultPrefix),
]);
}
}
// Trim off the last value (after the slash).
const defaultPrefix = prefix.substr(
0,
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1,
);
if (prefixToRoute.has(defaultPrefix)) {
ret.push([defaultPrefix, prefixToRoute.get(defaultPrefix)]);
}
}
return harden(ret);
return harden(ret);
},
/**
* @param {string} prefix
* @param {T} route
*/
register(prefix, route) {
this.state.prefixToRoute.init(prefix, route);
},
/**
* @param {string} prefix
* @param {T} route
*/
unregister(prefix, route) {
this.state.prefixToRoute.get(prefix) === route ||
Fail`Router is not registered at prefix ${prefix}`;
this.state.prefixToRoute.delete(prefix);
},
},
register(prefix, route) {
prefixToRoute.init(prefix, route);
},
unregister(prefix, route) {
prefixToRoute.get(prefix) === route ||
Fail`Router is not registered at prefix ${prefix}`;
prefixToRoute.delete(prefix);
},
});
}
);
return makeRouter;
};
/**
* @typedef {object} RouterProtocol
* @property {(prefix: string) => Promise<Port>} bind
* @property {(prefix: string) => PromiseVow<Port>} bindPort
* @property {(paths: string[], protocolHandler: ProtocolHandler) => void} registerProtocolHandler

@@ -73,44 +111,79 @@ * @property {(prefix: string, protocolHandler: ProtocolHandler) => void} unregisterProtocolHandler

*
* @param {import('@agoric/base-zone').Zone} zone
* @param {import('./network.js').Powers} powers
* @param {typeof defaultE} [E] Eventual sender
* @returns {RouterProtocol} The new delegated protocol
*/
export function makeRouterProtocol(E = defaultE) {
const router = makeRouter();
/** @type {MapStore<string, Protocol>} */
const protocols = makeScalarMapStore('prefix');
/** @type {MapStore<string, ProtocolHandler>} */
const protocolHandlers = makeScalarMapStore('prefix');
export const prepareRouterProtocol = (zone, powers, E = defaultE) => {
const detached = zone.detached();
function registerProtocolHandler(paths, protocolHandler) {
const protocol = makeNetworkProtocol(protocolHandler);
for (const prefix of paths) {
router.register(prefix, protocol);
protocols.init(prefix, protocol);
protocolHandlers.init(prefix, protocolHandler);
}
}
const makeRouter = prepareRouter(zone);
const makeNetworkProtocol = prepareNetworkProtocol(zone, powers);
// FIXME: Buggy.
// Needs to account for multiple paths.
function unregisterProtocolHandler(prefix, protocolHandler) {
const ph = protocolHandlers.get(prefix);
ph === protocolHandler ||
Fail`Protocol handler is not registered at prefix ${prefix}`;
router.unregister(prefix, ph);
protocols.delete(prefix);
protocolHandlers.delete(prefix);
}
const makeRouterProtocol = zone.exoClass(
'RouterProtocol',
M.interface('RouterProtocol', {
registerProtocolHandler: M.call(
M.arrayOf(M.string()),
M.remotable(),
).returns(),
unregisterProtocolHandler: M.call(M.string(), M.remotable()).returns(),
bindPort: M.callWhen(Shape.Endpoint).returns(Shape.Vow$(Shape.Port)),
}),
() => {
/** @type {Router<Protocol>} */
const router = makeRouter();
/** @type {Protocol['bind']} */
async function bind(localAddr) {
const [route] = router.getRoutes(localAddr);
route !== undefined || Fail`No registered router for ${localAddr}`;
return E(route[1]).bind(localAddr);
}
/** @type {MapStore<string, Protocol>} */
const protocols = detached.mapStore('prefix');
return Far('RouterProtocol', {
bind,
registerProtocolHandler,
unregisterProtocolHandler,
});
}
/** @type {MapStore<string, Remote<ProtocolHandler>>} */
const protocolHandlers = detached.mapStore('prefix');
return {
router,
protocolHandlers,
protocols,
};
},
{
/**
* @param {string[]} paths
* @param {Remote<ProtocolHandler>} protocolHandler
*/
registerProtocolHandler(paths, protocolHandler) {
const protocol = makeNetworkProtocol(protocolHandler);
for (const prefix of paths) {
this.state.router.register(prefix, protocol);
this.state.protocols.init(prefix, protocol);
this.state.protocolHandlers.init(prefix, protocolHandler);
}
},
// FIXME: Buggy.
// Needs to account for multiple paths.
/**
* @param {string} prefix
* @param {Remote<ProtocolHandler>} protocolHandler
*/
unregisterProtocolHandler(prefix, protocolHandler) {
const ph = this.state.protocolHandlers.get(prefix);
ph === protocolHandler ||
Fail`Protocol handler is not registered at prefix ${prefix}`;
// TODO: unmap protocol handlers to their corresponding protocol
// e.g. using a map
// before unregistering
// @ts-expect-error note FIXME above
this.state.router.unregister(prefix, ph);
this.state.protocols.delete(prefix);
this.state.protocolHandlers.delete(prefix);
},
/** @param {Endpoint} localAddr */
async bindPort(localAddr) {
const [route] = this.state.router.getRoutes(localAddr);
route !== undefined || Fail`No registered router for ${localAddr}`;
return E(route[1]).bindPort(localAddr);
},
},
);
return makeRouterProtocol;
};

@@ -1,21 +0,31 @@

type Data = string | Buffer | ArrayBuffer;
type Bytes = string;
/**
* Rearrange the exo types to make a cast of the methods (M) and init function (I) to a specific type.
*/
export type ExoClassMethods<M extends import("@endo/exo").Methods, I extends (...args: any[]) => any> = M & ThisType<{
self: import("@endo/exo").Guarded<M>;
state: ReturnType<I>;
}>;
/**
* Each character code carries 8-bit octets. Eventually we want to use passable Uint8Arrays.
*/
export type Bytes = string;
/**
* A local or remote address See multiaddr.js for an
* opinionated router implementation
*/
type Endpoint = string;
export type Endpoint = string;
/**
* A closable object
*/
type Closable = {
export type ClosableI = {
/**
* Terminate the object
*/
close: () => Promise<void>;
close: () => PromiseVow<void>;
};
export type Closable = RemotableObject & ClosableI;
/**
* The network Protocol
*/
type Protocol = {
export type Protocol = {
/**

@@ -25,3 +35,3 @@ * Claim a port, or if

*/
bind: (prefix: Endpoint) => Promise<Port>;
bindPort: (prefix: Endpoint) => PromiseVow<Port>;
};

@@ -31,3 +41,3 @@ /**

*/
type Port = {
export type Port = {
/**

@@ -41,11 +51,11 @@ * Get the locally bound name of this

*/
addListener: (acceptHandler: ListenHandler) => Promise<void>;
addListener: (acceptHandler: Remote<ListenHandler>) => PromiseVow<void>;
/**
* Make an outbound connection
*/
connect: (remote: Endpoint, connectionHandler?: ConnectionHandler) => Promise<Connection>;
connect: (remote: Endpoint, connectionHandler?: Remote<ConnectionHandler>) => PromiseVow<Connection>;
/**
* Remove the currently-bound listener
*/
removeListener: (acceptHandler: ListenHandler) => Promise<void>;
removeListener: (acceptHandler: Remote<ListenHandler>) => PromiseVow<void>;
/**

@@ -55,3 +65,3 @@ * Deallocate the port entirely, removing all

*/
revoke: () => void;
revoke: () => PromiseVow<void>;
};

@@ -61,20 +71,19 @@ /**

*/
type ListenHandler = {
export type ListenHandler = {
/**
* The
* listener has been registered
* The listener has been registered
*/
onListen?: ((port: Port, l: ListenHandler) => Promise<void>) | undefined;
onListen?: ((port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
/**
* A new connection is incoming
*/
onAccept: (port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>;
onAccept: (port: Remote<Port>, localAddr: Endpoint, remoteAddr: Endpoint, l: Remote<ListenHandler>) => PromiseVow<Remote<ConnectionHandler>>;
/**
* The connection was rejected
*/
onReject?: ((port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<void>) | undefined;
onReject?: ((port: Remote<Port>, localAddr: Endpoint, remoteAddr: Endpoint, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
/**
* There was an error while listening
*/
onError?: ((port: Port, rej: any, l: ListenHandler) => Promise<void>) | undefined;
onError?: ((port: Remote<Port>, rej: any, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
/**

@@ -84,13 +93,13 @@ * The

*/
onRemove?: ((port: Port, l: ListenHandler) => Promise<void>) | undefined;
onRemove?: ((port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
};
type Connection = {
export type ConnectionI = {
/**
* Send a packet on the connection
*/
send: (packetBytes: Data, opts?: Record<string, any>) => Promise<Bytes>;
send: (packetBytes: Bytes, opts?: Record<string, any>) => PromiseVow<Bytes>;
/**
* Close both ends of the connection
*/
close: () => Promise<void>;
close: () => PromiseVow<void>;
/**

@@ -106,18 +115,19 @@ * Get the locally bound name of this

};
export type Connection = RemotableObject & ConnectionI;
/**
* A handler for a given Connection
*/
type ConnectionHandler = {
export type ConnectionHandler = {
/**
* The connection has been opened
*/
onOpen?: ((connection: Connection, localAddr: Endpoint, remoteAddr: Endpoint, c: ConnectionHandler) => void) | undefined;
onOpen?: ((connection: Remote<Connection>, localAddr: Endpoint, remoteAddr: Endpoint, c: Remote<ConnectionHandler>) => PromiseVow<void>) | undefined;
/**
* The connection received a packet
*/
onReceive?: ((connection: Connection, packetBytes: Bytes, c: ConnectionHandler, opts?: Record<string, any>) => Promise<Data>) | undefined;
onReceive?: ((connection: Remote<Connection>, ack: Bytes, c: Remote<ConnectionHandler>, opts?: Record<string, any>) => PromiseVow<Bytes>) | undefined;
/**
* The connection has been closed
*/
onClose?: ((connection: Connection, reason?: CloseReason, c?: ConnectionHandler) => Promise<void>) | undefined;
onClose?: ((connection: Remote<Connection>, reason?: CloseReason, c?: Remote<ConnectionHandler>) => PromiseVow<void>) | undefined;
};

@@ -127,5 +137,5 @@ /**

*/
type CloseReason = any | null;
type AttemptDescription = {
handler: ConnectionHandler;
export type CloseReason = any | null;
export type AttemptDescription = {
handler: Remote<ConnectionHandler>;
remoteAddress?: string | undefined;

@@ -138,35 +148,35 @@ localAddress?: string | undefined;

*/
type ProtocolHandler = {
export type ProtocolHandler = {
/**
* This protocol is created
*/
onCreate: (protocol: ProtocolImpl, p: ProtocolHandler) => Promise<void>;
onCreate: (protocol: Remote<ProtocolImpl>, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* Create a fresh port identifier for this protocol
*/
generatePortID: (localAddr: Endpoint, p: ProtocolHandler) => Promise<string>;
generatePortID: (localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<string>;
/**
* A port will be bound
*/
onBind: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>;
onBind: (port: Remote<Port>, localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* A port was listening
*/
onListen: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>;
onListen: (port: Remote<Port>, localAddr: Endpoint, listenHandler: Remote<ListenHandler>, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* A port listener has been reset
*/
onListenRemove: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>;
onListenRemove: (port: Remote<Port>, localAddr: Endpoint, listenHandler: Remote<ListenHandler>, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* Return unique suffix for local address
*/
onInstantiate?: ((port: Port, localAddr: Endpoint, remote: Endpoint, p: ProtocolHandler) => Promise<Endpoint>) | undefined;
onInstantiate?: ((port: Remote<Port>, localAddr: Endpoint, remote: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<Endpoint>) | undefined;
/**
* A port initiates an outbound connection
*/
onConnect: (port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise<AttemptDescription>;
onConnect: (port: Remote<Port>, localAddr: Endpoint, remote: Endpoint, c: Remote<ConnectionHandler>, p: Remote<ProtocolHandler>) => PromiseVow<AttemptDescription>;
/**
* The port is being completely destroyed
*/
onRevoke: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>;
onRevoke: (port: Remote<Port>, localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<void>;
};

@@ -176,7 +186,7 @@ /**

*/
type InboundAttempt = {
export type InboundAttempt = {
/**
* Establish the connection
*/
accept: (desc: AttemptDescription) => Promise<Connection>;
accept: (desc: AttemptDescription) => PromiseVow<Connection>;
/**

@@ -195,3 +205,3 @@ * Return the local address for this

*/
close: () => Promise<void>;
close: () => PromiseVow<void>;
};

@@ -201,3 +211,3 @@ /**

*/
type ProtocolImpl = {
export type ProtocolImpl = {
/**

@@ -207,12 +217,15 @@ * Claim a port, or if

*/
bind: (prefix: Endpoint) => Promise<Port>;
bindPort: (prefix: Endpoint) => PromiseVow<Remote<Port>>;
/**
* Make an attempt to connect into this protocol
*/
inbound: (listenAddr: Endpoint, remoteAddr: Endpoint) => Promise<InboundAttempt>;
inbound: (listenAddr: Endpoint, remoteAddr: Endpoint) => PromiseVow<InboundAttempt>;
/**
* Create an outbound connection
*/
outbound: (port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise<Connection>;
outbound: (port: Remote<Port>, remoteAddr: Endpoint, connectionHandler: Remote<ConnectionHandler>) => PromiseVow<Connection>;
};
import type { PromiseVow } from '@agoric/vow';
import type { RemotableObject } from '@endo/pass-style';
import type { Remote } from '@agoric/vow';
//# sourceMappingURL=types.d.ts.map

@@ -0,8 +1,23 @@

// @ts-check
// Ensure this is a module.
export {};
/**
* @typedef {string | Buffer | ArrayBuffer} Data
*
* @typedef {string} Bytes
* @import {Passable, RemotableObject} from '@endo/pass-style';
* @import {PromiseVow, Remote} from '@agoric/vow';
*/
/**
* @template {import('@endo/exo').Methods} M
* @template {(...args: any[]) => any} I
* @typedef {M & ThisType<{ self: import('@endo/exo').Guarded<M>, state: ReturnType<I> }>} ExoClassMethods
* Rearrange the exo types to make a cast of the methods (M) and init function (I) to a specific type.
*/
/**
* @typedef {string} Bytes Each character code carries 8-bit octets. Eventually we want to use passable Uint8Arrays.
*/
/**
* @typedef {string} Endpoint A local or remote address See multiaddr.js for an

@@ -13,9 +28,12 @@ * opinionated router implementation

/**
* @typedef {object} Closable A closable object
* @property {() => Promise<void>} close Terminate the object
* @typedef {object} ClosableI A closable object
* @property {() => PromiseVow<void>} close Terminate the object
*/
/**
* @typedef {RemotableObject & ClosableI} Closable
*/
/**
* @typedef {object} Protocol The network Protocol
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if
* @property {(prefix: Endpoint) => PromiseVow<Port>} bindPort Claim a port, or if
* ending in ENDPOINT_SEPARATOR, a fresh name

@@ -28,12 +46,12 @@ */

* port
* @property {(acceptHandler: ListenHandler) => Promise<void>} addListener
* @property {(acceptHandler: Remote<ListenHandler>) => PromiseVow<void>} addListener
* Begin accepting incoming connections
* @property {(
* remote: Endpoint,
* connectionHandler?: ConnectionHandler,
* ) => Promise<Connection>} connect
* connectionHandler?: Remote<ConnectionHandler>,
* ) => PromiseVow<Connection>} connect
* Make an outbound connection
* @property {(acceptHandler: ListenHandler) => Promise<void>} removeListener
* @property {(acceptHandler: Remote<ListenHandler>) => PromiseVow<void>} removeListener
* Remove the currently-bound listener
* @property {() => void} revoke Deallocate the port entirely, removing all
* @property {() => PromiseVow<void>} revoke Deallocate the port entirely, removing all
* listeners and closing all active connections

@@ -44,21 +62,20 @@ */

* @typedef {object} ListenHandler A handler for incoming connections
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onListen] The
* listener has been registered
* @property {(port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>} [onListen] The listener has been registered
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remoteAddr: Endpoint,
* l: ListenHandler,
* ) => Promise<ConnectionHandler>} onAccept
* l: Remote<ListenHandler>,
* ) => PromiseVow<Remote<ConnectionHandler>>} onAccept
* A new connection is incoming
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remoteAddr: Endpoint,
* l: ListenHandler,
* ) => Promise<void>} [onReject]
* l: Remote<ListenHandler>,
* ) => PromiseVow<void>} [onReject]
* The connection was rejected
* @property {(port: Port, rej: any, l: ListenHandler) => Promise<void>} [onError]
* @property {(port: Remote<Port>, rej: any, l: Remote<ListenHandler>) => PromiseVow<void>} [onError]
* There was an error while listening
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onRemove] The
* @property {(port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>} [onRemove] The
* listener has been removed

@@ -68,9 +85,9 @@ */

/**
* @typedef {object} Connection
* @typedef {object} ConnectionI
* @property {(
* packetBytes: Data,
* packetBytes: Bytes,
* opts?: Record<string, any>,
* ) => Promise<Bytes>} send
* ) => PromiseVow<Bytes>} send
* Send a packet on the connection
* @property {() => Promise<void>} close Close both ends of the connection
* @property {() => PromiseVow<void>} close Close both ends of the connection
* @property {() => Endpoint} getLocalAddress Get the locally bound name of this

@@ -80,2 +97,5 @@ * connection

*/
/**
* @typedef {RemotableObject & ConnectionI} Connection
*/

@@ -85,20 +105,20 @@ /**

* @property {(
* connection: Connection,
* connection: Remote<Connection>,
* localAddr: Endpoint,
* remoteAddr: Endpoint,
* c: ConnectionHandler,
* ) => void} [onOpen]
* c: Remote<ConnectionHandler>,
* ) => PromiseVow<void>} [onOpen]
* The connection has been opened
* @property {(
* connection: Connection,
* packetBytes: Bytes,
* c: ConnectionHandler,
* connection: Remote<Connection>,
* ack: Bytes,
* c: Remote<ConnectionHandler>,
* opts?: Record<string, any>,
* ) => Promise<Data>} [onReceive]
* ) => PromiseVow<Bytes>} [onReceive]
* The connection received a packet
* @property {(
* connection: Connection,
* connection: Remote<Connection>,
* reason?: CloseReason,
* c?: ConnectionHandler,
* ) => Promise<void>} [onClose]
* c?: Remote<ConnectionHandler>,
* ) => PromiseVow<void>} [onClose]
* The connection has been closed

@@ -111,3 +131,3 @@ *

* @typedef {object} AttemptDescription
* @property {ConnectionHandler} handler
* @property {Remote<ConnectionHandler>} handler
* @property {Endpoint} [remoteAddress]

@@ -120,50 +140,50 @@ * @property {Endpoint} [localAddress]

* implementation will invoke
* @property {(protocol: ProtocolImpl, p: ProtocolHandler) => Promise<void>} onCreate
* @property {(protocol: Remote<ProtocolImpl>, p: Remote<ProtocolHandler>) => PromiseVow<void>} onCreate
* This protocol is created
* @property {(localAddr: Endpoint, p: ProtocolHandler) => Promise<string>} generatePortID
* @property {(localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<string>} generatePortID
* Create a fresh port identifier for this protocol
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* p: ProtocolHandler,
* ) => Promise<void>} onBind
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onBind
* A port will be bound
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* listenHandler: ListenHandler,
* p: ProtocolHandler,
* ) => Promise<void>} onListen
* listenHandler: Remote<ListenHandler>,
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onListen
* A port was listening
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* listenHandler: ListenHandler,
* p: ProtocolHandler,
* ) => Promise<void>} onListenRemove
* listenHandler: Remote<ListenHandler>,
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onListenRemove
* A port listener has been reset
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remote: Endpoint,
* p: ProtocolHandler,
* ) => Promise<Endpoint>} [onInstantiate]
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<Endpoint>} [onInstantiate]
* Return unique suffix for local address
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remote: Endpoint,
* c: ConnectionHandler,
* p: ProtocolHandler,
* ) => Promise<AttemptDescription>} onConnect
* c: Remote<ConnectionHandler>,
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<AttemptDescription>} onConnect
* A port initiates an outbound connection
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* p: ProtocolHandler,
* ) => Promise<void>} onRevoke
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onRevoke
* The port is being completely destroyed
*
* @typedef {object} InboundAttempt An inbound connection attempt
* @property {(desc: AttemptDescription) => Promise<Connection>} accept
* @property {(desc: AttemptDescription) => PromiseVow<Connection>} accept
* Establish the connection

@@ -174,6 +194,6 @@ * @property {() => Endpoint} getLocalAddress Return the local address for this

* this attempt
* @property {() => Promise<void>} close Abort the attempt
* @property {() => PromiseVow<void>} close Abort the attempt
*
* @typedef {object} ProtocolImpl Things the protocol can do for us
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if
* @property {(prefix: Endpoint) => PromiseVow<Remote<Port>>} bindPort Claim a port, or if
* ending in ENDPOINT_SEPARATOR, a fresh name

@@ -183,10 +203,10 @@ * @property {(

* remoteAddr: Endpoint,
* ) => Promise<InboundAttempt>} inbound
* ) => PromiseVow<InboundAttempt>} inbound
* Make an attempt to connect into this protocol
* @property {(
* port: Port,
* port: Remote<Port>,
* remoteAddr: Endpoint,
* connectionHandler: ConnectionHandler,
* ) => Promise<Connection>} outbound
* connectionHandler: Remote<ConnectionHandler>,
* ) => PromiseVow<Connection>} outbound
* Create an outbound connection
*/

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc