@libp2p/multistream-select
Advanced tools
Comparing version
@@ -1,7 +0,6 @@ | ||
import type { AbortOptions } from '@libp2p/interfaces'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { Duplex } from 'it-stream-types'; | ||
export declare function handle(stream: Duplex<Uint8Array>, protocols: string | string[], options?: AbortOptions): Promise<{ | ||
stream: Duplex<Uint8Array, Uint8Array, Promise<void>>; | ||
protocol: string; | ||
}>; | ||
import type { ByteArrayInit, ByteListInit, ProtocolStream } from './index.js'; | ||
export declare function handle(stream: Duplex<Uint8Array>, protocols: string | string[], options: ByteArrayInit): Promise<ProtocolStream<Uint8Array>>; | ||
export declare function handle(stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>, protocols: string | string[], options?: ByteListInit): Promise<ProtocolStream<Uint8ArrayList, Uint8ArrayList | Uint8Array>>; | ||
//# sourceMappingURL=handle.d.ts.map |
@@ -16,7 +16,7 @@ import { logger } from '@libp2p/logger'; | ||
log('respond with "%s" for "%s"', PROTOCOL_ID, protocol); | ||
multistream.write(writer, uint8ArrayFromString(PROTOCOL_ID)); | ||
multistream.write(writer, uint8ArrayFromString(PROTOCOL_ID), options); | ||
continue; | ||
} | ||
if (protocols.includes(protocol)) { | ||
multistream.write(writer, uint8ArrayFromString(protocol)); | ||
multistream.write(writer, uint8ArrayFromString(protocol), options); | ||
log('respond with "%s" for "%s"', protocol, protocol); | ||
@@ -28,7 +28,8 @@ rest(); | ||
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n | ||
multistream.write(writer, new Uint8ArrayList(...protocols.map(p => multistream.encode(uint8ArrayFromString(p))))); | ||
multistream.write(writer, new Uint8ArrayList(...protocols.map(p => multistream.encode(uint8ArrayFromString(p)))), options); | ||
// multistream.writeAll(writer, protocols.map(p => uint8ArrayFromString(p))) | ||
log('respond with "%s" for %s', protocols, protocol); | ||
continue; | ||
} | ||
multistream.write(writer, uint8ArrayFromString('na')); | ||
multistream.write(writer, uint8ArrayFromString('na'), options); | ||
log('respond with "na" for "%s"', protocol); | ||
@@ -35,0 +36,0 @@ } |
@@ -5,23 +5,17 @@ import { PROTOCOL_ID } from './constants.js'; | ||
export { PROTOCOL_ID }; | ||
export interface ProtocolStream { | ||
stream: Duplex<Uint8Array>; | ||
export interface ProtocolStream<TSource, TSink = TSource> { | ||
stream: Duplex<TSource, TSink>; | ||
protocol: string; | ||
} | ||
declare class MultistreamSelect { | ||
protected stream: Duplex<Uint8Array>; | ||
protected shaken: boolean; | ||
constructor(stream: Duplex<Uint8Array>); | ||
/** | ||
* Perform the multistream-select handshake | ||
* | ||
* @param {AbortOptions} [options] | ||
*/ | ||
_handshake(options?: AbortOptions): Promise<void>; | ||
export interface ByteArrayInit extends AbortOptions { | ||
writeBytes: true; | ||
} | ||
export declare class Dialer extends MultistreamSelect { | ||
select(protocols: string | string[], options?: AbortOptions): Promise<ProtocolStream>; | ||
export interface ByteListInit extends AbortOptions { | ||
writeBytes?: false; | ||
} | ||
export declare class Listener extends MultistreamSelect { | ||
handle(protocols: string | string[], options?: AbortOptions): Promise<ProtocolStream>; | ||
export interface MultistreamSelectInit extends AbortOptions { | ||
writeBytes?: boolean; | ||
} | ||
export { select } from './select.js'; | ||
export { handle } from './handle.js'; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,34 +0,5 @@ | ||
import { select } from './select.js'; | ||
import { handle } from './handle.js'; | ||
import { PROTOCOL_ID } from './constants.js'; | ||
export { PROTOCOL_ID }; | ||
class MultistreamSelect { | ||
constructor(stream) { | ||
this.stream = stream; | ||
this.shaken = false; | ||
} | ||
/** | ||
* Perform the multistream-select handshake | ||
* | ||
* @param {AbortOptions} [options] | ||
*/ | ||
async _handshake(options) { | ||
if (this.shaken) { | ||
return; | ||
} | ||
const { stream } = await select(this.stream, PROTOCOL_ID, undefined, options); | ||
this.stream = stream; | ||
this.shaken = true; | ||
} | ||
} | ||
export class Dialer extends MultistreamSelect { | ||
async select(protocols, options) { | ||
return await select(this.stream, protocols, this.shaken ? undefined : PROTOCOL_ID, options); | ||
} | ||
} | ||
export class Listener extends MultistreamSelect { | ||
async handle(protocols, options) { | ||
return await handle(this.stream, protocols, options); | ||
} | ||
} | ||
export { select } from './select.js'; | ||
export { handle } from './handle.js'; | ||
//# sourceMappingURL=index.js.map |
@@ -5,13 +5,14 @@ import { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { Reader } from 'it-reader'; | ||
export declare function encode(buffer: Uint8Array | Uint8ArrayList): Uint8Array; | ||
import type { MultistreamSelectInit } from '.'; | ||
export declare function encode(buffer: Uint8Array | Uint8ArrayList): Uint8ArrayList; | ||
/** | ||
* `write` encodes and writes a single buffer | ||
*/ | ||
export declare function write(writer: Pushable<Uint8Array>, buffer: Uint8Array | Uint8ArrayList): void; | ||
export declare function write(writer: Pushable<any>, buffer: Uint8Array | Uint8ArrayList, options?: MultistreamSelectInit): void; | ||
/** | ||
* `writeAll` behaves like `write`, except it encodes an array of items as a single write | ||
*/ | ||
export declare function writeAll(writer: Pushable<Uint8Array>, buffers: Uint8Array[]): void; | ||
export declare function read(reader: Reader, options?: AbortOptions): Promise<Uint8Array>; | ||
export declare function writeAll(writer: Pushable<any>, buffers: Uint8Array[], options?: MultistreamSelectInit): void; | ||
export declare function read(reader: Reader, options?: AbortOptions): Promise<Uint8ArrayList>; | ||
export declare function readString(reader: Reader, options?: AbortOptions): Promise<string>; | ||
//# sourceMappingURL=multistream.d.ts.map |
@@ -12,3 +12,3 @@ import { Uint8ArrayList } from 'uint8arraylist'; | ||
const list = new Uint8ArrayList(buffer, NewLine); | ||
return lp.encode.single(list).subarray(); | ||
return lp.encode.single(list); | ||
} | ||
@@ -18,4 +18,10 @@ /** | ||
*/ | ||
export function write(writer, buffer) { | ||
writer.push(encode(buffer).subarray()); | ||
export function write(writer, buffer, options = {}) { | ||
const encoded = encode(buffer); | ||
if (options.writeBytes === true) { | ||
writer.push(encoded.subarray()); | ||
} | ||
else { | ||
writer.push(encoded); | ||
} | ||
} | ||
@@ -25,3 +31,3 @@ /** | ||
*/ | ||
export function writeAll(writer, buffers) { | ||
export function writeAll(writer, buffers, options = {}) { | ||
const list = new Uint8ArrayList(); | ||
@@ -31,3 +37,8 @@ for (const buf of buffers) { | ||
} | ||
writer.push(list.slice()); | ||
if (options.writeBytes === true) { | ||
writer.push(list.subarray()); | ||
} | ||
else { | ||
writer.push(list); | ||
} | ||
} | ||
@@ -47,3 +58,5 @@ export async function read(reader, options) { | ||
// Once the length has been parsed, read chunk for that length | ||
const onLength = (l) => { byteLength = l; }; | ||
const onLength = (l) => { | ||
byteLength = l; | ||
}; | ||
const buf = await pipe(input, lp.decode({ onLength }), async (source) => await first(source)); | ||
@@ -56,8 +69,8 @@ if (buf == null) { | ||
} | ||
return buf.slice(0, -1); // Remove newline | ||
return buf.sublist(0, -1); // Remove newline | ||
} | ||
export async function readString(reader, options) { | ||
const buf = await read(reader, options); | ||
return uint8ArrayToString(buf); | ||
return uint8ArrayToString(buf.subarray()); | ||
} | ||
//# sourceMappingURL=multistream.js.map |
@@ -1,7 +0,6 @@ | ||
import type { AbortOptions } from '@libp2p/interfaces'; | ||
import type { Duplex } from 'it-stream-types'; | ||
export declare function select(stream: Duplex<Uint8Array>, protocols: string | string[], protocolId?: string, options?: AbortOptions): Promise<{ | ||
stream: Duplex<Uint8Array, Uint8Array, Promise<void>>; | ||
protocol: string; | ||
}>; | ||
import type { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { ByteArrayInit, ByteListInit, ProtocolStream } from './index.js'; | ||
export declare function select(stream: Duplex<Uint8Array>, protocols: string | string[], options: ByteArrayInit): Promise<ProtocolStream<Uint8Array>>; | ||
export declare function select(stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>, protocols: string | string[], options?: ByteListInit): Promise<ProtocolStream<Uint8ArrayList, Uint8ArrayList | Uint8Array>>; | ||
//# sourceMappingURL=select.d.ts.map |
@@ -6,4 +6,5 @@ import { logger } from '@libp2p/logger'; | ||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'; | ||
import { PROTOCOL_ID } from './index.js'; | ||
const log = logger('libp2p:mss:select'); | ||
export async function select(stream, protocols, protocolId, options) { | ||
export async function select(stream, protocols, options = {}) { | ||
protocols = Array.isArray(protocols) ? [...protocols] : [protocols]; | ||
@@ -15,14 +16,10 @@ const { reader, writer, rest, stream: shakeStream } = handshake(stream); | ||
} | ||
if (protocolId != null) { | ||
log('select: write ["%s", "%s"]', protocolId, protocol); | ||
multistream.writeAll(writer, [uint8ArrayFromString(protocolId), uint8ArrayFromString(protocol)]); | ||
} | ||
else { | ||
log('select: write "%s"', protocol); | ||
multistream.write(writer, uint8ArrayFromString(protocol)); | ||
} | ||
log('select: write ["%s", "%s"]', PROTOCOL_ID, protocol); | ||
const p1 = uint8ArrayFromString(PROTOCOL_ID); | ||
const p2 = uint8ArrayFromString(protocol); | ||
multistream.writeAll(writer, [p1, p2], options); | ||
let response = await multistream.readString(reader, options); | ||
log('select: read "%s"', response); | ||
// Read the protocol response if we got the protocolId in return | ||
if (response === protocolId) { | ||
if (response === PROTOCOL_ID) { | ||
response = await multistream.readString(reader, options); | ||
@@ -39,3 +36,3 @@ log('select: read "%s"', response); | ||
log('select: write "%s"', protocol); | ||
multistream.write(writer, uint8ArrayFromString(protocol)); | ||
multistream.write(writer, uint8ArrayFromString(protocol), options); | ||
const response = await multistream.readString(reader, options); | ||
@@ -42,0 +39,0 @@ log('select: read "%s" for "%s"', response, protocol); |
{ | ||
"name": "@libp2p/multistream-select", | ||
"version": "2.0.2", | ||
"version": "3.0.0", | ||
"description": "JavaScript implementation of multistream-select", | ||
@@ -157,3 +157,3 @@ "license": "Apache-2.0 OR MIT", | ||
"p-defer": "^4.0.0", | ||
"uint8arraylist": "^2.0.0", | ||
"uint8arraylist": "^2.3.1", | ||
"uint8arrays": "^3.0.0" | ||
@@ -160,0 +160,0 @@ }, |
115
README.md
@@ -21,22 +21,10 @@ # @libp2p/multistream-select <!-- omit in toc --> | ||
- [API](#api) | ||
- [`new Dialer(duplex)`](#new-dialerduplex) | ||
- [`mss.select(dulpex, protocols, [options])`](#mssselectdulpex-protocols-options) | ||
- [Parameters](#parameters) | ||
- [Returns](#returns) | ||
- [Examples](#examples) | ||
- [`dialer.select(protocols, [options])`](#dialerselectprotocols-options) | ||
- [`mss.handle(duplex, protocols, [options])`](#msshandleduplex-protocols-options) | ||
- [Parameters](#parameters-1) | ||
- [Returns](#returns-1) | ||
- [Examples](#examples-1) | ||
- [`dialer.ls([options])`](#dialerlsoptions) | ||
- [Parameters](#parameters-2) | ||
- [Returns](#returns-2) | ||
- [Examples](#examples-2) | ||
- [`new Listener(duplex)`](#new-listenerduplex) | ||
- [Parameters](#parameters-3) | ||
- [Returns](#returns-3) | ||
- [Examples](#examples-3) | ||
- [`listener.handle(protocols, [options])`](#listenerhandleprotocols-options) | ||
- [Parameters](#parameters-4) | ||
- [Returns](#returns-4) | ||
- [Examples](#examples-4) | ||
- [License](#license) | ||
@@ -73,11 +61,9 @@ - [Contribution](#contribution) | ||
This mode also packs a `ls` option, so that the callee can list the protocols it currently supports | ||
## Usage | ||
```js | ||
import { Dialer, Listener } from '@libp2p/multistream-select' | ||
import { select, handle } from '@libp2p/multistream-select' | ||
// You can now use | ||
// Dialer - actively select a protocol with a remote | ||
// Listener - handle a protocol with a remote | ||
// select - actively select a protocol with a remote | ||
// handle - handle a protocol with a remote | ||
``` | ||
@@ -89,3 +75,3 @@ | ||
import { pipe } from 'it-pipe' | ||
import { Dialer } from '@libp2p/multistream-select' | ||
import * as mss from '@libp2p/multistream-select' | ||
import { Mplex } from '@libp2p/mplex' | ||
@@ -96,8 +82,6 @@ | ||
const mss = new Dialer(muxedStream) | ||
// mss.select(protocol(s)) | ||
// Select from one of the passed protocols (in priority order) | ||
// Returns selected stream and protocol | ||
const { stream: dhtStream, protocol } = await mss.select([ | ||
const { stream: dhtStream, protocol } = await mss.select(muxedStream, [ | ||
// This might just be different versions of DHT, but could be different impls | ||
@@ -129,3 +113,3 @@ '/ipfs-dht/2.0.0', // Most of the time this will probably just be one item. | ||
import { pipe } from 'it-pipe' | ||
import { Listener } from '@libp2p/multistream-select' | ||
import * as mss from '@libp2p/multistream-select' | ||
import { Mplex } from '@libp2p/mplex' | ||
@@ -135,7 +119,5 @@ | ||
async onStream (muxedStream) { | ||
const mss = new Listener(muxedStream) | ||
// mss.handle(handledProtocols) | ||
// Returns selected stream and protocol | ||
const { stream, protocol } = await mss.handle([ | ||
const { stream, protocol } = await mss.handle(muxedStream, [ | ||
'/ipfs-dht/1.0.0', | ||
@@ -168,22 +150,4 @@ '/ipfs-bitswap/1.0.0' | ||
### `new Dialer(duplex)` | ||
### `mss.select(dulpex, protocols, [options])` | ||
Create a new multistream select "dialer" instance which can be used to negotiate a protocol to use, list all available protocols the remote supports, or do both. | ||
#### Parameters | ||
- `duplex` (`Object`) - A [duplex iterable stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it) to dial on. | ||
#### Returns | ||
A new multistream select dialer instance. | ||
#### Examples | ||
```js | ||
const dialer = new MSS.Dialer(duplex) | ||
``` | ||
### `dialer.select(protocols, [options])` | ||
Negotiate a protocol to use from a list of protocols. | ||
@@ -193,8 +157,9 @@ | ||
- `protocols` (`String[]`/`String`) - A list of protocols (or single protocol) to negotiate with. Protocols are attempted in order until a match is made. | ||
- `options` (`{ signal: AbortSignal }`) - an options object containing an AbortSignal | ||
- `duplex` (`Duplex`) - A [duplex iterable stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it) to dial on. | ||
- `protocols` (`string[]`/`string`) - A list of protocols (or single protocol) to negotiate with. Protocols are attempted in order until a match is made. | ||
- `options` (`{ signal: AbortSignal, writeBytes?: boolean }`) - an options object containing an AbortSignal and an optional boolean `writeBytes` - if this is true, `Uint8Array`s will be written into `duplex`, otherwise `Uint8ArrayList`s will | ||
#### Returns | ||
`Promise<{ stream<Object>, protocol<String> }>` - A stream for the selected protocol and the protocol that was selected from the list of protocols provided to `select`. | ||
`Promise<{ stream<Duplex>, protocol<string> }>` - A stream for the selected protocol and the protocol that was selected from the list of protocols provided to `select`. | ||
@@ -214,47 +179,4 @@ Note that after a protocol is selected `dialer` can no longer be used. | ||
### `dialer.ls([options])` | ||
### `mss.handle(duplex, protocols, [options])` | ||
List protocols that the remote supports. | ||
#### Parameters | ||
- `options` (`{ signal: AbortSignal }`) - an options object containing an AbortSignal | ||
#### Returns | ||
`String[]` - A list of all the protocols the remote supports. | ||
#### Examples | ||
```js | ||
const protocols = await dialer.ls() | ||
const wantedProto = '/ipfs-dht/2.0.0' | ||
if (!protocols.includes(wantedProto)) { | ||
throw new Error('remote does not support ' + wantedProto) | ||
} | ||
// Now use dialer.select to use wantedProto, safe in the knowledge it is supported | ||
``` | ||
### `new Listener(duplex)` | ||
Construct a new multistream select "listener" instance which can be used to handle multistream protocol selections for particular protocols. | ||
#### Parameters | ||
- `duplex` (`Object`) - A [duplex iterable stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it) to listen on. | ||
#### Returns | ||
A new multistream select listener instance. | ||
#### Examples | ||
```js | ||
const listener = new MSS.Listener(duplex) | ||
``` | ||
### `listener.handle(protocols, [options])` | ||
Handle multistream protocol selections for the given list of protocols. | ||
@@ -264,8 +186,9 @@ | ||
- `duplex` (`Duplex`) - A [duplex iterable stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it) to listen on. | ||
- `protocols` (`String[]`/`String`) - A list of protocols (or single protocol) that this listener is able to speak. | ||
- `options` (`{ signal: AbortSignal }`) - an options object containing an AbortSignal | ||
- `options` (`{ signal: AbortSignal, writeBytes?: boolean }`) - an options object containing an AbortSignal and an optional boolean `writeBytes` - if this is true, `Uint8Array`s will be written into `duplex`, otherwise `Uint8ArrayList`s will | ||
#### Returns | ||
`Promise<{ stream<Object>, protocol<String> }>` - A stream for the selected protocol and the protocol that was selected from the list of protocols provided to `select`. | ||
`Promise<{ stream<Duplex>, protocol<string> }>` - A stream for the selected protocol and the protocol that was selected from the list of protocols provided to `select`. | ||
@@ -277,3 +200,3 @@ Note that after a protocol is handled `listener` can no longer be used. | ||
```js | ||
const { stream, protocol } = await listener.handle([ | ||
const { stream, protocol } = await mss.handle(duplex, [ | ||
'/ipfs-dht/1.0.0', | ||
@@ -280,0 +203,0 @@ '/ipfs-bitswap/1.0.0' |
@@ -7,8 +7,10 @@ import { logger } from '@libp2p/logger' | ||
import { Uint8ArrayList } from 'uint8arraylist' | ||
import type { AbortOptions } from '@libp2p/interfaces' | ||
import type { Duplex } from 'it-stream-types' | ||
import type { ByteArrayInit, ByteListInit, MultistreamSelectInit, ProtocolStream } from './index.js' | ||
const log = logger('libp2p:mss:handle') | ||
export async function handle (stream: Duplex<Uint8Array>, protocols: string | string[], options?: AbortOptions) { | ||
export async function handle (stream: Duplex<Uint8Array>, protocols: string | string[], options: ByteArrayInit): Promise<ProtocolStream<Uint8Array>> | ||
export async function handle (stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>, protocols: string | string[], options?: ByteListInit): Promise<ProtocolStream<Uint8ArrayList, Uint8ArrayList | Uint8Array>> | ||
export async function handle (stream: Duplex<any>, protocols: string | string[], options?: MultistreamSelectInit): Promise<ProtocolStream<any>> { | ||
protocols = Array.isArray(protocols) ? protocols : [protocols] | ||
@@ -23,3 +25,3 @@ const { writer, reader, rest, stream: shakeStream } = handshake(stream) | ||
log('respond with "%s" for "%s"', PROTOCOL_ID, protocol) | ||
multistream.write(writer, uint8ArrayFromString(PROTOCOL_ID)) | ||
multistream.write(writer, uint8ArrayFromString(PROTOCOL_ID), options) | ||
continue | ||
@@ -29,3 +31,3 @@ } | ||
if (protocols.includes(protocol)) { | ||
multistream.write(writer, uint8ArrayFromString(protocol)) | ||
multistream.write(writer, uint8ArrayFromString(protocol), options) | ||
log('respond with "%s" for "%s"', protocol, protocol) | ||
@@ -38,3 +40,4 @@ rest() | ||
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n | ||
multistream.write(writer, new Uint8ArrayList(...protocols.map(p => multistream.encode(uint8ArrayFromString(p))))) | ||
multistream.write(writer, new Uint8ArrayList(...protocols.map(p => multistream.encode(uint8ArrayFromString(p)))), options) | ||
// multistream.writeAll(writer, protocols.map(p => uint8ArrayFromString(p))) | ||
log('respond with "%s" for %s', protocols, protocol) | ||
@@ -44,5 +47,5 @@ continue | ||
multistream.write(writer, uint8ArrayFromString('na')) | ||
multistream.write(writer, uint8ArrayFromString('na'), options) | ||
log('respond with "na" for "%s"', protocol) | ||
} | ||
} |
@@ -1,3 +0,1 @@ | ||
import { select } from './select.js' | ||
import { handle } from './handle.js' | ||
import { PROTOCOL_ID } from './constants.js' | ||
@@ -9,42 +7,20 @@ import type { Duplex } from 'it-stream-types' | ||
export interface ProtocolStream { | ||
stream: Duplex<Uint8Array> | ||
export interface ProtocolStream<TSource, TSink = TSource> { | ||
stream: Duplex<TSource, TSink> | ||
protocol: string | ||
} | ||
class MultistreamSelect { | ||
protected stream: Duplex<Uint8Array> | ||
protected shaken: boolean | ||
export interface ByteArrayInit extends AbortOptions { | ||
writeBytes: true | ||
} | ||
constructor (stream: Duplex<Uint8Array>) { | ||
this.stream = stream | ||
this.shaken = false | ||
} | ||
/** | ||
* Perform the multistream-select handshake | ||
* | ||
* @param {AbortOptions} [options] | ||
*/ | ||
async _handshake (options?: AbortOptions): Promise<void> { | ||
if (this.shaken) { | ||
return | ||
} | ||
const { stream } = await select(this.stream, PROTOCOL_ID, undefined, options) | ||
this.stream = stream | ||
this.shaken = true | ||
} | ||
export interface ByteListInit extends AbortOptions { | ||
writeBytes?: false | ||
} | ||
export class Dialer extends MultistreamSelect { | ||
async select (protocols: string | string[], options?: AbortOptions): Promise<ProtocolStream> { | ||
return await select(this.stream, protocols, this.shaken ? undefined : PROTOCOL_ID, options) | ||
} | ||
export interface MultistreamSelectInit extends AbortOptions { | ||
writeBytes?: boolean | ||
} | ||
export class Listener extends MultistreamSelect { | ||
async handle (protocols: string | string[], options?: AbortOptions): Promise<ProtocolStream> { | ||
return await handle(this.stream, protocols, options) | ||
} | ||
} | ||
export { select } from './select.js' | ||
export { handle } from './handle.js' |
@@ -14,9 +14,10 @@ | ||
import type { Reader } from 'it-reader' | ||
import type { MultistreamSelectInit } from '.' | ||
const NewLine = uint8ArrayFromString('\n') | ||
export function encode (buffer: Uint8Array | Uint8ArrayList): Uint8Array { | ||
export function encode (buffer: Uint8Array | Uint8ArrayList): Uint8ArrayList { | ||
const list = new Uint8ArrayList(buffer, NewLine) | ||
return lp.encode.single(list).subarray() | ||
return lp.encode.single(list) | ||
} | ||
@@ -27,4 +28,10 @@ | ||
*/ | ||
export function write (writer: Pushable<Uint8Array>, buffer: Uint8Array | Uint8ArrayList) { | ||
writer.push(encode(buffer).subarray()) | ||
export function write (writer: Pushable<any>, buffer: Uint8Array | Uint8ArrayList, options: MultistreamSelectInit = {}) { | ||
const encoded = encode(buffer) | ||
if (options.writeBytes === true) { | ||
writer.push(encoded.subarray()) | ||
} else { | ||
writer.push(encoded) | ||
} | ||
} | ||
@@ -35,3 +42,3 @@ | ||
*/ | ||
export function writeAll (writer: Pushable<Uint8Array>, buffers: Uint8Array[]) { | ||
export function writeAll (writer: Pushable<any>, buffers: Uint8Array[], options: MultistreamSelectInit = {}) { | ||
const list = new Uint8ArrayList() | ||
@@ -43,6 +50,10 @@ | ||
writer.push(list.slice()) | ||
if (options.writeBytes === true) { | ||
writer.push(list.subarray()) | ||
} else { | ||
writer.push(list) | ||
} | ||
} | ||
export async function read (reader: Reader, options?: AbortOptions) { | ||
export async function read (reader: Reader, options?: AbortOptions): Promise<Uint8ArrayList> { | ||
let byteLength = 1 // Read single byte chunks until the length is known | ||
@@ -63,3 +74,5 @@ const varByteSource = { // No return impl - we want the reader to remain readable | ||
// Once the length has been parsed, read chunk for that length | ||
const onLength = (l: number) => { byteLength = l } | ||
const onLength = (l: number) => { | ||
byteLength = l | ||
} | ||
@@ -80,3 +93,3 @@ const buf = await pipe( | ||
return buf.slice(0, -1) // Remove newline | ||
return buf.sublist(0, -1) // Remove newline | ||
} | ||
@@ -87,3 +100,3 @@ | ||
return uint8ArrayToString(buf) | ||
return uint8ArrayToString(buf.subarray()) | ||
} |
@@ -6,8 +6,12 @@ import { logger } from '@libp2p/logger' | ||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' | ||
import type { AbortOptions } from '@libp2p/interfaces' | ||
import { PROTOCOL_ID } from './index.js' | ||
import type { Duplex } from 'it-stream-types' | ||
import type { Uint8ArrayList } from 'uint8arraylist' | ||
import type { ByteArrayInit, ByteListInit, MultistreamSelectInit, ProtocolStream } from './index.js' | ||
const log = logger('libp2p:mss:select') | ||
export async function select (stream: Duplex<Uint8Array>, protocols: string | string[], protocolId?: string, options?: AbortOptions) { | ||
export async function select (stream: Duplex<Uint8Array>, protocols: string | string[], options: ByteArrayInit): Promise<ProtocolStream<Uint8Array>> | ||
export async function select (stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>, protocols: string | string[], options?: ByteListInit): Promise<ProtocolStream<Uint8ArrayList, Uint8ArrayList | Uint8Array>> | ||
export async function select (stream: Duplex<any>, protocols: string | string[], options: MultistreamSelectInit = {}): Promise<ProtocolStream<any>> { | ||
protocols = Array.isArray(protocols) ? [...protocols] : [protocols] | ||
@@ -22,9 +26,6 @@ const { reader, writer, rest, stream: shakeStream } = handshake(stream) | ||
if (protocolId != null) { | ||
log('select: write ["%s", "%s"]', protocolId, protocol) | ||
multistream.writeAll(writer, [uint8ArrayFromString(protocolId), uint8ArrayFromString(protocol)]) | ||
} else { | ||
log('select: write "%s"', protocol) | ||
multistream.write(writer, uint8ArrayFromString(protocol)) | ||
} | ||
log('select: write ["%s", "%s"]', PROTOCOL_ID, protocol) | ||
const p1 = uint8ArrayFromString(PROTOCOL_ID) | ||
const p2 = uint8ArrayFromString(protocol) | ||
multistream.writeAll(writer, [p1, p2], options) | ||
@@ -35,3 +36,3 @@ let response = await multistream.readString(reader, options) | ||
// Read the protocol response if we got the protocolId in return | ||
if (response === protocolId) { | ||
if (response === PROTOCOL_ID) { | ||
response = await multistream.readString(reader, options) | ||
@@ -50,3 +51,3 @@ log('select: read "%s"', response) | ||
log('select: write "%s"', protocol) | ||
multistream.write(writer, uint8ArrayFromString(protocol)) | ||
multistream.write(writer, uint8ArrayFromString(protocol), options) | ||
const response = await multistream.readString(reader, options) | ||
@@ -53,0 +54,0 @@ log('select: read "%s" for "%s"', response, protocol) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
39041
-2.63%383
-7.49%207
-27.11%1
Infinity%Updated