Comparing version 0.1.1 to 0.1.3
@@ -46,5 +46,5 @@ ;(function(root, factory) { | ||
function trap(process) { | ||
function trap(transform) { | ||
return { | ||
process: conj({'*': trapFallbackMsgHandler}, process) | ||
transform: conj({'*': trapFallbackMsgHandler}, transform) | ||
}; | ||
@@ -67,6 +67,6 @@ } | ||
function ProcessorDef(opts) { | ||
this.defType = 'processor'; | ||
function TransformDef(opts) { | ||
this.defType = 'transform'; | ||
this.init = ensure(opts.init, noop); | ||
this.process = opts.process; | ||
this.transform = opts.transform; | ||
} | ||
@@ -91,4 +91,4 @@ | ||
// tail | ||
if (i) child = new Node(graph, createProcessorDef(defs[i]), child, index); | ||
while (--i > 0) child = new Node(graph, createProcessorDef(defs[i]), child, 0); | ||
if (i) child = new Node(graph, createTransformDef(defs[i]), child, index); | ||
while (--i > 0) child = new Node(graph, createTransformDef(defs[i]), child, 0); | ||
@@ -123,27 +123,27 @@ return buildGraphHead(graph, defs[0], child); | ||
function createProcessorDef(obj) { | ||
function createTransformDef(obj) { | ||
if (typeof obj === 'function') { | ||
obj = { | ||
process: {'flume:value': obj} | ||
transform: {'flume:value': obj} | ||
}; | ||
} | ||
else if (typeof (obj || 0).process === 'function') { | ||
else if (typeof (obj || 0).transform === 'function') { | ||
obj = conj(obj, { | ||
process: {'flume:value': obj.process} | ||
transform: {'flume:value': obj.transform} | ||
}); | ||
} | ||
else if (typeOf((obj || 0).process) !== 'object') { | ||
else if (typeOf((obj || 0).transform) !== 'object') { | ||
throw new Error( | ||
"Expected function or object matching processor shape but got " + | ||
"Expected function or object matching transform shape but got " + | ||
typeOf(obj)); | ||
} | ||
return new ProcessorDef(obj); | ||
return new TransformDef(obj); | ||
} | ||
// message processing | ||
// message transforming | ||
function trapFallbackMsgHandler(state, v) { | ||
return [v]; | ||
return {state: v}; | ||
} | ||
@@ -155,3 +155,3 @@ | ||
input: createInputHandler, | ||
processor: createProcessorHandler | ||
transform: createTransformHandler | ||
}[node.def.defType](node); | ||
@@ -162,4 +162,4 @@ } | ||
function createInputHandler(node) { | ||
return function handle(msgs, end) { | ||
node.child.handle(msgs, node, node, end); | ||
return function handle(msg, end) { | ||
node.child.handle([msg], node, node, end); | ||
}; | ||
@@ -169,3 +169,3 @@ } | ||
function createProcessorHandler(node) { | ||
function createTransformHandler(node) { | ||
var queue = []; | ||
@@ -175,7 +175,6 @@ var isBusy = false; | ||
var state = node.def.init(); | ||
var processAsync = maybeAsync(process); | ||
var transformAsync = maybeAsync(transform); | ||
return function handle(msgs, parent, source, end) { | ||
msgs = castArray(msgs).map(castMessage); | ||
msgs = msgs.map(castMessage) | ||
var i = -1; | ||
@@ -213,3 +212,3 @@ var n = msgs.length - 1; | ||
return processAsync() | ||
return transformAsync() | ||
.then(resolveSeq) | ||
@@ -219,7 +218,7 @@ .then(success, failure); | ||
function process() { | ||
var fns = node.def.process; | ||
function transform() { | ||
var fns = node.def.transform; | ||
var fn = fns[task.msg.type] || fns['*']; | ||
return !fn | ||
? [state, task.msg] | ||
? {value: task.msg} | ||
: fn(state, task.msg.value, { | ||
@@ -244,15 +243,5 @@ source: task.source.def, | ||
function success(res) { | ||
var msg; | ||
if (Array.isArray(res)) { | ||
state = res[0]; | ||
msg = res.length > 1 | ||
? res[1] | ||
: state; | ||
} | ||
else { | ||
msg = nil; | ||
} | ||
done(msg); | ||
res = normalizeResult(state, res); | ||
state = res.state; | ||
done(res.values); | ||
} | ||
@@ -262,3 +251,3 @@ | ||
if (!node.child) throw new UnhandledError(e); | ||
done(message('flume:error', e)); | ||
done([message('flume:error', e)]); | ||
} | ||
@@ -331,2 +320,19 @@ } | ||
function normalizeResult(state, raw) { | ||
var hasState = raw.hasOwnProperty('state'); | ||
var values; | ||
if (raw.hasOwnProperty('value')) values = [raw.value]; | ||
else if (raw.hasOwnProperty('values')) values = raw.values; | ||
else if (hasState) values = [raw.state]; | ||
return { | ||
values: values, | ||
state: hasState | ||
? raw.state | ||
: state | ||
}; | ||
} | ||
function castArray(v) { | ||
@@ -460,4 +466,5 @@ return !Array.isArray(v) | ||
maybeAsync: maybeAsync, | ||
castThenable: castThenable, | ||
resolveSeq: resolveSeq | ||
}; | ||
}); |
{ | ||
"name": "flume-core", | ||
"version": "0.1.1", | ||
"version": "0.1.3", | ||
"description": "core library for flume", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
@@ -7,5 +7,3 @@ # flume-core | ||
``` | ||
Sorry about the lack of documentation, I know its really not great. Will add this as soon as I can. | ||
``` | ||
See [flume](https://github.com/justinvdm/flume) for docs. | ||
@@ -12,0 +10,0 @@ ## install |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
10673
338
14