barnard59-core
Advanced tools
Comparing version 5.2.0 to 5.3.0
# barnard59-core | ||
## 5.3.0 | ||
### Minor Changes | ||
- ba328de: Support steps being implemented as async generators | ||
### Patch Changes | ||
- 70b50da: Fix: wrong `Context` type used for `Operation` | ||
- a172b45: Relax pointer arguments | ||
- 86131dc: `Pipeline#init` made public | ||
## 5.2.0 | ||
@@ -4,0 +16,0 @@ |
@@ -9,2 +9,4 @@ import type { Logger } from 'winston'; | ||
import Pipeline, { PipelineOptions } from './lib/Pipeline.js'; | ||
export type { default as Step } from './lib/Step.js'; | ||
export type { default as Pipeline, PipelineOptions } from './lib/Pipeline.js'; | ||
export interface Variables { | ||
@@ -11,0 +13,0 @@ } |
import * as winston from 'winston'; | ||
declare const levels: { | ||
error: number; | ||
warn: number; | ||
info: number; | ||
verbose: number; | ||
debug: number; | ||
trace: number; | ||
}; | ||
export type LogLevels = keyof typeof levels; | ||
declare module 'winston' { | ||
@@ -3,0 +12,0 @@ interface Logger { |
@@ -1,7 +0,5 @@ | ||
/// <reference types="node" /> | ||
import { Context } from 'vm'; | ||
import type { GraphPointer } from 'clownface'; | ||
import { Logger } from 'winston'; | ||
import { LoaderRegistry } from 'rdf-loaders-registry'; | ||
import { VariableMap } from '../../index.js'; | ||
import { Context, VariableMap } from '../../index.js'; | ||
declare function createArguments(ptr: GraphPointer, { basePath, context, loaderRegistry, variables }: { | ||
@@ -8,0 +6,0 @@ basePath: string; |
@@ -1,3 +0,1 @@ | ||
/// <reference types="node" /> | ||
import { Context } from 'vm'; | ||
import type { MultiPointer } from 'clownface'; | ||
@@ -7,4 +5,4 @@ import { Logger } from 'winston'; | ||
import { Stream } from 'readable-stream'; | ||
import { VariableMap } from '../../index.js'; | ||
export type Operation = (this: Context, ...args: unknown[]) => Promise<Stream> | Stream; | ||
import { Context, VariableMap } from '../../index.js'; | ||
export type Operation = (this: Context, ...args: unknown[]) => Promise<Stream | (() => AsyncGenerator)> | Stream | (() => AsyncGenerator); | ||
declare function createOperation(ptr: MultiPointer, { basePath, context, loaderRegistry, logger, variables }: { | ||
@@ -11,0 +9,0 @@ basePath: string; |
@@ -1,2 +0,2 @@ | ||
import type { GraphPointer } from 'clownface'; | ||
import type { DatasetCore, Term } from 'rdf-js'; | ||
import { Logger } from 'winston'; | ||
@@ -15,3 +15,6 @@ import { LoaderRegistry } from 'rdf-loaders-registry'; | ||
}; | ||
declare function createPipeline(ptr: GraphPointer, init: CreatePipeline): Pipeline; | ||
declare function createPipeline(maybePtr: { | ||
term?: Term; | ||
dataset?: DatasetCore; | ||
}, init: CreatePipeline): Pipeline; | ||
export default createPipeline; |
@@ -8,3 +8,3 @@ import defaultLoaderRegistry from '../defaultLoaderRegistry.js'; | ||
import createVariables from './variables.js'; | ||
function createPipelineContext(ptr, { basePath, context, logger, variables, error }) { | ||
function createPipelineContext({ basePath, context, logger, variables, error }) { | ||
return { error, ...context, basePath, logger, variables }; | ||
@@ -19,9 +19,9 @@ } | ||
} | ||
function createPipeline(ptr, init) { | ||
function createPipeline(maybePtr, init) { | ||
let context = init.context || { env: init.env }; | ||
let { basePath, loaderRegistry = defaultLoaderRegistry(context.env), logger = defaultLogger(), variables = new VariableMapImpl(), } = init; | ||
if (!ptr.term || !ptr.dataset) { | ||
if (!maybePtr.term || !maybePtr.dataset) { | ||
throw new Error('the given graph pointer is invalid'); | ||
} | ||
ptr = context.env.clownface({ dataset: ptr.dataset, term: ptr.term }); | ||
const ptr = context.env.clownface({ dataset: maybePtr.dataset, term: maybePtr.term }); | ||
const onInit = async (pipeline) => { | ||
@@ -35,3 +35,3 @@ function error(err) { | ||
variables = await createPipelineVariables(ptr, { basePath, context, loaderRegistry, logger, variables }); | ||
context = await createPipelineContext(ptr, { basePath, context, logger, variables, error }); | ||
context = await createPipelineContext({ basePath, context, logger, variables, error }); | ||
logVariables(ptr, context, variables); | ||
@@ -38,0 +38,0 @@ // add pipeline factory with current values as defaults |
@@ -8,3 +8,3 @@ import type { GraphPointer } from 'clownface'; | ||
basePath: string; | ||
context: Pick<Context, 'env'>; | ||
context: Context; | ||
loaderRegistry: LoaderRegistry; | ||
@@ -11,0 +11,0 @@ logger: Logger; |
@@ -0,1 +1,2 @@ | ||
import { Duplex } from 'node:stream'; | ||
import { SpanStatusCode } from '@opentelemetry/api'; | ||
@@ -13,3 +14,10 @@ import { isStream } from '../isStream.js'; | ||
const operation = await createOperation(ptr.out(context.env.ns.code.implementedBy), { basePath, context, loaderRegistry, logger, variables }); | ||
const stream = await operation.apply(context, args); | ||
let stream; | ||
const streamOrGenerator = await operation.apply(context, args); | ||
if (typeof streamOrGenerator === 'function') { | ||
stream = Duplex.from(streamOrGenerator); | ||
} | ||
else { | ||
stream = streamOrGenerator; | ||
} | ||
if (!stream || !isStream(stream)) { | ||
@@ -16,0 +24,0 @@ throw new Error(`${ptr.value} didn't return a stream`); |
@@ -20,3 +20,3 @@ import streams, { Stream } from 'readable-stream'; | ||
private readonly ctx; | ||
private readonly init; | ||
readonly init: () => Promise<void>; | ||
readonly read: (size: number) => Promise<void>; | ||
@@ -23,0 +23,0 @@ readonly write: (chunk: unknown, encoding: string, callback: (error?: (Error | null)) => void) => Promise<boolean>; |
{ | ||
"name": "barnard59-core", | ||
"version": "5.2.0", | ||
"version": "5.3.0", | ||
"description": "Core component of Barnard59 Linked Data pipelines", | ||
@@ -9,3 +9,4 @@ "type": "module", | ||
"test": "mocha", | ||
"prepack": "tsc" | ||
"build": "tsc", | ||
"prepack": "npm run build" | ||
}, | ||
@@ -18,3 +19,3 @@ "repository": { | ||
"keywords": [], | ||
"author": "Thomas Bergwinkl <bergi@axolotlfarm.org> (https://www.bergnet.org/people/bergi/card#me)", | ||
"author": "Zazuko GmbH", | ||
"license": "MIT", | ||
@@ -42,3 +43,3 @@ "bugs": { | ||
"@types/readable-stream": "^4.0.9", | ||
"barnard59-env": "^1.2.0", | ||
"barnard59-env": "^1.2.1", | ||
"barnard59-http": "^2.0.0", | ||
@@ -50,3 +51,2 @@ "barnard59-test-support": "^0.0.3", | ||
"lint-staged": "^13.2.2", | ||
"mocha": "^10.2.0", | ||
"nock": "^13.1.0", | ||
@@ -57,4 +57,4 @@ "sinon": "^15.0.4" | ||
"require": "../../test/mocha-setup.cjs", | ||
"loader": "ts-node/esm" | ||
"loader": "tsm" | ||
} | ||
} |
@@ -18,3 +18,3 @@ import { deepStrictEqual, strictEqual } from 'assert' | ||
it('should build key-value pair arguments', async () => { | ||
const definition = await loadPipelineDefinition('arguments') | ||
const { ptr: definition } = await loadPipelineDefinition('arguments') | ||
const ptr = [...definition.node(ns.ex.keyValues).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -31,3 +31,3 @@ | ||
it('should build key-value pair arguments with undefined variable', async () => { | ||
const definition = await loadPipelineDefinition('arguments') | ||
const { ptr: definition } = await loadPipelineDefinition('arguments') | ||
const ptr = [...definition.node(ns.ex.keyValueMissingVar).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -47,3 +47,3 @@ const variables = new VariableMap() | ||
it('should build list arguments', async () => { | ||
const definition = await loadPipelineDefinition('arguments') | ||
const { ptr: definition } = await loadPipelineDefinition('arguments') | ||
const ptr = [...definition.node(ns.ex.list).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -60,3 +60,3 @@ | ||
it('should build list arguments with undefined variable', async () => { | ||
const definition = await loadPipelineDefinition('arguments') | ||
const { ptr: definition } = await loadPipelineDefinition('arguments') | ||
const ptr = [...definition.node(ns.ex.listMissingVar).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -76,3 +76,3 @@ const variables = new VariableMap() | ||
it('should forward variables to the loader', async () => { | ||
const definition = await loadPipelineDefinition('arguments') | ||
const { ptr: definition } = await loadPipelineDefinition('arguments') | ||
const ptr = [...definition.node(ns.ex.variable).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -79,0 +79,0 @@ |
@@ -19,3 +19,3 @@ import { strictEqual } from 'assert' | ||
it('should load the given operation', async () => { | ||
const definition = await loadPipelineDefinition('plain') | ||
const { ptr: definition } = await loadPipelineDefinition('plain') | ||
const ptr = [...definition.node(ns.ex('')).out(ns.p.steps).out(ns.p.stepList).list()][0].out(ns.code.implementedBy) | ||
@@ -22,0 +22,0 @@ |
@@ -20,5 +20,5 @@ import { strictEqual, throws } from 'assert' | ||
it('should return a Pipeline object', async () => { | ||
const definition = await loadPipelineDefinition('plain') | ||
const { ptr } = await loadPipelineDefinition('plain') | ||
const pipeline = createPipeline(definition, { env, basePath: resolve('test') }) | ||
const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
@@ -29,3 +29,3 @@ strictEqual(pipeline instanceof Pipeline, true) | ||
it('should load the given pipeline from a plain graph pointer', async () => { | ||
const definition = await loadPipelineDefinition('plain') | ||
const { ptr: definition } = await loadPipelineDefinition('plain') | ||
const ptr = { dataset: definition.dataset, term: definition.term } | ||
@@ -40,6 +40,6 @@ | ||
it('should throw an error if the term property of the graph pointer is missing', async () => { | ||
const ptr = (await loadPipelineDefinition('read')).any() | ||
const { ptr: graph } = await loadPipelineDefinition('read') | ||
throws(() => { | ||
createPipeline(ptr, { env, basePath: resolve('test') }) | ||
createPipeline(graph.any(), { env, basePath: resolve('test') }) | ||
}) | ||
@@ -49,3 +49,3 @@ }) | ||
it('should throw an error if the dataset property of the graph pointer is missing', async () => { | ||
const ptr = (await loadPipelineDefinition('read')) | ||
const { ptr } = (await loadPipelineDefinition('read')) | ||
@@ -59,3 +59,3 @@ throws(() => { | ||
const basePath = resolve('test') | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -69,3 +69,3 @@ const pipeline = createPipeline(ptr, { env, basePath }) | ||
const context = { abc: 'def', env } | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -79,3 +79,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test'), context }) | ||
it('should create a pipeline with readable interface matching the rdf:type', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -89,3 +89,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should create a pipeline with readable object mode interface matching the rdf:type', async () => { | ||
const ptr = await loadPipelineDefinition('read-object-mode') | ||
const { ptr } = await loadPipelineDefinition('read-object-mode') | ||
@@ -98,3 +98,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should create a pipeline with writable interface matching the rdf:type', async () => { | ||
const ptr = await loadPipelineDefinition('write') | ||
const { ptr } = await loadPipelineDefinition('write') | ||
@@ -108,3 +108,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should create a pipeline with writable object mode interface matching the rdf:type', async () => { | ||
const ptr = await loadPipelineDefinition('write-object-mode') | ||
const { ptr } = await loadPipelineDefinition('write-object-mode') | ||
@@ -117,5 +117,5 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should attach createPipeline to the context', async () => { | ||
const definition = await loadPipelineDefinition('plain') | ||
const { ptr } = await loadPipelineDefinition('plain') | ||
const pipeline = createPipeline(definition, { env, basePath: resolve('test') }) | ||
const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
await pipeline.init() | ||
@@ -128,3 +128,3 @@ | ||
// given | ||
const definition = await loadPipelineDefinition('nested') | ||
const { ptr } = await loadPipelineDefinition('nested') | ||
const logger = { | ||
@@ -139,3 +139,3 @@ debug: sinon.spy(), | ||
// when | ||
const pipeline = createPipeline(definition, { | ||
const pipeline = createPipeline(ptr, { | ||
env, | ||
@@ -142,0 +142,0 @@ basePath: resolve('test'), |
@@ -21,3 +21,3 @@ import { strictEqual, rejects } from 'assert' | ||
it('should load the given step', async () => { | ||
const definition = await loadPipelineDefinition('plain') | ||
const { ptr: definition } = await loadPipelineDefinition('plain') | ||
const ptr = [...definition.node(ns.ex('')).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -36,3 +36,3 @@ | ||
it('should forward errors thrown by the loader', async () => { | ||
const definition = await loadPipelineDefinition('step-operation-missing-error') | ||
const { ptr: definition } = await loadPipelineDefinition('step-operation-missing-error') | ||
const ptr = [...definition.node(ns.ex('')).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -56,3 +56,3 @@ | ||
it('should attach step to the context', async () => { | ||
const definition = await loadPipelineDefinition('step-ptr') | ||
const { ptr: definition } = await loadPipelineDefinition('step-ptr') | ||
const ptr = [...definition.node(ns.ex('')).out(ns.p.steps).out(ns.p.stepList).list()][0] | ||
@@ -59,0 +59,0 @@ |
@@ -15,3 +15,3 @@ import { resolve } from 'path' | ||
it('should return a VariableMap', async () => { | ||
const definition = await loadPipelineDefinition('plain') | ||
const { ptr: definition } = await loadPipelineDefinition('plain') | ||
const ptr = definition.node(ns.ex('')).out(ns.p.variables) | ||
@@ -29,3 +29,3 @@ | ||
it('should load "required" annotation', async () => { | ||
const definition = await loadPipelineDefinition('variables') | ||
const { ptr: definition } = await loadPipelineDefinition('variables') | ||
const ptr = definition.node(ns.ex.inline).out(ns.p.variables) | ||
@@ -43,3 +43,3 @@ | ||
it('should load the given inline variables', async () => { | ||
const definition = await loadPipelineDefinition('variables') | ||
const { ptr: definition } = await loadPipelineDefinition('variables') | ||
const ptr = definition.node(ns.ex.inline).out(ns.p.variables) | ||
@@ -57,3 +57,3 @@ | ||
it('should load the given variables sets', async () => { | ||
const definition = await loadPipelineDefinition('variables') | ||
const { ptr: definition } = await loadPipelineDefinition('variables') | ||
const ptr = definition.node(ns.ex.multiset).out(ns.p.variables) | ||
@@ -60,0 +60,0 @@ |
@@ -14,3 +14,3 @@ import { strictEqual, rejects } from 'assert' | ||
const basePath = resolve('test') | ||
const ptr = await loadPipelineDefinition('plain') | ||
const { ptr } = await loadPipelineDefinition('plain') | ||
@@ -17,0 +17,0 @@ const variables = new Map([ |
@@ -23,3 +23,3 @@ import { strictEqual, rejects } from 'assert' | ||
it('should process the given pipeline definition', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -34,3 +34,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should support writable pipelines', async () => { | ||
const ptr = await loadPipelineDefinition('write') | ||
const { ptr } = await loadPipelineDefinition('write') | ||
@@ -47,3 +47,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should support nested pipelines', async () => { | ||
const ptr = await loadPipelineDefinition('nested') | ||
const { ptr } = await loadPipelineDefinition('nested') | ||
@@ -58,3 +58,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit error when nested pipeline step errors immediately', async () => { | ||
const ptr = await loadPipelineDefinition('nestedErrorBeforeInit') | ||
const { ptr } = await loadPipelineDefinition('nestedErrorBeforeInit') | ||
@@ -69,3 +69,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should support nested writable pipelines', async () => { | ||
const ptr = await loadPipelineDefinition('nested-write') | ||
const { ptr } = await loadPipelineDefinition('nested-write') | ||
const result = [] | ||
@@ -85,3 +85,3 @@ | ||
it('should assign the pipeline stream to the .stream property', async () => { | ||
const ptr = await loadPipelineDefinition('nested') | ||
const { ptr } = await loadPipelineDefinition('nested') | ||
@@ -94,3 +94,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should assign the pipeline to the .pipeline property of the stream', async () => { | ||
const ptr = await loadPipelineDefinition('nested') | ||
const { ptr } = await loadPipelineDefinition('nested') | ||
@@ -103,3 +103,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should have a basePath string property', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -112,3 +112,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should have a context object property', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -121,3 +121,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an error if the Pipeline contains no steps', async () => { | ||
const ptr = await loadPipelineDefinition('empty') | ||
const { ptr } = await loadPipelineDefinition('empty') | ||
@@ -132,3 +132,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should have a ptr clownface property', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -143,3 +143,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should have a ptr variables Map property', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -152,3 +152,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an error if an operation returns an invalid stream', async () => { | ||
const ptr = await loadPipelineDefinition('step-invalid') | ||
const { ptr } = await loadPipelineDefinition('step-invalid') | ||
@@ -163,3 +163,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an error if an operation rejects with error', async () => { | ||
const ptr = await loadPipelineDefinition('step-operation-error') | ||
const { ptr } = await loadPipelineDefinition('step-operation-error') | ||
@@ -174,3 +174,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit step stream errors', async () => { | ||
const ptr = await loadPipelineDefinition('step-stream-error') | ||
const { ptr } = await loadPipelineDefinition('step-stream-error') | ||
@@ -185,3 +185,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should catch and emit step stream errors', async () => { | ||
const ptr = await loadPipelineDefinition('step-stream-throw') | ||
const { ptr } = await loadPipelineDefinition('step-stream-throw') | ||
@@ -197,3 +197,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an end event', async () => { | ||
const ptr = await loadPipelineDefinition('plain') | ||
const { ptr } = await loadPipelineDefinition('plain') | ||
@@ -212,3 +212,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an end event', async () => { | ||
const ptr = await loadPipelineDefinition('read') | ||
const { ptr } = await loadPipelineDefinition('read') | ||
@@ -225,3 +225,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an error if the last step doesn\'t have a readable interface', async () => { | ||
const ptr = await loadPipelineDefinition('read-step-not-read') | ||
const { ptr } = await loadPipelineDefinition('read-step-not-read') | ||
@@ -238,3 +238,3 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) | ||
it('should emit an finish event', async () => { | ||
const ptr = await loadPipelineDefinition('write') | ||
const { ptr } = await loadPipelineDefinition('write') | ||
@@ -241,0 +241,0 @@ const pipeline = createPipeline(ptr, { env, basePath: resolve('test') }) |
99801
11
2075