Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

keuss

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

keuss - npm Package Compare versions

Comparing version 1.5.11 to 1.5.12

8

CHANGELOG.md
# Changelog
* v1.5.12
* corrected small pipeline-related issues
* v1.5.11 (void)
* v1.5.10
* pipelines overhaul
* mubsub change to @nodebb/mubsub
* v1.5.9:
* added some complete, meaningful examples
* v1.5.8

@@ -3,0 +11,0 @@ * added deadletter support

2

package.json
{
"name": "keuss",
"version": "1.5.11",
"version": "1.5.12",
"keywords": [

@@ -5,0 +5,0 @@ "queue",

@@ -106,2 +106,3 @@ const _ = require ('lodash');

this.src().ok (elem._id, err => {
if (err) this.emit ('error', {on: 'src-queue-commit-on-drop', elem, err});
debug ('%s: processed, marked to be dropped: %s', this._name, elem._id);

@@ -126,3 +127,3 @@ this._process (ondata);

debug ('error in next:', err);
this.emit ('error', {on: 'next-queue-on-error', elem, opts, err});
this.emit ('error', {on: 'next-queue', elem, opts, err});
}

@@ -129,0 +130,0 @@

@@ -21,2 +21,3 @@ # Keuss Pipelines

- [Semantic `this` in process function](#semantic-this-in-process-function)
- [Events](#events)
- [DirectLink](#directlink)

@@ -40,7 +41,7 @@ - [Creation](#creation-1)

# About
Pipelines is a Keuss extension for building [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load) processing graphs with ease while guaranteeing atomicity in the processing: whatever happens at the processing of an element, the element is guaranteed to be in either the source or i the destination queue; never in both, never in none.
Pipelines is a Keuss extension for building [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load) processing graphs with ease while guaranteeing atomicity in the processing: whatever happens at the processing of an element, the element is guaranteed to be in either the source or in the destination queue; never in both, never in none.
Keuss pipelines are build upon Keuss Queues with *pipeline* capacity, which means Pipeliens inherit all their advantges in terms of HA, durability and performance. So far, Keuss offers only one Queue backend with pipeline capacity, `pl-mongo`
Keuss pipelines are build upon Keuss Queues with *pipeline* capacity, which means Pipelines inherit all their advantages in terms of HA, durability and performance. So far, Keuss offers only one Queue backend with pipeline capacity, `pl-mongo`
Queues are linked together with processing units named *Processors*, which glue together a source queue with zero or more destination queues. Each processor encapsulates a loop that could be described -in its simplet form- as follows:
Queues are linked together with processing units named *Processors*, which glue together a source queue with zero or more destination queues. Each processor encapsulates a loop that could be described -in its simplest form- as follows:

@@ -155,3 +156,3 @@

```
the '`update` param of the second arg to `done()` is pased itnernally to `pl_step()` as `opts.update`: this would cause the message's `payload.passed` to be set to `true` even if there's no explicit mention of `payload`
the '`update` parameter of the second argument to `done()` is passed internally to `pl_step()` as `opts.update`: this would cause the message's `payload.passed` to be set to `true` even if there's no explicit mention of `payload`

@@ -180,4 +181,4 @@ The whole `pl_step()` operation is guaranteed to be atomic; this includes applying of `opts.payload` or `opts.update` if present

They default to `2` and `1` respectively
* `mature`: Date instance or unix timestamp (in milliseconds, as integer) expressing the not-before timestamp for the item, to be used when calling pl_step in the src queue
* `delay`: delay in seconds to calculate mature, if mature is not specified
* `mature`: Date instance or unix timestamp (in milliseconds, as integer) expressing the not-before timestamp for the item, to be used when calling `pl_step()` in the src queue
* `delay`: delay in seconds to calculate `mature`, if `mature` is not specified

@@ -215,3 +216,3 @@ ### Methods

* else the item is passed to the text queue in the pipeline (by means of `pl_next()`)
* if `res.mature` or `res.delay` exist (or they were specified at the processor's creation) they are used to calculate the delay/mature fo the element in the destination queue
* if `res.mature` or `res.delay` exist (or they were specified at the processor's creation) they are used to calculate the delay/mature of the element in the destination queue
* if `res.payload` exists it is used to replace the item's payload entirely

@@ -223,6 +224,18 @@ * else if `res.update` exists it is used as mongodb-update operations on the item's payload

#### Semantic `this` in process function
The function is bound to the processor, so the funcion can access and use processor's primitives. For example, it can insert copies of the item, or new items, in any of the source or destination queues
The function is bound to the processor, so the function can access and use processor's primitives. For example, it can insert copies of the item, or new items, in any of the source or destination queues
In order to use this functionality the process function can not be declared as an 'arrow' function, since thos can not be bound. Use the classic `function xxxx (item, cb) {...}` if you intend to access the underlying Processor
In order to use this functionality the process function can not be declared as an 'arrow' function, since those can not be bound. Use the classic `function xxxx (item, cb) {...}` if you intend to access the underlying Processor
### Events
BaseLink inherits from `EventEmitter` and publishes the following events:
* `error`: an error happened in the internal loop. It comes with one parameter, an object with the following fields:
* `on`: exact type of error:
* `src-queue-pop`: error while reserving an element on the src queue
* `src-queue-commit-on-error`: error while committing an element on the src queue when an error was passed and `err.drop==true`
* `src-queue-rollback-on-error`: error while rolling back an element on the src queue when an error was passed
* `src-queue-commit-on-drop`: error while committing an element an element on the src queue when processed ok and `res.drop==true`
* `next-queue`: error while atomically moving the element to the next queue
* `elem`: element that caused the error. Not present in `src-queue-pop`
* `error`: original error object
* `opts`: (only present in `next-queue`) options passed internally to `pl_step()`

@@ -280,6 +293,4 @@ ## DirectLink

### Process Function
==== BASE ====
ChoiceLink expects an `res.dst` in the callback invocation, which must fullfill one of those conditions:
* be an integer and resolv to a valid element when applied as index to the array of destination queues
==== BASE ====
* be an integer and resolve to a valid element when applied as index to the array of destination queues
* be a string and correspond to the name of one of the destination queues

@@ -286,0 +297,0 @@

@@ -146,3 +146,3 @@ const async = require ('async');

err.should.match ({
on: 'next-queue-on-error',
on: 'next-queue',
elem: {

@@ -196,3 +196,3 @@ payload: { a: 666, b: 'see it fail...' },

err.should.match ({
on: 'next-queue-on-error',
on: 'next-queue',
elem: {

@@ -199,0 +199,0 @@ payload: { a: 666, b: 'see it fail...' },

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc