New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@agoric/network

Package Overview
Dependencies
Maintainers
9
Versions
1039
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@agoric/network - npm Package Compare versions

Comparing version 0.1.1-dev-f85d35b.0 to 0.1.1-dev-f8ea4a8.0

src/shapes.d.ts

3

./src/index.js
export * from './network.js';
export { default as makeRouter, makeRouterProtocol } from './router.js';
export * from './shapes.js';
export { prepareRouter, prepareRouterProtocol } from './router.js';
export * from './multiaddr.js';
export * from './bytes.js';
{
"name": "@agoric/network",
"version": "0.1.1-dev-f85d35b.0+f85d35b",
"version": "0.1.1-dev-f8ea4a8.0+f8ea4a8",
"description": "Agoric's network protocol API",

@@ -24,14 +24,18 @@ "type": "module",

"dependencies": {
"@agoric/assert": "0.6.1-dev-f85d35b.0+f85d35b",
"@agoric/internal": "0.3.3-dev-f85d35b.0+f85d35b",
"@agoric/store": "0.9.3-dev-f85d35b.0+f85d35b",
"@endo/base64": "^1.0.0",
"@endo/far": "^1.0.1",
"@endo/promise-kit": "^1.0.1"
"@agoric/assert": "0.6.1-dev-f8ea4a8.0+f8ea4a8",
"@agoric/internal": "0.3.3-dev-f8ea4a8.0+f8ea4a8",
"@agoric/store": "0.9.3-dev-f8ea4a8.0+f8ea4a8",
"@agoric/vat-data": "0.5.3-dev-f8ea4a8.0+f8ea4a8",
"@endo/base64": "^1.0.3",
"@endo/far": "^1.1.0",
"@endo/patterns": "^1.3.0",
"@endo/promise-kit": "^1.1.0"
},
"devDependencies": {
"@agoric/swingset-vat": "0.32.3-dev-f85d35b.0+f85d35b",
"@endo/bundle-source": "^3.0.1",
"@agoric/swingset-liveslots": "0.10.3-dev-f8ea4a8.0+f8ea4a8",
"@agoric/swingset-vat": "0.32.3-dev-f8ea4a8.0+f8ea4a8",
"@agoric/zone": "0.2.3-dev-f8ea4a8.0+f8ea4a8",
"@endo/bundle-source": "^3.2.1",
"ava": "^5.3.0",
"c8": "^7.13.0"
"c8": "^9.1.0"
},

@@ -55,3 +59,3 @@ "exports": {

"engines": {
"node": ">=14.15.0"
"node": "^18.12 || ^20.9"
},

@@ -69,5 +73,5 @@ "ava": {

"typeCoverage": {
"atLeast": 76.57
"atLeast": 91.68
},
"gitHead": "f85d35b675741f504026f4801071e052118c5538"
"gitHead": "f8ea4a8351d880722c3b33824402daed2716e47d"
}
/**
* Convert some data to bytes.
* Convert a Uint8Array or other sequence of octets to a string representation
* that `@endo/marshal` accepts as Passable.
*
* @param {Data} data
* @param {ByteSource} byteSource
* @returns {Bytes}
*/
export function toBytes(data: Data): Bytes;
export function toBytes(byteSource: ByteSource): Bytes;
/**

@@ -18,8 +19,8 @@ * Convert bytes to a String.

*
* @param {Data} data
* @param {ByteSource} byteSource
* @returns {string} base64 encoding
*/
export function dataToBase64(data: Data): string;
export function dataToBase64(byteSource: ByteSource): string;
/**
* Decodes a string into base64.
* Decodes a base64 string into bytes.
*

@@ -30,2 +31,3 @@ * @param {string} string Base64-encoded string

export function base64ToBytes(string: string): Bytes;
export type ByteSource = Bytes | Buffer | Uint8Array | Iterable<number>;
//# sourceMappingURL=bytes.d.ts.map

@@ -0,23 +1,41 @@

// @ts-check
/// <reference path="./types.js" />
import { Fail } from '@agoric/assert';
import { encodeBase64, decodeBase64 } from '@endo/base64';
/* eslint-disable no-bitwise */
/** @typedef {Bytes | Buffer | Uint8Array | Iterable<number>} ByteSource */
/**
* Convert some data to bytes.
* @param {ByteSource} contents
*/
const coerceToByteArray = contents => {
if (typeof contents === 'string') {
return Uint8Array.from(contents, c => {
const b = c.charCodeAt(0);
b <= 0xff || Fail`character cannot be coerced to an octet: ${c}`;
return b;
});
} else if (contents instanceof Uint8Array) {
// Reconstruct to ensure we have a Uint8Array and not a Buffer.
return new Uint8Array(
contents.buffer,
contents.byteOffset,
contents.byteLength,
);
}
return new Uint8Array(contents);
};
/**
* Convert a Uint8Array or other sequence of octets to a string representation
* that `@endo/marshal` accepts as Passable.
*
* @param {Data} data
* @param {ByteSource} byteSource
* @returns {Bytes}
*/
export function toBytes(data) {
/** @type {Data | number[]} */
let bytes = data;
// TODO: We really need marshallable TypedArrays.
if (typeof bytes === 'string') {
bytes = bytes.split('').map(c => c.charCodeAt(0));
}
export function toBytes(byteSource) {
// We return the raw characters in the lower half of
// the String's representation.
const buf = new Uint8Array(bytes);
return String.fromCharCode.apply(null, buf);
const buf = coerceToByteArray(byteSource);
return String.fromCharCode(...buf);
}

@@ -38,16 +56,7 @@

*
* @param {Data} data
* @param {ByteSource} byteSource
* @returns {string} base64 encoding
*/
export function dataToBase64(data) {
/** @type {Uint8Array} */
let bytes;
if (typeof data === 'string') {
bytes = new Uint8Array(data.length);
for (let i = 0; i < data.length; i += 1) {
bytes[i] = data.charCodeAt(i);
}
} else {
bytes = new Uint8Array(data);
}
export function dataToBase64(byteSource) {
const bytes = coerceToByteArray(byteSource);
return encodeBase64(bytes);

@@ -57,3 +66,3 @@ }

/**
* Decodes a string into base64.
* Decodes a base64 string into bytes.
*

@@ -60,0 +69,0 @@ * @param {string} string Base64-encoded string

export * from "./network.js";
export * from "./shapes.js";
export * from "./multiaddr.js";
export * from "./bytes.js";
export { default as makeRouter, makeRouterProtocol } from "./router.js";
export { prepareRouter, prepareRouterProtocol } from "./router.js";
//# sourceMappingURL=index.d.ts.map
export * from './network.js';
export { default as makeRouter, makeRouterProtocol } from './router.js';
export * from './shapes.js';
export { prepareRouter, prepareRouterProtocol } from './router.js';
export * from './multiaddr.js';
export * from './bytes.js';
/**
* @param {ConnectionHandler} handler0
* @param {Endpoint} addr0
* @param {ConnectionHandler} handler1
* @param {Endpoint} addr1
* @param {WeakSet<Connection>} [current]
* @returns {[Connection, Connection]}
*/
export function crossoverConnection(handler0: ConnectionHandler, addr0: Endpoint, handler1: ConnectionHandler, addr1: Endpoint, current?: WeakSet<Connection> | undefined): [Connection, Connection];
/**
* Get the list of prefixes from longest to shortest.

@@ -17,22 +8,37 @@ *

/**
* Create a protocol that has a handler.
*
* @param {ProtocolHandler} protocolHandler
* @returns {Protocol} the local capability for connecting and listening
*/
export function makeNetworkProtocol(protocolHandler: ProtocolHandler): Protocol;
/**
* Create a ConnectionHandler that just echoes its packets.
*
* @returns {ConnectionHandler}
*/
export function makeEchoConnectionHandler(): ConnectionHandler;
export function makeNonceMaker(prefix?: string, suffix?: string): () => Promise<string>;
/**
* Create a protocol handler that just connects to itself.
*
* @param {ProtocolHandler['onInstantiate']} [onInstantiate]
* @returns {ProtocolHandler} The localhost handler
* @param {import('@agoric/base-zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
export function makeLoopbackProtocolHandler(onInstantiate?: ProtocolHandler['onInstantiate']): ProtocolHandler;
export function prepareLoopbackProtocolHandler(zone: import('@agoric/base-zone').Zone, { watch, allVows }: ReturnType<(zone: import("@agoric/base-zone").Zone, powers?: {
isRetryableReason?: ((reason: any) => boolean) | undefined;
watchPromise?: ((p: PromiseLike<any>, watcher: import("@agoric/vow/src/watch-promise.js").PromiseWatcher, ...args: unknown[]) => void) | undefined;
} | undefined) => {
when: <T, TResult1 = import("@agoric/vow").Unwrap<T>, TResult2 = never>(specimenP: T, onFulfilled?: ((value: import("@agoric/vow").Unwrap<T>) => TResult1 | PromiseLike<TResult1>) | undefined, onRejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined) => Promise<TResult1 | TResult2>;
watch: <T_1 = any, TResult1_1 = T_1, TResult2_1 = T_1>(specimenP: import("@agoric/vow").ERef<T_1 | import("@agoric/vow").Vow<T_1>>, watcher?: import("@agoric/vow").Watcher<T_1, TResult1_1, TResult2_1> | undefined, watcherContext?: unknown) => import("@agoric/vow").Vow<TResult1_1 | TResult2_1>;
makeVowKit: <T_2>() => import("@agoric/vow").VowKit<T_2>;
allVows: (vows: any) => import("@agoric/vow").Vow<any>;
}>): (instancePrefix: any) => import("@endo/exo/src/exo-makers.js").Guarded<{
onCreate(_impl: any, _protocolHandler: any): Promise<void>;
generatePortID(_localAddr: any, _protocolHandler: any): Promise<string>;
onBind(_port: any, _localAddr: any, _protocolHandler: any): Promise<void>;
/**
* @param {*} _port
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @returns {PromiseVow<AttemptDescription>}}
*/
onConnect(_port: any, localAddr: Endpoint, remoteAddr: Endpoint): PromiseVow<AttemptDescription>;
onInstantiate(_port: any, _localAddr: any, _remote: any, _protocol: any): Promise<string>;
onListen(port: any, localAddr: any, listenHandler: any, _protocolHandler: any): Promise<void>;
/**
* @param {Remote<Port>} port
* @param {Endpoint} localAddr
* @param {Remote<ListenHandler>} listenHandler
* @param {*} _protocolHandler
*/
onListenRemove(port: Remote<Port>, localAddr: Endpoint, listenHandler: Remote<ListenHandler>, _protocolHandler: any): Promise<void>;
onRevoke(_port: any, _localAddr: any, _protocolHandler: any): Promise<void>;
}>;
/**

@@ -43,4 +49,54 @@ * Compatibility note: this must match what our peers use, so don't change it

export const ENDPOINT_SEPARATOR: "/";
export function rethrowUnlessMissing(err: any): undefined;
export function makeConnection(handler: ConnectionHandler, localAddr: Endpoint, remoteAddr: Endpoint, current?: Set<Closable> | undefined): Connection;
export function rethrowUnlessMissing(err: unknown): undefined;
export function crossoverConnection(zone: import('@agoric/zone').Zone, handler0: import('@agoric/vow').Remote<Required<ConnectionHandler>>, addr0: Endpoint, handler1: import('@agoric/vow').Remote<Required<ConnectionHandler>>, addr1: Endpoint, makeConnection: (opts: ConnectionOpts) => Connection, current?: WeakSetStore<Closable> | undefined): Connection[];
export function prepareNetworkProtocol(zone: import('@agoric/base-zone').Zone, powers: ReturnType<(zone: import("@agoric/base-zone").Zone, powers?: {
isRetryableReason?: ((reason: any) => boolean) | undefined;
watchPromise?: ((p: PromiseLike<any>, watcher: import("@agoric/vow/src/watch-promise.js").PromiseWatcher, ...args: unknown[]) => void) | undefined;
} | undefined) => {
when: <T, TResult1 = import("@agoric/vow").Unwrap<T>, TResult2 = never>(specimenP: T, onFulfilled?: ((value: import("@agoric/vow").Unwrap<T>) => TResult1 | PromiseLike<TResult1>) | undefined, onRejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined) => Promise<TResult1 | TResult2>;
watch: <T_1 = any, TResult1_1 = T_1, TResult2_1 = T_1>(specimenP: import("@agoric/vow").ERef<T_1 | import("@agoric/vow").Vow<T_1>>, watcher?: import("@agoric/vow").Watcher<T_1, TResult1_1, TResult2_1> | undefined, watcherContext?: unknown) => import("@agoric/vow").Vow<TResult1_1 | TResult2_1>;
makeVowKit: <T_2>() => import("@agoric/vow").VowKit<T_2>;
allVows: (vows: any) => import("@agoric/vow").Vow<any>;
}>): (protocolHandler: Remote<ProtocolHandler>) => Protocol;
export function prepareEchoConnectionKit(zone: import('@agoric/base-zone').Zone): () => import("@endo/exo/src/exo-makers.js").GuardedKit<{
handler: {
/**
* @param {Connection} _connection
* @param {Bytes} bytes
* @param {ConnectionHandler} _connectionHandler
*/
onReceive(_connection: Connection, bytes: Bytes, _connectionHandler: ConnectionHandler): Promise<string>;
/**
* @param {Connection} _connection
* @param {CloseReason} [_reason]
* @param {ConnectionHandler} [_connectionHandler]
*/
onClose(_connection: Connection, _reason?: CloseReason, _connectionHandler?: ConnectionHandler | undefined): Promise<void>;
};
listener: {
onAccept(_port: any, _localAddr: any, _remoteAddr: any, _listenHandler: any): Promise<import("@endo/exo/src/exo-makers.js").Guarded<{
/**
* @param {Connection} _connection
* @param {Bytes} bytes
* @param {ConnectionHandler} _connectionHandler
*/
onReceive(_connection: Connection, bytes: Bytes, _connectionHandler: ConnectionHandler): Promise<string>;
/**
* @param {Connection} _connection
* @param {CloseReason} [_reason]
* @param {ConnectionHandler} [_connectionHandler]
*/
onClose(_connection: Connection, _reason?: CloseReason, _connectionHandler?: ConnectionHandler | undefined): Promise<void>;
}>>;
onListen(port: any, _listenHandler: any): Promise<void>;
};
}>;
export type ConnectionOpts = {
addrs: Endpoint[];
handlers: import('@agoric/vow').Remote<Required<ConnectionHandler>>[];
conns: MapStore<number, Connection>;
current: WeakSetStore<Closable>;
l: 0 | 1;
r: 0 | 1;
};
//# sourceMappingURL=network.d.ts.map

@@ -1,7 +0,8 @@

import { makeScalarMapStore, makeLegacyMap } from '@agoric/store';
import { Far, E } from '@endo/far';
import { makePromiseKit } from '@endo/promise-kit';
// @ts-check
import { E } from '@endo/far';
import { M } from '@endo/patterns';
import { Fail } from '@agoric/assert';
import { whileTrue } from '@agoric/internal';
import { toBytes } from './bytes.js';
import { Shape } from './shapes.js';

@@ -17,2 +18,3 @@ import '@agoric/store/exported.js';

/** @param {unknown} err */
export const rethrowUnlessMissing = err => {

@@ -23,3 +25,3 @@ // Ugly hack rather than being able to determine if the function

!(err instanceof TypeError) ||
!err.message.match(/target has no method|is not a function$/)
!String(err.message).match(/target has no method|is not a function$/)
) {

@@ -32,85 +34,140 @@ throw err;

/**
* Create a handled Connection.
* Get the list of prefixes from longest to shortest.
*
* @param {ConnectionHandler} handler
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @param {Set<Closable>} [current]
* @returns {Connection}
* @param {string} addr
*/
export const makeConnection = (
handler,
localAddr,
remoteAddr,
current = new Set(),
) => {
let closed;
/** @type {Set<import('@endo/promise-kit').PromiseKit<Bytes>>} */
const pendingAcks = new Set();
/** @type {Connection} */
const connection = Far('Connection', {
getLocalAddress() {
return localAddr;
export function getPrefixes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
/** @type {string[]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
ret.push(prefix);
}
return ret;
}
/**
* @typedef {object} ConnectionOpts
* @property {Endpoint[]} addrs
* @property {import('@agoric/vow').Remote<Required<ConnectionHandler>>[]} handlers
* @property {MapStore<number, Connection>} conns
* @property {WeakSetStore<Closable>} current
* @property {0|1} l
* @property {0|1} r
*/
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
const prepareHalfConnection = (zone, { watch }) => {
const makeHalfConnectionKit = zone.exoClassKit(
'Connection',
Shape.ConnectionI,
/** @param {ConnectionOpts} opts */
({ addrs, handlers, conns, current, l, r }) => {
return {
addrs,
handlers,
conns,
current,
l,
r,
/** @type {string | undefined} */
closed: undefined,
};
},
getRemoteAddress() {
return remoteAddr;
{
connection: {
getLocalAddress() {
const { addrs, l } = this.state;
return addrs[l];
},
getRemoteAddress() {
const { addrs, r } = this.state;
return addrs[r];
},
/** @param {Bytes} packetBytes */
async send(packetBytes) {
const { closed, handlers, r, conns } = this.state;
if (closed) {
throw Error(closed);
}
const innerVow = watch(
E(handlers[r]).onReceive(
conns.get(r),
toBytes(packetBytes),
handlers[r],
),
this.facets.openConnectionAckWatcher,
);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
async close() {
const { closed, current, conns, l, handlers } = this.state;
if (closed) {
throw Error(closed);
}
this.state.closed = 'Connection closed';
current.delete(conns.get(l));
const innerVow = watch(
E(this.state.handlers[l]).onClose(
conns.get(l),
undefined,
handlers[l],
),
this.facets.sinkWatcher,
);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
},
openConnectionAckWatcher: {
onFulfilled(ack) {
return toBytes(ack || '');
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
sinkWatcher: {
onFulfilled(_value) {
return undefined;
},
},
},
async close() {
if (closed) {
throw closed;
}
current.delete(connection);
closed = Error('Connection closed');
for (const ackDeferred of [...pendingAcks.values()]) {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(closed);
}
await E(handler)
.onClose(connection, undefined, handler)
.catch(rethrowUnlessMissing);
},
async send(data, opts) {
// console.log('send', data, local === srcHandler);
if (closed) {
throw closed;
}
const bytes = toBytes(data);
const ackDeferred = makePromiseKit();
pendingAcks.add(ackDeferred);
E(handler)
.onReceive(connection, bytes, handler, opts)
.catch(err => {
rethrowUnlessMissing(err);
return '';
})
.then(
ack => {
pendingAcks.delete(ackDeferred);
ackDeferred.resolve(toBytes(ack));
},
err => {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(err);
},
);
return ackDeferred.promise;
},
});
);
current.add(connection);
E(handler)
.onOpen(connection, localAddr, remoteAddr, handler)
.catch(rethrowUnlessMissing);
return connection;
const makeHalfConnection = ({ addrs, handlers, conns, current, l, r }) => {
const { connection } = makeHalfConnectionKit({
addrs,
handlers,
conns,
current,
l,
r,
});
return harden(connection);
};
return makeHalfConnection;
};
/**
* @param {ConnectionHandler} handler0
* @param {import('@agoric/zone').Zone} zone
* @param {import('@agoric/vow').Remote<Required<ConnectionHandler>>} handler0
* @param {Endpoint} addr0
* @param {ConnectionHandler} handler1
* @param {import('@agoric/vow').Remote<Required<ConnectionHandler>>} handler1
* @param {Endpoint} addr1
* @param {WeakSet<Connection>} [current]
* @returns {[Connection, Connection]}
* @param {(opts: ConnectionOpts) => Connection} makeConnection
* @param {WeakSetStore<Closable>} [current]
*/
export function crossoverConnection(
export const crossoverConnection = (
zone,
handler0,

@@ -120,143 +177,263 @@ addr0,

addr1,
current = new WeakSet(),
) {
/** @type {Connection[]} */
const conns = [];
/** @type {ConnectionHandler[]} */
const handlers = [handler0, handler1];
/** @type {Endpoint[]} */
const addrs = [addr0, addr1];
makeConnection,
current = zone.detached().weakSetStore('crossoverCurrentConnections'),
) => {
const detached = zone.detached();
function makeHalfConnection(l, r) {
let closed;
conns[l] = Far('Connection', {
getLocalAddress() {
return addrs[l];
},
getRemoteAddress() {
return addrs[r];
},
async send(packetBytes) {
if (closed) {
throw closed;
}
const ack = await E(handlers[r])
.onReceive(conns[r], toBytes(packetBytes), handlers[r])
.catch(rethrowUnlessMissing);
return toBytes(ack || '');
},
async close() {
if (closed) {
throw closed;
}
closed = Error('Connection closed');
current.delete(conns[l]);
await E(handlers[l])
.onClose(conns[l], undefined, handlers[l])
.catch(rethrowUnlessMissing);
},
});
}
/** @type {MapStore<number, Connection>} */
const conns = detached.mapStore('addrToConnections');
makeHalfConnection(0, 1);
makeHalfConnection(1, 0);
/** @type {import('@agoric/vow').Remote<Required<ConnectionHandler>>[]} */
const handlers = harden([handler0, handler1]);
/** @type {Endpoint[]} */
const addrs = harden([addr0, addr1]);
/**
* @param {0|1} l
* @param {0|1} r
*/
const makeHalfConnection = (l, r) => {
conns.init(l, makeConnection({ addrs, handlers, conns, current, l, r }));
};
/**
* @param {number} l local side of the connection
* @param {number} r remote side of the connection
*/
function openHalfConnection(l, r) {
current.add(conns[l]);
const openHalfConnection = (l, r) => {
current.add(conns.get(l));
E(handlers[l])
.onOpen(conns[l], addrs[l], addrs[r], handlers[l])
.onOpen(conns.get(l), addrs[l], addrs[r], handlers[l])
.catch(rethrowUnlessMissing);
}
};
makeHalfConnection(0, 1);
makeHalfConnection(1, 0);
openHalfConnection(0, 1);
openHalfConnection(1, 0);
const [conn0, conn1] = conns;
return [conn0, conn1];
}
return [conns.get(0), conns.get(1)];
};
/**
* Get the list of prefixes from longest to shortest.
*
* @param {string} addr
* @param {import('@agoric/zone').Zone} zone
* @param {(opts: ConnectionOpts) => Connection} makeConnection
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
export function getPrefixes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
const prepareInboundAttempt = (zone, makeConnection, { watch }) => {
const makeInboundAttemptKit = zone.exoClassKit(
'InboundAttempt',
Shape.InboundAttemptI,
/**
* @param {object} opts
* @param {string} opts.localAddr
* @param {string} opts.remoteAddr
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections
* @param {string} opts.listenPrefix
* @param {MapStore<Endpoint, [Port, import('@agoric/vow').Remote<Required<ListenHandler>>]>} opts.listening
*/
({
localAddr,
remoteAddr,
currentConnections,
listenPrefix,
listening,
}) => {
/** @type {string | undefined} */
let consummated;
/** @type {string[]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
ret.push(prefix);
}
return ret;
}
return {
localAddr,
remoteAddr,
consummated,
currentConnections,
listenPrefix,
listening,
};
},
{
inboundAttempt: {
getLocalAddress() {
// Return address metadata.
return this.state.localAddr;
},
getRemoteAddress() {
return this.state.remoteAddr;
},
async close() {
const { consummated, localAddr, remoteAddr } = this.state;
const { listening, listenPrefix, currentConnections } = this.state;
if (consummated) {
throw Error(consummated);
}
this.state.consummated = 'Already closed';
const [port, listener] = listening.get(listenPrefix);
const current = currentConnections.get(port);
current.delete(this.facets.inboundAttempt);
const innerVow = watch(
E(listener).onReject(port, localAddr, remoteAddr, listener),
this.facets.sinkWatcher,
);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
/**
* @param {object} opts
* @param {string} [opts.localAddress]
* @param {string} [opts.remoteAddress]
* @param {import('@agoric/vow').Remote<ConnectionHandler>} opts.handler
*/
async accept({ localAddress, remoteAddress, handler: rchandler }) {
const { consummated, localAddr, remoteAddr } = this.state;
const { listening, listenPrefix, currentConnections } = this.state;
if (consummated) {
throw Error(consummated);
}
if (localAddress === undefined) {
localAddress = localAddr;
}
this.state.consummated = `${localAddress} Already accepted`;
if (remoteAddress === undefined) {
remoteAddress = remoteAddr;
}
const [port, listener] = listening.get(listenPrefix);
const current = currentConnections.get(port);
current.delete(this.facets.inboundAttempt);
return watch(
E(listener).onAccept(port, localAddress, remoteAddress, listener),
this.facets.inboundAttemptAcceptWatcher,
{
localAddress,
rchandler,
remoteAddress,
current,
},
);
},
},
inboundAttemptAcceptWatcher: {
onFulfilled(lchandler, watchContext) {
const { localAddress, rchandler, remoteAddress, current } =
watchContext;
return crossoverConnection(
zone,
/** @type {import('@agoric/vow').Remote<Required<ConnectionHandler>>} */ (
lchandler
),
localAddress,
/** @type {import('@agoric/vow').Remote<Required<ConnectionHandler>>} */ (
rchandler
),
remoteAddress,
makeConnection,
current,
)[1];
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
sinkWatcher: {
onFulfilled(_value) {
return undefined;
},
},
},
);
const makeInboundAttempt = ({
localAddr,
remoteAddr,
currentConnections,
listenPrefix,
listening,
}) => {
const { inboundAttempt } = makeInboundAttemptKit({
localAddr,
remoteAddr,
currentConnections,
listenPrefix,
listening,
});
return harden(inboundAttempt);
};
return makeInboundAttempt;
};
/** @enum {number} */
const RevokeState = /** @type {const} */ ({
NOT_REVOKED: 0,
REVOKING: 1,
REVOKED: 2,
});
/**
* Create a protocol that has a handler.
*
* @param {ProtocolHandler} protocolHandler
* @returns {Protocol} the local capability for connecting and listening
* @param {import('@agoric/zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
export function makeNetworkProtocol(protocolHandler) {
/** @type {LegacyMap<Port, Set<Closable>>} */
// Legacy because we're storing a JS Set
const currentConnections = makeLegacyMap('port');
const preparePort = (zone, powers) => {
const makeIncapable = zone.exoClass('Incapable', undefined, () => ({}), {});
const { watch, allVows } = powers;
/**
* Currently must be a single listenHandler. TODO: Do something sensible with
* multiple handlers?
*
* @type {MapStore<Endpoint, [Port, ListenHandler]>}
* @param {object} opts
* @param {Endpoint} opts.localAddr
* @param {MapStore<Endpoint, [Port, import('@agoric/vow').Remote<Required<ListenHandler>>]>} opts.listening
* @param {SetStore<import('@agoric/vow').Remote<Connection>>} opts.openConnections
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections
* @param {MapStore<string, Port>} opts.boundPorts
* @param {import('@agoric/vow').Remote<ProtocolHandler>} opts.protocolHandler
* @param {Remote<ProtocolImpl>} opts.protocolImpl
*/
const listening = makeScalarMapStore('localAddr');
/** @type {MapStore<string, Port>} */
const boundPorts = makeScalarMapStore('localAddr');
/** @param {Endpoint} localAddr */
const bind = async localAddr => {
// Check if we are underspecified (ends in slash)
const underspecified = localAddr.endsWith(ENDPOINT_SEPARATOR);
for await (const _ of whileTrue(() => underspecified)) {
const portID = await E(protocolHandler).generatePortID(
localAddr,
protocolHandler,
);
const newAddr = `${localAddr}${portID}`;
if (!boundPorts.has(newAddr)) {
localAddr = newAddr;
break;
}
}
if (boundPorts.has(localAddr)) {
return boundPorts.get(localAddr);
}
/** @enum {number} */
const RevokeState = {
NOT_REVOKED: 0,
REVOKING: 1,
REVOKED: 2,
const initPort = ({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl,
}) => {
return {
listening,
openConnections,
currentConnections,
boundPorts,
localAddr,
protocolHandler,
protocolImpl,
/** @type {RevokeState | undefined} */
revoked: undefined,
};
};
/** @type {RevokeState} */
let revoked = RevokeState.NOT_REVOKED;
const openConnections = new Set();
/** @type {Port} */
const port = Far('Port', {
const makePortKit = zone.exoClassKit('Port', Shape.PortI, initPort, {
port: {
getLocalAddress() {
// Works even after revoke().
return localAddr;
return this.state.localAddr;
},
/** @param {import('@agoric/vow').Remote<ListenHandler>} listenHandler */
async addListener(listenHandler) {
!revoked || Fail`Port ${localAddr} is revoked`;
const { revoked, listening, localAddr, protocolHandler } = this.state;
!revoked || Fail`Port ${this.state.localAddr} is revoked`;
listenHandler || Fail`listenHandler is not defined`;
if (listening.has(localAddr)) {

@@ -268,21 +445,34 @@ // Last one wins.

}
listening.set(localAddr, [port, listenHandler]);
listening.set(localAddr, [
this.facets.port,
/** @type {Remote<Required<ListenHandler>>} */ (listenHandler),
]);
E(lhandler).onRemove(lport, lhandler).catch(rethrowUnlessMissing);
} else {
listening.init(localAddr, [port, listenHandler]);
listening.init(
localAddr,
harden([
this.facets.port,
/** @type {Remote<Required<ListenHandler>>} */ (listenHandler),
]),
);
}
// TODO: Check that the listener defines onAccept.
// ASSUME: that the listener defines onAccept.
await E(protocolHandler).onListen(
port,
localAddr,
listenHandler,
protocolHandler,
const innerVow = watch(
E(protocolHandler).onListen(
this.facets.port,
localAddr,
listenHandler,
protocolHandler,
),
this.facets.portAddListenerWatcher,
{ listenHandler },
);
await E(listenHandler)
.onListen(port, listenHandler)
.catch(rethrowUnlessMissing);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
/** @param {Remote<ListenHandler>} listenHandler */
async removeListener(listenHandler) {
const { listening, localAddr, protocolHandler } = this.state;
listening.has(localAddr) || Fail`Port ${localAddr} is not listening`;

@@ -292,18 +482,74 @@ listening.get(localAddr)[1] === listenHandler ||

listening.delete(localAddr);
await E(protocolHandler).onListenRemove(
port,
localAddr,
listenHandler,
protocolHandler,
const innerVow = watch(
E(protocolHandler).onListenRemove(
this.facets.port,
localAddr,
listenHandler,
protocolHandler,
),
this.facets.portRemoveListenerWatcher,
{ listenHandler },
);
await E(listenHandler)
.onRemove(port, listenHandler)
.catch(rethrowUnlessMissing);
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
async connect(remotePort, connectionHandler = {}) {
/**
* @param {Endpoint} remotePort
* @param {Remote<ConnectionHandler>} [connectionHandler]
*/
async connect(
remotePort,
connectionHandler = /** @type {Remote<ConnectionHandler>} */ (
makeIncapable()
),
) {
const { revoked, localAddr, protocolImpl } = this.state;
!revoked || Fail`Port ${localAddr} is revoked`;
/** @type {Endpoint} */
const dst = harden(remotePort);
// eslint-disable-next-line no-use-before-define
const conn = await protocolImpl.outbound(port, dst, connectionHandler);
return watch(
E(protocolImpl).outbound(this.facets.port, dst, connectionHandler),
this.facets.portConnectWatcher,
{ revoked },
);
},
async revoke() {
const { revoked, localAddr } = this.state;
const { protocolHandler } = this.state;
revoked !== RevokeState.REVOKED ||
Fail`Port ${localAddr} is already revoked`;
this.state.revoked = RevokeState.REVOKING;
const revokeVow = watch(
E(protocolHandler).onRevoke(
this.facets.port,
localAddr,
protocolHandler,
),
this.facets.portRevokeWatcher,
);
return watch(revokeVow, this.facets.portRevokeCleanupWatcher);
},
},
portAddListenerWatcher: {
onFulfilled(_value, watcherContext) {
const { listenHandler } = watcherContext;
return E(listenHandler).onListen(this.facets.port, listenHandler);
},
},
portRemoveListenerWatcher: {
onFulfilled(_value, watcherContext) {
const { listenHandler } = watcherContext;
return E(listenHandler).onRemove(this.facets.port, listenHandler);
},
},
portConnectWatcher: {
onFulfilled(conn, watchContext) {
const { revoked } = watchContext;
const { openConnections } = this.state;
if (revoked) {

@@ -316,15 +562,20 @@ void E(conn).close();

},
async revoke() {
revoked !== RevokeState.REVOKED ||
Fail`Port ${localAddr} is already revoked`;
revoked = RevokeState.REVOKING;
await E(protocolHandler).onRevoke(port, localAddr, protocolHandler);
revoked = RevokeState.REVOKED;
},
portRevokeWatcher: {
onFulfilled(_value) {
const { currentConnections, listening, localAddr } = this.state;
const port = this.facets.port;
// Clean up everything we did.
const ps = [...currentConnections.get(port)].map(conn =>
E(conn)
.close()
.catch(_ => {}),
const values = [...currentConnections.get(port).values()];
/** @type {import('@agoric/vow').Specimen[]} */
const ps = [];
ps.push(
...values.map(conn =>
watch(E(conn).close(), this.facets.sinkWatcher),
),
);
if (listening.has(localAddr)) {

@@ -334,255 +585,839 @@ const listener = listening.get(localAddr)[1];

}
await Promise.all(ps);
currentConnections.delete(port);
return watch(allVows(ps), this.facets.rethrowUnlessMissingWatcher);
},
},
sinkWatcher: {
onFulfilled() {
return undefined;
},
onRejected() {
return undefined;
},
},
portRevokeCleanupWatcher: {
onFulfilled(_value) {
const { currentConnections, boundPorts, localAddr } = this.state;
this.state.revoked = RevokeState.REVOKED;
currentConnections.delete(this.facets.port);
boundPorts.delete(localAddr);
return `Port ${localAddr} revoked`;
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
});
const makePort = ({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl,
}) => {
const { port } = makePortKit({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl,
});
await E(protocolHandler).onBind(port, localAddr, protocolHandler);
boundPorts.init(localAddr, port);
currentConnections.init(port, new Set());
return port;
return harden(port);
};
/** @type {ProtocolImpl} */
const protocolImpl = Far('ProtocolImpl', {
bind,
async inbound(listenAddr, remoteAddr) {
let lastFailure = Error(`No listeners for ${listenAddr}`);
for await (const listenPrefix of getPrefixes(listenAddr)) {
if (!listening.has(listenPrefix)) {
continue;
}
const [port, listener] = listening.get(listenPrefix);
let localAddr;
await (async () => {
// See if our protocol is willing to receive this connection.
const localInstance = await E(protocolHandler)
.onInstantiate(port, listenPrefix, remoteAddr, protocolHandler)
.catch(rethrowUnlessMissing);
localAddr = localInstance
return makePort;
};
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
const prepareBinder = (zone, powers) => {
const makeConnection = prepareHalfConnection(zone, powers);
const { watch } = powers;
const makeInboundAttempt = prepareInboundAttempt(
zone,
makeConnection,
powers,
);
const makePort = preparePort(zone, powers);
const detached = zone.detached();
const makeFullBinderKit = zone.exoClassKit(
'binder',
{
protocolImpl: Shape.ProtocolImplI,
binder: M.interface('Binder', {
bind: M.callWhen(Shape.Endpoint).returns(Shape.Vow$(Shape.Port)),
}),
binderInboundInstantiateWatcher: M.interface(
'BinderInboundInstantiateWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderInboundInstantiateCatchWatcher: M.interface(
'BinderInboundInstantiateCatchWatcher',
{
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundInstantiateWatcher: M.interface(
'BinderOutboundInstantiateWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundConnectWatcher: M.interface(
'BinderOutboundConnectWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundCatchWatcher: M.interface('BinderOutboundCatchWatcher', {
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
binderOutboundInboundWatcher: M.interface(
'BinderOutboundInboundWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderOutboundAcceptWatcher: M.interface('BinderOutboundAcceptWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
binderBindGeneratePortWatcher: M.interface(
'BinderBindGeneratePortWatcher',
{
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
},
),
binderPortWatcher: M.interface('BinderPortWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
binderBindWatcher: M.interface('BinderBindWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
rethrowUnlessMissingWatcher: M.interface('RethrowUnlessMissingWatcher', {
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
},
/**
* @param {object} opts
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections
* @param {MapStore<string, Port>} opts.boundPorts
* @param {MapStore<Endpoint, [Port, Remote<Required<ListenHandler>>]>} opts.listening
* @param {Remote<ProtocolHandler>} opts.protocolHandler
*/
({ currentConnections, boundPorts, listening, protocolHandler }) => {
/** @type {SetStore<Connection>} */
const openConnections = detached.setStore('openConnections');
return {
currentConnections,
boundPorts,
listening,
revoked: RevokeState.NOT_REVOKED,
openConnections,
protocolHandler,
};
},
{
protocolImpl: {
/**
* @param {Endpoint} listenAddr
* @param {Endpoint} remoteAddr
*/
async inbound(listenAddr, remoteAddr) {
const { listening, protocolHandler } = this.state;
const prefixes = getPrefixes(listenAddr);
let listenPrefixIndex = 0;
let listenPrefix;
while (listenPrefixIndex < prefixes.length) {
listenPrefix = prefixes[listenPrefixIndex];
if (!listening.has(listenPrefix)) {
listenPrefixIndex += 1;
continue;
}
break;
}
if (listenPrefixIndex >= prefixes.length) {
throw Error(`No listeners for ${listenAddr}`);
}
const [port] = listening.get(/** @type {string} **/ (listenPrefix));
const innerVow = watch(
E(
/** @type {Remote<Required<ProtocolHandler>>} */ (
protocolHandler
),
).onInstantiate(
/** @type {Port} **/ (port),
prefixes[listenPrefixIndex],
remoteAddr,
protocolHandler,
),
this.facets.binderInboundInstantiateWatcher,
{
listenAddr,
remoteAddr,
port,
listenPrefixIndex,
},
);
return watch(
innerVow,
this.facets.binderInboundInstantiateCatchWatcher,
{
listenPrefixIndex,
listenAddr,
remoteAddr,
lastFailure: Error(`No listeners for ${listenAddr}`),
},
);
},
/**
* @param {Port} port
* @param {Endpoint} remoteAddr
* @param {ConnectionHandler} lchandler
*/
async outbound(port, remoteAddr, lchandler) {
const { protocolHandler } = this.state;
const localAddr = await E(port).getLocalAddress();
// Allocate a local address.
const instantiateInnerVow = watch(
E(
/** @type {Remote<Required<ProtocolHandler>>} */ (
protocolHandler
),
).onInstantiate(port, localAddr, remoteAddr, protocolHandler),
this.facets.binderOutboundInstantiateWatcher,
{
port,
localAddr,
remoteAddr,
protocolHandler,
},
);
const instantiateVow = watch(
instantiateInnerVow,
this.facets.rethrowUnlessMissingWatcher,
);
const attemptVow = watch(
instantiateVow,
this.facets.binderOutboundInboundWatcher,
{
localAddr,
remoteAddr,
},
);
const acceptedVow = watch(
attemptVow,
this.facets.binderOutboundAcceptWatcher,
{
handler: lchandler,
},
);
return watch(acceptedVow, this.facets.binderOutboundCatchWatcher, {
port,
remoteAddr,
lchandler,
localAddr,
});
},
async bind(localAddr) {
return this.facets.binder.bind(localAddr);
},
},
binder: {
/** @param {string} localAddr */
async bind(localAddr) {
const { protocolHandler } = this.state;
// Check if we are underspecified (ends in slash)
const underspecified = localAddr.endsWith(ENDPOINT_SEPARATOR);
const localAddrVow = watch(
E(protocolHandler).generatePortID(localAddr, protocolHandler),
this.facets.binderBindGeneratePortWatcher,
{
underspecified,
localAddr,
},
);
return watch(localAddrVow, this.facets.binderBindWatcher);
},
},
binderInboundInstantiateWatcher: {
onFulfilled(localInstance, watchContext) {
const { listenAddr, remoteAddr, port, listenPrefixIndex } =
watchContext;
const { listening, currentConnections } = this.state;
const prefixes = getPrefixes(listenAddr);
const localAddr = localInstance
? `${listenAddr}/${localInstance}`
: listenAddr;
})().catch(e => {
lastFailure = e;
});
if (!localAddr) {
continue;
}
// We have a legitimate inbound attempt.
let consummated;
const current = currentConnections.get(port);
const inboundAttempt = Far('InboundAttempt', {
getLocalAddress() {
// Return address metadata.
return localAddr;
},
getRemoteAddress() {
return remoteAddr;
},
async close() {
if (consummated) {
throw consummated;
const current = currentConnections.get(port);
const inboundAttempt = makeInboundAttempt({
localAddr,
remoteAddr,
currentConnections,
listenPrefix: prefixes[listenPrefixIndex],
listening,
});
current.add(inboundAttempt);
return inboundAttempt;
},
},
binderInboundInstantiateCatchWatcher: {
onRejected(e, watchContext) {
let { lastFailure, listenPrefixIndex } = watchContext;
try {
rethrowUnlessMissing(e);
} catch (innerE) {
lastFailure = innerE;
}
const { listenAddr, remoteAddr } = watchContext;
const { listening, protocolHandler } = this.state;
const prefixes = getPrefixes(listenAddr);
let listenPrefix;
listenPrefixIndex += 1;
while (listenPrefixIndex < prefixes.length) {
listenPrefix = prefixes[listenPrefixIndex];
if (!listening.has(listenPrefix)) {
listenPrefixIndex += 1;
continue;
}
consummated = Error(`Already closed`);
current.delete(inboundAttempt);
await E(listener)
.onReject(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing);
},
async accept({
localAddress = localAddr,
remoteAddress = remoteAddr,
handler: rchandler,
}) {
if (consummated) {
throw consummated;
}
consummated = Error(`Already accepted`);
current.delete(inboundAttempt);
const lchandler = await E(listener).onAccept(
break;
}
if (listenPrefixIndex >= prefixes.length) {
throw lastFailure;
}
const [port] = listening.get(/** @type {string} */ (listenPrefix));
const innerVow = watch(
E(
/** @type {Remote<Required<ProtocolHandler>>} */ (
protocolHandler
),
).onInstantiate(
port,
prefixes[listenPrefixIndex],
remoteAddr,
protocolHandler,
),
this.facets.binderInboundInstantiateWatcher,
{
listenAddr,
remoteAddr,
port,
listenPrefixIndex,
},
);
return watch(
innerVow,
this.facets.binderInboundInstantiateCatchWatcher,
{
...watchContext,
lastFailure,
listenPrefixIndex,
},
);
},
},
binderOutboundInstantiateWatcher: {
onFulfilled(localInstance, watchContext) {
const { localAddr } = watchContext;
return localInstance ? `${localAddr}/${localInstance}` : localAddr;
},
},
binderOutboundConnectWatcher: {
onFulfilled({ handler: rchandler }, watchContext) {
const { lastFailure, remoteAddr, localAddr, lchandler, port } =
watchContext;
const { currentConnections } = this.state;
if (!rchandler) {
throw lastFailure;
}
const current = currentConnections.get(port);
return crossoverConnection(
zone,
/** @type {import('@agoric/vow').Remote<Required<ConnectionHandler>>} */ (
lchandler
),
localAddr,
/** @type {import('@agoric/vow').Remote<Required<ConnectionHandler>>} */ (
rchandler
),
remoteAddr,
makeConnection,
current,
)[0];
},
},
binderOutboundCatchWatcher: {
onRejected(e, watchContext) {
let lastFailure;
try {
rethrowUnlessMissing(e);
} catch (innerE) {
lastFailure = innerE;
}
const { port, remoteAddr, lchandler, localAddr } = watchContext;
const { protocolHandler } = this.state;
const connectVow = watch(
E(protocolHandler).onConnect(
port,
localAddr,
remoteAddr,
listener,
);
return crossoverConnection(
lchandler,
localAddress,
rchandler,
remoteAddress,
current,
)[1];
},
});
current.add(inboundAttempt);
return inboundAttempt;
}
throw lastFailure;
},
async outbound(port, remoteAddr, lchandler) {
const localAddr =
/** @type {string} */
(await E(port).getLocalAddress());
protocolHandler,
),
);
// Allocate a local address.
const initialLocalInstance = await E(protocolHandler)
.onInstantiate(port, localAddr, remoteAddr, protocolHandler)
.catch(rethrowUnlessMissing);
const initialLocalAddr = initialLocalInstance
? `${localAddr}/${initialLocalInstance}`
: localAddr;
return watch(connectVow, this.facets.binderOutboundConnectWatcher, {
lastFailure,
remoteAddr,
localAddr,
lchandler,
port,
});
},
},
binderOutboundInboundWatcher: {
onFulfilled(initialLocalAddress, watchContext) {
const { remoteAddr, localAddr } = watchContext;
let lastFailure;
let accepted;
await (async () => {
// Attempt the loopback connection.
const attempt = await protocolImpl.inbound(
remoteAddr,
initialLocalAddr,
);
accepted = await attempt.accept({ handler: lchandler });
})().catch(e => {
lastFailure = e;
});
if (accepted) {
return accepted;
}
if (initialLocalAddress === undefined) {
initialLocalAddress = localAddr;
}
const {
remoteAddress = remoteAddr,
handler: rchandler,
localAddress = localAddr,
} =
/** @type {Partial<AttemptDescription>} */
(
await E(protocolHandler).onConnect(
// Attempt the loopback connection.
return this.facets.protocolImpl.inbound(
remoteAddr,
initialLocalAddress,
);
},
},
binderOutboundAcceptWatcher: {
onFulfilled(attempt, watchContext) {
const { handler } = watchContext;
return E(attempt).accept({ handler });
},
},
binderBindGeneratePortWatcher: {
onFulfilled(portID, watchContext) {
const { localAddr, underspecified } = watchContext;
const { protocolHandler, boundPorts } = this.state;
if (!underspecified) {
return localAddr;
}
const newAddr = `${localAddr}${portID}`;
if (!boundPorts.has(newAddr)) {
return newAddr;
}
return watch(
E(protocolHandler).generatePortID(localAddr, protocolHandler),
this.facets.binderBindGeneratePortWatcher,
watchContext,
);
},
},
binderPortWatcher: {
onFulfilled(_value, watchContext) {
const { port, localAddr } = watchContext;
const { boundPorts, currentConnections } = this.state;
boundPorts.init(localAddr, port);
currentConnections.init(
port,
initialLocalAddr,
remoteAddr,
lchandler,
zone.detached().setStore('connections'),
);
return port;
},
},
binderBindWatcher: {
onFulfilled(localAddr) {
const {
boundPorts,
listening,
openConnections,
currentConnections,
protocolHandler,
)
);
} = this.state;
if (!rchandler) {
throw lastFailure;
}
if (boundPorts.has(localAddr)) {
return boundPorts.get(localAddr);
}
const current = currentConnections.get(port);
return crossoverConnection(
lchandler,
localAddress,
rchandler,
remoteAddress,
current,
)[0];
const port = makePort({
localAddr,
listening,
openConnections,
currentConnections,
boundPorts,
protocolHandler,
protocolImpl: this.facets.protocolImpl,
});
return watch(
E(protocolHandler).onBind(port, localAddr, protocolHandler),
this.facets.binderPortWatcher,
{
port,
localAddr,
},
);
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
},
});
);
// Wire up the local protocol to the handler.
void E(protocolHandler).onCreate(protocolImpl, protocolHandler);
const makeBinderKit = ({
currentConnections,
boundPorts,
listening,
protocolHandler,
}) => {
const { protocolImpl, binder } = makeFullBinderKit({
currentConnections,
boundPorts,
listening,
protocolHandler,
});
return harden({ protocolImpl, binder });
};
return makeBinderKit;
};
// Return the user-facing protocol.
return Far('binder', { bind });
}
/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
export const prepareNetworkProtocol = (zone, powers) => {
const makeBinderKit = prepareBinder(zone, powers);
/**
* @param {Remote<ProtocolHandler>} protocolHandler
* @returns {Protocol}
*/
const makeNetworkProtocol = protocolHandler => {
const detached = zone.detached();
/** @type {MapStore<Port, SetStore<Closable>>} */
const currentConnections = detached.mapStore('portToCurrentConnections');
/** @type {MapStore<string, Port>} */
const boundPorts = detached.mapStore('addrToPort');
/** @type {MapStore<Endpoint, [Port, Remote<Required<ListenHandler>>]>} */
const listening = detached.mapStore('listening');
const { binder, protocolImpl } = makeBinderKit({
currentConnections,
boundPorts,
listening,
protocolHandler,
});
// Wire up the local protocol to the handler.
void E(protocolHandler).onCreate(protocolImpl, protocolHandler);
return binder;
};
return makeNetworkProtocol;
};
/**
* Create a ConnectionHandler that just echoes its packets.
*
* @returns {ConnectionHandler}
* @param {import('@agoric/base-zone').Zone} zone
*/
export function makeEchoConnectionHandler() {
let closed;
/** @type {Connection} */
return Far('ConnectionHandler', {
async onReceive(_connection, bytes, _connectionHandler) {
if (closed) {
throw closed;
}
return bytes;
export const prepareEchoConnectionKit = zone => {
const makeEchoConnectionKit = zone.exoClassKit(
'EchoConnectionKit',
{
handler: M.interface('ConnectionHandler', {
onReceive: M.callWhen(
Shape.Connection,
Shape.Bytes,
Shape.ConnectionHandler,
)
.optional(Shape.Opts)
.returns(Shape.Data),
onClose: M.callWhen(Shape.Connection)
.optional(M.any(), Shape.ConnectionHandler)
.returns(M.undefined()),
}),
listener: M.interface('Listener', {
onListen: M.callWhen(Shape.Port, Shape.ListenHandler).returns(
Shape.Vow$(M.undefined()),
),
onAccept: M.callWhen(
Shape.Port,
Shape.Endpoint,
Shape.Endpoint,
Shape.ListenHandler,
).returns(Shape.Vow$(Shape.ConnectionHandler)),
}),
},
async onClose(_connection, _reason, _connectionHandler) {
if (closed) {
throw closed;
}
closed = Error('Connection closed');
() => {
return {
/** @type {string | undefined} */
closed: undefined,
};
},
});
}
{
handler: {
/**
* @param {Connection} _connection
* @param {Bytes} bytes
* @param {ConnectionHandler} _connectionHandler
*/
async onReceive(_connection, bytes, _connectionHandler) {
const { closed } = this.state;
export function makeNonceMaker(prefix = '', suffix = '') {
let nonce = 0;
return async () => {
nonce += 1;
return `${prefix}${nonce}${suffix}`;
};
}
if (closed) {
throw Error(closed);
}
return bytes;
},
/**
* @param {Connection} _connection
* @param {CloseReason} [_reason]
* @param {ConnectionHandler} [_connectionHandler]
*/
async onClose(_connection, _reason, _connectionHandler) {
const { closed } = this.state;
if (closed) {
throw Error(closed);
}
this.state.closed = 'Connection closed';
},
},
listener: {
async onAccept(_port, _localAddr, _remoteAddr, _listenHandler) {
return this.facets.handler;
},
async onListen(port, _listenHandler) {
console.debug(`listening on echo port: ${port}`);
},
},
},
);
return makeEchoConnectionKit;
};
/**
* Create a protocol handler that just connects to itself.
*
* @param {ProtocolHandler['onInstantiate']} [onInstantiate]
* @returns {ProtocolHandler} The localhost handler
* @param {import('@agoric/base-zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
*/
export function makeLoopbackProtocolHandler(
onInstantiate = makeNonceMaker('nonce/'),
) {
/** @type {MapStore<string, [Port, ListenHandler]>} */
const listeners = makeScalarMapStore('localAddr');
export function prepareLoopbackProtocolHandler(zone, { watch, allVows }) {
const detached = zone.detached();
const makePortID = makeNonceMaker('port');
/** @param {string} [instancePrefix] */
const initHandler = (instancePrefix = 'nonce/') => {
/** @type {MapStore<string, [Remote<Port>, Remote<Required<ListenHandler>>]>} */
const listeners = detached.mapStore('localAddr');
return Far('ProtocolHandler', {
// eslint-disable-next-line no-empty-function
async onCreate(_impl, _protocolHandler) {
// TODO
return {
listeners,
portNonce: 0n,
instancePrefix,
instanceNonce: 0n,
};
};
const makeLoopbackProtocolHandlerKit = zone.exoClassKit(
'ProtocolHandler',
Shape.ProtocolHandlerI,
/** @param {string} [instancePrefix] */
initHandler,
{
protocolHandler: {
async onCreate(_impl, _protocolHandler) {
// noop
},
async generatePortID(_localAddr, _protocolHandler) {
this.state.portNonce += 1n;
return `port${this.state.portNonce}`;
},
async onBind(_port, _localAddr, _protocolHandler) {
// noop, for now; Maybe handle a bind?
},
/**
* @param {*} _port
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @returns {PromiseVow<AttemptDescription>}}
*/
async onConnect(_port, localAddr, remoteAddr) {
const { listeners } = this.state;
const [lport, lhandler] = listeners.get(remoteAddr);
const acceptVow = watch(
E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler),
this.facets.protocolHandlerAcceptWatcher,
);
const instantiateInnerVow = watch(
E(this.facets.protocolHandler).onInstantiate(
lport,
remoteAddr,
localAddr,
this.facets.protocolHandler,
),
this.facets.protocolHandlerInstantiateWatcher,
);
const instantiateVow = watch(
instantiateInnerVow,
this.facets.rethrowUnlessMissingWatcher,
);
return watch(
allVows([acceptVow, instantiateVow]),
this.facets.protocolHandlerConnectWatcher,
);
},
async onInstantiate(_port, _localAddr, _remote, _protocol) {
const { instancePrefix } = this.state;
this.state.instanceNonce += 1n;
return `${instancePrefix}${this.state.instanceNonce}`;
},
async onListen(port, localAddr, listenHandler, _protocolHandler) {
const { listeners } = this.state;
// This implementation has a simple last-one-wins replacement policy.
// Other handlers might use different policies.
if (listeners.has(localAddr)) {
const lhandler = listeners.get(localAddr)[1];
if (lhandler !== listenHandler) {
listeners.set(
localAddr,
harden([
port,
/** @type {Remote<Required<ListenHandler>>} */ (
listenHandler
),
]),
);
}
} else {
listeners.init(
localAddr,
harden([
port,
/** @type {Remote<Required<ListenHandler>>} */ (listenHandler),
]),
);
}
},
/**
* @param {Remote<Port>} port
* @param {Endpoint} localAddr
* @param {Remote<ListenHandler>} listenHandler
* @param {*} _protocolHandler
*/
async onListenRemove(port, localAddr, listenHandler, _protocolHandler) {
const { listeners } = this.state;
const [lport, lhandler] = listeners.get(localAddr);
lport === port || Fail`Port does not match listener on ${localAddr}`;
lhandler === listenHandler ||
Fail`Listen handler does not match listener on ${localAddr}`;
listeners.delete(localAddr);
},
async onRevoke(_port, _localAddr, _protocolHandler) {
// This is an opportunity to clean up resources.
},
},
protocolHandlerAcceptWatcher: {
onFulfilled(rchandler) {
return rchandler;
},
},
protocolHandlerConnectWatcher: {
onFulfilled(results) {
return {
remoteInstance: results[0],
handler: results[1],
};
},
},
protocolHandlerInstantiateWatcher: {
onFulfilled(remoteInstance) {
return remoteInstance;
},
},
rethrowUnlessMissingWatcher: {
onRejected(e) {
rethrowUnlessMissing(e);
},
},
},
async generatePortID(_protocolHandler) {
return makePortID();
},
async onBind(_port, _localAddr, _protocolHandler) {
// TODO: Maybe handle a bind?
},
async onConnect(_port, localAddr, remoteAddr, _chandler, protocolHandler) {
const [lport, lhandler] = listeners.get(remoteAddr);
const rchandler =
/** @type {ConnectionHandler} */
(await E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler));
// console.log(`rchandler is`, rchandler);
const remoteInstance = await E(protocolHandler)
.onInstantiate(lport, remoteAddr, localAddr, protocolHandler)
.catch(rethrowUnlessMissing);
return {
remoteInstance,
handler: rchandler,
};
},
onInstantiate,
async onListen(port, localAddr, listenHandler, _protocolHandler) {
// TODO: Implement other listener replacement policies.
if (listeners.has(localAddr)) {
const lhandler = listeners.get(localAddr)[1];
if (lhandler !== listenHandler) {
// Last-one-wins.
listeners.set(localAddr, [port, listenHandler]);
}
} else {
listeners.init(localAddr, [port, listenHandler]);
}
},
async onListenRemove(port, localAddr, listenHandler, _protocolHandler) {
const [lport, lhandler] = listeners.get(localAddr);
lport === port || Fail`Port does not match listener on ${localAddr}`;
lhandler === listenHandler ||
Fail`Listen handler does not match listener on ${localAddr}`;
listeners.delete(localAddr);
},
async onRevoke(_port, _localAddr, _protocolHandler) {
// TODO: maybe clean up?
},
});
);
const makeLoopbackProtocolHandler = instancePrefix => {
const { protocolHandler } = makeLoopbackProtocolHandlerKit(instancePrefix);
return harden(protocolHandler);
};
return makeLoopbackProtocolHandler;
}

@@ -11,31 +11,52 @@ /**

*/
/**
* Create a slash-delimited router.
*
* @template T
* @returns {Router<T>} a new Router
*/
export default function makeRouter<T>(): Router<T>;
/**
* @typedef {object} RouterProtocol
* @property {(prefix: string) => Promise<Port>} bind
* @property {(paths: string[], protocolHandler: ProtocolHandler) => void} registerProtocolHandler
* @property {(prefix: string, protocolHandler: ProtocolHandler) => void} unregisterProtocolHandler
*/
/**
* Create a router that behaves like a Protocol.
*
* @param {typeof defaultE} [E] Eventual sender
* @returns {RouterProtocol} The new delegated protocol
*/
export function makeRouterProtocol(E?: ((<T>(x: T) => import("@endo/eventual-send/src/E.js").ECallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T>>) & {
readonly get: <T_1>(x: T_1) => import("@endo/eventual-send/src/E.js").EGetters<import("@endo/eventual-send").LocalRecord<T_1>>;
export const RouterI: import("@endo/patterns").InterfaceGuard<{
getRoutes: import("@endo/patterns").MethodGuard;
register: import("@endo/patterns").MethodGuard;
unregister: import("@endo/patterns").MethodGuard;
}>;
export function prepareRouter<T>(zone: import('@agoric/base-zone').Zone): () => import("@endo/exo/src/exo-makers.js").Guarded<{
/** @param {Endpoint} addr */
getRoutes(addr: Endpoint): [string, T][];
/**
* @param {string} prefix
* @param {T} route
*/
register(prefix: string, route: T): void;
/**
* @param {string} prefix
* @param {T} route
*/
unregister(prefix: string, route: T): void;
}>;
export function prepareRouterProtocol(zone: import('@agoric/base-zone').Zone, powers: ReturnType<(zone: import("@agoric/base-zone").Zone, powers?: {
isRetryableReason?: ((reason: any) => boolean) | undefined;
watchPromise?: ((p: PromiseLike<any>, watcher: import("@agoric/vow/src/watch-promise.js").PromiseWatcher, ...args: unknown[]) => void) | undefined;
} | undefined) => {
when: <T, TResult1 = import("@agoric/vow").Unwrap<T>, TResult2 = never>(specimenP: T, onFulfilled?: ((value: import("@agoric/vow").Unwrap<T>) => TResult1 | PromiseLike<TResult1>) | undefined, onRejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined) => Promise<TResult1 | TResult2>;
watch: <T_1 = any, TResult1_1 = T_1, TResult2_1 = T_1>(specimenP: import("@agoric/vow").ERef<T_1 | import("@agoric/vow").Vow<T_1>>, watcher?: import("@agoric/vow").Watcher<T_1, TResult1_1, TResult2_1> | undefined, watcherContext?: unknown) => import("@agoric/vow").Vow<TResult1_1 | TResult2_1>;
makeVowKit: <T_2>() => import("@agoric/vow").VowKit<T_2>;
allVows: (vows: any) => import("@agoric/vow").Vow<any>;
}>, E?: ((<T_3>(x: T_3) => import("@endo/eventual-send/src/E.js").ECallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T_3>>) & {
readonly get: <T_1_1>(x: T_1_1) => import("@endo/eventual-send/src/E.js").EGetters<import("@endo/eventual-send").LocalRecord<T_1_1>>;
readonly resolve: {
(): Promise<void>;
<T_2>(value: T_2): Promise<Awaited<T_2>>;
<T_3>(value: T_3 | PromiseLike<T_3>): Promise<Awaited<T_3>>;
<T_2_1>(value: T_2_1): Promise<Awaited<T_2_1>>;
<T_3_1>(value: T_3_1 | PromiseLike<T_3_1>): Promise<Awaited<T_3_1>>;
};
readonly sendOnly: <T_4>(x: T_4) => import("@endo/eventual-send/src/E.js").ESendOnlyCallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T_4>>;
readonly when: <T_5, U = T_5>(x: T_5 | PromiseLike<T_5>, onfulfilled?: ((value: T_5) => import("@endo/eventual-send").ERef<U>) | undefined, onrejected?: ((reason: any) => import("@endo/eventual-send").ERef<U>) | undefined) => Promise<U>;
}) | undefined): RouterProtocol;
}) | undefined): () => import("@endo/exo/src/exo-makers.js").Guarded<{
/**
* @param {string[]} paths
* @param {Remote<ProtocolHandler>} protocolHandler
*/
registerProtocolHandler(paths: string[], protocolHandler: Remote<ProtocolHandler>): void;
/**
* @param {string} prefix
* @param {Remote<ProtocolHandler>} protocolHandler
*/
unregisterProtocolHandler(prefix: string, protocolHandler: Remote<ProtocolHandler>): void;
/** @param {Endpoint} localAddr */
bind(localAddr: Endpoint): Promise<Port | import("@agoric/vow").Vow<Port>>;
}>;
/**

@@ -62,3 +83,3 @@ * A delimited string router implementation

export type RouterProtocol = {
bind: (prefix: string) => Promise<Port>;
bind: (prefix: string) => PromiseVow<Port>;
registerProtocolHandler: (paths: string[], protocolHandler: ProtocolHandler) => void;

@@ -65,0 +86,0 @@ unregisterProtocolHandler: (prefix: string, protocolHandler: ProtocolHandler) => void;

@@ -1,5 +0,7 @@

import { Far, E as defaultE } from '@endo/far';
import { makeScalarMapStore } from '@agoric/store';
// @ts-check
import { E as defaultE } from '@endo/far';
import { M } from '@endo/patterns';
import { Fail } from '@agoric/assert';
import { makeNetworkProtocol, ENDPOINT_SEPARATOR } from './network.js';
import { ENDPOINT_SEPARATOR, prepareNetworkProtocol } from './network.js';
import { Shape } from './shapes.js';

@@ -20,47 +22,77 @@ import '@agoric/store/exported.js';

export const RouterI = M.interface('Router', {
getRoutes: M.call(Shape.Endpoint).returns(M.arrayOf([M.string(), M.any()])),
register: M.call(M.string(), M.any()).returns(M.undefined()),
unregister: M.call(M.string(), M.any()).returns(M.undefined()),
});
/**
* Create a slash-delimited router.
*
* @template T
* @returns {Router<T>} a new Router
* @param {import('@agoric/base-zone').Zone} zone
*/
export default function makeRouter() {
/** @type {MapStore<string, T>} */
const prefixToRoute = makeScalarMapStore('prefix');
return Far('Router', {
getRoutes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
/** @type {[string, T][]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
if (prefixToRoute.has(prefix)) {
ret.push([prefix, prefixToRoute.get(prefix)]);
export const prepareRouter = zone => {
const detached = zone.detached();
const makeRouter = zone.exoClass(
'Router',
RouterI,
() => {
/** @type {MapStore<string, T>} */
const prefixToRoute = detached.mapStore('prefix');
return {
prefixToRoute,
};
},
{
/** @param {Endpoint} addr */
getRoutes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);
/** @type {[string, T][]} */
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
if (this.state.prefixToRoute.has(prefix)) {
ret.push([prefix, this.state.prefixToRoute.get(prefix)]);
}
// Trim off the last value (after the slash).
const defaultPrefix = prefix.slice(
0,
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1,
);
if (this.state.prefixToRoute.has(defaultPrefix)) {
ret.push([
defaultPrefix,
this.state.prefixToRoute.get(defaultPrefix),
]);
}
}
// Trim off the last value (after the slash).
const defaultPrefix = prefix.substr(
0,
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1,
);
if (prefixToRoute.has(defaultPrefix)) {
ret.push([defaultPrefix, prefixToRoute.get(defaultPrefix)]);
}
}
return harden(ret);
return harden(ret);
},
/**
* @param {string} prefix
* @param {T} route
*/
register(prefix, route) {
this.state.prefixToRoute.init(prefix, route);
},
/**
* @param {string} prefix
* @param {T} route
*/
unregister(prefix, route) {
this.state.prefixToRoute.get(prefix) === route ||
Fail`Router is not registered at prefix ${prefix}`;
this.state.prefixToRoute.delete(prefix);
},
},
register(prefix, route) {
prefixToRoute.init(prefix, route);
},
unregister(prefix, route) {
prefixToRoute.get(prefix) === route ||
Fail`Router is not registered at prefix ${prefix}`;
prefixToRoute.delete(prefix);
},
});
}
);
return makeRouter;
};
/**
* @typedef {object} RouterProtocol
* @property {(prefix: string) => Promise<Port>} bind
* @property {(prefix: string) => PromiseVow<Port>} bind
* @property {(paths: string[], protocolHandler: ProtocolHandler) => void} registerProtocolHandler

@@ -73,44 +105,79 @@ * @property {(prefix: string, protocolHandler: ProtocolHandler) => void} unregisterProtocolHandler

*
* @param {import('@agoric/base-zone').Zone} zone
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers
* @param {typeof defaultE} [E] Eventual sender
* @returns {RouterProtocol} The new delegated protocol
*/
export function makeRouterProtocol(E = defaultE) {
const router = makeRouter();
/** @type {MapStore<string, Protocol>} */
const protocols = makeScalarMapStore('prefix');
/** @type {MapStore<string, ProtocolHandler>} */
const protocolHandlers = makeScalarMapStore('prefix');
export const prepareRouterProtocol = (zone, powers, E = defaultE) => {
const detached = zone.detached();
function registerProtocolHandler(paths, protocolHandler) {
const protocol = makeNetworkProtocol(protocolHandler);
for (const prefix of paths) {
router.register(prefix, protocol);
protocols.init(prefix, protocol);
protocolHandlers.init(prefix, protocolHandler);
}
}
const makeRouter = prepareRouter(zone);
const makeNetworkProtocol = prepareNetworkProtocol(zone, powers);
// FIXME: Buggy.
// Needs to account for multiple paths.
function unregisterProtocolHandler(prefix, protocolHandler) {
const ph = protocolHandlers.get(prefix);
ph === protocolHandler ||
Fail`Protocol handler is not registered at prefix ${prefix}`;
router.unregister(prefix, ph);
protocols.delete(prefix);
protocolHandlers.delete(prefix);
}
const makeRouterProtocol = zone.exoClass(
'RouterProtocol',
M.interface('RouterProtocol', {
registerProtocolHandler: M.call(
M.arrayOf(M.string()),
M.remotable(),
).returns(),
unregisterProtocolHandler: M.call(M.string(), M.remotable()).returns(),
bind: M.callWhen(Shape.Endpoint).returns(Shape.Vow$(Shape.Port)),
}),
() => {
/** @type {Router<Protocol>} */
const router = makeRouter();
/** @type {Protocol['bind']} */
async function bind(localAddr) {
const [route] = router.getRoutes(localAddr);
route !== undefined || Fail`No registered router for ${localAddr}`;
return E(route[1]).bind(localAddr);
}
/** @type {MapStore<string, Protocol>} */
const protocols = detached.mapStore('prefix');
return Far('RouterProtocol', {
bind,
registerProtocolHandler,
unregisterProtocolHandler,
});
}
/** @type {MapStore<string, Remote<ProtocolHandler>>} */
const protocolHandlers = detached.mapStore('prefix');
return {
router,
protocolHandlers,
protocols,
};
},
{
/**
* @param {string[]} paths
* @param {Remote<ProtocolHandler>} protocolHandler
*/
registerProtocolHandler(paths, protocolHandler) {
const protocol = makeNetworkProtocol(protocolHandler);
for (const prefix of paths) {
this.state.router.register(prefix, protocol);
this.state.protocols.init(prefix, protocol);
this.state.protocolHandlers.init(prefix, protocolHandler);
}
},
// FIXME: Buggy.
// Needs to account for multiple paths.
/**
* @param {string} prefix
* @param {Remote<ProtocolHandler>} protocolHandler
*/
unregisterProtocolHandler(prefix, protocolHandler) {
const ph = this.state.protocolHandlers.get(prefix);
ph === protocolHandler ||
Fail`Protocol handler is not registered at prefix ${prefix}`;
// TODO: unmap protocol handlers to their corresponding protocol
// e.g. using a map
// before unregistering
// @ts-expect-error note FIXME above
this.state.router.unregister(prefix, ph);
this.state.protocols.delete(prefix);
this.state.protocolHandlers.delete(prefix);
},
/** @param {Endpoint} localAddr */
async bind(localAddr) {
const [route] = this.state.router.getRoutes(localAddr);
route !== undefined || Fail`No registered router for ${localAddr}`;
return E(route[1]).bind(localAddr);
},
},
);
return makeRouterProtocol;
};

@@ -1,2 +0,13 @@

type Data = string | Buffer | ArrayBuffer;
type PromiseVow<T> = import('@agoric/vow').PromiseVow<T>;
type Remote<T> = import('@agoric/vow').Remote<T>;
/**
* Rearrange the exo types to make a cast of the methods (M) and init function (I) to a specific type.
*/
type ExoClassMethods<M extends import("@endo/exo/src/exo-tools").Methods, I extends (...args: any[]) => any> = M & ThisType<{
self: import('@endo/exo/src/exo-makers').Guarded<M>;
state: ReturnType<I>;
}>;
/**
* Each character code carries 8-bit octets. Eventually we want to use passable Uint8Arrays.
*/
type Bytes = string;

@@ -15,3 +26,3 @@ /**

*/
close: () => Promise<void>;
close: () => PromiseVow<void>;
};

@@ -26,3 +37,3 @@ /**

*/
bind: (prefix: Endpoint) => Promise<Port>;
bind: (prefix: Endpoint) => PromiseVow<Port>;
};

@@ -41,11 +52,11 @@ /**

*/
addListener: (acceptHandler: ListenHandler) => Promise<void>;
addListener: (acceptHandler: Remote<ListenHandler>) => PromiseVow<void>;
/**
* Make an outbound connection
*/
connect: (remote: Endpoint, connectionHandler?: ConnectionHandler) => Promise<Connection>;
connect: (remote: Endpoint, connectionHandler?: Remote<ConnectionHandler>) => PromiseVow<Connection>;
/**
* Remove the currently-bound listener
*/
removeListener: (acceptHandler: ListenHandler) => Promise<void>;
removeListener: (acceptHandler: Remote<ListenHandler>) => PromiseVow<void>;
/**

@@ -55,3 +66,3 @@ * Deallocate the port entirely, removing all

*/
revoke: () => void;
revoke: () => PromiseVow<void>;
};

@@ -63,18 +74,17 @@ /**

/**
* The
* listener has been registered
* The listener has been registered
*/
onListen?: ((port: Port, l: ListenHandler) => Promise<void>) | undefined;
onListen?: ((port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
/**
* A new connection is incoming
*/
onAccept: (port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>;
onAccept: (port: Remote<Port>, localAddr: Endpoint, remoteAddr: Endpoint, l: Remote<ListenHandler>) => PromiseVow<Remote<ConnectionHandler>>;
/**
* The connection was rejected
*/
onReject?: ((port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<void>) | undefined;
onReject?: ((port: Remote<Port>, localAddr: Endpoint, remoteAddr: Endpoint, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
/**
* There was an error while listening
*/
onError?: ((port: Port, rej: any, l: ListenHandler) => Promise<void>) | undefined;
onError?: ((port: Remote<Port>, rej: any, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
/**

@@ -84,3 +94,3 @@ * The

*/
onRemove?: ((port: Port, l: ListenHandler) => Promise<void>) | undefined;
onRemove?: ((port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>) | undefined;
};

@@ -91,7 +101,7 @@ type Connection = {

*/
send: (packetBytes: Data, opts?: Record<string, any>) => Promise<Bytes>;
send: (packetBytes: Bytes, opts?: Record<string, any>) => PromiseVow<Bytes>;
/**
* Close both ends of the connection
*/
close: () => Promise<void>;
close: () => PromiseVow<void>;
/**

@@ -114,11 +124,11 @@ * Get the locally bound name of this

*/
onOpen?: ((connection: Connection, localAddr: Endpoint, remoteAddr: Endpoint, c: ConnectionHandler) => void) | undefined;
onOpen?: ((connection: Remote<Connection>, localAddr: Endpoint, remoteAddr: Endpoint, c: Remote<ConnectionHandler>) => PromiseVow<void>) | undefined;
/**
* The connection received a packet
*/
onReceive?: ((connection: Connection, packetBytes: Bytes, c: ConnectionHandler, opts?: Record<string, any>) => Promise<Data>) | undefined;
onReceive?: ((connection: Remote<Connection>, ack: Bytes, c: Remote<ConnectionHandler>, opts?: Record<string, any>) => PromiseVow<Bytes>) | undefined;
/**
* The connection has been closed
*/
onClose?: ((connection: Connection, reason?: CloseReason, c?: ConnectionHandler) => Promise<void>) | undefined;
onClose?: ((connection: Remote<Connection>, reason?: CloseReason, c?: Remote<ConnectionHandler>) => PromiseVow<void>) | undefined;
};

@@ -130,3 +140,3 @@ /**

type AttemptDescription = {
handler: ConnectionHandler;
handler: Remote<ConnectionHandler>;
remoteAddress?: string | undefined;

@@ -143,31 +153,31 @@ localAddress?: string | undefined;

*/
onCreate: (protocol: ProtocolImpl, p: ProtocolHandler) => Promise<void>;
onCreate: (protocol: Remote<ProtocolImpl>, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* Create a fresh port identifier for this protocol
*/
generatePortID: (localAddr: Endpoint, p: ProtocolHandler) => Promise<string>;
generatePortID: (localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<string>;
/**
* A port will be bound
*/
onBind: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>;
onBind: (port: Remote<Port>, localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* A port was listening
*/
onListen: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>;
onListen: (port: Remote<Port>, localAddr: Endpoint, listenHandler: Remote<ListenHandler>, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* A port listener has been reset
*/
onListenRemove: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>;
onListenRemove: (port: Remote<Port>, localAddr: Endpoint, listenHandler: Remote<ListenHandler>, p: Remote<ProtocolHandler>) => PromiseVow<void>;
/**
* Return unique suffix for local address
*/
onInstantiate?: ((port: Port, localAddr: Endpoint, remote: Endpoint, p: ProtocolHandler) => Promise<Endpoint>) | undefined;
onInstantiate?: ((port: Remote<Port>, localAddr: Endpoint, remote: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<Endpoint>) | undefined;
/**
* A port initiates an outbound connection
*/
onConnect: (port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise<AttemptDescription>;
onConnect: (port: Remote<Port>, localAddr: Endpoint, remote: Endpoint, c: Remote<ConnectionHandler>, p: Remote<ProtocolHandler>) => PromiseVow<AttemptDescription>;
/**
* The port is being completely destroyed
*/
onRevoke: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>;
onRevoke: (port: Remote<Port>, localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<void>;
};

@@ -181,3 +191,3 @@ /**

*/
accept: (desc: AttemptDescription) => Promise<Connection>;
accept: (desc: AttemptDescription) => PromiseVow<Connection>;
/**

@@ -196,3 +206,3 @@ * Return the local address for this

*/
close: () => Promise<void>;
close: () => PromiseVow<void>;
};

@@ -207,12 +217,12 @@ /**

*/
bind: (prefix: Endpoint) => Promise<Port>;
bind: (prefix: Endpoint) => PromiseVow<Remote<Port>>;
/**
* Make an attempt to connect into this protocol
*/
inbound: (listenAddr: Endpoint, remoteAddr: Endpoint) => Promise<InboundAttempt>;
inbound: (listenAddr: Endpoint, remoteAddr: Endpoint) => PromiseVow<InboundAttempt>;
/**
* Create an outbound connection
*/
outbound: (port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise<Connection>;
outbound: (port: Remote<Port>, remoteAddr: Endpoint, connectionHandler: Remote<ConnectionHandler>) => PromiseVow<Connection>;
};
//# sourceMappingURL=types.d.ts.map

@@ -0,8 +1,25 @@

// @ts-check
/**
* @typedef {string | Buffer | ArrayBuffer} Data
*
* @typedef {string} Bytes
* @template T
* @typedef {import('@agoric/vow').PromiseVow<T>} PromiseVow
*/
/**
* @template T
* @typedef {import('@agoric/vow').Remote<T>} Remote
*/
/**
* @template {import('@endo/exo/src/exo-makers').Methods} M
* @template {(...args: any[]) => any} I
* @typedef {M & ThisType<{ self: import('@endo/exo/src/exo-makers').Guarded<M>, state: ReturnType<I> }>} ExoClassMethods
* Rearrange the exo types to make a cast of the methods (M) and init function (I) to a specific type.
*/
/**
* @typedef {string} Bytes Each character code carries 8-bit octets. Eventually we want to use passable Uint8Arrays.
*/
/**
* @typedef {string} Endpoint A local or remote address See multiaddr.js for an

@@ -14,3 +31,3 @@ * opinionated router implementation

* @typedef {object} Closable A closable object
* @property {() => Promise<void>} close Terminate the object
* @property {() => PromiseVow<void>} close Terminate the object
*/

@@ -20,3 +37,3 @@

* @typedef {object} Protocol The network Protocol
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if
* @property {(prefix: Endpoint) => PromiseVow<Port>} bind Claim a port, or if
* ending in ENDPOINT_SEPARATOR, a fresh name

@@ -29,12 +46,12 @@ */

* port
* @property {(acceptHandler: ListenHandler) => Promise<void>} addListener
* @property {(acceptHandler: Remote<ListenHandler>) => PromiseVow<void>} addListener
* Begin accepting incoming connections
* @property {(
* remote: Endpoint,
* connectionHandler?: ConnectionHandler,
* ) => Promise<Connection>} connect
* connectionHandler?: Remote<ConnectionHandler>,
* ) => PromiseVow<Connection>} connect
* Make an outbound connection
* @property {(acceptHandler: ListenHandler) => Promise<void>} removeListener
* @property {(acceptHandler: Remote<ListenHandler>) => PromiseVow<void>} removeListener
* Remove the currently-bound listener
* @property {() => void} revoke Deallocate the port entirely, removing all
* @property {() => PromiseVow<void>} revoke Deallocate the port entirely, removing all
* listeners and closing all active connections

@@ -45,21 +62,20 @@ */

* @typedef {object} ListenHandler A handler for incoming connections
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onListen] The
* listener has been registered
* @property {(port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>} [onListen] The listener has been registered
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remoteAddr: Endpoint,
* l: ListenHandler,
* ) => Promise<ConnectionHandler>} onAccept
* l: Remote<ListenHandler>,
* ) => PromiseVow<Remote<ConnectionHandler>>} onAccept
* A new connection is incoming
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remoteAddr: Endpoint,
* l: ListenHandler,
* ) => Promise<void>} [onReject]
* l: Remote<ListenHandler>,
* ) => PromiseVow<void>} [onReject]
* The connection was rejected
* @property {(port: Port, rej: any, l: ListenHandler) => Promise<void>} [onError]
* @property {(port: Remote<Port>, rej: any, l: Remote<ListenHandler>) => PromiseVow<void>} [onError]
* There was an error while listening
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onRemove] The
* @property {(port: Remote<Port>, l: Remote<ListenHandler>) => PromiseVow<void>} [onRemove] The
* listener has been removed

@@ -71,7 +87,7 @@ */

* @property {(
* packetBytes: Data,
* packetBytes: Bytes,
* opts?: Record<string, any>,
* ) => Promise<Bytes>} send
* ) => PromiseVow<Bytes>} send
* Send a packet on the connection
* @property {() => Promise<void>} close Close both ends of the connection
* @property {() => PromiseVow<void>} close Close both ends of the connection
* @property {() => Endpoint} getLocalAddress Get the locally bound name of this

@@ -85,20 +101,20 @@ * connection

* @property {(
* connection: Connection,
* connection: Remote<Connection>,
* localAddr: Endpoint,
* remoteAddr: Endpoint,
* c: ConnectionHandler,
* ) => void} [onOpen]
* c: Remote<ConnectionHandler>,
* ) => PromiseVow<void>} [onOpen]
* The connection has been opened
* @property {(
* connection: Connection,
* packetBytes: Bytes,
* c: ConnectionHandler,
* connection: Remote<Connection>,
* ack: Bytes,
* c: Remote<ConnectionHandler>,
* opts?: Record<string, any>,
* ) => Promise<Data>} [onReceive]
* ) => PromiseVow<Bytes>} [onReceive]
* The connection received a packet
* @property {(
* connection: Connection,
* connection: Remote<Connection>,
* reason?: CloseReason,
* c?: ConnectionHandler,
* ) => Promise<void>} [onClose]
* c?: Remote<ConnectionHandler>,
* ) => PromiseVow<void>} [onClose]
* The connection has been closed

@@ -111,3 +127,3 @@ *

* @typedef {object} AttemptDescription
* @property {ConnectionHandler} handler
* @property {Remote<ConnectionHandler>} handler
* @property {Endpoint} [remoteAddress]

@@ -120,50 +136,50 @@ * @property {Endpoint} [localAddress]

* implementation will invoke
* @property {(protocol: ProtocolImpl, p: ProtocolHandler) => Promise<void>} onCreate
* @property {(protocol: Remote<ProtocolImpl>, p: Remote<ProtocolHandler>) => PromiseVow<void>} onCreate
* This protocol is created
* @property {(localAddr: Endpoint, p: ProtocolHandler) => Promise<string>} generatePortID
* @property {(localAddr: Endpoint, p: Remote<ProtocolHandler>) => PromiseVow<string>} generatePortID
* Create a fresh port identifier for this protocol
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* p: ProtocolHandler,
* ) => Promise<void>} onBind
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onBind
* A port will be bound
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* listenHandler: ListenHandler,
* p: ProtocolHandler,
* ) => Promise<void>} onListen
* listenHandler: Remote<ListenHandler>,
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onListen
* A port was listening
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* listenHandler: ListenHandler,
* p: ProtocolHandler,
* ) => Promise<void>} onListenRemove
* listenHandler: Remote<ListenHandler>,
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onListenRemove
* A port listener has been reset
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remote: Endpoint,
* p: ProtocolHandler,
* ) => Promise<Endpoint>} [onInstantiate]
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<Endpoint>} [onInstantiate]
* Return unique suffix for local address
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* remote: Endpoint,
* c: ConnectionHandler,
* p: ProtocolHandler,
* ) => Promise<AttemptDescription>} onConnect
* c: Remote<ConnectionHandler>,
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<AttemptDescription>} onConnect
* A port initiates an outbound connection
* @property {(
* port: Port,
* port: Remote<Port>,
* localAddr: Endpoint,
* p: ProtocolHandler,
* ) => Promise<void>} onRevoke
* p: Remote<ProtocolHandler>,
* ) => PromiseVow<void>} onRevoke
* The port is being completely destroyed
*
* @typedef {object} InboundAttempt An inbound connection attempt
* @property {(desc: AttemptDescription) => Promise<Connection>} accept
* @property {(desc: AttemptDescription) => PromiseVow<Connection>} accept
* Establish the connection

@@ -174,6 +190,6 @@ * @property {() => Endpoint} getLocalAddress Return the local address for this

* this attempt
* @property {() => Promise<void>} close Abort the attempt
* @property {() => PromiseVow<void>} close Abort the attempt
*
* @typedef {object} ProtocolImpl Things the protocol can do for us
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if
* @property {(prefix: Endpoint) => PromiseVow<Remote<Port>>} bind Claim a port, or if
* ending in ENDPOINT_SEPARATOR, a fresh name

@@ -183,10 +199,10 @@ * @property {(

* remoteAddr: Endpoint,
* ) => Promise<InboundAttempt>} inbound
* ) => PromiseVow<InboundAttempt>} inbound
* Make an attempt to connect into this protocol
* @property {(
* port: Port,
* port: Remote<Port>,
* remoteAddr: Endpoint,
* connectionHandler: ConnectionHandler,
* ) => Promise<Connection>} outbound
* connectionHandler: Remote<ConnectionHandler>,
* ) => PromiseVow<Connection>} outbound
* Create an outbound connection
*/

@@ -5,2 +5,3 @@ {

"checkJs": false,
"maxNodeModuleJsDepth": 1,
},

@@ -7,0 +8,0 @@ "include": [

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc