barnard59-base
Advanced tools
Comparing version 2.2.0 to 2.3.0
# barnard59-base | ||
## 2.3.0 | ||
### Minor Changes | ||
- 0c0245d: Bundle TypeScript type declarations | ||
- 464b09e: Added an operation which creates a readable from given values (closes #199) | ||
### Patch Changes | ||
- ba328de: Simplify `base:limit` and `base:offset` by using async generators | ||
## 2.2.0 | ||
@@ -4,0 +15,0 @@ |
import duplexify from 'duplexify' | ||
/** | ||
* Limit the amount of chunks in a pipe. | ||
* @returns {import('stream').Duplex} A transform stream. | ||
* @param {(import('stream').Duplex)[]} streams | ||
* @param {*} options | ||
*/ | ||
function combine(streams, options) { | ||
@@ -4,0 +10,0 @@ if (streams.length === 0) { |
import { finished, Readable } from 'readable-stream' | ||
class ConcatStream extends Readable { | ||
/** | ||
* @param {(import('stream').Duplex)[]} streams | ||
* @param {{ | ||
* objectMode?: boolean | ||
* }} [options] | ||
*/ | ||
constructor(streams, { objectMode = false } = {}) { | ||
@@ -13,2 +19,5 @@ super({ objectMode }) | ||
/** | ||
* @return {void|boolean|unknown} | ||
*/ | ||
_read() { | ||
@@ -41,2 +50,6 @@ if (!this.current) { | ||
/** | ||
* @param {(import('stream').Duplex)[]} streams | ||
* @return {Readable} | ||
*/ | ||
function factory(...streams) { | ||
@@ -46,2 +59,6 @@ return new ConcatStream(streams) | ||
/** | ||
* @param {(import('stream').Duplex)[]} streams | ||
* @return {Readable} | ||
*/ | ||
const object = (...streams) => { | ||
@@ -48,0 +65,0 @@ return new ConcatStream(streams, { objectMode: true }) |
import { obj } from 'through2' | ||
/** | ||
* @typedef {(this: import('barnard59-core').Context, chunk: T, encoding: string) => boolean | Promise<boolean>} Filter<T> | ||
* @template T | ||
*/ | ||
/** | ||
* @template T | ||
* @this {import('barnard59-core').Context} | ||
* @param {Filter<T>} func | ||
* @return {import('stream').Transform} | ||
*/ | ||
function filter(func) { | ||
@@ -4,0 +15,0 @@ return obj((chunk, encoding, callback) => { |
import { obj } from 'through2' | ||
/** | ||
* @callback ForEachCallback | ||
* @param {unknown} item | ||
* @return {void} | ||
*/ | ||
/** | ||
* @typedef {Iterable<unknown> | { forEach(cb: ForEachCallback): void }} IterableLike | ||
*/ | ||
/** | ||
* @return {import('stream').Transform} Transform | ||
*/ | ||
function flatten() { | ||
return obj(function (chunk, encoding, callback) { | ||
return obj(function (/** IterableLike */ chunk, encoding, callback) { | ||
if (typeof chunk[Symbol.iterator] === 'function') { | ||
@@ -14,3 +27,3 @@ for (const item of chunk) { | ||
if (typeof chunk.forEach === 'function') { | ||
chunk.forEach(item => this.push(item)) | ||
chunk.forEach((/** @type {unknown} */ item) => this.push(item)) | ||
@@ -17,0 +30,0 @@ return callback() |
@@ -1,2 +0,2 @@ | ||
import { promisify } from 'util' | ||
import { promisify } from 'node:util' | ||
import { context } from '@opentelemetry/api' | ||
@@ -8,2 +8,13 @@ import stream from 'readable-stream' | ||
/** | ||
* @typedef {Pick<import('barnard59-core').Context, 'createPipeline' | 'variables'> & { | ||
* pipeline: PipelineStream | ||
* variable: string | ||
* }} ForEachOptions | ||
* | ||
* @typedef {import('stream').Duplex & { | ||
* pipeline: import('barnard59-core').Pipeline | ||
* }} PipelineStream | ||
*/ | ||
async function nextLoop() { | ||
@@ -14,3 +25,6 @@ return new Promise(resolve => setTimeout(resolve, 0)) | ||
class ForEach extends Duplex { | ||
constructor({ createPipeline, pipeline, step, variable, variables }) { | ||
/** | ||
* @param {ForEachOptions} context | ||
*/ | ||
constructor({ createPipeline, pipeline, variable, variables }) { | ||
super({ objectMode: true }) | ||
@@ -23,3 +37,2 @@ | ||
this.createPipeline = createPipeline | ||
this.step = step | ||
@@ -37,3 +50,7 @@ // we only need the ptr of the pipeline to create new copies... | ||
/** | ||
* @type import('barnard59-core').Pipeline | ||
*/ | ||
get subPipeline() { | ||
// @ts-ignore | ||
return this.step.children[0] | ||
@@ -43,6 +60,13 @@ } | ||
set subPipeline(subPipeline) { | ||
// @ts-ignore | ||
this.step.children[0] = subPipeline | ||
} | ||
/** | ||
* @param {*} chunk | ||
* @param {string} encoding | ||
* @param {(error?: (Error | null)) => void} callback | ||
*/ | ||
async _write(chunk, encoding, callback) { | ||
// @ts-ignore | ||
try { | ||
@@ -57,5 +81,7 @@ const variables = new Map(this.variables) | ||
// @ts-ignore | ||
this.pull = ReadableToReadable.readFrom(this.subPipeline.stream, { end: false }) | ||
if (this.subPipeline.writable) { | ||
// @ts-ignore | ||
this.subPipeline.stream.end(chunk) | ||
@@ -69,3 +95,3 @@ } | ||
return callback() | ||
} catch (cause) { | ||
} catch (/** @type {any} */ cause) { | ||
const err = new Error(`error in forEach sub-pipeline ${this.ptr.value}`) | ||
@@ -94,2 +120,8 @@ | ||
/** | ||
* @this {import('barnard59-core').Context} | ||
* @param {PipelineStream} pipeline | ||
* @param {string} variable | ||
* @return {ForEach} | ||
*/ | ||
function factory(pipeline, variable) { | ||
@@ -99,3 +131,2 @@ return new ForEach({ | ||
createPipeline: this.createPipeline, | ||
step: this.step, | ||
variable, | ||
@@ -102,0 +133,0 @@ variables: this.variables, |
14
glob.js
@@ -1,2 +0,2 @@ | ||
import { promisify } from 'util' | ||
import { promisify } from 'node:util' | ||
import { SpanStatusCode } from '@opentelemetry/api' | ||
@@ -8,5 +8,11 @@ import globFn from 'glob' | ||
/** | ||
* @this {import('barnard59-core').Context} | ||
* @param {{ pattern: string } & import('glob').IOptions} options | ||
* @return {Readable} | ||
*/ | ||
function glob({ pattern, ...options }) { | ||
const { logger } = this | ||
let filenames = null | ||
/** @type {string[]} */ | ||
let filenames = [] | ||
@@ -21,2 +27,3 @@ const span = tracer.startSpan('glob') | ||
/** @type {Readable} */ | ||
const stream = new Readable({ | ||
@@ -39,4 +46,5 @@ objectMode: true, | ||
// @ts-ignore | ||
stream._read() | ||
} catch (err) { | ||
} catch (/** @type {any} */ err) { | ||
span.recordException(err) | ||
@@ -43,0 +51,0 @@ span.setStatus({ code: SpanStatusCode.ERROR, message: err.message }) |
@@ -5,2 +5,3 @@ export { default as combine } from './combine.js' | ||
export { default as flatten } from './flatten.js' | ||
export { default as forEach } from './forEach.js' | ||
export { default as glob } from './glob.js' | ||
@@ -7,0 +8,0 @@ export { parse as jsonParse, stringify as jsonStringify } from './json.js' |
16
json.js
@@ -11,2 +11,7 @@ import { Transform } from 'readable-stream' | ||
/** | ||
* @param {*} chunk | ||
* @param {string} encoding | ||
* @param {(error?: Error | null, data?: any) => void} callback | ||
*/ | ||
_transform(chunk, encoding, callback) { | ||
@@ -25,2 +30,7 @@ callback(null, JSON.parse(chunk.toString())) | ||
/** | ||
* @param {*} chunk | ||
* @param {string} encoding | ||
* @param {(error?: Error | null, data?: any) => void} callback | ||
*/ | ||
_transform(chunk, encoding, callback) { | ||
@@ -31,2 +41,5 @@ callback(null, JSON.stringify(chunk)) | ||
/** | ||
* @return {Transform} | ||
*/ | ||
function parse() { | ||
@@ -36,2 +49,5 @@ return new JsonParse() | ||
/** | ||
* @return {Transform} | ||
*/ | ||
function stringify() { | ||
@@ -38,0 +54,0 @@ return new JsonStringify() |
31
limit.js
@@ -1,25 +0,18 @@ | ||
import { obj } from 'through2' | ||
/** | ||
* Limit the amount of chunks in a pipe. | ||
* @param {number} limit Limit the amount of chunks passed through the pipe. | ||
* @memberof module:barnard59 | ||
*/ | ||
function limit(limit) { | ||
const t = obj(function (chunk, encoding, callback) { | ||
t.count++ | ||
if (t.count <= t.limit) { | ||
this.push(chunk) | ||
export default function limit(limit) { | ||
/** | ||
* @param {AsyncIterable<*>} stream | ||
*/ | ||
return async function * (stream) { | ||
let count = 0 | ||
for await (const chunk of stream) { | ||
count++ | ||
if (count <= limit) { | ||
yield chunk | ||
} | ||
} | ||
callback() | ||
}) | ||
t.limit = limit | ||
t.count = 0 | ||
return t | ||
} | ||
} | ||
export default limit |
28
map.js
import transform from 'parallel-transform' | ||
/** | ||
* @typedef {(this: import('barnard59-core').Context, chunk: From) => Promise<To> | To} MapCallback | ||
* @template From, To | ||
*/ | ||
/** | ||
* @typedef {{ | ||
* map: MapCallback<From, To> | ||
* concurrency?: number | ||
* ordered?: boolean | ||
* objectMode?: boolean | ||
* }|MapCallback<From, To>} MapOptions | ||
* @template From, To | ||
*/ | ||
/** | ||
* Processes chunks with a transform function | ||
* | ||
* @param {Object|Function} options Transform function or complex options | ||
* @params {Function} options.map Transform function | ||
* @param {Number} [options.concurrency=1] The max number of concurrent chunks being transformed | ||
* @param {Boolean} [options.ordered=true] Option to keep order of asynchronously transformed chunks | ||
* @param {Boolean} [options.objectMode=true] Option to transform chunks in object mode | ||
* @return {ReadableStream} | ||
* @this {import('barnard59-core').Context} | ||
* @param {MapOptions<From, To>} options Transform function or complex options | ||
* @return {import('stream').Transform} | ||
* @template From, To | ||
*/ | ||
export default function map(options) { | ||
/** | ||
* @type {MapCallback<*, *>} | ||
*/ | ||
let func | ||
@@ -15,0 +31,0 @@ let concurrency = 1 |
@@ -8,2 +8,7 @@ import { Writable } from 'readable-stream' | ||
/** | ||
* @param {any} chunk | ||
* @param {string} encoding | ||
* @param {() => void} callback | ||
*/ | ||
_write(chunk, encoding, callback) { | ||
@@ -14,2 +19,5 @@ callback() | ||
/** | ||
* @return {Writable} | ||
*/ | ||
function factory() { | ||
@@ -16,0 +24,0 @@ return new Nul() |
@@ -1,20 +0,19 @@ | ||
import { obj } from 'through2' | ||
/** | ||
* Limit the amount of chunks in a pipe. | ||
* @param {number} offset | ||
*/ | ||
export default function (offset) { | ||
/** | ||
* @param {AsyncIterable<*>} stream | ||
*/ | ||
return async function * (stream) { | ||
let count = 0 | ||
function offset(offset) { | ||
const t = obj(function (chunk, encoding, callback) { | ||
t.count++ | ||
if (t.count > t.offset) { | ||
this.push(chunk) | ||
for await (const chunk of stream) { | ||
count++ | ||
if (count > offset) { | ||
yield chunk | ||
} | ||
} | ||
callback() | ||
}) | ||
t.offset = offset | ||
t.count = 0 | ||
return t | ||
} | ||
} | ||
export default offset |
{ | ||
"name": "barnard59-base", | ||
"version": "2.2.0", | ||
"version": "2.3.0", | ||
"description": "Linked Data pipelines", | ||
@@ -8,3 +8,6 @@ "main": "index.js", | ||
"scripts": { | ||
"test": "mocha" | ||
"test": "mocha", | ||
"prebuild": "rimraf *.d.ts", | ||
"build": "tsc", | ||
"prepack": "npm run build" | ||
}, | ||
@@ -17,3 +20,3 @@ "repository": { | ||
"keywords": [], | ||
"author": "Thomas Bergwinkl <bergi@axolotlfarm.org> (https://www.bergnet.org/people/bergi/card#me)", | ||
"author": "Zazuko GmbH", | ||
"license": "MIT", | ||
@@ -35,8 +38,14 @@ "bugs": { | ||
"devDependencies": { | ||
"barnard59-core": "^5.0.0", | ||
"@types/duplexify": "^3.6.4", | ||
"@types/glob": "^7.2.0", | ||
"@types/parallel-transform": "^1.1.4", | ||
"@types/readable-stream": "^4.0.10", | ||
"@types/readable-to-readable": "^0.1.0", | ||
"@types/through2": "^2.0.41", | ||
"barnard59-core": "^5.3.0", | ||
"chai": "^4.3.10", | ||
"get-stream": "^6.0.1", | ||
"into-stream": "^7.0.0", | ||
"isstream": "^0.1.2", | ||
"mocha": "^9.0.1", | ||
"is-stream": "^3.0.0", | ||
"rimraf": "^3.0.2", | ||
"sinon": "^17.0.0" | ||
@@ -48,4 +57,5 @@ }, | ||
"mocha": { | ||
"require": "../../test/mocha-setup.cjs" | ||
"require": "../../test/mocha-setup.cjs", | ||
"loader": "tsm" | ||
} | ||
} |
import { Transform } from 'readable-stream' | ||
class StdOut extends Transform { | ||
/** | ||
* @param {Uint8Array | string} chunk | ||
* @param {BufferEncoding} encoding | ||
* @param {(error?: Error | null, data?: any) => void} callback | ||
*/ | ||
_transform(chunk, encoding, callback) { | ||
@@ -11,2 +16,5 @@ process.stdout.write(chunk, encoding) | ||
/** | ||
* @return {Transform} | ||
*/ | ||
export function stdout() { | ||
@@ -16,4 +24,7 @@ return new StdOut() | ||
/** | ||
* @return {import('stream').Readable} | ||
*/ | ||
export function stdin() { | ||
return process.stdin | ||
} |
import { deepStrictEqual, strictEqual, rejects } from 'assert' | ||
import getStream, { array } from 'get-stream' | ||
import { isReadable, isWritable } from 'isstream' | ||
import { isReadableStream as isReadable, isWritableStream as isWritable } from 'is-stream' | ||
import { Readable } from 'readable-stream' | ||
@@ -5,0 +5,0 @@ import concat, { object } from '../concat.js' |
import { deepStrictEqual, strictEqual } from 'assert' | ||
import { array } from 'get-stream' | ||
import { isReadable, isWritable } from 'isstream' | ||
import { isReadableStream as isReadable, isWritableStream as isWritable } from 'is-stream' | ||
import { Readable } from 'readable-stream' | ||
@@ -5,0 +5,0 @@ import flatten from '../flatten.js' |
import { deepStrictEqual, strictEqual } from 'assert' | ||
import { array } from 'get-stream' | ||
import { isReadable, isWritable } from 'isstream' | ||
import { isReadableStream as isReadable, isWritableStream as isWritable } from 'is-stream' | ||
import sinon from 'sinon' | ||
@@ -5,0 +5,0 @@ import { expect } from 'chai' |
@@ -15,2 +15,5 @@ import Readable from 'readable-stream' | ||
/** | ||
* @deprecated Use `base:streamValues` instead. | ||
*/ | ||
function stringToReadable(str) { | ||
@@ -20,2 +23,5 @@ return new ToReadable(str) | ||
/** | ||
* @deprecated Use `base:streamValues` instead. | ||
*/ | ||
function objectToReadable(obj) { | ||
@@ -22,0 +28,0 @@ return new ToReadable(obj, { objectMode: true }) |
@@ -11,2 +11,7 @@ import { Transform } from 'readable-stream' | ||
/** | ||
* @param {Uint8Array | string} chunk | ||
* @param {BufferEncoding} encoding | ||
* @param {import('stream').TransformCallback} callback | ||
*/ | ||
_transform(chunk, encoding, callback) { | ||
@@ -17,2 +22,5 @@ callback(null, chunk.toString()) | ||
/** | ||
* @return {Transform} | ||
*/ | ||
function factory() { | ||
@@ -19,0 +27,0 @@ return new ToString() |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41187
45
1157
13