Comparing version 0.4.5 to 0.4.6
185
index.js
@@ -5,2 +5,3 @@ var q = require('q'); | ||
//shallow clone | ||
//TODO: consider replacing with better one | ||
function clone(obj) { | ||
@@ -18,2 +19,10 @@ if (typeof obj === "object") { | ||
function mkDef(api) { | ||
return function (definition) { | ||
api.promises = q; | ||
return definition(api); | ||
} | ||
} | ||
function identity(i) { | ||
@@ -32,11 +41,49 @@ return i; | ||
function getByTopic(collection, topic) { | ||
return topicRecur(collection, topic).filter(identity).sort(function (a, b) { | ||
return ~~(a.order) - ~~(b.order) //ascending | ||
}).map(function (e) { | ||
return e.func | ||
}); | ||
return topicRecur(collection, topic).filter(identity) | ||
} | ||
function rmItemFromArray(item, array) { | ||
return array.splice(array.indexOf(item), 1); | ||
} | ||
//Factory of functions that can be used for defining subscribers, transforms etc. | ||
// that are in a 1 topic to many flowElements relation | ||
function flowElementRegistererFactory(elementsCollection) { //#sojava | ||
return function defineFlowElement(topic, func, order) { | ||
if (!elementsCollection[topic]) { | ||
elementsCollection[topic] = []; | ||
} | ||
elementsCollection[topic].push({ | ||
func: func, | ||
ord: order, //only used for transforms, but code-reuse is good, so subscribers have it too | ||
t: topic | ||
}); | ||
return { | ||
drop: function () { | ||
elementsCollection[topic] = rmItemFromArray(elementsCollection, elementsCollection[topic]) | ||
} | ||
} | ||
} | ||
} | ||
//Factory of functions that can be used for defining moderators etc. | ||
// that are in 1 to 1 relation with a topic | ||
function controlHandlerRegistererFactory(handlersCollection, name) { | ||
return function defineControlHandler(topic, controlHandler) { | ||
if (!handlersCollection[topic]) { | ||
handlersCollection[topic] = controlHandler; | ||
return { | ||
drop: function () { | ||
if (handlersCollection[topic] === controlHandler) { | ||
handlersCollection[topic] = null; | ||
} | ||
} | ||
} | ||
} else { | ||
throw new Error("There can be only one " + name + " for topic. " + topic); | ||
} | ||
} | ||
} | ||
function init() { | ||
@@ -48,32 +95,21 @@ | ||
moderators = {}, | ||
forwards = {}; | ||
forwards = {}, | ||
reporter; | ||
//subscribes a function to a topic | ||
function subscribe(topic, func, order) { | ||
if (!subscriptions[topic]) { | ||
subscriptions[topic] = []; | ||
} | ||
subscriptions[topic].push({ | ||
func: func, | ||
order: order | ||
}); | ||
} | ||
//clears the topic | ||
function unsubscribeAll(topic) { | ||
if (subscriptions[topic]) { | ||
subscriptions[topic] = []; | ||
} | ||
} | ||
//publishes in topic | ||
function publish(topic, input) { | ||
var report = ['pub:' + topic]; | ||
return q().then(function () { | ||
var selectedTransforms = getByTopic(transforms, topic); | ||
var selectedTransforms = getByTopic(transforms, topic).sort(function (a, b) { | ||
return (~~(a.ord) - ~~(b.ord)) //ascending | ||
}); | ||
var data = (input) ? clone(input) : {}; | ||
var promiseArgs = q(data); | ||
selectedTransforms.forEach(function (transform) { | ||
promiseArgs = promiseArgs.then(transform); | ||
promiseArgs = promiseArgs.then(function (data) { | ||
reporter && report.push('tr:' + transform.t); | ||
return transform.func(data); | ||
}); | ||
}); | ||
@@ -86,2 +122,3 @@ return promiseArgs; | ||
}).then(function (topicDetail) { | ||
reporter && report.push('mod:' + topic + '+.' + topicDetail); | ||
topic = topic + '.' + topicDetail; | ||
@@ -95,11 +132,44 @@ return data; | ||
var selectedSubs = getByTopic(subscriptions, topic); | ||
var todos = selectedSubs.map(function (subber) { | ||
return subber(data); | ||
}); | ||
for (var ch in forwards) { | ||
//TODO, this is kinda inconsequent, resolutions from there are going to be returned as arrays | ||
//but at least error handling is ok. | ||
todos.push(forwards[ch].publish(topic, input)); | ||
var currentRepot, todos; | ||
if (reporter) { | ||
currentRepot = report.length; | ||
report[currentRepot] = 'subs: '; | ||
todos = selectedSubs.map(function (subber) { | ||
return q(data).then(subber.func).then(function (a) { | ||
report[currentRepot] += (subber.t + ';'); | ||
return a; | ||
}, function (err) { | ||
report[currentRepot] += (subber.t + '(!);'); | ||
throw err; | ||
}); | ||
}); | ||
} else { | ||
todos = selectedSubs.map(function (subber) { | ||
return q(data).then(subber.func); | ||
}); | ||
} | ||
if (forwards.length > 0) { | ||
reporter && report.push('fwd:' + forwards.length); | ||
for (var ch in forwards) { | ||
//TODO, this is kinda inconsequent, resolutions from there are going to be returned as arrays | ||
//but at least error handling is ok. | ||
todos.push(forwards[ch].publish(topic, input)); | ||
} | ||
} | ||
return q.all(todos); | ||
}).then(function (resolutions) { | ||
reporter && reporter({ | ||
report: '[ok] ' + report.join(" >> ") | ||
}); | ||
return resolutions; | ||
}, function (err) { | ||
reporter && reporter({ | ||
report: '[!!] ' + report.join(" >> ") + " (!)" + err, | ||
input: input, | ||
error: err | ||
}); | ||
throw err; | ||
}); | ||
@@ -109,22 +179,6 @@ | ||
//register a transform function that gets called the same way as a subscribtion handler, but has to resolve to arguments that are supposed to be passed on | ||
//if transform function throws, the publish is instantly cancelled | ||
function transform(what, transform, order) { | ||
if (!transforms[what]) { | ||
transforms[what] = []; | ||
} | ||
transforms[what].push({ | ||
func: transform, | ||
order: order | ||
}); | ||
} | ||
function moderator(what, moderator) { | ||
if (!moderators[what]) { | ||
moderators[what] = moderator; | ||
} else { | ||
throw new Error("There can be only one moderator for topic. " + what); | ||
} | ||
} | ||
function forwardTo(chan) { | ||
@@ -143,2 +197,10 @@ if (chan.name !== name) { | ||
//clears the topic | ||
function unsubscribeAll(topic) { | ||
if (subscriptions[topic]) { | ||
subscriptions[topic] = []; | ||
} | ||
} | ||
function destroy() { | ||
@@ -151,10 +213,8 @@ subscriptions = {}; | ||
function mkDef(api) { | ||
return function (definition) { | ||
api.promises = q; | ||
return definition(api); | ||
} | ||
} | ||
return { | ||
name: name, | ||
report: function (func) { | ||
reporter = func; | ||
}, | ||
define: { | ||
@@ -165,10 +225,10 @@ publisher: mkDef({ | ||
subscriber: mkDef({ | ||
subscribe: subscribe, | ||
subscribe: flowElementRegistererFactory(subscriptions), | ||
unsubscribeAll: unsubscribeAll | ||
}), | ||
transform: mkDef({ | ||
transform: transform | ||
transform: flowElementRegistererFactory(transforms) | ||
}), | ||
moderator: mkDef({ | ||
moderator: moderator | ||
moderator: controlHandlerRegistererFactory(moderators, "moderator") | ||
}) | ||
@@ -178,3 +238,2 @@ }, | ||
dropForward: dropForward, | ||
name: name, | ||
destroy: destroy | ||
@@ -185,3 +244,3 @@ }; | ||
var globalSubscribtions = init(); | ||
var globalSubscriptions = init(); | ||
@@ -191,3 +250,3 @@ module.exports = { | ||
newChannel: init, | ||
global: globalSubscribtions | ||
global: globalSubscriptions | ||
}; |
{ | ||
"name": "axons", | ||
"version": "0.4.5", | ||
"version": "0.4.6", | ||
"description": "A communication channel you always wanted instead of pub-sub", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
var axons = require('../'); | ||
//just a demo for now | ||
axons.global.define.transform(function (fi) { | ||
fi.transform('test', function (a) { | ||
console.log('fil: ', a); | ||
axons.global.report(function debg(data) { | ||
console.log(data.report); | ||
}); | ||
axons.global.define.transform(function (tr) { | ||
console.log("expect 43120"); | ||
tr.transform('test.case', function (a) { | ||
console.log('inside transform 0: ', a); | ||
return a; | ||
}, 2) | ||
tr.transform('test', function (a) { | ||
console.log('inside transform 1: ', a); | ||
return a; | ||
}, -1) | ||
tr.transform('test.case', function (a) { | ||
console.log('inside transform 2: ', a); | ||
return a; | ||
}) | ||
tr.transform('test', function (a) { | ||
console.log('inside transform 3: ', a); | ||
return a; | ||
}, -2) | ||
tr.transform('test.case', function (a) { | ||
console.log('inside transform 4: ', a); | ||
return a; | ||
}, -3) | ||
@@ -13,13 +35,33 @@ }); | ||
axons.global.define.subscriber(function (sub) { | ||
sub.subscribe('test', function watcherSub(data) { | ||
console.log('sub: ', data); | ||
sub.subscribe('test.case', function problematicSub(data) { | ||
console.log('inside subber 1: ', data); | ||
throw new Error('An error in subscriber'); | ||
return sub.promises(); | ||
}); | ||
sub.subscribe('test', function justSub(data) { | ||
console.log('inside subber 2: ', data); | ||
return sub.promises(); | ||
}); | ||
sub.subscribe('test.case.moderated', function justSub(data) { | ||
console.log('inside subber 3: ', data); | ||
return sub.promises(); | ||
}); | ||
}); | ||
axons.global.define.moderator(function (mod) { | ||
mod.moderator('test.case', function (data) { | ||
//appends to topic based on data | ||
return 'moderated'; | ||
}); | ||
}); | ||
axons.global.define.publisher(function (pub) { | ||
console.log('pub: '); | ||
pub.publish('test', { | ||
console.log('publishing!'); | ||
pub.publish('test.case', { | ||
foo: 'bar' | ||
}).done(); | ||
}).fail(function (e) { | ||
console.error(e); | ||
console.error(e.stack); | ||
}); | ||
}); |
28373
696