Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

flume-core

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

flume-core - npm Package Compare versions

Comparing version 0.1.1 to 0.1.3

89

flume-core.js

@@ -46,5 +46,5 @@ ;(function(root, factory) {

function trap(process) {
function trap(transform) {
return {
process: conj({'*': trapFallbackMsgHandler}, process)
transform: conj({'*': trapFallbackMsgHandler}, transform)
};

@@ -67,6 +67,6 @@ }

function ProcessorDef(opts) {
this.defType = 'processor';
function TransformDef(opts) {
this.defType = 'transform';
this.init = ensure(opts.init, noop);
this.process = opts.process;
this.transform = opts.transform;
}

@@ -91,4 +91,4 @@

// tail
if (i) child = new Node(graph, createProcessorDef(defs[i]), child, index);
while (--i > 0) child = new Node(graph, createProcessorDef(defs[i]), child, 0);
if (i) child = new Node(graph, createTransformDef(defs[i]), child, index);
while (--i > 0) child = new Node(graph, createTransformDef(defs[i]), child, 0);

@@ -123,27 +123,27 @@ return buildGraphHead(graph, defs[0], child);

function createProcessorDef(obj) {
function createTransformDef(obj) {
if (typeof obj === 'function') {
obj = {
process: {'flume:value': obj}
transform: {'flume:value': obj}
};
}
else if (typeof (obj || 0).process === 'function') {
else if (typeof (obj || 0).transform === 'function') {
obj = conj(obj, {
process: {'flume:value': obj.process}
transform: {'flume:value': obj.transform}
});
}
else if (typeOf((obj || 0).process) !== 'object') {
else if (typeOf((obj || 0).transform) !== 'object') {
throw new Error(
"Expected function or object matching processor shape but got " +
"Expected function or object matching transform shape but got " +
typeOf(obj));
}
return new ProcessorDef(obj);
return new TransformDef(obj);
}
// message processing
// message transforming
function trapFallbackMsgHandler(state, v) {
return [v];
return {state: v};
}

@@ -155,3 +155,3 @@

input: createInputHandler,
processor: createProcessorHandler
transform: createTransformHandler
}[node.def.defType](node);

@@ -162,4 +162,4 @@ }

function createInputHandler(node) {
return function handle(msgs, end) {
node.child.handle(msgs, node, node, end);
return function handle(msg, end) {
node.child.handle([msg], node, node, end);
};

@@ -169,3 +169,3 @@ }

function createProcessorHandler(node) {
function createTransformHandler(node) {
var queue = [];

@@ -175,7 +175,6 @@ var isBusy = false;

var state = node.def.init();
var processAsync = maybeAsync(process);
var transformAsync = maybeAsync(transform);
return function handle(msgs, parent, source, end) {
msgs = castArray(msgs).map(castMessage);
msgs = msgs.map(castMessage)
var i = -1;

@@ -213,3 +212,3 @@ var n = msgs.length - 1;

return processAsync()
return transformAsync()
.then(resolveSeq)

@@ -219,7 +218,7 @@ .then(success, failure);

function process() {
var fns = node.def.process;
function transform() {
var fns = node.def.transform;
var fn = fns[task.msg.type] || fns['*'];
return !fn
? [state, task.msg]
? {value: task.msg}
: fn(state, task.msg.value, {

@@ -244,15 +243,5 @@ source: task.source.def,

function success(res) {
var msg;
if (Array.isArray(res)) {
state = res[0];
msg = res.length > 1
? res[1]
: state;
}
else {
msg = nil;
}
done(msg);
res = normalizeResult(state, res);
state = res.state;
done(res.values);
}

@@ -262,3 +251,3 @@

if (!node.child) throw new UnhandledError(e);
done(message('flume:error', e));
done([message('flume:error', e)]);
}

@@ -331,2 +320,19 @@ }

function normalizeResult(state, raw) {
var hasState = raw.hasOwnProperty('state');
var values;
if (raw.hasOwnProperty('value')) values = [raw.value];
else if (raw.hasOwnProperty('values')) values = raw.values;
else if (hasState) values = [raw.state];
return {
values: values,
state: hasState
? raw.state
: state
};
}
function castArray(v) {

@@ -460,4 +466,5 @@ return !Array.isArray(v)

maybeAsync: maybeAsync,
castThenable: castThenable,
resolveSeq: resolveSeq
};
});
{
"name": "flume-core",
"version": "0.1.1",
"version": "0.1.3",
"description": "core library for flume",

@@ -5,0 +5,0 @@ "license": "MIT",

@@ -7,5 +7,3 @@ # flume-core

```
Sorry about the lack of documentation, I know its really not great. Will add this as soon as I can.
```
See [flume](https://github.com/justinvdm/flume) for docs.

@@ -12,0 +10,0 @@ ## install

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc