@agoric/network
Advanced tools
Comparing version 0.1.1-dev-780a461.0 to 0.1.1-dev-7ba058a.0
export * from './network.js'; | ||
export { default as makeRouter, makeRouterProtocol } from './router.js'; | ||
export { prepareRouter, prepareRouterProtocol } from './router.js'; | ||
export * from './multiaddr.js'; | ||
export * from './bytes.js'; |
{ | ||
"name": "@agoric/network", | ||
"version": "0.1.1-dev-780a461.0+780a461", | ||
"version": "0.1.1-dev-7ba058a.0+7ba058a", | ||
"description": "Agoric's network protocol API", | ||
@@ -24,12 +24,16 @@ "type": "module", | ||
"dependencies": { | ||
"@agoric/assert": "0.6.1-dev-780a461.0+780a461", | ||
"@agoric/internal": "0.3.3-dev-780a461.0+780a461", | ||
"@agoric/store": "0.9.3-dev-780a461.0+780a461", | ||
"@endo/base64": "^1.0.0", | ||
"@endo/far": "^1.0.1", | ||
"@endo/promise-kit": "^1.0.1" | ||
"@agoric/assert": "0.6.1-dev-7ba058a.0+7ba058a", | ||
"@agoric/internal": "0.3.3-dev-7ba058a.0+7ba058a", | ||
"@agoric/store": "0.9.3-dev-7ba058a.0+7ba058a", | ||
"@agoric/vat-data": "0.5.3-dev-7ba058a.0+7ba058a", | ||
"@endo/base64": "^1.0.2", | ||
"@endo/far": "^1.0.4", | ||
"@endo/patterns": "^1.1.0", | ||
"@endo/promise-kit": "^1.0.4" | ||
}, | ||
"devDependencies": { | ||
"@agoric/swingset-vat": "0.32.3-dev-780a461.0+780a461", | ||
"@endo/bundle-source": "^3.0.1", | ||
"@agoric/swingset-liveslots": "0.10.3-dev-7ba058a.0+7ba058a", | ||
"@agoric/swingset-vat": "0.32.3-dev-7ba058a.0+7ba058a", | ||
"@agoric/zone": "0.2.3-dev-7ba058a.0+7ba058a", | ||
"@endo/bundle-source": "^3.1.0", | ||
"ava": "^5.3.0", | ||
@@ -68,5 +72,5 @@ "c8": "^7.13.0" | ||
"typeCoverage": { | ||
"atLeast": 76.57 | ||
"atLeast": 91.68 | ||
}, | ||
"gitHead": "780a4613a6a5f805cedf2686cb2b3c0412f7ffff" | ||
"gitHead": "7ba058ab6142e26fb3ca1bfd31a0a9756735ac17" | ||
} |
/// <reference path="./types.js" /> | ||
import { encodeBase64, decodeBase64 } from '@endo/base64'; | ||
/* eslint-disable no-bitwise */ | ||
/** | ||
@@ -6,0 +5,0 @@ * Convert some data to bytes. |
export * from "./network.js"; | ||
export * from "./multiaddr.js"; | ||
export * from "./bytes.js"; | ||
export { default as makeRouter, makeRouterProtocol } from "./router.js"; | ||
export { prepareRouter, prepareRouterProtocol } from "./router.js"; | ||
//# sourceMappingURL=index.d.ts.map |
export * from './network.js'; | ||
export { default as makeRouter, makeRouterProtocol } from './router.js'; | ||
export { prepareRouter, prepareRouterProtocol } from './router.js'; | ||
export * from './multiaddr.js'; | ||
export * from './bytes.js'; |
/** | ||
* @param {ConnectionHandler} handler0 | ||
* @param {Endpoint} addr0 | ||
* @param {ConnectionHandler} handler1 | ||
* @param {Endpoint} addr1 | ||
* @param {WeakSet<Connection>} [current] | ||
* @returns {[Connection, Connection]} | ||
*/ | ||
export function crossoverConnection(handler0: ConnectionHandler, addr0: Endpoint, handler1: ConnectionHandler, addr1: Endpoint, current?: WeakSet<Connection> | undefined): [Connection, Connection]; | ||
/** | ||
* Get the list of prefixes from longest to shortest. | ||
@@ -17,22 +8,33 @@ * | ||
/** | ||
* Create a protocol that has a handler. | ||
* | ||
* @param {ProtocolHandler} protocolHandler | ||
* @returns {Protocol} the local capability for connecting and listening | ||
*/ | ||
export function makeNetworkProtocol(protocolHandler: ProtocolHandler): Protocol; | ||
/** | ||
* Create a ConnectionHandler that just echoes its packets. | ||
* | ||
* @returns {ConnectionHandler} | ||
*/ | ||
export function makeEchoConnectionHandler(): ConnectionHandler; | ||
export function makeNonceMaker(prefix?: string, suffix?: string): () => Promise<string>; | ||
/** | ||
* Create a protocol handler that just connects to itself. | ||
* | ||
* @param {ProtocolHandler['onInstantiate']} [onInstantiate] | ||
* @returns {ProtocolHandler} The localhost handler | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
export function makeLoopbackProtocolHandler(onInstantiate?: ProtocolHandler['onInstantiate']): ProtocolHandler; | ||
export function prepareLoopbackProtocolHandler(zone: import('@agoric/base-zone').Zone, { when }: ReturnType<(zone: import("@agoric/base-zone").Zone, powers?: { | ||
isRetryableReason?: ((reason: any) => boolean) | undefined; | ||
watchPromise?: ((p: PromiseLike<any>, watcher: import("@agoric/vow/src/watch-promise.js").PromiseWatcher, ...args: unknown[]) => void) | undefined; | ||
} | undefined) => { | ||
when: <T = any, TResult1 = import("@agoric/vow/src/E.js").Unwrap<T>, TResult2 = never>(specimenP: T, onFulfilled?: ((value: import("@agoric/vow/src/E.js").Unwrap<T>) => TResult1 | PromiseLike<TResult1>) | undefined, onRejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined) => Promise<TResult1 | TResult2>; | ||
watch: <T_1 = any, TResult1_1 = T_1, TResult2_1 = T_1>(specimenP: import("@agoric/vow").ERef<T_1 | import("@agoric/vow").Vow<T_1>>, watcher?: import("@agoric/vow").Watcher<T_1, TResult1_1, TResult2_1> | undefined, watcherContext?: unknown) => import("@agoric/vow").Vow<TResult1_1 | TResult2_1>; | ||
makeVowKit: <T_2>() => import("@agoric/vow").VowKit<T_2>; | ||
}>): (instancePrefix?: string | undefined) => import("@endo/exo/src/exo-makers.js").Guarded<{ | ||
onCreate(_impl: any, _protocolHandler: any): Promise<void>; | ||
generatePortID(_localAddr: any, _protocolHandler: any): Promise<string>; | ||
onBind(_port: any, _localAddr: any, _protocolHandler: any): Promise<void>; | ||
onConnect(_port: any, localAddr: any, remoteAddr: any, _chandler: any, protocolHandler: any): Promise<{ | ||
remoteInstance: any; | ||
handler: ConnectionHandler; | ||
}>; | ||
onInstantiate(_port: any, _localAddr: any, _remote: any, _protocol: any): Promise<string>; | ||
onListen(port: any, localAddr: any, listenHandler: any, _protocolHandler: any): Promise<void>; | ||
/** | ||
* @param {Port} port | ||
* @param {Endpoint} localAddr | ||
* @param {ListenHandler} listenHandler | ||
* @param {*} _protocolHandler | ||
*/ | ||
onListenRemove(port: Port, localAddr: Endpoint, listenHandler: ListenHandler, _protocolHandler: any): Promise<void>; | ||
onRevoke(_port: any, _localAddr: any, _protocolHandler: any): Promise<void>; | ||
}>; | ||
/** | ||
@@ -43,4 +45,104 @@ * Compatibility note: this must match what our peers use, so don't change it | ||
export const ENDPOINT_SEPARATOR: "/"; | ||
export function rethrowUnlessMissing(err: any): undefined; | ||
export function makeConnection(handler: ConnectionHandler, localAddr: Endpoint, remoteAddr: Endpoint, current?: Set<Closable> | undefined): Connection; | ||
export namespace Shape { | ||
let ConnectionI: import("@endo/patterns").InterfaceGuard<{ | ||
send: import("@endo/patterns").MethodGuard; | ||
close: import("@endo/patterns").MethodGuard; | ||
getLocalAddress: import("@endo/patterns").MethodGuard; | ||
getRemoteAddress: import("@endo/patterns").MethodGuard; | ||
}>; | ||
let InboundAttemptI: import("@endo/patterns").InterfaceGuard<{ | ||
accept: import("@endo/patterns").MethodGuard; | ||
getLocalAddress: import("@endo/patterns").MethodGuard; | ||
getRemoteAddress: import("@endo/patterns").MethodGuard; | ||
close: import("@endo/patterns").MethodGuard; | ||
}>; | ||
let PortI: import("@endo/patterns").InterfaceGuard<{ | ||
getLocalAddress: import("@endo/patterns").MethodGuard; | ||
addListener: import("@endo/patterns").MethodGuard; | ||
connect: import("@endo/patterns").MethodGuard; | ||
removeListener: import("@endo/patterns").MethodGuard; | ||
revoke: import("@endo/patterns").MethodGuard; | ||
}>; | ||
let ProtocolHandlerI: import("@endo/patterns").InterfaceGuard<{ | ||
onCreate: import("@endo/patterns").MethodGuard; | ||
generatePortID: import("@endo/patterns").MethodGuard; | ||
onBind: import("@endo/patterns").MethodGuard; | ||
onListen: import("@endo/patterns").MethodGuard; | ||
onListenRemove: import("@endo/patterns").MethodGuard; | ||
onInstantiate: import("@endo/patterns").MethodGuard; | ||
onConnect: import("@endo/patterns").MethodGuard; | ||
onRevoke: import("@endo/patterns").MethodGuard; | ||
}>; | ||
let ProtocolImplI: import("@endo/patterns").InterfaceGuard<{ | ||
bind: import("@endo/patterns").MethodGuard; | ||
inbound: import("@endo/patterns").MethodGuard; | ||
outbound: import("@endo/patterns").MethodGuard; | ||
}>; | ||
function Vow$(shape: any): import("@endo/patterns").Matcher; | ||
let AttemptDescription: import("@endo/patterns").Matcher; | ||
let Opts: import("@endo/patterns").Matcher; | ||
let Data: import("@endo/patterns").Matcher; | ||
let Bytes: import("@endo/patterns").Matcher; | ||
let Endpoint: import("@endo/patterns").Matcher; | ||
let Vow: import("@endo/patterns").Matcher; | ||
let ConnectionHandler: import("@endo/patterns").Matcher; | ||
let Connection: import("@endo/patterns").Matcher; | ||
let InboundAttempt: import("@endo/patterns").Matcher; | ||
let Listener: import("@endo/patterns").Matcher; | ||
let ListenHandler: import("@endo/patterns").Matcher; | ||
let Port: import("@endo/patterns").Matcher; | ||
let ProtocolHandler: import("@endo/patterns").Matcher; | ||
let ProtocolImpl: import("@endo/patterns").Matcher; | ||
} | ||
export function rethrowUnlessMissing(err: unknown): undefined; | ||
export function crossoverConnection(zone: import('@agoric/zone').Zone, handler0: ConnectionHandler, addr0: Endpoint, handler1: ConnectionHandler, addr1: Endpoint, makeConnection: (opts: ConnectionOpts) => Connection, current?: WeakSetStore<Closable>): Connection[]; | ||
export function prepareNetworkProtocol(zone: import('@agoric/base-zone').Zone, powers: ReturnType<(zone: import("@agoric/base-zone").Zone, powers?: { | ||
isRetryableReason?: ((reason: any) => boolean) | undefined; | ||
watchPromise?: ((p: PromiseLike<any>, watcher: import("@agoric/vow/src/watch-promise.js").PromiseWatcher, ...args: unknown[]) => void) | undefined; | ||
} | undefined) => { | ||
when: <T = any, TResult1 = import("@agoric/vow/src/E.js").Unwrap<T>, TResult2 = never>(specimenP: T, onFulfilled?: ((value: import("@agoric/vow/src/E.js").Unwrap<T>) => TResult1 | PromiseLike<TResult1>) | undefined, onRejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined) => Promise<TResult1 | TResult2>; | ||
watch: <T_1 = any, TResult1_1 = T_1, TResult2_1 = T_1>(specimenP: import("@agoric/vow").ERef<T_1 | import("@agoric/vow").Vow<T_1>>, watcher?: import("@agoric/vow").Watcher<T_1, TResult1_1, TResult2_1> | undefined, watcherContext?: unknown) => import("@agoric/vow").Vow<TResult1_1 | TResult2_1>; | ||
makeVowKit: <T_2>() => import("@agoric/vow").VowKit<T_2>; | ||
}>): (protocolHandler: ProtocolHandler) => Protocol; | ||
export function prepareEchoConnectionKit(zone: import('@agoric/base-zone').Zone): () => import("@endo/exo/src/exo-makers.js").GuardedKit<{ | ||
handler: { | ||
/** | ||
* @param {Connection} _connection | ||
* @param {Bytes} bytes | ||
* @param {ConnectionHandler} _connectionHandler | ||
*/ | ||
onReceive(_connection: Connection, bytes: Bytes, _connectionHandler: ConnectionHandler): Promise<string>; | ||
/** | ||
* @param {Connection} _connection | ||
* @param {CloseReason} [_reason] | ||
* @param {ConnectionHandler} [_connectionHandler] | ||
*/ | ||
onClose(_connection: Connection, _reason?: CloseReason, _connectionHandler?: ConnectionHandler | undefined): Promise<void>; | ||
}; | ||
listener: { | ||
onAccept(_port: any, _localAddr: any, _remoteAddr: any, _listenHandler: any): Promise<import("@endo/exo/src/exo-makers.js").Guarded<{ | ||
/** | ||
* @param {Connection} _connection | ||
* @param {Bytes} bytes | ||
* @param {ConnectionHandler} _connectionHandler | ||
*/ | ||
onReceive(_connection: Connection, bytes: Bytes, _connectionHandler: ConnectionHandler): Promise<string>; | ||
/** | ||
* @param {Connection} _connection | ||
* @param {CloseReason} [_reason] | ||
* @param {ConnectionHandler} [_connectionHandler] | ||
*/ | ||
onClose(_connection: Connection, _reason?: CloseReason, _connectionHandler?: ConnectionHandler | undefined): Promise<void>; | ||
}>>; | ||
onListen(port: any, _listenHandler: any): Promise<void>; | ||
}; | ||
}>; | ||
export type ConnectionOpts = { | ||
addrs: Endpoint[]; | ||
handlers: ConnectionHandler[]; | ||
conns: MapStore<number, Connection>; | ||
current: WeakSetStore<Closable>; | ||
l: 0 | 1; | ||
r: 0 | 1; | ||
}; | ||
//# sourceMappingURL=network.d.ts.map |
1285
src/network.js
@@ -1,4 +0,5 @@ | ||
import { makeScalarMapStore, makeLegacyMap } from '@agoric/store'; | ||
import { Far, E } from '@endo/far'; | ||
import { makePromiseKit } from '@endo/promise-kit'; | ||
// @ts-check | ||
import { E } from '@endo/far'; | ||
import { M } from '@endo/patterns'; | ||
import { Fail } from '@agoric/assert'; | ||
@@ -17,2 +18,126 @@ import { whileTrue } from '@agoric/internal'; | ||
const Shape1 = /** @type {const} */ ({ | ||
/** | ||
* Data is string | Buffer | ArrayBuffer | ||
* but only string is passable | ||
*/ | ||
Data: M.string(), | ||
Bytes: M.string(), | ||
Endpoint: M.string(), | ||
// TODO: match on "Vow" tag | ||
// @endo/patterns supports it as of | ||
// https://github.com/endojs/endo/pull/2091 | ||
// but that's not in agoric-sdk yet. | ||
// For now, use M.any() to avoid: | ||
// cannot check unrecognized tag "Vow": "[Vow]" | ||
Vow: M.any(), | ||
ConnectionHandler: M.remotable('ConnectionHandler'), | ||
Connection: M.remotable('Connection'), | ||
InboundAttempt: M.remotable('InboundAttempt'), | ||
Listener: M.remotable('Listener'), | ||
ListenHandler: M.remotable('ListenHandler'), | ||
Port: M.remotable('Port'), | ||
ProtocolHandler: M.remotable('ProtocolHandler'), | ||
ProtocolImpl: M.remotable('ProtocolImpl'), | ||
}); | ||
const Shape2 = /** @type {const} */ ({ | ||
...Shape1, | ||
Vow$: shape => M.or(shape, Shape1.Vow), | ||
AttemptDescription: M.splitRecord( | ||
{ handler: Shape1.ConnectionHandler }, | ||
{ remoteAddress: Shape1.Endpoint, localAddress: Shape1.Endpoint }, | ||
), | ||
Opts: M.recordOf(M.string(), M.any()), | ||
}); | ||
export const Shape = /** @type {const} */ harden({ | ||
...Shape2, | ||
ConnectionI: M.interface('Connection', { | ||
send: M.callWhen(Shape2.Data) | ||
.optional(Shape2.Opts) | ||
.returns(Shape2.Vow$(Shape2.Bytes)), | ||
close: M.callWhen().returns(Shape2.Vow$(M.undefined())), | ||
getLocalAddress: M.call().returns(Shape2.Endpoint), | ||
getRemoteAddress: M.call().returns(Shape2.Endpoint), | ||
}), | ||
InboundAttemptI: M.interface('InboundAttempt', { | ||
accept: M.callWhen(Shape2.AttemptDescription).returns( | ||
Shape2.Vow$(Shape2.Connection), | ||
), | ||
getLocalAddress: M.call().returns(Shape2.Endpoint), | ||
getRemoteAddress: M.call().returns(Shape2.Endpoint), | ||
close: M.callWhen().returns(Shape2.Vow$(M.undefined())), | ||
}), | ||
PortI: M.interface('Port', { | ||
getLocalAddress: M.call().returns(Shape2.Endpoint), | ||
addListener: M.callWhen(Shape2.Listener).returns( | ||
Shape2.Vow$(M.undefined()), | ||
), | ||
connect: M.callWhen(Shape2.Endpoint) | ||
.optional(Shape2.ConnectionHandler) | ||
.returns(Shape2.Vow$(Shape2.Connection)), | ||
removeListener: M.callWhen(Shape2.Listener).returns( | ||
Shape2.Vow$(M.undefined()), | ||
), | ||
revoke: M.callWhen().returns(M.undefined()), | ||
}), | ||
ProtocolHandlerI: M.interface('ProtocolHandler', { | ||
onCreate: M.callWhen(M.remotable(), Shape2.ProtocolHandler).returns( | ||
Shape2.Vow$(M.undefined()), | ||
), | ||
generatePortID: M.callWhen(Shape2.Endpoint, Shape2.ProtocolHandler).returns( | ||
Shape2.Vow$(M.string()), | ||
), | ||
onBind: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.ProtocolHandler, | ||
).returns(Shape2.Vow$(M.undefined())), | ||
onListen: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.ListenHandler, | ||
Shape2.ProtocolHandler, | ||
).returns(Shape2.Vow$(M.undefined())), | ||
onListenRemove: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.ListenHandler, | ||
Shape2.ProtocolHandler, | ||
).returns(Shape2.Vow$(M.undefined())), | ||
onInstantiate: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.Endpoint, | ||
Shape2.ProtocolHandler, | ||
).returns(Shape2.Vow$(Shape2.Endpoint)), | ||
onConnect: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.Endpoint, | ||
Shape2.ConnectionHandler, | ||
Shape2.ProtocolHandler, | ||
).returns(Shape2.Vow$(Shape2.AttemptDescription)), | ||
onRevoke: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.ProtocolHandler, | ||
).returns(Shape2.Vow$(M.undefined())), | ||
}), | ||
ProtocolImplI: M.interface('ProtocolImpl', { | ||
bind: M.callWhen(Shape2.Endpoint).returns(Shape2.Vow$(Shape2.Port)), | ||
inbound: M.callWhen(Shape2.Endpoint, Shape2.Endpoint).returns( | ||
Shape2.Vow$(Shape2.InboundAttempt), | ||
), | ||
outbound: M.callWhen( | ||
Shape2.Port, | ||
Shape2.Endpoint, | ||
Shape2.ConnectionHandler, | ||
).returns(Shape2.Vow$(Shape2.Connection)), | ||
}), | ||
}); | ||
/** @param {unknown} err */ | ||
export const rethrowUnlessMissing = err => { | ||
@@ -31,230 +156,322 @@ // Ugly hack rather than being able to determine if the function | ||
/** | ||
* Create a handled Connection. | ||
* Get the list of prefixes from longest to shortest. | ||
* | ||
* @param {ConnectionHandler} handler | ||
* @param {Endpoint} localAddr | ||
* @param {Endpoint} remoteAddr | ||
* @param {Set<Closable>} [current] | ||
* @returns {Connection} | ||
* @param {string} addr | ||
*/ | ||
export const makeConnection = ( | ||
handler, | ||
localAddr, | ||
remoteAddr, | ||
current = new Set(), | ||
) => { | ||
let closed; | ||
/** @type {Set<import('@endo/promise-kit').PromiseKit<Bytes>>} */ | ||
const pendingAcks = new Set(); | ||
/** @type {Connection} */ | ||
const connection = Far('Connection', { | ||
getLocalAddress() { | ||
return localAddr; | ||
}, | ||
getRemoteAddress() { | ||
return remoteAddr; | ||
}, | ||
async close() { | ||
if (closed) { | ||
throw closed; | ||
} | ||
current.delete(connection); | ||
closed = Error('Connection closed'); | ||
for (const ackDeferred of [...pendingAcks.values()]) { | ||
pendingAcks.delete(ackDeferred); | ||
ackDeferred.reject(closed); | ||
} | ||
await E(handler) | ||
.onClose(connection, undefined, handler) | ||
.catch(rethrowUnlessMissing); | ||
}, | ||
async send(data, opts) { | ||
// console.log('send', data, local === srcHandler); | ||
if (closed) { | ||
throw closed; | ||
} | ||
const bytes = toBytes(data); | ||
const ackDeferred = makePromiseKit(); | ||
pendingAcks.add(ackDeferred); | ||
E(handler) | ||
.onReceive(connection, bytes, handler, opts) | ||
.catch(err => { | ||
rethrowUnlessMissing(err); | ||
return ''; | ||
}) | ||
.then( | ||
ack => { | ||
pendingAcks.delete(ackDeferred); | ||
ackDeferred.resolve(toBytes(ack)); | ||
}, | ||
err => { | ||
pendingAcks.delete(ackDeferred); | ||
ackDeferred.reject(err); | ||
}, | ||
); | ||
return ackDeferred.promise; | ||
}, | ||
}); | ||
export function getPrefixes(addr) { | ||
const parts = addr.split(ENDPOINT_SEPARATOR); | ||
current.add(connection); | ||
E(handler) | ||
.onOpen(connection, localAddr, remoteAddr, handler) | ||
.catch(rethrowUnlessMissing); | ||
return connection; | ||
}; | ||
/** @type {string[]} */ | ||
const ret = []; | ||
for (let i = parts.length; i > 0; i -= 1) { | ||
// Try most specific match. | ||
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR); | ||
ret.push(prefix); | ||
} | ||
return ret; | ||
} | ||
/** | ||
* @param {ConnectionHandler} handler0 | ||
* @param {Endpoint} addr0 | ||
* @param {ConnectionHandler} handler1 | ||
* @param {Endpoint} addr1 | ||
* @param {WeakSet<Connection>} [current] | ||
* @returns {[Connection, Connection]} | ||
* @typedef {object} ConnectionOpts | ||
* @property {Endpoint[]} addrs | ||
* @property {ConnectionHandler[]} handlers | ||
* @property {MapStore<number, Connection>} conns | ||
* @property {WeakSetStore<Closable>} current | ||
* @property {0|1} l | ||
* @property {0|1} r | ||
*/ | ||
export function crossoverConnection( | ||
handler0, | ||
addr0, | ||
handler1, | ||
addr1, | ||
current = new WeakSet(), | ||
) { | ||
/** @type {Connection[]} */ | ||
const conns = []; | ||
/** @type {ConnectionHandler[]} */ | ||
const handlers = [handler0, handler1]; | ||
/** @type {Endpoint[]} */ | ||
const addrs = [addr0, addr1]; | ||
function makeHalfConnection(l, r) { | ||
let closed; | ||
conns[l] = Far('Connection', { | ||
/** | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
const prepareHalfConnection = (zone, { when }) => { | ||
const makeHalfConnection = zone.exoClass( | ||
'Connection', | ||
Shape.ConnectionI, | ||
/** @param {ConnectionOpts} opts */ | ||
({ addrs, handlers, conns, current, l, r }) => { | ||
/** @type {string | undefined} */ | ||
let closed; | ||
return { | ||
addrs, | ||
handlers, | ||
conns, | ||
current, | ||
l, | ||
r, | ||
closed, | ||
}; | ||
}, | ||
{ | ||
getLocalAddress() { | ||
const { addrs, l } = this.state; | ||
return addrs[l]; | ||
}, | ||
getRemoteAddress() { | ||
const { addrs, r } = this.state; | ||
return addrs[r]; | ||
}, | ||
/** @param {Data} packetBytes */ | ||
async send(packetBytes) { | ||
const { closed, handlers, r, conns } = this.state; | ||
if (closed) { | ||
throw closed; | ||
} | ||
const ack = await E(handlers[r]) | ||
.onReceive(conns[r], toBytes(packetBytes), handlers[r]) | ||
.catch(rethrowUnlessMissing); | ||
const ack = await when( | ||
E(handlers[r]) | ||
.onReceive(conns.get(r), toBytes(packetBytes), handlers[r]) | ||
.catch(rethrowUnlessMissing), | ||
); | ||
return toBytes(ack || ''); | ||
}, | ||
async close() { | ||
const { closed, current, conns, l, handlers } = this.state; | ||
if (closed) { | ||
throw closed; | ||
throw Error(closed); | ||
} | ||
closed = Error('Connection closed'); | ||
current.delete(conns[l]); | ||
await E(handlers[l]) | ||
.onClose(conns[l], undefined, handlers[l]) | ||
.catch(rethrowUnlessMissing); | ||
this.state.closed = 'Connection closed'; | ||
current.delete(conns.get(l)); | ||
await when( | ||
E(this.state.handlers[l]).onClose( | ||
conns.get(l), | ||
undefined, | ||
handlers[l], | ||
), | ||
).catch(rethrowUnlessMissing); | ||
}, | ||
}); | ||
} | ||
}, | ||
); | ||
makeHalfConnection(0, 1); | ||
makeHalfConnection(1, 0); | ||
return makeHalfConnection; | ||
}; | ||
/** | ||
* @param {import('@agoric/zone').Zone} zone | ||
* @param {ConnectionHandler} handler0 | ||
* @param {Endpoint} addr0 | ||
* @param {ConnectionHandler} handler1 | ||
* @param {Endpoint} addr1 | ||
* @param {(opts: ConnectionOpts) => Connection} makeConnection | ||
* @param {WeakSetStore<Closable>} current | ||
*/ | ||
export const crossoverConnection = ( | ||
zone, | ||
handler0, | ||
addr0, | ||
handler1, | ||
addr1, | ||
makeConnection, | ||
current = zone.detached().weakSetStore('crossoverCurrentConnections'), | ||
) => { | ||
const detached = zone.detached(); | ||
/** @type {MapStore<number, Connection>} */ | ||
const conns = detached.mapStore('addrToConnections'); | ||
/** @type {ConnectionHandler[]} */ | ||
const handlers = harden([handler0, handler1]); | ||
/** @type {Endpoint[]} */ | ||
const addrs = harden([addr0, addr1]); | ||
/** | ||
* @param {0|1} l | ||
* @param {0|1} r | ||
*/ | ||
const makeHalfConnection = (l, r) => { | ||
conns.init(l, makeConnection({ addrs, handlers, conns, current, l, r })); | ||
}; | ||
/** | ||
* @param {number} l local side of the connection | ||
* @param {number} r remote side of the connection | ||
*/ | ||
function openHalfConnection(l, r) { | ||
current.add(conns[l]); | ||
const openHalfConnection = (l, r) => { | ||
current.add(conns.get(l)); | ||
E(handlers[l]) | ||
.onOpen(conns[l], addrs[l], addrs[r], handlers[l]) | ||
.onOpen(conns.get(l), addrs[l], addrs[r], handlers[l]) | ||
.catch(rethrowUnlessMissing); | ||
} | ||
}; | ||
makeHalfConnection(0, 1); | ||
makeHalfConnection(1, 0); | ||
openHalfConnection(0, 1); | ||
openHalfConnection(1, 0); | ||
const [conn0, conn1] = conns; | ||
return [conn0, conn1]; | ||
} | ||
return [conns.get(0), conns.get(1)]; | ||
}; | ||
/** | ||
* Get the list of prefixes from longest to shortest. | ||
* | ||
* @param {string} addr | ||
* @param {import('@agoric/zone').Zone} zone | ||
* @param {(opts: ConnectionOpts) => Connection} makeConnection | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
export function getPrefixes(addr) { | ||
const parts = addr.split(ENDPOINT_SEPARATOR); | ||
const prepareInboundAttempt = (zone, makeConnection, { when }) => { | ||
const makeInboundAttempt = zone.exoClass( | ||
'InboundAttempt', | ||
Shape.InboundAttemptI, | ||
/** | ||
* @param {object} opts | ||
* @param {string} opts.localAddr | ||
* @param {string} opts.remoteAddr | ||
* @param { MapStore<Port, SetStore<Closable>> } opts.currentConnections | ||
* @param {string} opts.listenPrefix | ||
* @param {MapStore<Endpoint, [Port, ListenHandler]>} opts.listening | ||
*/ | ||
({ | ||
localAddr, | ||
remoteAddr, | ||
currentConnections, | ||
listenPrefix, | ||
listening, | ||
}) => { | ||
/** @type {string | undefined} */ | ||
let consummated; | ||
/** @type {string[]} */ | ||
const ret = []; | ||
for (let i = parts.length; i > 0; i -= 1) { | ||
// Try most specific match. | ||
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR); | ||
ret.push(prefix); | ||
} | ||
return ret; | ||
} | ||
return { | ||
localAddr, | ||
remoteAddr, | ||
consummated, | ||
currentConnections, | ||
listenPrefix, | ||
listening, | ||
}; | ||
}, | ||
{ | ||
getLocalAddress() { | ||
// Return address metadata. | ||
return this.state.localAddr; | ||
}, | ||
getRemoteAddress() { | ||
return this.state.remoteAddr; | ||
}, | ||
async close() { | ||
const { consummated, localAddr, remoteAddr } = this.state; | ||
const { listening, listenPrefix, currentConnections } = this.state; | ||
/** | ||
* Create a protocol that has a handler. | ||
* | ||
* @param {ProtocolHandler} protocolHandler | ||
* @returns {Protocol} the local capability for connecting and listening | ||
*/ | ||
export function makeNetworkProtocol(protocolHandler) { | ||
/** @type {LegacyMap<Port, Set<Closable>>} */ | ||
// Legacy because we're storing a JS Set | ||
const currentConnections = makeLegacyMap('port'); | ||
if (consummated) { | ||
throw Error(consummated); | ||
} | ||
this.state.consummated = 'Already closed'; | ||
/** | ||
* Currently must be a single listenHandler. TODO: Do something sensible with | ||
* multiple handlers? | ||
* | ||
* @type {MapStore<Endpoint, [Port, ListenHandler]>} | ||
*/ | ||
const listening = makeScalarMapStore('localAddr'); | ||
const [port, listener] = listening.get(listenPrefix); | ||
/** @type {MapStore<string, Port>} */ | ||
const boundPorts = makeScalarMapStore('localAddr'); | ||
const current = currentConnections.get(port); | ||
current.delete(this.self); | ||
/** @param {Endpoint} localAddr */ | ||
const bind = async localAddr => { | ||
// Check if we are underspecified (ends in slash) | ||
const underspecified = localAddr.endsWith(ENDPOINT_SEPARATOR); | ||
for await (const _ of whileTrue(() => underspecified)) { | ||
const portID = await E(protocolHandler).generatePortID( | ||
localAddr, | ||
protocolHandler, | ||
); | ||
const newAddr = `${localAddr}${portID}`; | ||
if (!boundPorts.has(newAddr)) { | ||
localAddr = newAddr; | ||
break; | ||
} | ||
} | ||
await when( | ||
E(listener).onReject(port, localAddr, remoteAddr, listener), | ||
).catch(rethrowUnlessMissing); | ||
}, | ||
/** | ||
* @param {object} opts | ||
* @param {string} [opts.localAddress] | ||
* @param {string} [opts.remoteAddress] | ||
* @param {ConnectionHandler} opts.handler | ||
*/ | ||
async accept({ localAddress, remoteAddress, handler: rchandler }) { | ||
const { consummated, localAddr, remoteAddr } = this.state; | ||
const { listening, listenPrefix, currentConnections } = this.state; | ||
if (consummated) { | ||
throw Error(consummated); | ||
} | ||
this.state.consummated = 'Already accepted'; | ||
if (boundPorts.has(localAddr)) { | ||
return boundPorts.get(localAddr); | ||
} | ||
if (localAddress === undefined) { | ||
localAddress = localAddr; | ||
} | ||
/** @enum {number} */ | ||
const RevokeState = { | ||
NOT_REVOKED: 0, | ||
REVOKING: 1, | ||
REVOKED: 2, | ||
}; | ||
if (remoteAddress === undefined) { | ||
remoteAddress = remoteAddr; | ||
} | ||
/** @type {RevokeState} */ | ||
let revoked = RevokeState.NOT_REVOKED; | ||
const openConnections = new Set(); | ||
const [port, listener] = listening.get(listenPrefix); | ||
const current = currentConnections.get(port); | ||
/** @type {Port} */ | ||
const port = Far('Port', { | ||
current.delete(this.self); | ||
const lchandler = await when( | ||
E(listener).onAccept(port, localAddress, remoteAddress, listener), | ||
); | ||
return crossoverConnection( | ||
zone, | ||
lchandler, | ||
localAddress, | ||
rchandler, | ||
remoteAddress, | ||
makeConnection, | ||
current, | ||
)[1]; | ||
}, | ||
}, | ||
); | ||
return makeInboundAttempt; | ||
}; | ||
/** @enum {number} */ | ||
const RevokeState = /** @type {const} */ ({ | ||
NOT_REVOKED: 0, | ||
REVOKING: 1, | ||
REVOKED: 2, | ||
}); | ||
/** | ||
* @param {import('@agoric/zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
const preparePort = (zone, { when }) => { | ||
const makeIncapable = zone.exoClass('Incapable', undefined, () => ({}), {}); | ||
const makePort = zone.exoClass( | ||
'Port', | ||
Shape.PortI, | ||
/** | ||
* @param {object} opts | ||
* @param {Endpoint} opts.localAddr | ||
* @param {MapStore<Endpoint, [Port, ListenHandler]>} opts.listening | ||
* @param {SetStore<Connection>} opts.openConnections | ||
* @param {MapStore<Port, SetStore<Closable>>} opts.currentConnections | ||
* @param {MapStore<string, Port>} opts.boundPorts | ||
* @param {ProtocolHandler} opts.protocolHandler | ||
* @param {ProtocolImpl} opts.protocolImpl | ||
*/ | ||
({ | ||
localAddr, | ||
listening, | ||
openConnections, | ||
currentConnections, | ||
boundPorts, | ||
protocolHandler, | ||
protocolImpl, | ||
}) => { | ||
return { | ||
listening, | ||
openConnections, | ||
currentConnections, | ||
boundPorts, | ||
localAddr, | ||
protocolHandler, | ||
protocolImpl, | ||
/** @type {RevokeState | undefined} */ | ||
revoked: undefined, | ||
}; | ||
}, | ||
{ | ||
getLocalAddress() { | ||
// Works even after revoke(). | ||
return localAddr; | ||
return this.state.localAddr; | ||
}, | ||
/** @param {ListenHandler} listenHandler */ | ||
async addListener(listenHandler) { | ||
!revoked || Fail`Port ${localAddr} is revoked`; | ||
const { revoked, listening, localAddr, protocolHandler } = this.state; | ||
!revoked || Fail`Port ${this.state.localAddr} is revoked`; | ||
listenHandler || Fail`listenHandler is not defined`; | ||
if (listening.has(localAddr)) { | ||
@@ -266,21 +483,25 @@ // Last one wins. | ||
} | ||
listening.set(localAddr, [port, listenHandler]); | ||
listening.set(localAddr, [this.self, listenHandler]); | ||
E(lhandler).onRemove(lport, lhandler).catch(rethrowUnlessMissing); | ||
} else { | ||
listening.init(localAddr, [port, listenHandler]); | ||
listening.init(localAddr, harden([this.self, listenHandler])); | ||
} | ||
// TODO: Check that the listener defines onAccept. | ||
// ASSUME: that the listener defines onAccept. | ||
await E(protocolHandler).onListen( | ||
port, | ||
localAddr, | ||
listenHandler, | ||
protocolHandler, | ||
await when( | ||
E(protocolHandler).onListen( | ||
this.self, | ||
localAddr, | ||
listenHandler, | ||
protocolHandler, | ||
), | ||
); | ||
await E(listenHandler) | ||
.onListen(port, listenHandler) | ||
.catch(rethrowUnlessMissing); | ||
await when(E(listenHandler).onListen(this.self, listenHandler)).catch( | ||
rethrowUnlessMissing, | ||
); | ||
}, | ||
/** @param {ListenHandler} listenHandler */ | ||
async removeListener(listenHandler) { | ||
const { listening, localAddr, protocolHandler } = this.state; | ||
listening.has(localAddr) || Fail`Port ${localAddr} is not listening`; | ||
@@ -290,18 +511,32 @@ listening.get(localAddr)[1] === listenHandler || | ||
listening.delete(localAddr); | ||
await E(protocolHandler).onListenRemove( | ||
port, | ||
localAddr, | ||
listenHandler, | ||
protocolHandler, | ||
await when( | ||
E(protocolHandler).onListenRemove( | ||
this.self, | ||
localAddr, | ||
listenHandler, | ||
protocolHandler, | ||
), | ||
); | ||
await E(listenHandler) | ||
.onRemove(port, listenHandler) | ||
.catch(rethrowUnlessMissing); | ||
await when(E(listenHandler).onRemove(this.self, listenHandler)).catch( | ||
rethrowUnlessMissing, | ||
); | ||
}, | ||
async connect(remotePort, connectionHandler = {}) { | ||
/** | ||
* @param {Endpoint} remotePort | ||
* @param {ConnectionHandler} connectionHandler | ||
*/ | ||
async connect( | ||
remotePort, | ||
connectionHandler = /** @type {any} */ (makeIncapable()), | ||
) { | ||
const { revoked, localAddr, protocolImpl, openConnections } = | ||
this.state; | ||
!revoked || Fail`Port ${localAddr} is revoked`; | ||
/** @type {Endpoint} */ | ||
const dst = harden(remotePort); | ||
// eslint-disable-next-line no-use-before-define | ||
const conn = await protocolImpl.outbound(port, dst, connectionHandler); | ||
const conn = await when( | ||
protocolImpl.outbound(this.self, dst, connectionHandler), | ||
); | ||
if (revoked) { | ||
@@ -315,271 +550,479 @@ void E(conn).close(); | ||
async revoke() { | ||
const { revoked, localAddr } = this.state; | ||
const { protocolHandler, currentConnections, listening, boundPorts } = | ||
this.state; | ||
revoked !== RevokeState.REVOKED || | ||
Fail`Port ${localAddr} is already revoked`; | ||
revoked = RevokeState.REVOKING; | ||
await E(protocolHandler).onRevoke(port, localAddr, protocolHandler); | ||
revoked = RevokeState.REVOKED; | ||
this.state.revoked = RevokeState.REVOKING; | ||
await when( | ||
E(protocolHandler).onRevoke(this.self, localAddr, protocolHandler), | ||
); | ||
this.state.revoked = RevokeState.REVOKED; | ||
// Clean up everything we did. | ||
const ps = [...currentConnections.get(port)].map(conn => | ||
E(conn) | ||
.close() | ||
.catch(_ => {}), | ||
); | ||
const values = [...currentConnections.get(this.self).values()]; | ||
const ps = values.map(conn => when(E(conn).close()).catch(_ => {})); | ||
if (listening.has(localAddr)) { | ||
const listener = listening.get(localAddr)[1]; | ||
ps.push(port.removeListener(listener)); | ||
ps.push(this.self.removeListener(listener)); | ||
} | ||
await Promise.all(ps); | ||
currentConnections.delete(port); | ||
currentConnections.delete(this.self); | ||
boundPorts.delete(localAddr); | ||
return `Port ${localAddr} revoked`; | ||
}, | ||
}); | ||
}, | ||
); | ||
await E(protocolHandler).onBind(port, localAddr, protocolHandler); | ||
boundPorts.init(localAddr, port); | ||
currentConnections.init(port, new Set()); | ||
return port; | ||
}; | ||
return makePort; | ||
}; | ||
/** @type {ProtocolImpl} */ | ||
const protocolImpl = Far('ProtocolImpl', { | ||
bind, | ||
async inbound(listenAddr, remoteAddr) { | ||
let lastFailure = Error(`No listeners for ${listenAddr}`); | ||
for await (const listenPrefix of getPrefixes(listenAddr)) { | ||
if (!listening.has(listenPrefix)) { | ||
continue; | ||
} | ||
const [port, listener] = listening.get(listenPrefix); | ||
let localAddr; | ||
await (async () => { | ||
// See if our protocol is willing to receive this connection. | ||
const localInstance = await E(protocolHandler) | ||
.onInstantiate(port, listenPrefix, remoteAddr, protocolHandler) | ||
.catch(rethrowUnlessMissing); | ||
localAddr = localInstance | ||
? `${listenAddr}/${localInstance}` | ||
: listenAddr; | ||
})().catch(e => { | ||
lastFailure = e; | ||
}); | ||
if (!localAddr) { | ||
continue; | ||
} | ||
// We have a legitimate inbound attempt. | ||
let consummated; | ||
const current = currentConnections.get(port); | ||
const inboundAttempt = Far('InboundAttempt', { | ||
getLocalAddress() { | ||
// Return address metadata. | ||
return localAddr; | ||
}, | ||
getRemoteAddress() { | ||
return remoteAddr; | ||
}, | ||
async close() { | ||
if (consummated) { | ||
throw consummated; | ||
/** | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
const prepareBinder = (zone, powers) => { | ||
const makeConnection = prepareHalfConnection(zone, powers); | ||
const { when } = powers; | ||
const makeInboundAttempt = prepareInboundAttempt( | ||
zone, | ||
makeConnection, | ||
powers, | ||
); | ||
const makePort = preparePort(zone, powers); | ||
const detached = zone.detached(); | ||
const makeBinderKit = zone.exoClassKit( | ||
'binder', | ||
{ | ||
protocolImpl: Shape.ProtocolImplI, | ||
binder: M.interface('Binder', { | ||
bind: M.callWhen(Shape.Endpoint).returns(Shape.Port), | ||
}), | ||
}, | ||
/** | ||
* @param {object} opts | ||
* @param { MapStore<Port, SetStore<Closable>> } opts.currentConnections | ||
* @param {MapStore<string, Port>} opts.boundPorts | ||
* @param {MapStore<Endpoint, [Port, ListenHandler]>} opts.listening | ||
* @param {ProtocolHandler} opts.protocolHandler | ||
*/ | ||
({ currentConnections, boundPorts, listening, protocolHandler }) => { | ||
/** @type {SetStore<Connection>} */ | ||
const openConnections = detached.setStore('openConnections'); | ||
return { | ||
currentConnections, | ||
boundPorts, | ||
listening, | ||
revoked: RevokeState.NOT_REVOKED, | ||
openConnections, | ||
protocolHandler, | ||
/** @type {Endpoint | undefined} */ | ||
localAddr: undefined, | ||
}; | ||
}, | ||
{ | ||
protocolImpl: { | ||
/** | ||
* @param {Endpoint} listenAddr | ||
* @param {Endpoint} remoteAddr | ||
*/ | ||
async inbound(listenAddr, remoteAddr) { | ||
const { listening, protocolHandler, currentConnections } = this.state; | ||
let lastFailure = Error(`No listeners for ${listenAddr}`); | ||
for await (const listenPrefix of getPrefixes(listenAddr)) { | ||
if (!listening.has(listenPrefix)) { | ||
continue; | ||
} | ||
consummated = Error(`Already closed`); | ||
current.delete(inboundAttempt); | ||
await E(listener) | ||
.onReject(port, localAddr, remoteAddr, listener) | ||
.catch(rethrowUnlessMissing); | ||
}, | ||
async accept({ | ||
localAddress = localAddr, | ||
remoteAddress = remoteAddr, | ||
handler: rchandler, | ||
}) { | ||
if (consummated) { | ||
throw consummated; | ||
const [port, _] = listening.get(listenPrefix); | ||
let localAddr; | ||
await (async () => { | ||
// See if our protocol is willing to receive this connection. | ||
const localInstance = await when( | ||
E(protocolHandler).onInstantiate( | ||
port, | ||
listenPrefix, | ||
remoteAddr, | ||
protocolHandler, | ||
), | ||
).catch(rethrowUnlessMissing); | ||
localAddr = localInstance | ||
? `${listenAddr}/${localInstance}` | ||
: listenAddr; | ||
})().catch(e => { | ||
lastFailure = e; | ||
}); | ||
if (!localAddr) { | ||
continue; | ||
} | ||
consummated = Error(`Already accepted`); | ||
current.delete(inboundAttempt); | ||
// We have a legitimate inbound attempt. | ||
const current = currentConnections.get(port); | ||
const inboundAttempt = makeInboundAttempt({ | ||
localAddr, | ||
remoteAddr, | ||
currentConnections, | ||
listenPrefix, | ||
listening, | ||
}); | ||
const lchandler = await E(listener).onAccept( | ||
current.add(inboundAttempt); | ||
return inboundAttempt; | ||
} | ||
throw lastFailure; | ||
}, | ||
/** | ||
* @param {Port} port | ||
* @param {Endpoint} remoteAddr | ||
* @param {ConnectionHandler} lchandler | ||
*/ | ||
async outbound(port, remoteAddr, lchandler) { | ||
const { protocolHandler, currentConnections } = this.state; | ||
const localAddr = await E(port).getLocalAddress(); | ||
// Allocate a local address. | ||
const initialLocalInstance = await when( | ||
E(protocolHandler).onInstantiate( | ||
port, | ||
localAddr, | ||
remoteAddr, | ||
listener, | ||
protocolHandler, | ||
), | ||
).catch(rethrowUnlessMissing); | ||
const initialLocalAddr = initialLocalInstance | ||
? `${localAddr}/${initialLocalInstance}` | ||
: localAddr; | ||
let lastFailure; | ||
let accepted; | ||
await (async () => { | ||
// Attempt the loopback connection. | ||
const attempt = await when( | ||
this.facets.protocolImpl.inbound(remoteAddr, initialLocalAddr), | ||
); | ||
accepted = await when(attempt.accept({ handler: lchandler })); | ||
})().catch(e => { | ||
lastFailure = e; | ||
}); | ||
if (accepted) { | ||
return accepted; | ||
} | ||
return crossoverConnection( | ||
lchandler, | ||
localAddress, | ||
rchandler, | ||
remoteAddress, | ||
current, | ||
)[1]; | ||
}, | ||
}); | ||
current.add(inboundAttempt); | ||
return inboundAttempt; | ||
} | ||
throw lastFailure; | ||
}, | ||
async outbound(port, remoteAddr, lchandler) { | ||
const localAddr = | ||
/** @type {string} */ | ||
(await E(port).getLocalAddress()); | ||
const { | ||
remoteAddress = remoteAddr, | ||
handler: rchandler, | ||
localAddress = localAddr, | ||
} = | ||
/** @type {Partial<AttemptDescription>} */ | ||
( | ||
await when( | ||
E(protocolHandler).onConnect( | ||
port, | ||
initialLocalAddr, | ||
remoteAddr, | ||
lchandler, | ||
protocolHandler, | ||
), | ||
) | ||
); | ||
// Allocate a local address. | ||
const initialLocalInstance = await E(protocolHandler) | ||
.onInstantiate(port, localAddr, remoteAddr, protocolHandler) | ||
.catch(rethrowUnlessMissing); | ||
const initialLocalAddr = initialLocalInstance | ||
? `${localAddr}/${initialLocalInstance}` | ||
: localAddr; | ||
if (!rchandler) { | ||
throw lastFailure; | ||
} | ||
let lastFailure; | ||
let accepted; | ||
await (async () => { | ||
// Attempt the loopback connection. | ||
const attempt = await protocolImpl.inbound( | ||
remoteAddr, | ||
initialLocalAddr, | ||
); | ||
accepted = await attempt.accept({ handler: lchandler }); | ||
})().catch(e => { | ||
lastFailure = e; | ||
}); | ||
if (accepted) { | ||
return accepted; | ||
} | ||
const { | ||
remoteAddress = remoteAddr, | ||
handler: rchandler, | ||
localAddress = localAddr, | ||
} = | ||
/** @type {Partial<AttemptDescription>} */ | ||
( | ||
await E(protocolHandler).onConnect( | ||
port, | ||
initialLocalAddr, | ||
remoteAddr, | ||
const current = currentConnections.get(port); | ||
return crossoverConnection( | ||
zone, | ||
lchandler, | ||
localAddress, | ||
rchandler, | ||
remoteAddress, | ||
makeConnection, | ||
current, | ||
)[0]; | ||
}, | ||
async bind(localAddr) { | ||
return this.facets.binder.bind(localAddr); | ||
}, | ||
}, | ||
binder: { | ||
/** @param {string} localAddr */ | ||
async bind(localAddr) { | ||
const { | ||
protocolHandler, | ||
) | ||
); | ||
boundPorts, | ||
listening, | ||
openConnections, | ||
currentConnections, | ||
} = this.state; | ||
if (!rchandler) { | ||
throw lastFailure; | ||
} | ||
// Check if we are underspecified (ends in slash) | ||
const underspecified = localAddr.endsWith(ENDPOINT_SEPARATOR); | ||
for await (const _ of whileTrue(() => underspecified)) { | ||
const portID = await when( | ||
E(protocolHandler).generatePortID(localAddr, protocolHandler), | ||
); | ||
const newAddr = `${localAddr}${portID}`; | ||
if (!boundPorts.has(newAddr)) { | ||
localAddr = newAddr; | ||
break; | ||
} | ||
} | ||
const current = currentConnections.get(port); | ||
return crossoverConnection( | ||
lchandler, | ||
localAddress, | ||
rchandler, | ||
remoteAddress, | ||
current, | ||
)[0]; | ||
this.state.localAddr = localAddr; | ||
if (boundPorts.has(localAddr)) { | ||
return boundPorts.get(localAddr); | ||
} | ||
const port = makePort({ | ||
localAddr, | ||
listening, | ||
openConnections, | ||
currentConnections, | ||
boundPorts, | ||
protocolHandler, | ||
protocolImpl: this.facets.protocolImpl, | ||
}); | ||
await when( | ||
E(protocolHandler).onBind(port, localAddr, protocolHandler), | ||
); | ||
boundPorts.init(localAddr, harden(port)); | ||
currentConnections.init(port, detached.setStore('connections')); | ||
return port; | ||
}, | ||
}, | ||
}, | ||
}); | ||
); | ||
// Wire up the local protocol to the handler. | ||
void E(protocolHandler).onCreate(protocolImpl, protocolHandler); | ||
return makeBinderKit; | ||
}; | ||
// Return the user-facing protocol. | ||
return Far('binder', { bind }); | ||
} | ||
/** | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
export const prepareNetworkProtocol = (zone, powers) => { | ||
const makeBinderKit = prepareBinder(zone, powers); | ||
/** | ||
* @param {ProtocolHandler} protocolHandler | ||
* @returns {Protocol} | ||
*/ | ||
const makeNetworkProtocol = protocolHandler => { | ||
const detached = zone.detached(); | ||
/** @type {MapStore<Port, SetStore<Closable>>} */ | ||
const currentConnections = detached.mapStore('portToCurrentConnections'); | ||
/** @type {MapStore<string, Port>} */ | ||
const boundPorts = detached.mapStore('addrToPort'); | ||
/** @type {MapStore<Endpoint, [Port, ListenHandler]>} */ | ||
const listening = detached.mapStore('listening'); | ||
const { binder, protocolImpl } = makeBinderKit({ | ||
currentConnections, | ||
boundPorts, | ||
listening, | ||
protocolHandler, | ||
}); | ||
// Wire up the local protocol to the handler. | ||
void E(protocolHandler).onCreate(protocolImpl, protocolHandler); | ||
return binder; | ||
}; | ||
return makeNetworkProtocol; | ||
}; | ||
/** | ||
* Create a ConnectionHandler that just echoes its packets. | ||
* | ||
* @returns {ConnectionHandler} | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
*/ | ||
export function makeEchoConnectionHandler() { | ||
let closed; | ||
/** @type {Connection} */ | ||
return Far('ConnectionHandler', { | ||
async onReceive(_connection, bytes, _connectionHandler) { | ||
if (closed) { | ||
throw closed; | ||
} | ||
return bytes; | ||
export const prepareEchoConnectionKit = zone => { | ||
const makeEchoConnectionKit = zone.exoClassKit( | ||
'EchoConnectionKit', | ||
{ | ||
handler: M.interface('ConnectionHandler', { | ||
onReceive: M.callWhen( | ||
Shape2.Connection, | ||
Shape2.Bytes, | ||
Shape2.ConnectionHandler, | ||
) | ||
.optional(Shape2.Opts) | ||
.returns(Shape2.Data), | ||
onClose: M.callWhen(Shape2.Connection) | ||
.optional(M.any(), Shape2.ConnectionHandler) | ||
.returns(M.undefined()), | ||
}), | ||
listener: M.interface('Listener', { | ||
onListen: M.callWhen(Shape.Port, Shape.ListenHandler).returns( | ||
Shape.Vow$(M.undefined()), | ||
), | ||
onAccept: M.callWhen( | ||
Shape.Port, | ||
Shape.Endpoint, | ||
Shape.Endpoint, | ||
Shape.ListenHandler, | ||
).returns(Shape.Vow$(Shape.ConnectionHandler)), | ||
}), | ||
}, | ||
async onClose(_connection, _reason, _connectionHandler) { | ||
if (closed) { | ||
throw closed; | ||
} | ||
closed = Error('Connection closed'); | ||
() => { | ||
/** @type {string | undefined} */ | ||
let closed; | ||
return { | ||
closed, | ||
}; | ||
}, | ||
}); | ||
} | ||
{ | ||
handler: { | ||
/** | ||
* @param {Connection} _connection | ||
* @param {Bytes} bytes | ||
* @param {ConnectionHandler} _connectionHandler | ||
*/ | ||
async onReceive(_connection, bytes, _connectionHandler) { | ||
const { closed } = this.state; | ||
export function makeNonceMaker(prefix = '', suffix = '') { | ||
let nonce = 0; | ||
return async () => { | ||
nonce += 1; | ||
return `${prefix}${nonce}${suffix}`; | ||
}; | ||
} | ||
if (closed) { | ||
throw closed; | ||
} | ||
return bytes; | ||
}, | ||
/** | ||
* @param {Connection} _connection | ||
* @param {CloseReason} [_reason] | ||
* @param {ConnectionHandler} [_connectionHandler] | ||
*/ | ||
async onClose(_connection, _reason, _connectionHandler) { | ||
const { closed } = this.state; | ||
if (closed) { | ||
throw Error(closed); | ||
} | ||
this.state.closed = 'Connection closed'; | ||
}, | ||
}, | ||
listener: { | ||
async onAccept(_port, _localAddr, _remoteAddr, _listenHandler) { | ||
return harden(this.facets.handler); | ||
}, | ||
async onListen(port, _listenHandler) { | ||
console.debug(`listening on echo port: ${port}`); | ||
}, | ||
}, | ||
}, | ||
); | ||
return makeEchoConnectionKit; | ||
}; | ||
/** | ||
* Create a protocol handler that just connects to itself. | ||
* | ||
* @param {ProtocolHandler['onInstantiate']} [onInstantiate] | ||
* @returns {ProtocolHandler} The localhost handler | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
*/ | ||
export function makeLoopbackProtocolHandler( | ||
onInstantiate = makeNonceMaker('nonce/'), | ||
) { | ||
/** @type {MapStore<string, [Port, ListenHandler]>} */ | ||
const listeners = makeScalarMapStore('localAddr'); | ||
export function prepareLoopbackProtocolHandler(zone, { when }) { | ||
const detached = zone.detached(); | ||
const makePortID = makeNonceMaker('port'); | ||
const makeLoopbackProtocolHandler = zone.exoClass( | ||
'ProtocolHandler', | ||
Shape.ProtocolHandlerI, | ||
/** @param {string} [instancePrefix] */ | ||
(instancePrefix = 'nonce/') => { | ||
/** @type {MapStore<string, [Port, ListenHandler]>} */ | ||
const listeners = detached.mapStore('localAddr'); | ||
return Far('ProtocolHandler', { | ||
// eslint-disable-next-line no-empty-function | ||
async onCreate(_impl, _protocolHandler) { | ||
// TODO | ||
}, | ||
async generatePortID(_protocolHandler) { | ||
return makePortID(); | ||
}, | ||
async onBind(_port, _localAddr, _protocolHandler) { | ||
// TODO: Maybe handle a bind? | ||
}, | ||
async onConnect(_port, localAddr, remoteAddr, _chandler, protocolHandler) { | ||
const [lport, lhandler] = listeners.get(remoteAddr); | ||
const rchandler = | ||
/** @type {ConnectionHandler} */ | ||
(await E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler)); | ||
// console.log(`rchandler is`, rchandler); | ||
const remoteInstance = await E(protocolHandler) | ||
.onInstantiate(lport, remoteAddr, localAddr, protocolHandler) | ||
.catch(rethrowUnlessMissing); | ||
return { | ||
remoteInstance, | ||
handler: rchandler, | ||
listeners, | ||
portNonce: 0n, | ||
instancePrefix, | ||
instanceNonce: 0n, | ||
}; | ||
}, | ||
onInstantiate, | ||
async onListen(port, localAddr, listenHandler, _protocolHandler) { | ||
// TODO: Implement other listener replacement policies. | ||
if (listeners.has(localAddr)) { | ||
const lhandler = listeners.get(localAddr)[1]; | ||
if (lhandler !== listenHandler) { | ||
// Last-one-wins. | ||
listeners.set(localAddr, [port, listenHandler]); | ||
{ | ||
async onCreate(_impl, _protocolHandler) { | ||
// noop | ||
}, | ||
async generatePortID(_localAddr, _protocolHandler) { | ||
this.state.portNonce += 1n; | ||
return `port${this.state.portNonce}`; | ||
}, | ||
async onBind(_port, _localAddr, _protocolHandler) { | ||
// noop, for now; Maybe handle a bind? | ||
}, | ||
async onConnect( | ||
_port, | ||
localAddr, | ||
remoteAddr, | ||
_chandler, | ||
protocolHandler, | ||
) { | ||
const { listeners } = this.state; | ||
const [lport, lhandler] = listeners.get(remoteAddr); | ||
const rchandler = await when( | ||
E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler), | ||
); | ||
// console.log(`rchandler is`, rchandler); | ||
const remoteInstance = await when( | ||
E(protocolHandler).onInstantiate( | ||
lport, | ||
remoteAddr, | ||
localAddr, | ||
protocolHandler, | ||
), | ||
).catch(rethrowUnlessMissing); | ||
return { | ||
remoteInstance, | ||
handler: rchandler, | ||
}; | ||
}, | ||
async onInstantiate(_port, _localAddr, _remote, _protocol) { | ||
const { instancePrefix } = this.state; | ||
this.state.instanceNonce += 1n; | ||
return `${instancePrefix}${this.state.instanceNonce}`; | ||
}, | ||
async onListen(port, localAddr, listenHandler, _protocolHandler) { | ||
const { listeners } = this.state; | ||
// This implementation has a simple last-one-wins replacement policy. | ||
// Other handlers might use different policies. | ||
if (listeners.has(localAddr)) { | ||
const lhandler = listeners.get(localAddr)[1]; | ||
if (lhandler !== listenHandler) { | ||
listeners.set(localAddr, [port, listenHandler]); | ||
} | ||
} else { | ||
listeners.init(localAddr, harden([port, listenHandler])); | ||
} | ||
} else { | ||
listeners.init(localAddr, [port, listenHandler]); | ||
} | ||
}, | ||
/** | ||
* @param {Port} port | ||
* @param {Endpoint} localAddr | ||
* @param {ListenHandler} listenHandler | ||
* @param {*} _protocolHandler | ||
*/ | ||
async onListenRemove(port, localAddr, listenHandler, _protocolHandler) { | ||
const { listeners } = this.state; | ||
const [lport, lhandler] = listeners.get(localAddr); | ||
lport === port || Fail`Port does not match listener on ${localAddr}`; | ||
lhandler === listenHandler || | ||
Fail`Listen handler does not match listener on ${localAddr}`; | ||
listeners.delete(localAddr); | ||
}, | ||
async onRevoke(_port, _localAddr, _protocolHandler) { | ||
// This is an opportunity to clean up resources. | ||
}, | ||
}, | ||
async onListenRemove(port, localAddr, listenHandler, _protocolHandler) { | ||
const [lport, lhandler] = listeners.get(localAddr); | ||
lport === port || Fail`Port does not match listener on ${localAddr}`; | ||
lhandler === listenHandler || | ||
Fail`Listen handler does not match listener on ${localAddr}`; | ||
listeners.delete(localAddr); | ||
}, | ||
async onRevoke(_port, _localAddr, _protocolHandler) { | ||
// TODO: maybe clean up? | ||
}, | ||
}); | ||
); | ||
return makeLoopbackProtocolHandler; | ||
} |
@@ -11,31 +11,51 @@ /** | ||
*/ | ||
/** | ||
* Create a slash-delimited router. | ||
* | ||
* @template T | ||
* @returns {Router<T>} a new Router | ||
*/ | ||
export default function makeRouter<T>(): Router<T>; | ||
/** | ||
* @typedef {object} RouterProtocol | ||
* @property {(prefix: string) => Promise<Port>} bind | ||
* @property {(paths: string[], protocolHandler: ProtocolHandler) => void} registerProtocolHandler | ||
* @property {(prefix: string, protocolHandler: ProtocolHandler) => void} unregisterProtocolHandler | ||
*/ | ||
/** | ||
* Create a router that behaves like a Protocol. | ||
* | ||
* @param {typeof defaultE} [E] Eventual sender | ||
* @returns {RouterProtocol} The new delegated protocol | ||
*/ | ||
export function makeRouterProtocol(E?: ((<T>(x: T) => import("@endo/eventual-send/src/E.js").ECallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T>>) & { | ||
readonly get: <T_1>(x: T_1) => import("@endo/eventual-send/src/E.js").EGetters<import("@endo/eventual-send").LocalRecord<T_1>>; | ||
export const RouterI: import("@endo/patterns").InterfaceGuard<{ | ||
getRoutes: import("@endo/patterns").MethodGuard; | ||
register: import("@endo/patterns").MethodGuard; | ||
unregister: import("@endo/patterns").MethodGuard; | ||
}>; | ||
export function prepareRouter<T>(zone: import('@agoric/base-zone').Zone): () => import("@endo/exo/src/exo-makers.js").Guarded<{ | ||
/** @param {Endpoint} addr */ | ||
getRoutes(addr: Endpoint): [string, T][]; | ||
/** | ||
* @param {string} prefix | ||
* @param {T} route | ||
*/ | ||
register(prefix: string, route: T): void; | ||
/** | ||
* @param {string} prefix | ||
* @param {T} route | ||
*/ | ||
unregister(prefix: string, route: T): void; | ||
}>; | ||
export function prepareRouterProtocol(zone: import('@agoric/base-zone').Zone, powers: ReturnType<(zone: import("@agoric/base-zone").Zone, powers?: { | ||
isRetryableReason?: ((reason: any) => boolean) | undefined; | ||
watchPromise?: ((p: PromiseLike<any>, watcher: import("@agoric/vow/src/watch-promise.js").PromiseWatcher, ...args: unknown[]) => void) | undefined; | ||
} | undefined) => { | ||
when: <T = any, TResult1 = import("@agoric/vow/src/E.js").Unwrap<T>, TResult2 = never>(specimenP: T, onFulfilled?: ((value: import("@agoric/vow/src/E.js").Unwrap<T>) => TResult1 | PromiseLike<TResult1>) | undefined, onRejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined) => Promise<TResult1 | TResult2>; | ||
watch: <T_1 = any, TResult1_1 = T_1, TResult2_1 = T_1>(specimenP: import("@agoric/vow").ERef<T_1 | import("@agoric/vow").Vow<T_1>>, watcher?: import("@agoric/vow").Watcher<T_1, TResult1_1, TResult2_1> | undefined, watcherContext?: unknown) => import("@agoric/vow").Vow<TResult1_1 | TResult2_1>; | ||
makeVowKit: <T_2>() => import("@agoric/vow").VowKit<T_2>; | ||
}>, E?: ((<T_3>(x: T_3) => import("@endo/eventual-send/src/E.js").ECallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T_3>>) & { | ||
readonly get: <T_1_1>(x: T_1_1) => import("@endo/eventual-send/src/E.js").EGetters<import("@endo/eventual-send").LocalRecord<T_1_1>>; | ||
readonly resolve: { | ||
(): Promise<void>; | ||
<T_2>(value: T_2): Promise<Awaited<T_2>>; | ||
<T_3>(value: T_3 | PromiseLike<T_3>): Promise<Awaited<T_3>>; | ||
<T_2_1>(value: T_2_1): Promise<Awaited<T_2_1>>; | ||
<T_3_1>(value: T_3_1 | PromiseLike<T_3_1>): Promise<Awaited<T_3_1>>; | ||
}; | ||
readonly sendOnly: <T_4>(x: T_4) => import("@endo/eventual-send/src/E.js").ESendOnlyCallableOrMethods<import("@endo/eventual-send").RemoteFunctions<T_4>>; | ||
readonly when: <T_5, U = T_5>(x: T_5 | PromiseLike<T_5>, onfulfilled?: ((value: T_5) => import("@endo/eventual-send").ERef<U>) | undefined, onrejected?: ((reason: any) => import("@endo/eventual-send").ERef<U>) | undefined) => Promise<U>; | ||
}) | undefined): RouterProtocol; | ||
}) | undefined): () => import("@endo/exo/src/exo-makers.js").Guarded<{ | ||
/** | ||
* @param {string[]} paths | ||
* @param {ProtocolHandler} protocolHandler | ||
*/ | ||
registerProtocolHandler(paths: string[], protocolHandler: ProtocolHandler): void; | ||
/** | ||
* @param {string} prefix | ||
* @param {ProtocolHandler} protocolHandler | ||
*/ | ||
unregisterProtocolHandler(prefix: string, protocolHandler: ProtocolHandler): void; | ||
/** @param {Endpoint} localAddr */ | ||
bind(localAddr: Endpoint): Promise<Port | import("@agoric/vow").Vow<Port>>; | ||
}>; | ||
/** | ||
@@ -62,3 +82,3 @@ * A delimited string router implementation | ||
export type RouterProtocol = { | ||
bind: (prefix: string) => Promise<Port>; | ||
bind: (prefix: string) => PromiseVow<Port>; | ||
registerProtocolHandler: (paths: string[], protocolHandler: ProtocolHandler) => void; | ||
@@ -65,0 +85,0 @@ unregisterProtocolHandler: (prefix: string, protocolHandler: ProtocolHandler) => void; |
@@ -1,5 +0,10 @@ | ||
import { Far, E as defaultE } from '@endo/far'; | ||
import { makeScalarMapStore } from '@agoric/store'; | ||
// @ts-check | ||
import { E as defaultE } from '@endo/far'; | ||
import { M } from '@endo/patterns'; | ||
import { Fail } from '@agoric/assert'; | ||
import { makeNetworkProtocol, ENDPOINT_SEPARATOR } from './network.js'; | ||
import { | ||
ENDPOINT_SEPARATOR, | ||
Shape, | ||
prepareNetworkProtocol, | ||
} from './network.js'; | ||
@@ -20,47 +25,77 @@ import '@agoric/store/exported.js'; | ||
export const RouterI = M.interface('Router', { | ||
getRoutes: M.call(Shape.Endpoint).returns(M.arrayOf([M.string(), M.any()])), | ||
register: M.call(M.string(), M.any()).returns(M.undefined()), | ||
unregister: M.call(M.string(), M.any()).returns(M.undefined()), | ||
}); | ||
/** | ||
* Create a slash-delimited router. | ||
* | ||
* @template T | ||
* @returns {Router<T>} a new Router | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
*/ | ||
export default function makeRouter() { | ||
/** @type {MapStore<string, T>} */ | ||
const prefixToRoute = makeScalarMapStore('prefix'); | ||
return Far('Router', { | ||
getRoutes(addr) { | ||
const parts = addr.split(ENDPOINT_SEPARATOR); | ||
/** @type {[string, T][]} */ | ||
const ret = []; | ||
for (let i = parts.length; i > 0; i -= 1) { | ||
// Try most specific match. | ||
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR); | ||
if (prefixToRoute.has(prefix)) { | ||
ret.push([prefix, prefixToRoute.get(prefix)]); | ||
export const prepareRouter = zone => { | ||
const detached = zone.detached(); | ||
const makeRouter = zone.exoClass( | ||
'Router', | ||
RouterI, | ||
() => { | ||
/** @type {MapStore<string, T>} */ | ||
const prefixToRoute = detached.mapStore('prefix'); | ||
return { | ||
prefixToRoute, | ||
}; | ||
}, | ||
{ | ||
/** @param {Endpoint} addr */ | ||
getRoutes(addr) { | ||
const parts = addr.split(ENDPOINT_SEPARATOR); | ||
/** @type {[string, T][]} */ | ||
const ret = []; | ||
for (let i = parts.length; i > 0; i -= 1) { | ||
// Try most specific match. | ||
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR); | ||
if (this.state.prefixToRoute.has(prefix)) { | ||
ret.push([prefix, this.state.prefixToRoute.get(prefix)]); | ||
} | ||
// Trim off the last value (after the slash). | ||
const defaultPrefix = prefix.substr( | ||
0, | ||
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1, | ||
); | ||
if (this.state.prefixToRoute.has(defaultPrefix)) { | ||
ret.push([ | ||
defaultPrefix, | ||
this.state.prefixToRoute.get(defaultPrefix), | ||
]); | ||
} | ||
} | ||
// Trim off the last value (after the slash). | ||
const defaultPrefix = prefix.substr( | ||
0, | ||
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1, | ||
); | ||
if (prefixToRoute.has(defaultPrefix)) { | ||
ret.push([defaultPrefix, prefixToRoute.get(defaultPrefix)]); | ||
} | ||
} | ||
return harden(ret); | ||
return harden(ret); | ||
}, | ||
/** | ||
* @param {string} prefix | ||
* @param {T} route | ||
*/ | ||
register(prefix, route) { | ||
this.state.prefixToRoute.init(prefix, route); | ||
}, | ||
/** | ||
* @param {string} prefix | ||
* @param {T} route | ||
*/ | ||
unregister(prefix, route) { | ||
this.state.prefixToRoute.get(prefix) === route || | ||
Fail`Router is not registered at prefix ${prefix}`; | ||
this.state.prefixToRoute.delete(prefix); | ||
}, | ||
}, | ||
register(prefix, route) { | ||
prefixToRoute.init(prefix, route); | ||
}, | ||
unregister(prefix, route) { | ||
prefixToRoute.get(prefix) === route || | ||
Fail`Router is not registered at prefix ${prefix}`; | ||
prefixToRoute.delete(prefix); | ||
}, | ||
}); | ||
} | ||
); | ||
return makeRouter; | ||
}; | ||
/** | ||
* @typedef {object} RouterProtocol | ||
* @property {(prefix: string) => Promise<Port>} bind | ||
* @property {(prefix: string) => PromiseVow<Port>} bind | ||
* @property {(paths: string[], protocolHandler: ProtocolHandler) => void} registerProtocolHandler | ||
@@ -73,44 +108,79 @@ * @property {(prefix: string, protocolHandler: ProtocolHandler) => void} unregisterProtocolHandler | ||
* | ||
* @param {import('@agoric/base-zone').Zone} zone | ||
* @param {ReturnType<import('@agoric/vow').prepareVowTools>} powers | ||
* @param {typeof defaultE} [E] Eventual sender | ||
* @returns {RouterProtocol} The new delegated protocol | ||
*/ | ||
export function makeRouterProtocol(E = defaultE) { | ||
const router = makeRouter(); | ||
/** @type {MapStore<string, Protocol>} */ | ||
const protocols = makeScalarMapStore('prefix'); | ||
/** @type {MapStore<string, ProtocolHandler>} */ | ||
const protocolHandlers = makeScalarMapStore('prefix'); | ||
export const prepareRouterProtocol = (zone, powers, E = defaultE) => { | ||
const detached = zone.detached(); | ||
function registerProtocolHandler(paths, protocolHandler) { | ||
const protocol = makeNetworkProtocol(protocolHandler); | ||
for (const prefix of paths) { | ||
router.register(prefix, protocol); | ||
protocols.init(prefix, protocol); | ||
protocolHandlers.init(prefix, protocolHandler); | ||
} | ||
} | ||
const makeRouter = prepareRouter(zone); | ||
const makeNetworkProtocol = prepareNetworkProtocol(zone, powers); | ||
// FIXME: Buggy. | ||
// Needs to account for multiple paths. | ||
function unregisterProtocolHandler(prefix, protocolHandler) { | ||
const ph = protocolHandlers.get(prefix); | ||
ph === protocolHandler || | ||
Fail`Protocol handler is not registered at prefix ${prefix}`; | ||
router.unregister(prefix, ph); | ||
protocols.delete(prefix); | ||
protocolHandlers.delete(prefix); | ||
} | ||
const makeRouterProtocol = zone.exoClass( | ||
'RouterProtocol', | ||
M.interface('RouterProtocol', { | ||
registerProtocolHandler: M.call( | ||
M.arrayOf(M.string()), | ||
M.remotable(), | ||
).returns(), | ||
unregisterProtocolHandler: M.call(M.string(), M.remotable()).returns(), | ||
bind: M.callWhen(Shape.Endpoint).returns(Shape.Vow$(Shape.Port)), | ||
}), | ||
() => { | ||
/** @type {Router<Protocol>} */ | ||
const router = makeRouter(); | ||
/** @type {Protocol['bind']} */ | ||
async function bind(localAddr) { | ||
const [route] = router.getRoutes(localAddr); | ||
route !== undefined || Fail`No registered router for ${localAddr}`; | ||
return E(route[1]).bind(localAddr); | ||
} | ||
/** @type {MapStore<string, Protocol>} */ | ||
const protocols = detached.mapStore('prefix'); | ||
return Far('RouterProtocol', { | ||
bind, | ||
registerProtocolHandler, | ||
unregisterProtocolHandler, | ||
}); | ||
} | ||
/** @type {MapStore<string, ProtocolHandler>} */ | ||
const protocolHandlers = detached.mapStore('prefix'); | ||
return { | ||
router, | ||
protocolHandlers, | ||
protocols, | ||
}; | ||
}, | ||
{ | ||
/** | ||
* @param {string[]} paths | ||
* @param {ProtocolHandler} protocolHandler | ||
*/ | ||
registerProtocolHandler(paths, protocolHandler) { | ||
const protocol = makeNetworkProtocol(protocolHandler); | ||
for (const prefix of paths) { | ||
this.state.router.register(prefix, protocol); | ||
this.state.protocols.init(prefix, protocol); | ||
this.state.protocolHandlers.init(prefix, protocolHandler); | ||
} | ||
}, | ||
// FIXME: Buggy. | ||
// Needs to account for multiple paths. | ||
/** | ||
* @param {string} prefix | ||
* @param {ProtocolHandler} protocolHandler | ||
*/ | ||
unregisterProtocolHandler(prefix, protocolHandler) { | ||
const ph = this.state.protocolHandlers.get(prefix); | ||
ph === protocolHandler || | ||
Fail`Protocol handler is not registered at prefix ${prefix}`; | ||
// TODO: unmap protocol hanlders to their corresponding protocol | ||
// e.g. using a map | ||
// before unregistering | ||
// @ts-expect-error note FIXME above | ||
this.state.router.unregister(prefix, ph); | ||
this.state.protocols.delete(prefix); | ||
this.state.protocolHandlers.delete(prefix); | ||
}, | ||
/** @param {Endpoint} localAddr */ | ||
async bind(localAddr) { | ||
const [route] = this.state.router.getRoutes(localAddr); | ||
route !== undefined || Fail`No registered router for ${localAddr}`; | ||
return E(route[1]).bind(localAddr); | ||
}, | ||
}, | ||
); | ||
return makeRouterProtocol; | ||
}; |
@@ -0,1 +1,2 @@ | ||
type PromiseVow<T> = Promise<T | import('@agoric/vow').Vow<T>>; | ||
type Data = string | Buffer | ArrayBuffer; | ||
@@ -15,3 +16,3 @@ type Bytes = string; | ||
*/ | ||
close: () => Promise<void>; | ||
close: () => PromiseVow<void>; | ||
}; | ||
@@ -26,3 +27,3 @@ /** | ||
*/ | ||
bind: (prefix: Endpoint) => Promise<Port>; | ||
bind: (prefix: Endpoint) => PromiseVow<Port>; | ||
}; | ||
@@ -41,11 +42,11 @@ /** | ||
*/ | ||
addListener: (acceptHandler: ListenHandler) => Promise<void>; | ||
addListener: (acceptHandler: ListenHandler) => PromiseVow<void>; | ||
/** | ||
* Make an outbound connection | ||
*/ | ||
connect: (remote: Endpoint, connectionHandler?: ConnectionHandler) => Promise<Connection>; | ||
connect: (remote: Endpoint, connectionHandler?: ConnectionHandler) => PromiseVow<Connection>; | ||
/** | ||
* Remove the currently-bound listener | ||
*/ | ||
removeListener: (acceptHandler: ListenHandler) => Promise<void>; | ||
removeListener: (acceptHandler: ListenHandler) => PromiseVow<void>; | ||
/** | ||
@@ -65,15 +66,15 @@ * Deallocate the port entirely, removing all | ||
*/ | ||
onListen?: ((port: Port, l: ListenHandler) => Promise<void>) | undefined; | ||
onListen?: ((port: Port, l: ListenHandler) => PromiseVow<void>) | undefined; | ||
/** | ||
* A new connection is incoming | ||
*/ | ||
onAccept: (port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>; | ||
onAccept: (port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => PromiseVow<ConnectionHandler>; | ||
/** | ||
* The connection was rejected | ||
*/ | ||
onReject?: ((port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<void>) | undefined; | ||
onReject?: ((port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => PromiseVow<void>) | undefined; | ||
/** | ||
* There was an error while listening | ||
*/ | ||
onError?: ((port: Port, rej: any, l: ListenHandler) => Promise<void>) | undefined; | ||
onError?: ((port: Port, rej: any, l: ListenHandler) => PromiseVow<void>) | undefined; | ||
/** | ||
@@ -83,3 +84,3 @@ * The | ||
*/ | ||
onRemove?: ((port: Port, l: ListenHandler) => Promise<void>) | undefined; | ||
onRemove?: ((port: Port, l: ListenHandler) => PromiseVow<void>) | undefined; | ||
}; | ||
@@ -90,7 +91,7 @@ type Connection = { | ||
*/ | ||
send: (packetBytes: Data, opts?: Record<string, any>) => Promise<Bytes>; | ||
send: (packetBytes: Data, opts?: Record<string, any>) => PromiseVow<Bytes>; | ||
/** | ||
* Close both ends of the connection | ||
*/ | ||
close: () => Promise<void>; | ||
close: () => PromiseVow<void>; | ||
/** | ||
@@ -113,11 +114,11 @@ * Get the locally bound name of this | ||
*/ | ||
onOpen?: ((connection: Connection, localAddr: Endpoint, remoteAddr: Endpoint, c: ConnectionHandler) => void) | undefined; | ||
onOpen?: ((connection: Connection, localAddr: Endpoint, remoteAddr: Endpoint, c: ConnectionHandler) => PromiseVow<void>) | undefined; | ||
/** | ||
* The connection received a packet | ||
*/ | ||
onReceive?: ((connection: Connection, packetBytes: Bytes, c: ConnectionHandler, opts?: Record<string, any>) => Promise<Data>) | undefined; | ||
onReceive?: ((connection: Connection, ack: Bytes, c: ConnectionHandler, opts?: Record<string, any>) => PromiseVow<Data>) | undefined; | ||
/** | ||
* The connection has been closed | ||
*/ | ||
onClose?: ((connection: Connection, reason?: CloseReason, c?: ConnectionHandler) => Promise<void>) | undefined; | ||
onClose?: ((connection: Connection, reason?: CloseReason, c?: ConnectionHandler) => PromiseVow<void>) | undefined; | ||
}; | ||
@@ -141,31 +142,31 @@ /** | ||
*/ | ||
onCreate: (protocol: ProtocolImpl, p: ProtocolHandler) => Promise<void>; | ||
onCreate: (protocol: ProtocolImpl, p: ProtocolHandler) => PromiseVow<void>; | ||
/** | ||
* Create a fresh port identifier for this protocol | ||
*/ | ||
generatePortID: (localAddr: Endpoint, p: ProtocolHandler) => Promise<string>; | ||
generatePortID: (localAddr: Endpoint, p: ProtocolHandler) => PromiseVow<string>; | ||
/** | ||
* A port will be bound | ||
*/ | ||
onBind: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>; | ||
onBind: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => PromiseVow<void>; | ||
/** | ||
* A port was listening | ||
*/ | ||
onListen: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>; | ||
onListen: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => PromiseVow<void>; | ||
/** | ||
* A port listener has been reset | ||
*/ | ||
onListenRemove: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>; | ||
onListenRemove: (port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => PromiseVow<void>; | ||
/** | ||
* Return unique suffix for local address | ||
*/ | ||
onInstantiate?: ((port: Port, localAddr: Endpoint, remote: Endpoint, p: ProtocolHandler) => Promise<Endpoint>) | undefined; | ||
onInstantiate?: ((port: Port, localAddr: Endpoint, remote: Endpoint, p: ProtocolHandler) => PromiseVow<Endpoint>) | undefined; | ||
/** | ||
* A port initiates an outbound connection | ||
*/ | ||
onConnect: (port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise<AttemptDescription>; | ||
onConnect: (port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => PromiseVow<AttemptDescription>; | ||
/** | ||
* The port is being completely destroyed | ||
*/ | ||
onRevoke: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>; | ||
onRevoke: (port: Port, localAddr: Endpoint, p: ProtocolHandler) => PromiseVow<void>; | ||
}; | ||
@@ -179,3 +180,3 @@ /** | ||
*/ | ||
accept: (desc: AttemptDescription) => Promise<Connection>; | ||
accept: (desc: AttemptDescription) => PromiseVow<Connection>; | ||
/** | ||
@@ -194,3 +195,3 @@ * Return the local address for this | ||
*/ | ||
close: () => Promise<void>; | ||
close: () => PromiseVow<void>; | ||
}; | ||
@@ -205,12 +206,12 @@ /** | ||
*/ | ||
bind: (prefix: Endpoint) => Promise<Port>; | ||
bind: (prefix: Endpoint) => PromiseVow<Port>; | ||
/** | ||
* Make an attempt to connect into this protocol | ||
*/ | ||
inbound: (listenAddr: Endpoint, remoteAddr: Endpoint) => Promise<InboundAttempt>; | ||
inbound: (listenAddr: Endpoint, remoteAddr: Endpoint) => PromiseVow<InboundAttempt>; | ||
/** | ||
* Create an outbound connection | ||
*/ | ||
outbound: (port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise<Connection>; | ||
outbound: (port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => PromiseVow<Connection>; | ||
}; | ||
//# sourceMappingURL=types.d.ts.map |
@@ -0,2 +1,9 @@ | ||
// @ts-check | ||
/** | ||
* @template T | ||
* @typedef {Promise<T | import('@agoric/vow').Vow<T>>} PromiseVow | ||
*/ | ||
/** | ||
* @typedef {string | Buffer | ArrayBuffer} Data | ||
@@ -14,3 +21,3 @@ * | ||
* @typedef {object} Closable A closable object | ||
* @property {() => Promise<void>} close Terminate the object | ||
* @property {() => PromiseVow<void>} close Terminate the object | ||
*/ | ||
@@ -20,3 +27,3 @@ | ||
* @typedef {object} Protocol The network Protocol | ||
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if | ||
* @property {(prefix: Endpoint) => PromiseVow<Port>} bind Claim a port, or if | ||
* ending in ENDPOINT_SEPARATOR, a fresh name | ||
@@ -29,3 +36,3 @@ */ | ||
* port | ||
* @property {(acceptHandler: ListenHandler) => Promise<void>} addListener | ||
* @property {(acceptHandler: ListenHandler) => PromiseVow<void>} addListener | ||
* Begin accepting incoming connections | ||
@@ -35,5 +42,5 @@ * @property {( | ||
* connectionHandler?: ConnectionHandler, | ||
* ) => Promise<Connection>} connect | ||
* ) => PromiseVow<Connection>} connect | ||
* Make an outbound connection | ||
* @property {(acceptHandler: ListenHandler) => Promise<void>} removeListener | ||
* @property {(acceptHandler: ListenHandler) => PromiseVow<void>} removeListener | ||
* Remove the currently-bound listener | ||
@@ -46,3 +53,3 @@ * @property {() => void} revoke Deallocate the port entirely, removing all | ||
* @typedef {object} ListenHandler A handler for incoming connections | ||
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onListen] The | ||
* @property {(port: Port, l: ListenHandler) => PromiseVow<void>} [onListen] The | ||
* listener has been registered | ||
@@ -54,3 +61,3 @@ * @property {( | ||
* l: ListenHandler, | ||
* ) => Promise<ConnectionHandler>} onAccept | ||
* ) => PromiseVow<ConnectionHandler>} onAccept | ||
* A new connection is incoming | ||
@@ -62,7 +69,7 @@ * @property {( | ||
* l: ListenHandler, | ||
* ) => Promise<void>} [onReject] | ||
* ) => PromiseVow<void>} [onReject] | ||
* The connection was rejected | ||
* @property {(port: Port, rej: any, l: ListenHandler) => Promise<void>} [onError] | ||
* @property {(port: Port, rej: any, l: ListenHandler) => PromiseVow<void>} [onError] | ||
* There was an error while listening | ||
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onRemove] The | ||
* @property {(port: Port, l: ListenHandler) => PromiseVow<void>} [onRemove] The | ||
* listener has been removed | ||
@@ -76,5 +83,5 @@ */ | ||
* opts?: Record<string, any>, | ||
* ) => Promise<Bytes>} send | ||
* ) => PromiseVow<Bytes>} send | ||
* Send a packet on the connection | ||
* @property {() => Promise<void>} close Close both ends of the connection | ||
* @property {() => PromiseVow<void>} close Close both ends of the connection | ||
* @property {() => Endpoint} getLocalAddress Get the locally bound name of this | ||
@@ -92,10 +99,10 @@ * connection | ||
* c: ConnectionHandler, | ||
* ) => void} [onOpen] | ||
* ) => PromiseVow<void>} [onOpen] | ||
* The connection has been opened | ||
* @property {( | ||
* connection: Connection, | ||
* packetBytes: Bytes, | ||
* ack: Bytes, | ||
* c: ConnectionHandler, | ||
* opts?: Record<string, any>, | ||
* ) => Promise<Data>} [onReceive] | ||
* ) => PromiseVow<Data>} [onReceive] | ||
* The connection received a packet | ||
@@ -106,3 +113,3 @@ * @property {( | ||
* c?: ConnectionHandler, | ||
* ) => Promise<void>} [onClose] | ||
* ) => PromiseVow<void>} [onClose] | ||
* The connection has been closed | ||
@@ -123,5 +130,5 @@ * | ||
* implementation will invoke | ||
* @property {(protocol: ProtocolImpl, p: ProtocolHandler) => Promise<void>} onCreate | ||
* @property {(protocol: ProtocolImpl, p: ProtocolHandler) => PromiseVow<void>} onCreate | ||
* This protocol is created | ||
* @property {(localAddr: Endpoint, p: ProtocolHandler) => Promise<string>} generatePortID | ||
* @property {(localAddr: Endpoint, p: ProtocolHandler) => PromiseVow<string>} generatePortID | ||
* Create a fresh port identifier for this protocol | ||
@@ -132,3 +139,3 @@ * @property {( | ||
* p: ProtocolHandler, | ||
* ) => Promise<void>} onBind | ||
* ) => PromiseVow<void>} onBind | ||
* A port will be bound | ||
@@ -140,3 +147,3 @@ * @property {( | ||
* p: ProtocolHandler, | ||
* ) => Promise<void>} onListen | ||
* ) => PromiseVow<void>} onListen | ||
* A port was listening | ||
@@ -148,3 +155,3 @@ * @property {( | ||
* p: ProtocolHandler, | ||
* ) => Promise<void>} onListenRemove | ||
* ) => PromiseVow<void>} onListenRemove | ||
* A port listener has been reset | ||
@@ -156,3 +163,3 @@ * @property {( | ||
* p: ProtocolHandler, | ||
* ) => Promise<Endpoint>} [onInstantiate] | ||
* ) => PromiseVow<Endpoint>} [onInstantiate] | ||
* Return unique suffix for local address | ||
@@ -165,3 +172,3 @@ * @property {( | ||
* p: ProtocolHandler, | ||
* ) => Promise<AttemptDescription>} onConnect | ||
* ) => PromiseVow<AttemptDescription>} onConnect | ||
* A port initiates an outbound connection | ||
@@ -172,7 +179,7 @@ * @property {( | ||
* p: ProtocolHandler, | ||
* ) => Promise<void>} onRevoke | ||
* ) => PromiseVow<void>} onRevoke | ||
* The port is being completely destroyed | ||
* | ||
* @typedef {object} InboundAttempt An inbound connection attempt | ||
* @property {(desc: AttemptDescription) => Promise<Connection>} accept | ||
* @property {(desc: AttemptDescription) => PromiseVow<Connection>} accept | ||
* Establish the connection | ||
@@ -183,6 +190,6 @@ * @property {() => Endpoint} getLocalAddress Return the local address for this | ||
* this attempt | ||
* @property {() => Promise<void>} close Abort the attempt | ||
* @property {() => PromiseVow<void>} close Abort the attempt | ||
* | ||
* @typedef {object} ProtocolImpl Things the protocol can do for us | ||
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if | ||
* @property {(prefix: Endpoint) => PromiseVow<Port>} bind Claim a port, or if | ||
* ending in ENDPOINT_SEPARATOR, a fresh name | ||
@@ -192,3 +199,3 @@ * @property {( | ||
* remoteAddr: Endpoint, | ||
* ) => Promise<InboundAttempt>} inbound | ||
* ) => PromiseVow<InboundAttempt>} inbound | ||
* Make an attempt to connect into this protocol | ||
@@ -199,4 +206,4 @@ * @property {( | ||
* connectionHandler: ConnectionHandler, | ||
* ) => Promise<Connection>} outbound | ||
* ) => PromiseVow<Connection>} outbound | ||
* Create an outbound connection | ||
*/ |
@@ -5,2 +5,3 @@ { | ||
"checkJs": false, | ||
"maxNodeModuleJsDepth": 1, | ||
}, | ||
@@ -7,0 +8,0 @@ "include": [ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Manifest confusion
Supply chain riskThis package has inconsistent metadata. This could be malicious or caused by an error when publishing the package.
Found 1 instance in 1 package
Manifest confusion
Supply chain riskThis package has inconsistent metadata. This could be malicious or caused by an error when publishing the package.
Found 1 instance in 1 package
97564
27
1988
8
6
+ Added@endo/patterns@^1.1.0
+ Added@endo/common@1.2.9(transitive)
+ Added@endo/marshal@1.6.3(transitive)
+ Added@endo/nat@5.0.14(transitive)
+ Added@endo/patterns@1.4.8(transitive)
Updated@endo/base64@^1.0.2
Updated@endo/far@^1.0.4
Updated@endo/promise-kit@^1.0.4