Comparing version 1.5.11 to 1.5.12
# Changelog | ||
* v1.5.12 | ||
* corrected small pipeline-related issues | ||
* v1.5.11 (void) | ||
* v1.5.10 | ||
* pipelines overhaul | ||
* mubsub change to @nodebb/mubsub | ||
* v1.5.9: | ||
* added some complete, meaningful examples | ||
* v1.5.8 | ||
@@ -3,0 +11,0 @@ * added deadletter support |
{ | ||
"name": "keuss", | ||
"version": "1.5.11", | ||
"version": "1.5.12", | ||
"keywords": [ | ||
@@ -5,0 +5,0 @@ "queue", |
@@ -106,2 +106,3 @@ const _ = require ('lodash'); | ||
this.src().ok (elem._id, err => { | ||
if (err) this.emit ('error', {on: 'src-queue-commit-on-drop', elem, err}); | ||
debug ('%s: processed, marked to be dropped: %s', this._name, elem._id); | ||
@@ -126,3 +127,3 @@ this._process (ondata); | ||
debug ('error in next:', err); | ||
this.emit ('error', {on: 'next-queue-on-error', elem, opts, err}); | ||
this.emit ('error', {on: 'next-queue', elem, opts, err}); | ||
} | ||
@@ -129,0 +130,0 @@ |
@@ -21,2 +21,3 @@ # Keuss Pipelines | ||
- [Semantic `this` in process function](#semantic-this-in-process-function) | ||
- [Events](#events) | ||
- [DirectLink](#directlink) | ||
@@ -40,7 +41,7 @@ - [Creation](#creation-1) | ||
# About | ||
Pipelines is a Keuss extension for building [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load) processing graphs with ease while guaranteeing atomicity in the processing: whatever happens at the processing of an element, the element is guaranteed to be in either the source or i the destination queue; never in both, never in none. | ||
Pipelines is a Keuss extension for building [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load) processing graphs with ease while guaranteeing atomicity in the processing: whatever happens at the processing of an element, the element is guaranteed to be in either the source or in the destination queue; never in both, never in none. | ||
Keuss pipelines are build upon Keuss Queues with *pipeline* capacity, which means Pipeliens inherit all their advantges in terms of HA, durability and performance. So far, Keuss offers only one Queue backend with pipeline capacity, `pl-mongo` | ||
Keuss pipelines are build upon Keuss Queues with *pipeline* capacity, which means Pipelines inherit all their advantages in terms of HA, durability and performance. So far, Keuss offers only one Queue backend with pipeline capacity, `pl-mongo` | ||
Queues are linked together with processing units named *Processors*, which glue together a source queue with zero or more destination queues. Each processor encapsulates a loop that could be described -in its simplet form- as follows: | ||
Queues are linked together with processing units named *Processors*, which glue together a source queue with zero or more destination queues. Each processor encapsulates a loop that could be described -in its simplest form- as follows: | ||
@@ -155,3 +156,3 @@ | ||
``` | ||
the '`update` param of the second arg to `done()` is pased itnernally to `pl_step()` as `opts.update`: this would cause the message's `payload.passed` to be set to `true` even if there's no explicit mention of `payload` | ||
the '`update` parameter of the second argument to `done()` is passed internally to `pl_step()` as `opts.update`: this would cause the message's `payload.passed` to be set to `true` even if there's no explicit mention of `payload` | ||
@@ -180,4 +181,4 @@ The whole `pl_step()` operation is guaranteed to be atomic; this includes applying of `opts.payload` or `opts.update` if present | ||
They default to `2` and `1` respectively | ||
* `mature`: Date instance or unix timestamp (in milliseconds, as integer) expressing the not-before timestamp for the item, to be used when calling pl_step in the src queue | ||
* `delay`: delay in seconds to calculate mature, if mature is not specified | ||
* `mature`: Date instance or unix timestamp (in milliseconds, as integer) expressing the not-before timestamp for the item, to be used when calling `pl_step()` in the src queue | ||
* `delay`: delay in seconds to calculate `mature`, if `mature` is not specified | ||
@@ -215,3 +216,3 @@ ### Methods | ||
* else the item is passed to the text queue in the pipeline (by means of `pl_next()`) | ||
* if `res.mature` or `res.delay` exist (or they were specified at the processor's creation) they are used to calculate the delay/mature fo the element in the destination queue | ||
* if `res.mature` or `res.delay` exist (or they were specified at the processor's creation) they are used to calculate the delay/mature of the element in the destination queue | ||
* if `res.payload` exists it is used to replace the item's payload entirely | ||
@@ -223,6 +224,18 @@ * else if `res.update` exists it is used as mongodb-update operations on the item's payload | ||
#### Semantic `this` in process function | ||
The function is bound to the processor, so the funcion can access and use processor's primitives. For example, it can insert copies of the item, or new items, in any of the source or destination queues | ||
The function is bound to the processor, so the function can access and use processor's primitives. For example, it can insert copies of the item, or new items, in any of the source or destination queues | ||
In order to use this functionality the process function can not be declared as an 'arrow' function, since thos can not be bound. Use the classic `function xxxx (item, cb) {...}` if you intend to access the underlying Processor | ||
In order to use this functionality the process function can not be declared as an 'arrow' function, since those can not be bound. Use the classic `function xxxx (item, cb) {...}` if you intend to access the underlying Processor | ||
### Events | ||
BaseLink inherits from `EventEmitter` and publishes the following events: | ||
* `error`: an error happened in the internal loop. It comes with one parameter, an object with the following fields: | ||
* `on`: exact type of error: | ||
* `src-queue-pop`: error while reserving an element on the src queue | ||
* `src-queue-commit-on-error`: error while committing an element on the src queue when an error was passed and `err.drop==true` | ||
* `src-queue-rollback-on-error`: error while rolling back an element on the src queue when an error was passed | ||
* `src-queue-commit-on-drop`: error while committing an element an element on the src queue when processed ok and `res.drop==true` | ||
* `next-queue`: error while atomically moving the element to the next queue | ||
* `elem`: element that caused the error. Not present in `src-queue-pop` | ||
* `error`: original error object | ||
* `opts`: (only present in `next-queue`) options passed internally to `pl_step()` | ||
@@ -280,6 +293,4 @@ ## DirectLink | ||
### Process Function | ||
==== BASE ==== | ||
ChoiceLink expects an `res.dst` in the callback invocation, which must fullfill one of those conditions: | ||
* be an integer and resolv to a valid element when applied as index to the array of destination queues | ||
==== BASE ==== | ||
* be an integer and resolve to a valid element when applied as index to the array of destination queues | ||
* be a string and correspond to the name of one of the destination queues | ||
@@ -286,0 +297,0 @@ |
@@ -146,3 +146,3 @@ const async = require ('async'); | ||
err.should.match ({ | ||
on: 'next-queue-on-error', | ||
on: 'next-queue', | ||
elem: { | ||
@@ -196,3 +196,3 @@ payload: { a: 666, b: 'see it fail...' }, | ||
err.should.match ({ | ||
on: 'next-queue-on-error', | ||
on: 'next-queue', | ||
elem: { | ||
@@ -199,0 +199,0 @@ payload: { a: 666, b: 'see it fail...' }, |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
1503101
8966
0