Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

barnard59-base

Package Overview
Dependencies
Maintainers
1
Versions
21
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

barnard59-base - npm Package Compare versions

Comparing version 2.2.0 to 2.3.0

combine.d.ts

11

CHANGELOG.md
# barnard59-base
## 2.3.0
### Minor Changes
- 0c0245d: Bundle TypeScript type declarations
- 464b09e: Added an operation which creates a readable from given values (closes #199)
### Patch Changes
- ba328de: Simplify `base:limit` and `base:offset` by using async generators
## 2.2.0

@@ -4,0 +15,0 @@

import duplexify from 'duplexify'
/**
* Limit the amount of chunks in a pipe.
* @returns {import('stream').Duplex} A transform stream.
* @param {(import('stream').Duplex)[]} streams
* @param {*} options
*/
function combine(streams, options) {

@@ -4,0 +10,0 @@ if (streams.length === 0) {

import { finished, Readable } from 'readable-stream'
class ConcatStream extends Readable {
/**
* @param {(import('stream').Duplex)[]} streams
* @param {{
* objectMode?: boolean
* }} [options]
*/
constructor(streams, { objectMode = false } = {}) {

@@ -13,2 +19,5 @@ super({ objectMode })

/**
* @return {void|boolean|unknown}
*/
_read() {

@@ -41,2 +50,6 @@ if (!this.current) {

/**
* @param {(import('stream').Duplex)[]} streams
* @return {Readable}
*/
function factory(...streams) {

@@ -46,2 +59,6 @@ return new ConcatStream(streams)

/**
* @param {(import('stream').Duplex)[]} streams
* @return {Readable}
*/
const object = (...streams) => {

@@ -48,0 +65,0 @@ return new ConcatStream(streams, { objectMode: true })

import { obj } from 'through2'
/**
* @typedef {(this: import('barnard59-core').Context, chunk: T, encoding: string) => boolean | Promise<boolean>} Filter<T>
* @template T
*/
/**
* @template T
* @this {import('barnard59-core').Context}
* @param {Filter<T>} func
* @return {import('stream').Transform}
*/
function filter(func) {

@@ -4,0 +15,0 @@ return obj((chunk, encoding, callback) => {

17

flatten.js
import { obj } from 'through2'
/**
* @callback ForEachCallback
* @param {unknown} item
* @return {void}
*/
/**
* @typedef {Iterable<unknown> | { forEach(cb: ForEachCallback): void }} IterableLike
*/
/**
* @return {import('stream').Transform} Transform
*/
function flatten() {
return obj(function (chunk, encoding, callback) {
return obj(function (/** IterableLike */ chunk, encoding, callback) {
if (typeof chunk[Symbol.iterator] === 'function') {

@@ -14,3 +27,3 @@ for (const item of chunk) {

if (typeof chunk.forEach === 'function') {
chunk.forEach(item => this.push(item))
chunk.forEach((/** @type {unknown} */ item) => this.push(item))

@@ -17,0 +30,0 @@ return callback()

@@ -1,2 +0,2 @@

import { promisify } from 'util'
import { promisify } from 'node:util'
import { context } from '@opentelemetry/api'

@@ -8,2 +8,13 @@ import stream from 'readable-stream'

/**
* @typedef {Pick<import('barnard59-core').Context, 'createPipeline' | 'variables'> & {
* pipeline: PipelineStream
* variable: string
* }} ForEachOptions
*
* @typedef {import('stream').Duplex & {
* pipeline: import('barnard59-core').Pipeline
* }} PipelineStream
*/
async function nextLoop() {

@@ -14,3 +25,6 @@ return new Promise(resolve => setTimeout(resolve, 0))

class ForEach extends Duplex {
constructor({ createPipeline, pipeline, step, variable, variables }) {
/**
* @param {ForEachOptions} context
*/
constructor({ createPipeline, pipeline, variable, variables }) {
super({ objectMode: true })

@@ -23,3 +37,2 @@

this.createPipeline = createPipeline
this.step = step

@@ -37,3 +50,7 @@ // we only need the ptr of the pipeline to create new copies...

/**
* @type import('barnard59-core').Pipeline
*/
get subPipeline() {
// @ts-ignore
return this.step.children[0]

@@ -43,6 +60,13 @@ }

set subPipeline(subPipeline) {
// @ts-ignore
this.step.children[0] = subPipeline
}
/**
* @param {*} chunk
* @param {string} encoding
* @param {(error?: (Error | null)) => void} callback
*/
async _write(chunk, encoding, callback) {
// @ts-ignore
try {

@@ -57,5 +81,7 @@ const variables = new Map(this.variables)

// @ts-ignore
this.pull = ReadableToReadable.readFrom(this.subPipeline.stream, { end: false })
if (this.subPipeline.writable) {
// @ts-ignore
this.subPipeline.stream.end(chunk)

@@ -69,3 +95,3 @@ }

return callback()
} catch (cause) {
} catch (/** @type {any} */ cause) {
const err = new Error(`error in forEach sub-pipeline ${this.ptr.value}`)

@@ -94,2 +120,8 @@

/**
* @this {import('barnard59-core').Context}
* @param {PipelineStream} pipeline
* @param {string} variable
* @return {ForEach}
*/
function factory(pipeline, variable) {

@@ -99,3 +131,2 @@ return new ForEach({

createPipeline: this.createPipeline,
step: this.step,
variable,

@@ -102,0 +133,0 @@ variables: this.variables,

14

glob.js

@@ -1,2 +0,2 @@

import { promisify } from 'util'
import { promisify } from 'node:util'
import { SpanStatusCode } from '@opentelemetry/api'

@@ -8,5 +8,11 @@ import globFn from 'glob'

/**
* @this {import('barnard59-core').Context}
* @param {{ pattern: string } & import('glob').IOptions} options
* @return {Readable}
*/
function glob({ pattern, ...options }) {
const { logger } = this
let filenames = null
/** @type {string[]} */
let filenames = []

@@ -21,2 +27,3 @@ const span = tracer.startSpan('glob')

/** @type {Readable} */
const stream = new Readable({

@@ -39,4 +46,5 @@ objectMode: true,

// @ts-ignore
stream._read()
} catch (err) {
} catch (/** @type {any} */ err) {
span.recordException(err)

@@ -43,0 +51,0 @@ span.setStatus({ code: SpanStatusCode.ERROR, message: err.message })

@@ -5,2 +5,3 @@ export { default as combine } from './combine.js'

export { default as flatten } from './flatten.js'
export { default as forEach } from './forEach.js'
export { default as glob } from './glob.js'

@@ -7,0 +8,0 @@ export { parse as jsonParse, stringify as jsonStringify } from './json.js'

@@ -11,2 +11,7 @@ import { Transform } from 'readable-stream'

/**
* @param {*} chunk
* @param {string} encoding
* @param {(error?: Error | null, data?: any) => void} callback
*/
_transform(chunk, encoding, callback) {

@@ -25,2 +30,7 @@ callback(null, JSON.parse(chunk.toString()))

/**
* @param {*} chunk
* @param {string} encoding
* @param {(error?: Error | null, data?: any) => void} callback
*/
_transform(chunk, encoding, callback) {

@@ -31,2 +41,5 @@ callback(null, JSON.stringify(chunk))

/**
* @return {Transform}
*/
function parse() {

@@ -36,2 +49,5 @@ return new JsonParse()

/**
* @return {Transform}
*/
function stringify() {

@@ -38,0 +54,0 @@ return new JsonStringify()

@@ -1,25 +0,18 @@

import { obj } from 'through2'
/**
* Limit the amount of chunks in a pipe.
* @param {number} limit Limit the amount of chunks passed through the pipe.
* @memberof module:barnard59
*/
function limit(limit) {
const t = obj(function (chunk, encoding, callback) {
t.count++
if (t.count <= t.limit) {
this.push(chunk)
export default function limit(limit) {
/**
* @param {AsyncIterable<*>} stream
*/
return async function * (stream) {
let count = 0
for await (const chunk of stream) {
count++
if (count <= limit) {
yield chunk
}
}
callback()
})
t.limit = limit
t.count = 0
return t
}
}
export default limit
import transform from 'parallel-transform'
/**
* @typedef {(this: import('barnard59-core').Context, chunk: From) => Promise<To> | To} MapCallback
* @template From, To
*/
/**
* @typedef {{
* map: MapCallback<From, To>
* concurrency?: number
* ordered?: boolean
* objectMode?: boolean
* }|MapCallback<From, To>} MapOptions
* @template From, To
*/
/**
* Processes chunks with a transform function
*
* @param {Object|Function} options Transform function or complex options
* @params {Function} options.map Transform function
* @param {Number} [options.concurrency=1] The max number of concurrent chunks being transformed
* @param {Boolean} [options.ordered=true] Option to keep order of asynchronously transformed chunks
* @param {Boolean} [options.objectMode=true] Option to transform chunks in object mode
* @return {ReadableStream}
* @this {import('barnard59-core').Context}
* @param {MapOptions<From, To>} options Transform function or complex options
* @return {import('stream').Transform}
* @template From, To
*/
export default function map(options) {
/**
* @type {MapCallback<*, *>}
*/
let func

@@ -15,0 +31,0 @@ let concurrency = 1

@@ -8,2 +8,7 @@ import { Writable } from 'readable-stream'

/**
* @param {any} chunk
* @param {string} encoding
* @param {() => void} callback
*/
_write(chunk, encoding, callback) {

@@ -14,2 +19,5 @@ callback()

/**
* @return {Writable}
*/
function factory() {

@@ -16,0 +24,0 @@ return new Nul()

@@ -1,20 +0,19 @@

import { obj } from 'through2'
/**
* Limit the amount of chunks in a pipe.
* @param {number} offset
*/
export default function (offset) {
/**
* @param {AsyncIterable<*>} stream
*/
return async function * (stream) {
let count = 0
function offset(offset) {
const t = obj(function (chunk, encoding, callback) {
t.count++
if (t.count > t.offset) {
this.push(chunk)
for await (const chunk of stream) {
count++
if (count > offset) {
yield chunk
}
}
callback()
})
t.offset = offset
t.count = 0
return t
}
}
export default offset
{
"name": "barnard59-base",
"version": "2.2.0",
"version": "2.3.0",
"description": "Linked Data pipelines",

@@ -8,3 +8,6 @@ "main": "index.js",

"scripts": {
"test": "mocha"
"test": "mocha",
"prebuild": "rimraf *.d.ts",
"build": "tsc",
"prepack": "npm run build"
},

@@ -17,3 +20,3 @@ "repository": {

"keywords": [],
"author": "Thomas Bergwinkl <bergi@axolotlfarm.org> (https://www.bergnet.org/people/bergi/card#me)",
"author": "Zazuko GmbH",
"license": "MIT",

@@ -35,8 +38,14 @@ "bugs": {

"devDependencies": {
"barnard59-core": "^5.0.0",
"@types/duplexify": "^3.6.4",
"@types/glob": "^7.2.0",
"@types/parallel-transform": "^1.1.4",
"@types/readable-stream": "^4.0.10",
"@types/readable-to-readable": "^0.1.0",
"@types/through2": "^2.0.41",
"barnard59-core": "^5.3.0",
"chai": "^4.3.10",
"get-stream": "^6.0.1",
"into-stream": "^7.0.0",
"isstream": "^0.1.2",
"mocha": "^9.0.1",
"is-stream": "^3.0.0",
"rimraf": "^3.0.2",
"sinon": "^17.0.0"

@@ -48,4 +57,5 @@ },

"mocha": {
"require": "../../test/mocha-setup.cjs"
"require": "../../test/mocha-setup.cjs",
"loader": "tsm"
}
}
import { Transform } from 'readable-stream'
class StdOut extends Transform {
/**
* @param {Uint8Array | string} chunk
* @param {BufferEncoding} encoding
* @param {(error?: Error | null, data?: any) => void} callback
*/
_transform(chunk, encoding, callback) {

@@ -11,2 +16,5 @@ process.stdout.write(chunk, encoding)

/**
* @return {Transform}
*/
export function stdout() {

@@ -16,4 +24,7 @@ return new StdOut()

/**
* @return {import('stream').Readable}
*/
export function stdin() {
return process.stdin
}
import { deepStrictEqual, strictEqual, rejects } from 'assert'
import getStream, { array } from 'get-stream'
import { isReadable, isWritable } from 'isstream'
import { isReadableStream as isReadable, isWritableStream as isWritable } from 'is-stream'
import { Readable } from 'readable-stream'

@@ -5,0 +5,0 @@ import concat, { object } from '../concat.js'

import { deepStrictEqual, strictEqual } from 'assert'
import { array } from 'get-stream'
import { isReadable, isWritable } from 'isstream'
import { isReadableStream as isReadable, isWritableStream as isWritable } from 'is-stream'
import { Readable } from 'readable-stream'

@@ -5,0 +5,0 @@ import flatten from '../flatten.js'

import { deepStrictEqual, strictEqual } from 'assert'
import { array } from 'get-stream'
import { isReadable, isWritable } from 'isstream'
import { isReadableStream as isReadable, isWritableStream as isWritable } from 'is-stream'
import sinon from 'sinon'

@@ -5,0 +5,0 @@ import { expect } from 'chai'

@@ -15,2 +15,5 @@ import Readable from 'readable-stream'

/**
* @deprecated Use `base:streamValues` instead.
*/
function stringToReadable(str) {

@@ -20,2 +23,5 @@ return new ToReadable(str)

/**
* @deprecated Use `base:streamValues` instead.
*/
function objectToReadable(obj) {

@@ -22,0 +28,0 @@ return new ToReadable(obj, { objectMode: true })

@@ -11,2 +11,7 @@ import { Transform } from 'readable-stream'

/**
* @param {Uint8Array | string} chunk
* @param {BufferEncoding} encoding
* @param {import('stream').TransformCallback} callback
*/
_transform(chunk, encoding, callback) {

@@ -17,2 +22,5 @@ callback(null, chunk.toString())

/**
* @return {Transform}
*/
function factory() {

@@ -19,0 +27,0 @@ return new ToString()

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc