New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@ndn/segmented-object

Package Overview
Dependencies
Maintainers
0
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@ndn/segmented-object - npm Package Compare versions

Comparing version 0.0.20240113 to 0.0.20240630

lib/fetch/unverified_browser.js

39

lib/discover-version_browser.js

@@ -1,30 +0,35 @@

import { Endpoint } from "@ndn/endpoint";
import { consume } from "@ndn/endpoint";
import { Interest } from "@ndn/packet";
import { defaultSegmentConvention, defaultVersionConvention } from "./convention_browser.js";
/** Discover version with CanBePrefix. */
export async function discoverVersion(name, { endpoint = new Endpoint(), describe, versionConvention = defaultVersionConvention, segmentNumConvention = defaultSegmentConvention, conventions: conventionsInput = [], expectedSuffixLen = 2, modifyInterest, retxLimit = 2, signal, verifier, } = {}) {
const conventions = conventionsInput.length > 0 ? conventionsInput : [[versionConvention, segmentNumConvention]];
/**
* Discover version with CanBePrefix.
* @param name - Name without version component.
* @returns Promise that resolves to versioned name annotated with identified conventions.
*/
export async function discoverVersion(name, { cOpts, versionConvention = defaultVersionConvention, segmentNumConvention = defaultSegmentConvention, conventions: conventionsInput = [], expectedSuffixLen = 2, } = {}) {
const interest = new Interest(name, Interest.CanBePrefix, Interest.MustBeFresh);
const data = await endpoint.consume(interest, {
describe: describe ?? `discoverVersion(${name})`,
modifyInterest,
retx: retxLimit,
signal,
verifier,
const data = await consume(interest, {
describe: `discoverVersion(${name})`,
...cOpts,
});
const conventions = conventionsInput.length > 0 ? conventionsInput : [[versionConvention, segmentNumConvention]];
const vComp = data.name.get(-2);
const sComp = data.name.get(-1);
let conventionIndex;
if ((expectedSuffixLen !== discoverVersion.ANY_SUFFIX_LEN &&
data.name.length !== name.length + expectedSuffixLen) ||
(conventionIndex = conventions.findIndex(([v, s]) => v.match(vComp) && s.match(sComp))) < 0) {
let index;
if (!checkSuffixLength(expectedSuffixLen, data.name.length - name.length) ||
(index = conventions.findIndex(([v, s]) => v.match(vComp) && s.match(sComp))) < 0) {
throw new Error(`cannot extract version from ${data.name}`);
}
return Object.defineProperties(data.name.getPrefix(-1), {
versionConvention: { value: conventions[conventionIndex][0] },
segmentNumConvention: { value: conventions[conventionIndex][1] },
versionConvention: { value: conventions[index][0] },
segmentNumConvention: { value: conventions[index][1] },
});
}
(function (discoverVersion) {
discoverVersion.ANY_SUFFIX_LEN = Symbol("discoverVersion.ANY_SUFFIX_LEN");
discoverVersion.ANY_SUFFIX_LEN = Symbol("@ndn/segmented-object#discoverVersion.ANY_SUFFIX_LEN");
})(discoverVersion || (discoverVersion = {}));
function checkSuffixLength(expected, actual) {
return (expected === discoverVersion.ANY_SUFFIX_LEN) ||
(Array.isArray(expected) && expected.includes(actual)) ||
expected === actual;
}

@@ -1,30 +0,35 @@

import { Endpoint } from "@ndn/endpoint";
import { consume } from "@ndn/endpoint";
import { Interest } from "@ndn/packet";
import { defaultSegmentConvention, defaultVersionConvention } from "./convention_node.js";
/** Discover version with CanBePrefix. */
export async function discoverVersion(name, { endpoint = new Endpoint(), describe, versionConvention = defaultVersionConvention, segmentNumConvention = defaultSegmentConvention, conventions: conventionsInput = [], expectedSuffixLen = 2, modifyInterest, retxLimit = 2, signal, verifier, } = {}) {
const conventions = conventionsInput.length > 0 ? conventionsInput : [[versionConvention, segmentNumConvention]];
/**
* Discover version with CanBePrefix.
* @param name - Name without version component.
* @returns Promise that resolves to versioned name annotated with identified conventions.
*/
export async function discoverVersion(name, { cOpts, versionConvention = defaultVersionConvention, segmentNumConvention = defaultSegmentConvention, conventions: conventionsInput = [], expectedSuffixLen = 2, } = {}) {
const interest = new Interest(name, Interest.CanBePrefix, Interest.MustBeFresh);
const data = await endpoint.consume(interest, {
describe: describe ?? `discoverVersion(${name})`,
modifyInterest,
retx: retxLimit,
signal,
verifier,
const data = await consume(interest, {
describe: `discoverVersion(${name})`,
...cOpts,
});
const conventions = conventionsInput.length > 0 ? conventionsInput : [[versionConvention, segmentNumConvention]];
const vComp = data.name.get(-2);
const sComp = data.name.get(-1);
let conventionIndex;
if ((expectedSuffixLen !== discoverVersion.ANY_SUFFIX_LEN &&
data.name.length !== name.length + expectedSuffixLen) ||
(conventionIndex = conventions.findIndex(([v, s]) => v.match(vComp) && s.match(sComp))) < 0) {
let index;
if (!checkSuffixLength(expectedSuffixLen, data.name.length - name.length) ||
(index = conventions.findIndex(([v, s]) => v.match(vComp) && s.match(sComp))) < 0) {
throw new Error(`cannot extract version from ${data.name}`);
}
return Object.defineProperties(data.name.getPrefix(-1), {
versionConvention: { value: conventions[conventionIndex][0] },
segmentNumConvention: { value: conventions[conventionIndex][1] },
versionConvention: { value: conventions[index][0] },
segmentNumConvention: { value: conventions[index][1] },
});
}
(function (discoverVersion) {
discoverVersion.ANY_SUFFIX_LEN = Symbol("discoverVersion.ANY_SUFFIX_LEN");
discoverVersion.ANY_SUFFIX_LEN = Symbol("@ndn/segmented-object#discoverVersion.ANY_SUFFIX_LEN");
})(discoverVersion || (discoverVersion = {}));
function checkSuffixLength(expected, actual) {
return (expected === discoverVersion.ANY_SUFFIX_LEN) ||
(Array.isArray(expected) && expected.includes(actual)) ||
expected === actual;
}

@@ -0,17 +1,38 @@

import { type ConsumerOptions } from "@ndn/endpoint";
import { type Name } from "@ndn/packet";
import { type SegmentConvention, type VersionConvention } from "./convention.js";
import type { fetch } from "./fetch/mod.js";
/** Discover version with CanBePrefix. */
export declare function discoverVersion(name: Name, { endpoint, describe, versionConvention, segmentNumConvention, conventions: conventionsInput, expectedSuffixLen, modifyInterest, retxLimit, signal, verifier, }?: discoverVersion.Options): Promise<discoverVersion.Result>;
/**
* Discover version with CanBePrefix.
* @param name - Name without version component.
* @returns Promise that resolves to versioned name annotated with identified conventions.
*/
export declare function discoverVersion(name: Name, { cOpts, versionConvention, segmentNumConvention, conventions: conventionsInput, expectedSuffixLen, }?: discoverVersion.Options): Promise<discoverVersion.Result>;
export declare namespace discoverVersion {
const ANY_SUFFIX_LEN: unique symbol;
interface Options extends fetch.Options {
interface Options {
/**
* Consumer options.
*
* @remarks
* - `.describe` defaults to "discoverVersion" + name.
* - `.retx` defaults to 2.
* - `.verifier` is recommended.
*/
cOpts?: ConsumerOptions;
/**
* Choose a version naming convention.
* Default is Version from @ndn/naming-convention2 package.
* @defaultValue `Version3`
*/
versionConvention?: VersionConvention;
/**
* Choose a segment number naming convention.
* @defaultValue `Segment3`
*/
segmentNumConvention?: SegmentConvention;
/**
* List of acceptable version+segment naming convention combinations.
* If this is specified and non-empty, it overrides versionConvention,segmentNumConvention.
*
* @remarks
* If this is specified and non-empty, it overrides `.versionConvention` and
* `.segmentNumConvention`.
*/

@@ -21,6 +42,10 @@ conventions?: ReadonlyArray<[VersionConvention, SegmentConvention]>;

* Expected number of suffix components, including Version and Segment.
* @defaultValue 2
*
* @remarks
* Minimum and default are 2, i.e. Version and Segment components.
* ANY_SUFFIX_LEN allows any suffix length.
* This can be a single number or an array of acceptable numbers.
* {@link ANY_SUFFIX_LEN} allows any suffix length.
*/
expectedSuffixLen?: number | typeof ANY_SUFFIX_LEN;
expectedSuffixLen?: number | readonly number[] | typeof ANY_SUFFIX_LEN;
}

@@ -27,0 +52,0 @@ type Result = Name & {

@@ -6,2 +6,6 @@ import { assert } from "@ndn/util";

cwnd_;
/**
* Constructor.
* @param initialCwnd - Initial congestion window.
*/
constructor(initialCwnd) {

@@ -11,2 +15,3 @@ super();

}
/** Congestion window. */
get cwnd() { return this.cwnd_; }

@@ -13,0 +18,0 @@ updateCwnd(v) {

@@ -6,2 +6,6 @@ import { assert } from "@ndn/util";

cwnd_;
/**
* Constructor.
* @param initialCwnd - Initial congestion window.
*/
constructor(initialCwnd) {

@@ -11,2 +15,3 @@ super();

}
/** Congestion window. */
get cwnd() { return this.cwnd_; }

@@ -13,0 +18,0 @@ updateCwnd(v) {

@@ -8,8 +8,22 @@ import { TypedEventTarget } from "typescript-event-target";

private cwnd_;
/**
* Constructor.
* @param initialCwnd - Initial congestion window.
*/
constructor(initialCwnd: number);
/** Congestion window. */
get cwnd(): number;
protected updateCwnd(v: number): void;
/**
* Increase congestion window.
* @param now - Timestamp of positive feedback (successful retrieval without congestion mark).
* @param rtt - Round-trip time when the positive feedback is received.
*/
abstract increase(now: number, rtt: number): void;
/**
* Decrease congestion window upon negative feedback (loss or congestion mark).
* @param now - Timestamp of negative feedback.
*/
abstract decrease(now: number): void;
}
export {};

@@ -1,7 +0,5 @@

import { __importDefault, __importStar } from "tslib";
import { Name } from "@ndn/packet";
import { assert, concatBuffers, Reorder } from "@ndn/util";
import _cjsDefaultImport0 from "event-iterator"; const EventIterator = __importDefault(_cjsDefaultImport0).default;
import { collect, map, writeToStream } from "streaming-iterables";
import { Fetcher } from "./fetcher_browser.js";
import { collect, map, parallelMap, writeToStream } from "streaming-iterables";
import { UnverifiedFetcher } from "./unverified_browser.js";
class FetchResult {

@@ -14,29 +12,23 @@ name;

}
get count() { return this.ctx?.count ?? 0; }
ctx;
get count() { return this.uvf?.count ?? 0; }
uvf;
promise;
startFetcher() {
assert(!this.ctx, "fetch.Result is already used");
const ctx = new Fetcher(this.name, this.opts);
this.ctx = ctx;
return new EventIterator(({ push, stop, fail, on }) => {
let resume;
on("highWater", () => { resume = ctx.pause(); });
on("lowWater", () => { resume?.(); });
const abort = new AbortController();
ctx.addEventListener("segment", push, { signal: abort.signal });
ctx.addEventListener("end", stop, { signal: abort.signal });
ctx.addEventListener("error", ({ detail }) => fail(detail), { signal: abort.signal });
return () => {
resume?.();
abort.abort();
};
});
assert(!this.uvf, "fetch.Result is already used");
const opts = {
...this.opts.cOpts,
...this.opts,
};
this.uvf = new UnverifiedFetcher(this.name, opts);
return parallelMap(16, async ({ seg, data }) => {
await opts.verifier?.verify(data);
return { seg, data };
}, this.uvf.fetch());
}
unordered() {
return map(({ data }) => data, this.startFetcher());
return map(({ data, seg: segNum }) => Object.assign(data, { segNum }), this.startFetcher());
}
async *ordered() {
const reorder = new Reorder(this.opts.segmentRange?.[0]);
for await (const { segNum, data } of this.startFetcher()) {
for await (const { seg: segNum, data } of this.startFetcher()) {
reorder.push(segNum, data);

@@ -66,5 +58,13 @@ yield* reorder.shift();

}
/** Fetch a segmented object. */
/**
* Fetch a segmented object.
*
* @remarks
* This function does not perform version discovery. If the segmented object is versioned, `name`
* must include the version component. You can perform version discovery with
* {@link discoverVersion} function and pass its result to this function for fetching the
* versioned and segmented object.
*/
export function fetch(name, opts = {}) {
return new FetchResult(Name.from(name), opts);
}

@@ -1,7 +0,5 @@

import { __importDefault, __importStar } from "tslib";
import { Name } from "@ndn/packet";
import { assert, concatBuffers, Reorder } from "@ndn/util";
import _cjsDefaultImport0 from "event-iterator"; const EventIterator = __importDefault(_cjsDefaultImport0).default;
import { collect, map, writeToStream } from "streaming-iterables";
import { Fetcher } from "./fetcher_node.js";
import { collect, map, parallelMap, writeToStream } from "streaming-iterables";
import { UnverifiedFetcher } from "./unverified_node.js";
class FetchResult {

@@ -14,29 +12,23 @@ name;

}
get count() { return this.ctx?.count ?? 0; }
ctx;
get count() { return this.uvf?.count ?? 0; }
uvf;
promise;
startFetcher() {
assert(!this.ctx, "fetch.Result is already used");
const ctx = new Fetcher(this.name, this.opts);
this.ctx = ctx;
return new EventIterator(({ push, stop, fail, on }) => {
let resume;
on("highWater", () => { resume = ctx.pause(); });
on("lowWater", () => { resume?.(); });
const abort = new AbortController();
ctx.addEventListener("segment", push, { signal: abort.signal });
ctx.addEventListener("end", stop, { signal: abort.signal });
ctx.addEventListener("error", ({ detail }) => fail(detail), { signal: abort.signal });
return () => {
resume?.();
abort.abort();
};
});
assert(!this.uvf, "fetch.Result is already used");
const opts = {
...this.opts.cOpts,
...this.opts,
};
this.uvf = new UnverifiedFetcher(this.name, opts);
return parallelMap(16, async ({ seg, data }) => {
await opts.verifier?.verify(data);
return { seg, data };
}, this.uvf.fetch());
}
unordered() {
return map(({ data }) => data, this.startFetcher());
return map(({ data, seg: segNum }) => Object.assign(data, { segNum }), this.startFetcher());
}
async *ordered() {
const reorder = new Reorder(this.opts.segmentRange?.[0]);
for await (const { segNum, data } of this.startFetcher()) {
for await (const { seg: segNum, data } of this.startFetcher()) {
reorder.push(segNum, data);

@@ -66,5 +58,13 @@ yield* reorder.shift();

}
/** Fetch a segmented object. */
/**
* Fetch a segmented object.
*
* @remarks
* This function does not perform version discovery. If the segmented object is versioned, `name`
* must include the version component. You can perform version discovery with
* {@link discoverVersion} function and pass its result to this function for fetching the
* versioned and segmented object.
*/
export function fetch(name, opts = {}) {
return new FetchResult(Name.from(name), opts);
}

@@ -1,11 +0,43 @@

import { type Data, type NameLike } from "@ndn/packet";
import type { ConsumerOptions } from "@ndn/endpoint";
import { type Data, type NameLike, type Verifier } from "@ndn/packet";
import { type WritableStreamish } from "streaming-iterables";
import { Fetcher } from "./fetcher.js";
/** Fetch a segmented object. */
import { type UnverifiedFetcherOptions } from "./unverified.js";
/**
* Fetch a segmented object.
*
* @remarks
* This function does not perform version discovery. If the segmented object is versioned, `name`
* must include the version component. You can perform version discovery with
* {@link discoverVersion} function and pass its result to this function for fetching the
* versioned and segmented object.
*/
export declare function fetch(name: NameLike, opts?: fetch.Options): fetch.Result;
export declare namespace fetch {
type Options = Fetcher.Options;
/** {@link fetch} options. */
interface Options extends UnverifiedFetcherOptions {
/**
* Inherit fetcher options from consumer options.
*
* @remarks
* These options are inherited if the corresponding fetcher option is unset:
* - `describe`
* - `fw`
* - `modifyInterest`
* - `signal`
* - `verifier`
*
* Other options cannot be inherited, notably:
* - `retx`
*/
cOpts?: ConsumerOptions;
/**
* Data verifier.
* @defaultValue noopSigning
*/
verifier?: Verifier;
}
/**
* Return type of fetch() function.
* Return type of {@link fetch} function.
*
* @remarks
* Fetch output may be accessed in one of several formats:

@@ -22,3 +54,5 @@ * - `await result` resolves to the reassembled object as Uint8Array.

/** Iterate over Data packets as they arrive, not sorted in segment number order. */
unordered: () => AsyncIterable<Data>;
unordered: () => AsyncIterable<Data & {
readonly segNum: number;
}>;
/** Iterate over payload chunks in segment number order. */

@@ -25,0 +59,0 @@ chunks: () => AsyncIterable<Uint8Array>;

@@ -11,3 +11,3 @@ const defaultParameters = {

* RTT estimator.
* @see https://datatracker.ietf.org/doc/html/rfc6298
* @see {@link https://datatracker.ietf.org/doc/html/rfc6298}
*/

@@ -14,0 +14,0 @@ export class RttEstimator {

@@ -11,3 +11,3 @@ const defaultParameters = {

* RTT estimator.
* @see https://datatracker.ietf.org/doc/html/rfc6298
* @see {@link https://datatracker.ietf.org/doc/html/rfc6298}
*/

@@ -14,0 +14,0 @@ export class RttEstimator {

@@ -11,3 +11,3 @@ interface Parameters {

* RTT estimator.
* @see https://datatracker.ietf.org/doc/html/rfc6298
* @see {@link https://datatracker.ietf.org/doc/html/rfc6298}
*/

@@ -14,0 +14,0 @@ export declare class RttEstimator {

import { CongestionAvoidance } from "./congestion-avoidance_browser.js";
/**
* TCP CUBIC algorithm.
* @see https://datatracker.ietf.org/doc/html/rfc8312
* @see {@link https://datatracker.ietf.org/doc/html/rfc8312}
*/

@@ -6,0 +6,0 @@ export class TcpCubic extends CongestionAvoidance {

import { CongestionAvoidance } from "./congestion-avoidance_node.js";
/**
* TCP CUBIC algorithm.
* @see https://datatracker.ietf.org/doc/html/rfc8312
* @see {@link https://datatracker.ietf.org/doc/html/rfc8312}
*/

@@ -6,0 +6,0 @@ export class TcpCubic extends CongestionAvoidance {

import { CongestionAvoidance } from "./congestion-avoidance.js";
/**
* TCP CUBIC algorithm.
* @see https://datatracker.ietf.org/doc/html/rfc8312
* @see {@link https://datatracker.ietf.org/doc/html/rfc8312}
*/

@@ -21,9 +21,18 @@ export declare class TcpCubic extends CongestionAvoidance {

interface Options {
/** Initial window. Default is 2. */
/**
* Initial congestion window.
* @defaultValue 2
*/
iw?: number;
/** CUBIC parameter C. Default is 0.4. */
/**
* CUBIC parameter C.
* @defaultValue 0.4
*/
c?: number;
/** CUBIC parameter beta_cubic. Default is 0.7. */
/**
* CUBIC parameter beta_cubic.
* @defaultValue 0.7
*/
betaCubic?: number;
}
}

@@ -6,18 +6,12 @@ import { Component, Name } from "@ndn/packet";

* Start serving a segmented object with support of CanBePrefix version discovery.
* @param prefixInput Data prefix excluding version and segment components.
* @param source where to read segment payload chunks.
* @param opts other options.
* @param prefix - Data prefix excluding version and segment components.
* @param source - Where to read segment payload chunks.
* @param opts - Other options.
*/
export function serveVersioned(prefixInput, source, opts = {}) {
let versionComp;
let { version = Date.now(), producerPrefix } = opts;
if (typeof version === "number") {
const { versionConvention = defaultVersionConvention } = opts;
versionComp = versionConvention.create(version);
}
else {
versionComp = Component.from(version);
}
const prefix = Name.from(prefixInput);
producerPrefix ??= prefix;
export function serveVersioned(prefix, source, opts = {}) {
const { version = Date.now(), versionConvention = defaultVersionConvention } = opts;
const versionComp = typeof version === "number" ?
versionConvention.create(version) : Component.from(version);
prefix = Name.from(prefix);
const { producerPrefix = prefix } = opts;
return serve(prefix.append(versionComp), source, {

@@ -24,0 +18,0 @@ ...opts,

@@ -6,18 +6,12 @@ import { Component, Name } from "@ndn/packet";

* Start serving a segmented object with support of CanBePrefix version discovery.
* @param prefixInput Data prefix excluding version and segment components.
* @param source where to read segment payload chunks.
* @param opts other options.
* @param prefix - Data prefix excluding version and segment components.
* @param source - Where to read segment payload chunks.
* @param opts - Other options.
*/
export function serveVersioned(prefixInput, source, opts = {}) {
let versionComp;
let { version = Date.now(), producerPrefix } = opts;
if (typeof version === "number") {
const { versionConvention = defaultVersionConvention } = opts;
versionComp = versionConvention.create(version);
}
else {
versionComp = Component.from(version);
}
const prefix = Name.from(prefixInput);
producerPrefix ??= prefix;
export function serveVersioned(prefix, source, opts = {}) {
const { version = Date.now(), versionConvention = defaultVersionConvention } = opts;
const versionComp = typeof version === "number" ?
versionConvention.create(version) : Component.from(version);
prefix = Name.from(prefix);
const { producerPrefix = prefix } = opts;
return serve(prefix.append(versionComp), source, {

@@ -24,0 +18,0 @@ ...opts,

import { type ComponentLike, type NameLike } from "@ndn/packet";
import { type VersionConventionFromNumber } from "./convention.js";
import { type ChunkSource, serve, type Server } from "./serve/mod.js";
type GivenVersionOptions = {
/** Version number component. */
version: ComponentLike;
};
type MakeVersionOptions = {
/**
* Choose a version number naming convention.
* Default is Version from @ndn/naming-convention2 package.
*/
versionConvention?: VersionConventionFromNumber;
/**
* Version number.
* Default is current Unix timestamp (milliseconds).
*/
version?: number;
};
/**
* Start serving a segmented object with support of CanBePrefix version discovery.
* @param prefixInput Data prefix excluding version and segment components.
* @param source where to read segment payload chunks.
* @param opts other options.
* @param prefix - Data prefix excluding version and segment components.
* @param source - Where to read segment payload chunks.
* @param opts - Other options.
*/
export declare function serveVersioned(prefixInput: NameLike, source: ChunkSource, opts?: serveVersioned.Options): Server;
export declare function serveVersioned(prefix: NameLike, source: ChunkSource, opts?: serveVersioned.Options): Server;
export declare namespace serveVersioned {
type Options = serve.Options & (GivenVersionOptions | MakeVersionOptions);
interface Options extends serve.Options {
/**
* Version component or version number.
* @defaultValue `Date.now())`
*/
version?: ComponentLike | number;
/**
* Choose a version number naming convention.
* @defaultValue `Version3`
*
* @remarks
* If `.version` is a number, it's encoded with this convention.
*/
versionConvention?: VersionConventionFromNumber;
}
}
export {};

@@ -1,2 +0,1 @@

/// <reference types="node" />
import type { Blob as NodeBlob } from "node:buffer";

@@ -3,0 +2,0 @@ import { type ChunkOptions, type ChunkSource, KnownSizeChunkSource } from "./common.js";

@@ -0,1 +1,2 @@

/** ChunkSource where total size is known. */
export class KnownSizeChunkSource {

@@ -2,0 +3,0 @@ chunkSize;

@@ -0,1 +1,2 @@

/** ChunkSource where total size is known. */
export class KnownSizeChunkSource {

@@ -2,0 +3,0 @@ chunkSize;

@@ -14,3 +14,3 @@ /** Index and payload of a chunk. */

* Generate chunks sequentially.
* @returns an AsyncIterable of chunks in order.
* @returns AsyncIterable of chunks in order.
*/

@@ -20,4 +20,4 @@ listChunks: () => AsyncIterable<Chunk>;

* Generate a chunk on-demand.
* @param i chunk number, starting from zero.
* @returns a Promise that resolves to requested chunk, or undefined if out of range.
* @param i - Chunk number, starting from zero.
* @returns Promise that resolves to requested chunk, or undefined if out of range.
*/

@@ -27,2 +27,3 @@ getChunk?: (i: number) => Promise<Chunk | undefined>;

}
/** ChunkSource where total size is known. */
export declare abstract class KnownSizeChunkSource implements ChunkSource {

@@ -42,3 +43,3 @@ protected readonly chunkSize: number;

* Minimum chunk size.
* @default 64
* @defaultValue 64
*/

@@ -48,3 +49,3 @@ minChunkSize?: number;

* Maximum chunk size.
* @default 4096
* @defaultValue 4096
*/

@@ -51,0 +52,0 @@ maxChunkSize?: number;

@@ -1,1 +0,50 @@

export {};
import PLazy from "p-lazy";
import { getMaxChunkSize, KnownSizeChunkSource } from "./common_browser.js";
import { fsOpen } from "./platform_browser.js";
class FileHandleChunkSource extends KnownSizeChunkSource {
fh;
constructor(fh, chunkSize, totalSize) {
super(chunkSize, totalSize);
this.fh = fh;
}
async getPayload(i, offset, chunkSize) {
void i;
const payload = new Uint8Array(chunkSize);
await this.fh.read(payload, 0, chunkSize, offset);
return payload;
}
async close() {
await this.fh.close();
}
}
/**
* Generate chunks from a file.
*
* @remarks
* Warning: modifying the file while FileChunkSource is active may cause undefined behavior.
*/
export class FileChunkSource {
constructor(path, opts = {}) {
const chunkSize = getMaxChunkSize(opts);
this.opening = PLazy.from(async () => {
const fh = await fsOpen(path, opts);
const { size } = await fh.stat();
return new FileHandleChunkSource(fh, chunkSize, size);
});
}
opening;
/* c8 ignore start: not used when getChunk is present */
async *listChunks() {
const h = await this.opening;
yield* h.listChunks();
}
/* c8 ignore stop */
async getChunk(i) {
const h = await this.opening;
return h.getChunk(i);
}
async close() {
const h = await this.opening;
await h.close();
}
}

@@ -1,4 +0,4 @@

import fs from "node:fs/promises";
import PLazy from "p-lazy";
import { getMaxChunkSize, KnownSizeChunkSource } from "./common_node.js";
import { fsOpen } from "./platform_node.js";
class FileHandleChunkSource extends KnownSizeChunkSource {

@@ -23,2 +23,3 @@ fh;

*
* @remarks
* Warning: modifying the file while FileChunkSource is active may cause undefined behavior.

@@ -30,3 +31,3 @@ */

this.opening = PLazy.from(async () => {
const fh = await fs.open(path, "r");
const fh = await fsOpen(path, opts);
const { size } = await fh.stat();

@@ -33,0 +34,0 @@ return new FileHandleChunkSource(fh, chunkSize, size);

@@ -1,56 +0,46 @@

import { assert } from "@ndn/util";
import { assert, concatBuffers } from "@ndn/util";
import { collect } from "streaming-iterables";
import { getMaxChunkSize, getMinChunkSize } from "./common_browser.js";
/** Gather chunks of acceptable size from scattered buffers. */
class ScatteredChunk {
minSize;
maxSize;
constructor(minSize, maxSize) {
this.minSize = minSize;
this.maxSize = maxSize;
}
vector = [];
length = 0;
append(buf) {
this.vector.push(buf);
this.length += buf.byteLength;
}
gather(ignoreMinSize = false) {
if (!ignoreMinSize && this.length < this.minSize) {
return undefined;
function resize(min, max) {
let vec = [];
let length = 0;
return function* (buf) {
if (!buf) { // final chunk
return yield concatBuffers(vec, length);
}
if (this.length === 0) { // implies ignoreMinSize
return new Uint8Array();
}
// fast path when first buffer has acceptable size
let buf = this.vector[0];
if (buf.byteLength >= this.minSize && buf.byteLength <= this.maxSize) {
this.length -= buf.byteLength;
return this.vector.shift();
}
// fast path when first buffer has enough payload
if (buf.byteLength > this.maxSize) {
const output = buf.subarray(0, this.maxSize);
this.length -= this.maxSize;
this.vector[0] = buf.subarray(this.maxSize);
return output;
}
// slow path that combines multiple buffers
const output = new Uint8Array(Math.min(this.maxSize, this.length));
for (let offset = 0; offset < output.byteLength;) {
buf = this.vector[0];
const rem = output.byteLength - offset;
if (buf.byteLength > rem) {
output.set(buf.subarray(0, rem), offset);
offset += rem;
this.vector[0] = buf.subarray(rem);
const total = length + buf.length;
if (total >= min && total <= max) {
if (length === 0) {
yield buf;
}
else {
output.set(buf, offset);
offset += buf.byteLength;
this.vector.shift();
vec.push(buf);
yield concatBuffers(vec, total);
vec = [];
length = 0;
}
return;
}
this.length -= output.byteLength;
return output;
}
if (total < min) {
vec.push(buf);
length = total;
return;
}
// assert total > max
let wanted = max - length;
vec.push(buf.subarray(0, wanted));
yield concatBuffers(vec, max);
let off = wanted;
let rem = buf.length - wanted;
while (rem >= min) {
wanted = Math.min(rem, max);
const end = off + wanted;
yield buf.subarray(off, end);
off = end;
rem -= wanted;
}
vec = [buf.subarray(off)];
length = rem;
};
}

@@ -62,2 +52,3 @@ /**

export class IterableChunkSource {
input;
constructor(input, opts = {}) {

@@ -68,3 +59,2 @@ this.input = input;

}
input;
minSize;

@@ -74,8 +64,6 @@ maxSize;

let i = -1;
const scattered = new ScatteredChunk(this.minSize, this.maxSize);
const resizer = resize(this.minSize, this.maxSize);
for await (const buf of this.input) {
assert(buf instanceof Uint8Array);
scattered.append(buf);
let payload;
while (payload = scattered.gather()) { // eslint-disable-line no-cond-assign
for (const payload of resizer(buf)) {
++i;

@@ -86,3 +74,3 @@ yield { i, payload };

++i;
yield { i, final: i, payload: scattered.gather(true) };
yield { i, final: i, payload: collect(resizer())[0] };
}

@@ -89,0 +77,0 @@ }

@@ -1,56 +0,46 @@

import { assert } from "@ndn/util";
import { assert, concatBuffers } from "@ndn/util";
import { collect } from "streaming-iterables";
import { getMaxChunkSize, getMinChunkSize } from "./common_node.js";
/** Gather chunks of acceptable size from scattered buffers. */
class ScatteredChunk {
minSize;
maxSize;
constructor(minSize, maxSize) {
this.minSize = minSize;
this.maxSize = maxSize;
}
vector = [];
length = 0;
append(buf) {
this.vector.push(buf);
this.length += buf.byteLength;
}
gather(ignoreMinSize = false) {
if (!ignoreMinSize && this.length < this.minSize) {
return undefined;
function resize(min, max) {
let vec = [];
let length = 0;
return function* (buf) {
if (!buf) { // final chunk
return yield concatBuffers(vec, length);
}
if (this.length === 0) { // implies ignoreMinSize
return new Uint8Array();
}
// fast path when first buffer has acceptable size
let buf = this.vector[0];
if (buf.byteLength >= this.minSize && buf.byteLength <= this.maxSize) {
this.length -= buf.byteLength;
return this.vector.shift();
}
// fast path when first buffer has enough payload
if (buf.byteLength > this.maxSize) {
const output = buf.subarray(0, this.maxSize);
this.length -= this.maxSize;
this.vector[0] = buf.subarray(this.maxSize);
return output;
}
// slow path that combines multiple buffers
const output = new Uint8Array(Math.min(this.maxSize, this.length));
for (let offset = 0; offset < output.byteLength;) {
buf = this.vector[0];
const rem = output.byteLength - offset;
if (buf.byteLength > rem) {
output.set(buf.subarray(0, rem), offset);
offset += rem;
this.vector[0] = buf.subarray(rem);
const total = length + buf.length;
if (total >= min && total <= max) {
if (length === 0) {
yield buf;
}
else {
output.set(buf, offset);
offset += buf.byteLength;
this.vector.shift();
vec.push(buf);
yield concatBuffers(vec, total);
vec = [];
length = 0;
}
return;
}
this.length -= output.byteLength;
return output;
}
if (total < min) {
vec.push(buf);
length = total;
return;
}
// assert total > max
let wanted = max - length;
vec.push(buf.subarray(0, wanted));
yield concatBuffers(vec, max);
let off = wanted;
let rem = buf.length - wanted;
while (rem >= min) {
wanted = Math.min(rem, max);
const end = off + wanted;
yield buf.subarray(off, end);
off = end;
rem -= wanted;
}
vec = [buf.subarray(off)];
length = rem;
};
}

@@ -62,2 +52,3 @@ /**

export class IterableChunkSource {
input;
constructor(input, opts = {}) {

@@ -68,3 +59,2 @@ this.input = input;

}
input;
minSize;

@@ -74,8 +64,6 @@ maxSize;

let i = -1;
const scattered = new ScatteredChunk(this.minSize, this.maxSize);
const resizer = resize(this.minSize, this.maxSize);
for await (const buf of this.input) {
assert(buf instanceof Uint8Array);
scattered.append(buf);
let payload;
while (payload = scattered.gather()) { // eslint-disable-line no-cond-assign
for (const payload of resizer(buf)) {
++i;

@@ -86,3 +74,3 @@ yield { i, payload };

++i;
yield { i, final: i, payload: scattered.gather(true) };
yield { i, final: i, payload: collect(resizer())[0] };
}

@@ -89,0 +77,0 @@ }

@@ -1,3 +0,2 @@

/// <reference types="node" />
import type { AnyIterable } from "streaming-iterables";
import { type AnyIterable } from "streaming-iterables";
import { type Chunk, type ChunkOptions, type ChunkSource } from "./common.js";

@@ -9,4 +8,4 @@ /**

export declare class IterableChunkSource implements ChunkSource {
private readonly input;
constructor(input: AnyIterable<Uint8Array> | NodeJS.ReadableStream, opts?: ChunkOptions);
private readonly input;
private readonly minSize;

@@ -13,0 +12,0 @@ private readonly maxSize;

@@ -5,2 +5,1 @@ export * from "./blob_browser.js";

export * from "./iterable_browser.js";
export * from "./make_browser.js";

@@ -5,2 +5,1 @@ export * from "./blob_node.js";

export * from "./iterable_node.js";
export * from "./make_node.js";
export type { Chunk, ChunkSource, ChunkOptions } from "./common.js";
export * from "./blob.js";
export * from "./buffer.js";
export * from "./file_node.js";
export * from "./file.js";
export * from "./iterable.js";
export * from "./make.js";

@@ -8,3 +8,6 @@ import { __importDefault, __importStar } from "tslib";

import { defaultSegmentConvention } from "../convention_browser.js";
/** Produce Data for requested segment. */
/**
* Produce Data for requested segment.
* @see {@link DataProducer.create}
*/
export class DataProducer {

@@ -11,0 +14,0 @@ source;

@@ -8,3 +8,6 @@ import { __importDefault, __importStar } from "tslib";

import { defaultSegmentConvention } from "../convention_node.js";
/** Produce Data for requested segment. */
/**
* Produce Data for requested segment.
* @see {@link DataProducer.create}
*/
export class DataProducer {

@@ -11,0 +14,0 @@ source;

@@ -5,3 +5,6 @@ import type { ProducerHandler } from "@ndn/endpoint";

import type { Chunk, ChunkSource } from "./chunk-source/mod.js";
/** Produce Data for requested segment. */
/**
* Produce Data for requested segment.
* @see {@link DataProducer.create}
*/
export declare abstract class DataProducer {

@@ -14,5 +17,5 @@ protected readonly source: ChunkSource;

private readonly signer;
constructor(source: ChunkSource, prefix: Name, { segmentNumConvention, contentType, freshnessPeriod, signer, }: DataProducer.Options);
protected constructor(source: ChunkSource, prefix: Name, { segmentNumConvention, contentType, freshnessPeriod, signer, }: DataProducer.Options);
listData(): AsyncIterable<Data>;
processInterest: ProducerHandler;
readonly processInterest: ProducerHandler;
private parseInterest;

@@ -27,3 +30,3 @@ protected makeData({ i, final, payload }: Chunk): Promise<Data>;

* Choose a segment number naming convention.
* Default is Segment from @ndn/naming-convention2 package.
* @defaultValue `Segment3`
*/

@@ -33,3 +36,3 @@ segmentNumConvention?: SegmentConvention;

* Data ContentType.
* @default 0
* @defaultValue 0
*/

@@ -39,8 +42,8 @@ contentType?: number;

* Data FreshnessPeriod (in milliseconds).
* @default 60000
* @defaultValue 60000
*/
freshnessPeriod?: number;
/**
* A private key to sign Data.
* Default is SHA256 digest.
* Data signer.
* @defaultValue digestSigning
*/

@@ -50,4 +53,7 @@ signer?: Signer;

* How many chunks behind latest request to store in buffer.
* This is ignored if the ChunkSource supports getChunk() function.
* @defaultValue Infinity
*
* @remarks
* This is ignored if the ChunkSource supports `getChunk()` function.
*
* After processing an Interest requesting segment `i`, subsequent Interests requesting

@@ -60,4 +66,2 @@ * segment before `i - bufferBehind` cannot be answered.

* which would become a problem in the presence of multiple consumers.
*
* @default Infinity
*/

@@ -67,8 +71,9 @@ bufferBehind?: number;

* How many chunks ahead of latest request to store in buffer.
* This is ignored if the ChunkSource supports getChunk() function.
* @defaultValue 16
*
* @remarks
* This is ignored if the ChunkSource supports `getChunk()` function.
*
* A larger number can reduce latency of fulfilling Interests if ChunkSource is slow.
* A smaller number reduces memory usage.
*
* @default 16
*/

@@ -75,0 +80,0 @@ bufferAhead?: number;

@@ -1,2 +0,2 @@

import { Endpoint } from "@ndn/endpoint";
import { produce } from "@ndn/endpoint";
import { Name } from "@ndn/packet";

@@ -6,18 +6,20 @@ import { DataProducer } from "./data-producer_browser.js";

* Start serving a segmented object.
* @param prefixInput Data prefix excluding segment number.
* @param source where to read segment payload chunks.
* @param opts other options.
* @param prefix - Data prefix excluding segment number.
* @param source - Where to read segment payload chunks.
* @param opts - Other options.
*
* @remarks
* This function does not automatically add a version component to the name prefix.
* If a version component is desired, use serveVersioned() function instead.
* If a version component is desired, use {@link serveVersioned} function instead.
*/
export function serve(prefixInput, source, opts = {}) {
const prefix = Name.from(prefixInput);
const { endpoint = new Endpoint() } = opts;
const dp = DataProducer.create(source, prefix, opts);
const ep = endpoint.produce(opts.producerPrefix ?? prefix, dp.processInterest, {
export function serve(prefix, source, opts = {}) {
prefix = Name.from(prefix);
const { producerPrefix = prefix, pOpts: pOptsInput, } = opts;
const pOpts = {
describe: `serve(${prefix})`,
concurrency: 16,
describe: opts.describe ?? `serve(${prefix})`,
announcement: opts.announcement,
});
...pOptsInput,
};
const dp = DataProducer.create(source, prefix, { signer: pOpts.dataSigner, ...opts });
const ep = produce(producerPrefix, dp.processInterest, pOpts);
return {

@@ -24,0 +26,0 @@ prefix,

@@ -1,2 +0,2 @@

import { Endpoint } from "@ndn/endpoint";
import { produce } from "@ndn/endpoint";
import { Name } from "@ndn/packet";

@@ -6,18 +6,20 @@ import { DataProducer } from "./data-producer_node.js";

* Start serving a segmented object.
* @param prefixInput Data prefix excluding segment number.
* @param source where to read segment payload chunks.
* @param opts other options.
* @param prefix - Data prefix excluding segment number.
* @param source - Where to read segment payload chunks.
* @param opts - Other options.
*
* @remarks
* This function does not automatically add a version component to the name prefix.
* If a version component is desired, use serveVersioned() function instead.
* If a version component is desired, use {@link serveVersioned} function instead.
*/
export function serve(prefixInput, source, opts = {}) {
const prefix = Name.from(prefixInput);
const { endpoint = new Endpoint() } = opts;
const dp = DataProducer.create(source, prefix, opts);
const ep = endpoint.produce(opts.producerPrefix ?? prefix, dp.processInterest, {
export function serve(prefix, source, opts = {}) {
prefix = Name.from(prefix);
const { producerPrefix = prefix, pOpts: pOptsInput, } = opts;
const pOpts = {
describe: `serve(${prefix})`,
concurrency: 16,
describe: opts.describe ?? `serve(${prefix})`,
announcement: opts.announcement,
});
...pOptsInput,
};
const dp = DataProducer.create(source, prefix, { signer: pOpts.dataSigner, ...opts });
const ep = produce(producerPrefix, dp.processInterest, pOpts);
return {

@@ -24,0 +26,0 @@ prefix,

@@ -1,2 +0,2 @@

import { Endpoint } from "@ndn/endpoint";
import { type ProducerOptions } from "@ndn/endpoint";
import { type Data, type Interest, Name, type NameLike } from "@ndn/packet";

@@ -12,4 +12,5 @@ import type { ChunkSource } from "./chunk-source/mod.js";

*
* The producer handler is already attached to the Endpoint and will react to incoming Interests.
* It's usually unnecessary to call this function manually.
* @remarks
* The producer handler is already attached to the logical forwarder and will respond to incoming
* Interests. It's usually unnecessary to call this function manually.
*/

@@ -22,18 +23,18 @@ processInterest: (interest: Interest) => Promise<Data | undefined>;

* Start serving a segmented object.
* @param prefixInput Data prefix excluding segment number.
* @param source where to read segment payload chunks.
* @param opts other options.
* @param prefix - Data prefix excluding segment number.
* @param source - Where to read segment payload chunks.
* @param opts - Other options.
*
* @remarks
* This function does not automatically add a version component to the name prefix.
* If a version component is desired, use serveVersioned() function instead.
* If a version component is desired, use {@link serveVersioned} function instead.
*/
export declare function serve(prefixInput: NameLike, source: ChunkSource, opts?: serve.Options): Server;
export declare function serve(prefix: NameLike, source: ChunkSource, opts?: serve.Options): Server;
export declare namespace serve {
type Options = DataProducer.Options & {
/** Use specified Endpoint instead of default. */
endpoint?: Endpoint;
/** FwFace description. */
describe?: string;
interface Options extends DataProducer.Options {
/**
* Producer name prefix, if differs from Data prefix.
* Producer name prefix.
* @defaultValue Data prefix.
*
* @remarks
* Specifying a shorter prefix enables name discovery.

@@ -43,8 +44,12 @@ */

/**
* Prefix announcement.
* Default is same as producer name prefix or Data prefix.
* False disables announcement.
* Producer options.
*
* @remarks
* - `.describe` defaults to "serve" + Data prefix.
* - `.concurrency` defaults to 16.
* - `.announcement` defaults to `producerPrefix`.
* - {@link DataProducer.Options.signer} defaults to `.dataSigner`.
*/
announcement?: Endpoint.RouteAnnouncement;
};
pOpts?: ProducerOptions;
}
}
{
"name": "@ndn/segmented-object",
"version": "0.0.20240113",
"version": "0.0.20240630",
"description": "NDNts: Segmented Object",

@@ -22,21 +22,22 @@ "keywords": [

"url": "https://github.com/yoursunny/NDNts.git",
"directory": "packages/segmented-object"
"directory": "pkg/segmented-object"
},
"dependencies": {
"@ndn/endpoint": "0.0.20240113",
"@ndn/fw": "0.0.20240113",
"@ndn/naming-convention2": "0.0.20240113",
"@ndn/packet": "0.0.20240113",
"@ndn/util": "0.0.20240113",
"event-iterator": "^2.0.0",
"hirestime": "^7.0.3",
"mnemonist": "^0.39.7",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"@ndn/endpoint": "0.0.20240630",
"@ndn/fw": "0.0.20240630",
"@ndn/naming-convention2": "0.0.20240630",
"@ndn/packet": "0.0.20240630",
"@ndn/util": "0.0.20240630",
"@zenfs/core": "^0.12.10",
"hirestime": "^7.0.4",
"it-keepalive": "^1.2.0",
"mnemonist": "^0.39.8",
"obliterator": "^2.0.4",
"p-defer": "^4.0.1",
"p-lazy": "^4.0.0",
"streaming-iterables": "^8.0.1",
"tslib": "^2.6.2",
"typescript-event-target": "^1.1.0"
"tslib": "^2.6.3",
"typescript-event-target": "^1.1.1"
},
"types": "lib/mod.d.ts"
}

@@ -11,3 +11,3 @@ # @ndn/segmented-object

* [X] supports version discovery via CanBePrefix.
* [X] supports version discovery via RDR protocol (in `@ndn/rdr` package).
* [X] supports version discovery by requesting metadata (in `@ndn/rdr` package).
* [ ] supports manifest.

@@ -20,3 +20,3 @@ * [X] allows specifying segment range.

* [X] verifies packets with a `Verifier` (fixed key or trust schema).
* [X] emits events as segments arrive.
* [ ] emits events as segments arrive.
* [X] outputs in-order data chunks as a readable stream.

@@ -29,7 +29,7 @@ * [X] outputs completely reassembled object via Promise.

* [X] takes input from readable streams.
* [X] takes input from files (filename in Node.js, `Blob` in browser).
* [X] takes input from files or `Blob`.
* [X] generates segments of fixed size.
* [ ] generates segments of available data as Interest arrives, to minimize delivery latency.
* [X] responds to version discovery Interests with CanBePrefix.
* [X] responds to RDR protocol (in `@ndn/rdr` package).
* [X] responds to metadata requests (in `@ndn/rdr` package).
* [ ] generates manifest.

@@ -36,0 +36,0 @@ * [X] supports segment numbers.

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc