arch-stream
Advanced tools
Comparing version 0.0.2 to 0.0.3
# Changelog | ||
## 0.0.3 | ||
Update | ||
## 0.0.2 | ||
@@ -4,0 +8,0 @@ |
@@ -24,3 +24,3 @@ /** | ||
interface Exportable<T> { | ||
export(): IModularStream<T> | ||
export(limit?: number): IModularStream<T> | ||
} | ||
@@ -40,3 +40,3 @@ interface IArchTransformStream<T> extends Transform<T>, Exportable<T> { | ||
import(module: IModularStream<T>): ComposeStream<T> | ||
export(): IModularStream<T> | ||
export(limit?: number): IModularStream<T> | ||
} | ||
@@ -59,6 +59,2 @@ | ||
export interface ICallback<T> { | ||
(data: T): T|Message<T>|IThenable<T> | ||
} | ||
export interface ICallbackVoid<T> { | ||
(data: T): any | ||
@@ -68,3 +64,3 @@ } | ||
export interface IThenable<T> { | ||
then(done: (data: T) => T|Message<T>|IThenable<T>, fail?: (data: T) => T|Message<T>|IThenable<T>, progress?: (data: T) => T|Message<T>|IThenable<T>): IThenable<T> | ||
then(done: ICallback<T>, fail?: ICallback<T>): IThenable<T> | ||
} | ||
@@ -74,24 +70,16 @@ | ||
constructor() | ||
send(data: T): Message<T> | ||
send(data: T, async?: boolean): Message<T> | ||
recv(callback: (data: T) => any): Message<T> | ||
trans(callback: ICallback<T>): Message<T> | ||
} | ||
export class ArchStreamReflectable<T> extends ArchStream<T> { | ||
constructor(parent: ArchStream<T>, msg: Message<T>) | ||
message_: Message<T> | ||
} | ||
export var Proxy: { | ||
State<T>(definition: { index: (string|number)[], indexer: (data: T) => string|number }): IStateProxy<T, ICallbackVoid<T>> | ||
State<T>(definition: { index: (string|number)[], indexer: (data: T) => string|number, void?: boolean }): IStateProxy<T, ICallbackVoid<T>> | ||
Void<T>(): IVoidProxy<T> | ||
State<T>(definition: { index: (string|number)[], indexer: (data: T) => string|number }): IStateProxy<T, ICallback<T>> | ||
} | ||
export interface IProxyCallback<T> { | ||
(callback: (data: T, args?: any[]) => IProxyCallback<T>): IProxyCallback<T> | ||
} | ||
interface IStateProxy<T, U extends Function> { | ||
trans(...callbacks: U[]): ArchStream<T> | ||
} | ||
interface IVoidProxy<T> { | ||
trans(callback: ICallbackVoid<T>): ArchStream<T> | ||
} | ||
} |
@@ -6,3 +6,3 @@ /** | ||
* @name arch-stream | ||
* @version 0.0.2 | ||
* @version 0.0.3 | ||
* --- | ||
@@ -36,4 +36,8 @@ * @author falsandtru https://github.com/falsandtru/arch-stream | ||
this.send = function (self) { | ||
return function (data) { | ||
self.trampoline_(data); | ||
return function (data, async) { | ||
if (!async) { | ||
self.trampoline_(data); | ||
} else { | ||
tick_1.Tick.queue(self.trampoline_, [data], self); | ||
} | ||
return self; | ||
@@ -44,10 +48,2 @@ }; | ||
return function (callback) { | ||
return self.trans(function (data) { | ||
callback(data); | ||
return data; | ||
}); | ||
}; | ||
}(this); | ||
this.trans = function (self) { | ||
return function (callback) { | ||
self.listener_ = callback; | ||
@@ -64,3 +60,3 @@ self.collection_ = void 0; | ||
return function (done, fail) { | ||
return self.trans(done || fail); | ||
return self.recv(done || fail); | ||
}; | ||
@@ -86,15 +82,21 @@ }(this); | ||
var collection = this.collection_ ? this.collection_ : this.collect_(); | ||
var result = data; | ||
for (var i = 0; i < collection.length; i++) { | ||
var msg = collection[i]; | ||
msg.memory_ = result; | ||
result = msg.listener_(result); | ||
msg.memory_ = data; | ||
var result = msg.listener_(data); | ||
if (isThenable(result)) { | ||
result.then(msg.cascade_, msg.cascade_, msg.cascade_); | ||
var callback = cascade(msg, data); | ||
result.then(callback, callback); | ||
break; | ||
} else if (msg.depth_ % MAX_RESCURSIVE_DEPTH === 0) { | ||
msg.cascade_(result, true); | ||
msg.cascade_(data, true); | ||
break; | ||
} | ||
} | ||
return; | ||
function cascade(msg, data) { | ||
return function () { | ||
return void msg.cascade_(data, true); | ||
}; | ||
} | ||
}; | ||
@@ -108,4 +110,4 @@ Message.prototype.clone_ = function (msg) { | ||
Message.prototype.connect = function (msg) { | ||
this.child_ = msg; | ||
msg.parent_ = this; | ||
this.parent_.child_ = msg; | ||
msg.parent_ = this.parent_; | ||
return this; | ||
@@ -157,3 +159,3 @@ }; | ||
if (this.isDrainable_()) { | ||
this.processing_.push(isUniqueElement(data) ? data : void 0); | ||
this.processing_.push(data); | ||
this.listener_(data); | ||
@@ -186,6 +188,2 @@ return true; | ||
exports.default = Throttle; | ||
function isUniqueElement(elm) { | ||
var type = typeof elm; | ||
return type === 'object' || type === 'function'; | ||
} | ||
},{}],3:[function(require,module,exports){ | ||
@@ -236,6 +234,4 @@ "use strict"; | ||
"use strict"; | ||
var arch_stream_1 = require('arch-stream'); | ||
function default_1(definition) { | ||
var index = definition.index, indexer = definition.indexer; | ||
var void_ = definition.void === false ? false : true; | ||
var map = Object.create(null); | ||
@@ -246,49 +242,20 @@ index.forEach(function (method, i) { | ||
return { trans: cascade }; | ||
function cascade() { | ||
var fs = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
fs[_i - 0] = arguments[_i]; | ||
} | ||
var self = this; | ||
return new arch_stream_1.ArchStreamReflectable(self, self.message_.trans(function (data) { | ||
function cascade(registered) { | ||
return registered(function (data, args) { | ||
var index = map[indexer(data)]; | ||
var result = (index < fs.length && fs[index] || identity)(data); | ||
if (!result || typeof result !== 'object') { | ||
return void_ ? data : result; | ||
} else if ('then' in result) { | ||
var msg = new arch_stream_1.ArchStream.Message(); | ||
result.then(msg.send, msg.send); | ||
return msg; | ||
} | ||
return void_ ? data : result; | ||
})); | ||
return index < args.length ? args[index] : noop; | ||
}); | ||
} | ||
function identity(data) { | ||
return data; | ||
function noop() { | ||
; | ||
} | ||
} | ||
exports.default = default_1; | ||
},{"arch-stream":"arch-stream"}],5:[function(require,module,exports){ | ||
},{}],5:[function(require,module,exports){ | ||
"use strict"; | ||
var arch_stream_1 = require('arch-stream'); | ||
function default_1() { | ||
return { trans: cascade }; | ||
function cascade(f) { | ||
var self = this; | ||
return new arch_stream_1.ArchStreamReflectable(self, self.message_.trans(function (data) { | ||
f(data); | ||
return data; | ||
})); | ||
} | ||
} | ||
exports.default = default_1; | ||
},{"arch-stream":"arch-stream"}],6:[function(require,module,exports){ | ||
"use strict"; | ||
var message_1 = require('../lib/message'); | ||
var compose_1 = require('./compose'); | ||
var state_1 = require('../proxy/state'); | ||
var void_1 = require('../proxy/void'); | ||
var ArchStream = function () { | ||
function ArchStream(parent_, message_, proxies) { | ||
var _this = this; | ||
if (message_ === void 0) { | ||
@@ -306,20 +273,37 @@ message_ = new message_1.default(); | ||
} | ||
proxies.forEach(function (proxy) { | ||
return _this.proxy(proxy); | ||
}); | ||
this.proxy(proxies); | ||
} | ||
ArchStream.prototype.proxy = function (proxy) { | ||
ArchStream.prototype.proxy_ = function (proxy) { | ||
var _this = this; | ||
this.proxies_.push(proxy); | ||
Object.keys(proxy).filter(function (prop) { | ||
return typeof proxy[prop] === 'function'; | ||
}).filter(function (method) { | ||
return !(method in _this) || _this[method] === ArchStream.prototype[method]; | ||
}).forEach(function (method) { | ||
return _this[method] = proxy[method]; | ||
return typeof proxy[prop] === 'function' && prop !== 'import' && prop !== 'export'; | ||
}).map(function (method) { | ||
return [ | ||
method, | ||
method in _this && _this[method] !== ArchStream.prototype[method] ? _this[method] : ArchStream.prototype.trans | ||
]; | ||
}).forEach(function (tuple) { | ||
return _this[tuple[0]] = function () { | ||
var args = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
args[_i - 0] = arguments[_i]; | ||
} | ||
return tuple[1] !== ArchStream.prototype.trans ? proxy[tuple[0]](tuple[1]) : tuple[1].call(_this, function (callback) { | ||
return function (data) { | ||
return callback(data, args)(data); | ||
}; | ||
}(proxy[tuple[0]](identity))); | ||
}; | ||
}); | ||
return this; | ||
}; | ||
ArchStream.prototype.proxy = function (proxies) { | ||
var _this = this; | ||
return proxies instanceof Array ? proxies.reduce(function (_, proxy) { | ||
return _this.proxy_(proxy); | ||
}, this) : this.proxy_(proxies); | ||
}; | ||
ArchStream.prototype.trans = function (callback) { | ||
return new ArchStream(this, this.message_.trans(callback)); | ||
return new ArchStream(this, this.message_.recv(callback)); | ||
}; | ||
@@ -329,14 +313,14 @@ ArchStream.prototype.import = function (module) { | ||
}; | ||
ArchStream.prototype.export = function () { | ||
return new compose_1.default([this.message_]).export(); | ||
ArchStream.prototype.export = function (limit) { | ||
return new compose_1.default([this.message_]).export(limit); | ||
}; | ||
ArchStream.Message = message_1.default; | ||
ArchStream.Proxy = { | ||
State: state_1.default, | ||
Void: void_1.default | ||
}; | ||
ArchStream.Proxy = { State: state_1.default }; | ||
return ArchStream; | ||
}(); | ||
exports.default = ArchStream; | ||
},{"../lib/message":1,"../proxy/state":4,"../proxy/void":5,"./compose":7}],7:[function(require,module,exports){ | ||
function identity(arg) { | ||
return arg; | ||
} | ||
},{"../lib/message":1,"../proxy/state":4,"./compose":6}],6:[function(require,module,exports){ | ||
"use strict"; | ||
@@ -356,7 +340,11 @@ var modular_1 = require('./modular'); | ||
ComposeStream.prototype.import = function (module) { | ||
this.register_(module.procs); | ||
if (module.limit-- > 0) { | ||
this.register_(module.procs); | ||
} else { | ||
throw new Error('Import count has exceeded the limit.'); | ||
} | ||
return this; | ||
}; | ||
ComposeStream.prototype.export = function () { | ||
return new modular_1.default(this.procs_); | ||
ComposeStream.prototype.export = function (limit) { | ||
return new modular_1.default(this.procs_, limit); | ||
}; | ||
@@ -366,3 +354,3 @@ return ComposeStream; | ||
exports.default = ComposeStream; | ||
},{"./modular":8}],8:[function(require,module,exports){ | ||
},{"./modular":7}],7:[function(require,module,exports){ | ||
"use strict"; | ||
@@ -372,4 +360,8 @@ var throttle_1 = require('../lib/throttle'); | ||
var ModularStream = function () { | ||
function ModularStream(procs) { | ||
function ModularStream(procs, limit) { | ||
if (limit === void 0) { | ||
limit = Infinity; | ||
} | ||
this.procs = procs; | ||
this.limit = limit; | ||
} | ||
@@ -398,3 +390,3 @@ ModularStream.prototype.compose_ = function () { | ||
this.compose_(); | ||
this.last_.trans(callback); | ||
this.last_.recv(callback); | ||
}; | ||
@@ -446,3 +438,3 @@ ModularStream.prototype.read = function (callback) { | ||
exports.default = arch_1.default; | ||
},{"./stream/arch":6}]},{},["arch-stream",1,2,3,4,5,6,7,8]); | ||
},{"./stream/arch":5}]},{},["arch-stream",1,2,3,4,5,6,7]); | ||
typeof module === "object" && module && "exports" in module && (module.exports = require("arch-stream")); |
@@ -1,2 +0,2 @@ | ||
/*! arch-stream v0.0.2 | (c) 2015, falsandtru | MIT Licence */ | ||
require=function a(b,c,d){function e(g,h){if(!c[g]){if(!b[g]){var i="function"==typeof require&&require;if(!h&&i)return i(g,!0);if(f)return f(g,!0);var j=new Error("Cannot find module '"+g+"'");throw j.code="MODULE_NOT_FOUND",j}var k=c[g]={exports:{}};b[g][0].call(k.exports,function(a){var c=b[g][1][a];return e(c?c:a)},k,k.exports,a,b,c,d)}return c[g].exports}for(var f="function"==typeof require&&require,g=0;g<d.length;g++)e(d[g]);return e}({1:[function(a,b,c){"use strict";function d(a){return a}function e(a){return!!a&&"object"==typeof a&&"then"in a}var f=a("./tick"),g=100,h=function(){function a(b,c){void 0===c&&(c=d),this.parent_=b,this.listener_=c,this.depth_=1,this.cascade_=function(a){return function(b,c){return!a.child_||c&&a.rest_(b)||a.child_.send(b),b}}(this),this.send=function(a){return function(b){return a.trampoline_(b),a}}(this),this.recv=function(a){return function(b){return a.trans(function(a){return b(a),a})}}(this),this.trans=function(b){return function(c){b.listener_=c,b.collection_=void 0;var d=new a(b);return void 0!==b.memory_&&b.send(b.memory_),d}}(this),this.then=function(a){return function(b,c){return a.trans(b||c)}}(this),b&&(this.parent_.child_=this)}return a.prototype.root=function(){return this.parent_?this.parent_.root():this},a.prototype.collect_=function(){var a=this,b=[];do b.push(a),a=a.child_;while(a);return this.collection_=b,b},a.prototype.trampoline_=function(a){for(var b=this.collection_?this.collection_:this.collect_(),c=a,d=0;d<b.length;d++){var f=b[d];if(f.memory_=c,c=f.listener_(c),e(c)){c.then(f.cascade_,f.cascade_,f.cascade_);break}if(f.depth_%g===0){f.cascade_(c,!0);break}}},a.prototype.clone_=function(b){return b.parent_?new a(this.clone_(b.parent_),b.listener_):new a(void 0,b.listener_)},a.prototype.clone=function(){return this.clone_(this)},a.prototype.connect=function(a){return this.child_=a,a.parent_=this,this},a.prototype.rest_=function(a){return this.depth_=this.parent_?this.parent_.depth_+1:this.depth_,this.depth_%g===0?(f.Tick.queue(this.cascade_,[a]),!0):!1},a}();c["default"]=h},{"./tick":3}],2:[function(a,b,c){"use strict";function d(a){var b=typeof a;return"object"===b||"function"===b}var e=function(){function a(){this.queue_=[],this.processing_=[],this.limit_=1/0,this.volume_=1/0}return a.prototype.isDrainable_=function(){return this.processing_.length<this.volume_},a.prototype.discharger_=function(a,b){},a.prototype.flow=function(a,b,c){void 0===b&&(b=1/0),this.volume_=a,this.limit_=b,this.discharger_=c||function(){return null}},a.prototype.enqueue=function(a){if(this.isDrainable_())return this.processing_.push(d(a)?a:void 0),this.listener_(a),!0;this.queue_.push(a);do this.discharger_(this.queue_,this.processing_);while(this.processing_.length+this.queue_.length>this.limit_);return!1},a.prototype.dequeue=function(a){var b=this.processing_.indexOf(a);b>0?this.processing_.splice(b,1):this.processing_.shift(),this.queue_.length>0&&this.isDrainable_()&&this.enqueue(this.queue_.shift())},a.prototype.subscribe=function(a){this.listener_=a},a}();c["default"]=e},{}],3:[function(a,b,c){"use strict";var d;!function(a){function b(a,b,c){d(),e.push([a,b,c])}function c(){for(g.shift(),d();e.length>0;){var a=e.shift();a[1]?a[0].apply(a[2],a[1]):a[0]()}}function d(){0===e.length&&g.length<f.length+1&&g.push(setTimeout(c,f[0]));for(var a=g.length;a<f.length;a++)g.push(setTimeout(c,f[a%f.length]))}var e=[],f=[0,4,10,20,25],g=[];a.queue=b}(d=c.Tick||(c.Tick={}))},{}],4:[function(a,b,c){"use strict";function d(a){function b(){for(var a=[],b=0;b<arguments.length;b++)a[b-0]=arguments[b];var d=this;return new e.ArchStreamReflectable(d,d.message_.trans(function(b){var d=h[f(b)],i=(d<a.length&&a[d]||c)(b);if(!i||"object"!=typeof i)return g?b:i;if("then"in i){var j=new e.ArchStream.Message;return i.then(j.send,j.send),j}return g?b:i}))}function c(a){return a}var d=a.index,f=a.indexer,g=a["void"]===!1?!1:!0,h=Object.create(null);return d.forEach(function(a,b){return h[a]=b}),{trans:b}}var e=a("arch-stream");c["default"]=d},{"arch-stream":"arch-stream"}],5:[function(a,b,c){"use strict";function d(){function a(a){var b=this;return new e.ArchStreamReflectable(b,b.message_.trans(function(b){return a(b),b}))}return{trans:a}}var e=a("arch-stream");c["default"]=d},{"arch-stream":"arch-stream"}],6:[function(a,b,c){"use strict";var d=a("../lib/message"),e=a("./compose"),f=a("../proxy/state"),g=a("../proxy/void"),h=function(){function a(a,b,c){var e=this;void 0===b&&(b=new d["default"]),void 0===c&&(c=[]),this.parent_=a,this.message_=b,this.proxies_=[],this.parent_&&c.unshift.apply(c,a.proxies_),c.forEach(function(a){return e.proxy(a)})}return a.prototype.proxy=function(b){var c=this;return this.proxies_.push(b),Object.keys(b).filter(function(a){return"function"==typeof b[a]}).filter(function(b){return!(b in c)||c[b]===a.prototype[b]}).forEach(function(a){return c[a]=b[a]}),this},a.prototype.trans=function(b){return new a(this,this.message_.trans(b))},a.prototype["import"]=function(a){return new e["default"](a.procs)},a.prototype["export"]=function(){return new e["default"]([this.message_])["export"]()},a.Message=d["default"],a.Proxy={State:f["default"],Void:g["default"]},a}();c["default"]=h},{"../lib/message":1,"../proxy/state":4,"../proxy/void":5,"./compose":7}],7:[function(a,b,c){"use strict";var d=a("./modular"),e=function(){function a(a){this.procs_=[],this.register_(a)}return a.prototype.register_=function(a){for(var b=this.procs_,c=this.procs_.length,d=a.length;d--;)b[d+c]=a[d]},a.prototype["import"]=function(a){return this.register_(a.procs),this},a.prototype["export"]=function(){return new d["default"](this.procs_)},a}();c["default"]=e},{"./modular":8}],8:[function(a,b,c){"use strict";var d=a("../lib/throttle"),e=a("../lib/tick"),f=function(){function a(a){this.procs=a}return a.prototype.compose_=function(){this.last_=this.procs.map(function(a){return a.clone()}).reduce(function(a,b){return a.connect(b.root())&&b}),this.first_=this.last_.root()},a.prototype.flow=function(a,b,c){var e=this;return void 0===b&&(b=1/0),this.flow_=new d["default"],this.flow_.flow(a,b,c),this.flow_.subscribe(function(a){return e.write_(a)}),this},a.prototype.read_=function(a){this.compose_(),this.last_.trans(a)},a.prototype.read=function(a){var b=this,c=!1;return this.read_(function(d){b.flow_&&e.Tick.queue(function(){return b.flow_.dequeue(d)}),c&&a(d)}),c=!0,this},a.prototype.write_=function(a){this.first_.send(a)},a.prototype.write=function(a){return this.flow_?this.flow_.enqueue(a):this.write_(a),this},a}();c["default"]=f},{"../lib/throttle":2,"../lib/tick":3}],"arch-stream":[function(a,b,c){"use strict";var d=a("./stream/arch");c.ArchStream=d["default"],c.ArchStreamReflectable=d["default"],c.Message=d["default"].Message,c.Proxy=d["default"].Proxy,c.A=function(){return new c.ArchStream},c.A.Msg=function(){return new c.Message},c.A.Proxy=c.Proxy,c["default"]=d["default"]},{"./stream/arch":6}]},{},["arch-stream",1,2,3,4,5,6,7,8]),"object"==typeof module&&module&&"exports"in module&&(module.exports=require("arch-stream")); | ||
/*! arch-stream v0.0.3 | (c) 2015, falsandtru | MIT Licence */ | ||
require=function a(b,c,d){function e(g,h){if(!c[g]){if(!b[g]){var i="function"==typeof require&&require;if(!h&&i)return i(g,!0);if(f)return f(g,!0);var j=new Error("Cannot find module '"+g+"'");throw j.code="MODULE_NOT_FOUND",j}var k=c[g]={exports:{}};b[g][0].call(k.exports,function(a){var c=b[g][1][a];return e(c?c:a)},k,k.exports,a,b,c,d)}return c[g].exports}for(var f="function"==typeof require&&require,g=0;g<d.length;g++)e(d[g]);return e}({1:[function(a,b,c){"use strict";function d(a){return a}function e(a){return!!a&&"object"==typeof a&&"then"in a}var f=a("./tick"),g=100,h=function(){function a(b,c){void 0===c&&(c=d),this.parent_=b,this.listener_=c,this.depth_=1,this.cascade_=function(a){return function(b,c){return!a.child_||c&&a.rest_(b)||a.child_.send(b),b}}(this),this.send=function(a){return function(b,c){return c?f.Tick.queue(a.trampoline_,[b],a):a.trampoline_(b),a}}(this),this.recv=function(b){return function(c){b.listener_=c,b.collection_=void 0;var d=new a(b);return void 0!==b.memory_&&b.send(b.memory_),d}}(this),this.then=function(a){return function(b,c){return a.recv(b||c)}}(this),b&&(this.parent_.child_=this)}return a.prototype.root=function(){return this.parent_?this.parent_.root():this},a.prototype.collect_=function(){var a=this,b=[];do b.push(a),a=a.child_;while(a);return this.collection_=b,b},a.prototype.trampoline_=function(a){function b(a,b){return function(){return void a.cascade_(b,!0)}}for(var c=this.collection_?this.collection_:this.collect_(),d=0;d<c.length;d++){var f=c[d];f.memory_=a;var h=f.listener_(a);if(e(h)){var i=b(f,a);h.then(i,i);break}if(f.depth_%g===0){f.cascade_(a,!0);break}}},a.prototype.clone_=function(b){return b.parent_?new a(this.clone_(b.parent_),b.listener_):new a(void 0,b.listener_)},a.prototype.clone=function(){return this.clone_(this)},a.prototype.connect=function(a){return this.parent_.child_=a,a.parent_=this.parent_,this},a.prototype.rest_=function(a){return this.depth_=this.parent_?this.parent_.depth_+1:this.depth_,this.depth_%g===0?(f.Tick.queue(this.cascade_,[a]),!0):!1},a}();c["default"]=h},{"./tick":3}],2:[function(a,b,c){"use strict";var d=function(){function a(){this.queue_=[],this.processing_=[],this.limit_=1/0,this.volume_=1/0}return a.prototype.isDrainable_=function(){return this.processing_.length<this.volume_},a.prototype.discharger_=function(a,b){},a.prototype.flow=function(a,b,c){void 0===b&&(b=1/0),this.volume_=a,this.limit_=b,this.discharger_=c||function(){return null}},a.prototype.enqueue=function(a){if(this.isDrainable_())return this.processing_.push(a),this.listener_(a),!0;this.queue_.push(a);do this.discharger_(this.queue_,this.processing_);while(this.processing_.length+this.queue_.length>this.limit_);return!1},a.prototype.dequeue=function(a){var b=this.processing_.indexOf(a);b>0?this.processing_.splice(b,1):this.processing_.shift(),this.queue_.length>0&&this.isDrainable_()&&this.enqueue(this.queue_.shift())},a.prototype.subscribe=function(a){this.listener_=a},a}();c["default"]=d},{}],3:[function(a,b,c){"use strict";var d;!function(a){function b(a,b,c){d(),e.push([a,b,c])}function c(){for(g.shift(),d();e.length>0;){var a=e.shift();a[1]?a[0].apply(a[2],a[1]):a[0]()}}function d(){0===e.length&&g.length<f.length+1&&g.push(setTimeout(c,f[0]));for(var a=g.length;a<f.length;a++)g.push(setTimeout(c,f[a%f.length]))}var e=[],f=[0,4,10,20,25],g=[];a.queue=b}(d=c.Tick||(c.Tick={}))},{}],4:[function(a,b,c){"use strict";function d(a){function b(a){return a(function(a,b){var d=f[e(a)];return d<b.length?b[d]:c})}function c(){}var d=a.index,e=a.indexer,f=Object.create(null);return d.forEach(function(a,b){return f[a]=b}),{trans:b}}c["default"]=d},{}],5:[function(a,b,c){"use strict";function d(a){return a}var e=a("../lib/message"),f=a("./compose"),g=a("../proxy/state"),h=function(){function a(a,b,c){void 0===b&&(b=new e["default"]),void 0===c&&(c=[]),this.parent_=a,this.message_=b,this.proxies_=[],this.parent_&&c.unshift.apply(c,a.proxies_),this.proxy(c)}return a.prototype.proxy_=function(b){var c=this;return this.proxies_.push(b),Object.keys(b).filter(function(a){return"function"==typeof b[a]&&"import"!==a&&"export"!==a}).map(function(b){return[b,b in c&&c[b]!==a.prototype[b]?c[b]:a.prototype.trans]}).forEach(function(e){return c[e[0]]=function(){for(var f=[],g=0;g<arguments.length;g++)f[g-0]=arguments[g];return e[1]!==a.prototype.trans?b[e[0]](e[1]):e[1].call(c,function(a){return function(b){return a(b,f)(b)}}(b[e[0]](d)))}}),this},a.prototype.proxy=function(a){var b=this;return a instanceof Array?a.reduce(function(a,c){return b.proxy_(c)},this):this.proxy_(a)},a.prototype.trans=function(b){return new a(this,this.message_.recv(b))},a.prototype["import"]=function(a){return new f["default"](a.procs)},a.prototype["export"]=function(a){return new f["default"]([this.message_])["export"](a)},a.Message=e["default"],a.Proxy={State:g["default"]},a}();c["default"]=h},{"../lib/message":1,"../proxy/state":4,"./compose":6}],6:[function(a,b,c){"use strict";var d=a("./modular"),e=function(){function a(a){this.procs_=[],this.register_(a)}return a.prototype.register_=function(a){for(var b=this.procs_,c=this.procs_.length,d=a.length;d--;)b[d+c]=a[d]},a.prototype["import"]=function(a){if(!(a.limit-->0))throw new Error("Import count has exceeded the limit.");return this.register_(a.procs),this},a.prototype["export"]=function(a){return new d["default"](this.procs_,a)},a}();c["default"]=e},{"./modular":7}],7:[function(a,b,c){"use strict";var d=a("../lib/throttle"),e=a("../lib/tick"),f=function(){function a(a,b){void 0===b&&(b=1/0),this.procs=a,this.limit=b}return a.prototype.compose_=function(){this.last_=this.procs.map(function(a){return a.clone()}).reduce(function(a,b){return a.connect(b.root())&&b}),this.first_=this.last_.root()},a.prototype.flow=function(a,b,c){var e=this;return void 0===b&&(b=1/0),this.flow_=new d["default"],this.flow_.flow(a,b,c),this.flow_.subscribe(function(a){return e.write_(a)}),this},a.prototype.read_=function(a){this.compose_(),this.last_.recv(a)},a.prototype.read=function(a){var b=this,c=!1;return this.read_(function(d){b.flow_&&e.Tick.queue(function(){return b.flow_.dequeue(d)}),c&&a(d)}),c=!0,this},a.prototype.write_=function(a){this.first_.send(a)},a.prototype.write=function(a){return this.flow_?this.flow_.enqueue(a):this.write_(a),this},a}();c["default"]=f},{"../lib/throttle":2,"../lib/tick":3}],"arch-stream":[function(a,b,c){"use strict";var d=a("./stream/arch");c.ArchStream=d["default"],c.ArchStreamReflectable=d["default"],c.Message=d["default"].Message,c.Proxy=d["default"].Proxy,c.A=function(){return new c.ArchStream},c.A.Msg=function(){return new c.Message},c.A.Proxy=c.Proxy,c["default"]=d["default"]},{"./stream/arch":5}]},{},["arch-stream",1,2,3,4,5,6,7]),"object"==typeof module&&module&&"exports"in module&&(module.exports=require("arch-stream")); |
{ | ||
"name": "arch-stream", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "Modular stream for domain and data oriented program architecture design.", | ||
@@ -62,3 +62,3 @@ "private": false, | ||
"mocha": "^2.3.2", | ||
"power-assert": "^1.0.0", | ||
"power-assert": "^1.0.1", | ||
"requirejs": "^2.1.20", | ||
@@ -65,0 +65,0 @@ "strictify": "^0.2.0", |
@@ -7,3 +7,3 @@ # ArchStream | ||
Modular stream for domain and data oriented program architecture design. | ||
Transport only modular stream for domain and data oriented program architecture design. | ||
@@ -35,7 +35,7 @@ ## API | ||
const add1 = | ||
ArchStream<number>() | ||
.trans(n => ++n) | ||
ArchStream<{val: number}>() | ||
.trans(e => ++e.val) | ||
.export(), | ||
add2 = | ||
ArchStream<number>() | ||
ArchStream<{val: number}>() | ||
.import(add1) | ||
@@ -45,3 +45,3 @@ .import(add1) | ||
add3 = | ||
ArchStream<number>() | ||
ArchStream<{val: number}>() | ||
.import(add1) | ||
@@ -51,3 +51,3 @@ .import(add2) | ||
ArchStream<number>() | ||
ArchStream<{val: number}>() | ||
.import(add1) | ||
@@ -57,5 +57,5 @@ .import(add2) | ||
.export() | ||
.read(n => console.log(n)) | ||
.write(0) // => 6 | ||
.write(9); // => 15 | ||
.read(e => console.log(e.val)) | ||
.write({val: 0}) // => 6 | ||
.write({val: 9}); // => 15 | ||
``` | ||
@@ -68,8 +68,8 @@ | ||
ArchStream<any>() | ||
.trans(n => new Promise(resolve => setTimeout(_ => resolve(n / 10), 100 - n))) | ||
ArchStream<number>() | ||
.trans(n => new Promise(resolve => setTimeout(_ => resolve(), 100 - n))) | ||
.export() | ||
.read(n => console.log(n)) | ||
.write(10) // => 1 | ||
.write(20); // => 2 | ||
.write(10) // => 10 | ||
.write(20); // => 20 | ||
``` | ||
@@ -86,9 +86,9 @@ | ||
ArchStream<number>() | ||
.proxy(A.Proxy.State({index: [0, 1], indexer: n => n % 2, void: false})) | ||
.trans(n => n, n => -n) | ||
ArchStream<{val: number}>() | ||
.proxy(A.Proxy.State({index: [0, 1], indexer: e => e.val % 2})) | ||
.trans(_ => _, e => e.val = -e.val) | ||
.export() | ||
.read(n => console.log(n)) | ||
.write(1) // => -1 | ||
.write(2); // => 2 | ||
.read(e => console.log(e.val)) | ||
.write({val: 1}) // => -1 | ||
.write({val: 2}); // => 2 | ||
``` | ||
@@ -95,0 +95,0 @@ |
@@ -66,6 +66,2 @@ import Arch from 'arch-stream'; | ||
it('Void', function () { | ||
assert(typeof Proxy.Void === 'function'); | ||
}); | ||
}); | ||
@@ -72,0 +68,0 @@ |
@@ -11,3 +11,3 @@ import { Message as IMessage, ICallback, IThenable } from 'arch-stream'; | ||
// mutable once | ||
private listener_ = identity | ||
private listener_: ICallback<T> = identity | ||
) { | ||
@@ -46,17 +46,22 @@ if (parent_) { | ||
const collection = this.collection_ ? this.collection_ : this.collect_(); | ||
var result: T|IThenable<T> = data; | ||
for (let i = 0; i < collection.length; i++) { | ||
const msg = collection[i]; | ||
msg.memory_ = <T>result; | ||
result = msg.listener_(result); | ||
msg.memory_ = <T>data; | ||
const result = msg.listener_(data); | ||
if (isThenable(result)) { | ||
(<IThenable<T>>result) | ||
.then(msg.cascade_, msg.cascade_, msg.cascade_); | ||
const callback = cascade(msg, data); | ||
(<IThenable<void>>result) | ||
.then(callback, callback); | ||
break; | ||
} | ||
else if (msg.depth_ % MAX_RESCURSIVE_DEPTH === 0) { | ||
msg.cascade_(<T>result, true); | ||
msg.cascade_(data, true); | ||
break; | ||
} | ||
} | ||
return; | ||
function cascade<T>(msg: Message<T>, data: T) { | ||
return () => void msg.cascade_(data, true); | ||
} | ||
} | ||
@@ -77,4 +82,5 @@ | ||
assert(msg.parent_ === void 0); | ||
this.child_ = msg; | ||
msg.parent_ = this; | ||
assert(this.listener_ === identity); | ||
this.parent_.child_ = msg; | ||
msg.parent_ = this.parent_; | ||
return this; | ||
@@ -103,4 +109,9 @@ } | ||
public send = ((self: Message<T>) => | ||
function (data: T): Message<T> { | ||
self.trampoline_(data); | ||
function (data: T, async?: boolean): Message<T> { | ||
if (!async) { | ||
self.trampoline_(data); | ||
} | ||
else { | ||
Tick.queue(self.trampoline_, [data], self); | ||
} | ||
/* | ||
@@ -123,11 +134,2 @@ const result = self.listener_(data); | ||
function (callback: (data: T) => any): Message<T> { | ||
return self.trans(data => { | ||
callback(data); | ||
return data; | ||
}); | ||
} | ||
)(this); | ||
public trans = ((self: Message<T>) => | ||
function (callback: ICallback<T>): Message<T> { | ||
assert(self.listener_ === identity); | ||
@@ -146,3 +148,3 @@ self.listener_ = callback; | ||
function (done: ICallback<T>, fail?: ICallback<T>): Message<T> { | ||
return self.trans(done || fail); | ||
return self.recv(done || fail); | ||
} | ||
@@ -153,3 +155,3 @@ )(this); | ||
function identity<T>(data: T): T|IThenable<T> { | ||
function identity<T>(data: T): T { | ||
return data; | ||
@@ -156,0 +158,0 @@ } |
@@ -20,3 +20,3 @@ | ||
if (this.isDrainable_()) { | ||
this.processing_.push(isUniqueElement(data) ? data : void 0); | ||
this.processing_.push(data); | ||
this.listener_(data); | ||
@@ -51,6 +51,1 @@ return true; | ||
} | ||
function isUniqueElement(elm): boolean { | ||
const type = typeof elm; | ||
return type === 'object' || type === 'function'; | ||
} |
@@ -1,2 +0,2 @@ | ||
import { ArchStream, ArchStreamReflectable, ICallback, IThenable } from 'arch-stream'; | ||
import { IProxyCallback } from 'arch-stream'; | ||
@@ -6,8 +6,6 @@ export default function <T, U>( | ||
index: (string|number)[], | ||
indexer: (data: T) => string|number, | ||
void?: boolean | ||
indexer: (data: T) => string|number | ||
} | ||
) { | ||
const { index, indexer } = definition; | ||
const void_ = definition.void === false ? false : true; | ||
const map: { [index: string]: number } = Object.create(null); | ||
@@ -20,21 +18,13 @@ index | ||
function cascade(...fs: ICallback<T>[]): ArchStream<T> { | ||
const self: ArchStreamReflectable<T> = this; | ||
return new ArchStreamReflectable<T>(self, self.message_.trans((data: T) => { | ||
function cascade(registered: IProxyCallback<T>): IProxyCallback<T> { | ||
return registered((data: T, args: IProxyCallback<T>[]) => { | ||
const index = map[indexer(data)]; | ||
const result = (index < fs.length && fs[index] || identity)(data); | ||
if (!result || typeof result !== 'object') { | ||
return void_ ? data : result; | ||
} | ||
else if ('then' in result) { | ||
const msg = new ArchStream.Message<T>(); | ||
(<IThenable<T>>result).then(msg.send, msg.send); | ||
return msg; // ignore void option | ||
} | ||
return void_ ? data : result; | ||
})); | ||
return index < args.length | ||
? args[index] | ||
: <IProxyCallback<T>>noop; | ||
}); | ||
} | ||
function identity(data: T): T { | ||
return data; | ||
function noop(): any { | ||
; | ||
} | ||
} |
@@ -6,6 +6,5 @@ import { ArchStream as IArchStream, ICallback } from 'arch-stream'; | ||
import StateProxy from '../proxy/state'; | ||
import VoidProxy from '../proxy/void'; | ||
interface IProxy<T> { | ||
[method: string]: (...args: ICallback<T>[]) => ArchStream<T>; | ||
[method: string]: (registered: ICallback<T>) => ArchStream<T>; | ||
} | ||
@@ -15,4 +14,3 @@ export default class ArchStream<T> implements IArchStream<T> { | ||
public static Proxy = { | ||
State: StateProxy, | ||
Void: VoidProxy | ||
State: StateProxy | ||
}; | ||
@@ -30,17 +28,29 @@ | ||
} | ||
proxies | ||
.forEach(proxy => this.proxy(proxy)); | ||
this.proxy(proxies); | ||
} | ||
// mutable | ||
private proxies_: IProxy<T>[] = []; | ||
public proxy<U extends IProxy<T>>(proxy: U): ArchStream<T>/*&U*/ { | ||
private proxy_<U extends IProxy<T>>(proxy: U): ArchStream<T>/*&U*/ { | ||
this.proxies_.push(proxy); | ||
Object.keys(proxy) | ||
.filter((prop: string) => typeof proxy[prop] === 'function') | ||
.filter((method: string) => !(method in this) || this[method] === ArchStream.prototype[method]) | ||
.forEach((method: string) => this[method] = proxy[method]); | ||
.filter(prop => typeof proxy[prop] === 'function' && prop !== 'import' && prop !== 'export') | ||
.map<[string, typeof identity]>(method => [ | ||
method, | ||
method in this && this[method] !== ArchStream.prototype[method] ? this[method] : ArchStream.prototype.trans | ||
]) | ||
.forEach(tuple => | ||
this[tuple[0]] = (...args) => | ||
tuple[1] !== ArchStream.prototype.trans | ||
? proxy[tuple[0]](tuple[1]) | ||
: tuple[1].call(this, (callback => (data: T) => callback(data, args)(data))(proxy[tuple[0]](identity))) | ||
); | ||
return this; | ||
} | ||
public proxy<U extends IProxy<T>>(proxies: U|U[]): ArchStream<T>/*&U*/ { | ||
return proxies instanceof Array | ||
? proxies.reduce((_, proxy) => this.proxy_(proxy), this) | ||
: this.proxy_(<U>proxies); | ||
} | ||
public trans(callback: ICallback<T>): ArchStream<T> { | ||
return new ArchStream(this, this.message_.trans(callback)); | ||
return new ArchStream(this, this.message_.recv(callback)); | ||
} | ||
@@ -50,5 +60,9 @@ public import(module: ModularStream<T>): ComposeStream<T> { | ||
} | ||
public export(): ModularStream<T> { | ||
return new ComposeStream<T>([this.message_]).export(); | ||
public export(limit?: number): ModularStream<T> { | ||
return new ComposeStream<T>([this.message_]).export(limit); | ||
} | ||
} | ||
function identity<T>(arg: T): T { | ||
return arg; | ||
} |
@@ -18,8 +18,13 @@ import { ComposeStream as IComposeStream } from 'arch-stream'; | ||
public import(module: ModularStream<T>): ComposeStream<T> { | ||
this.register_(module.procs); | ||
if (module.limit-- > 0) { | ||
this.register_(module.procs); | ||
} | ||
else { | ||
throw new Error('Import count has exceeded the limit.'); | ||
} | ||
return this; | ||
} | ||
public export(): ModularStream<T> { | ||
return new ModularStream<T>(this.procs_); | ||
public export(limit?: number): ModularStream<T> { | ||
return new ModularStream<T>(this.procs_, limit); | ||
} | ||
} |
@@ -1,2 +0,2 @@ | ||
import { IModularStream, IModularWritableStream, IModularReadableStream, ICallbackVoid } from 'arch-stream'; | ||
import { IModularStream, IModularWritableStream, IModularReadableStream, ICallback } from 'arch-stream'; | ||
import Message from '../lib/message'; | ||
@@ -7,3 +7,3 @@ import Throttle from '../lib/throttle'; | ||
export default class ModularStream<T> implements IModularStream<T>, IModularWritableStream<T>, IModularReadableStream<T> { | ||
constructor(public procs: Message<T>[]) { | ||
constructor(public procs: Message<T>[], public limit = Infinity) { | ||
assert(procs instanceof Array); | ||
@@ -30,7 +30,7 @@ assert(procs.length > 0); | ||
private read_(callback: ICallbackVoid<T>): void { | ||
private read_(callback: ICallback<T>): void { | ||
this.compose_(); | ||
this.last_.trans(callback); | ||
this.last_.recv(callback); | ||
} | ||
public read(callback: ICallbackVoid<T>): IModularWritableStream<T> { | ||
public read(callback: ICallback<T>): IModularWritableStream<T> { | ||
var active = false; | ||
@@ -37,0 +37,0 @@ this.read_((data: T) => { |
import { A } from 'arch-stream'; | ||
interface Entity<T> { | ||
val: T; | ||
} | ||
describe('Performance:', function () { | ||
@@ -8,7 +12,7 @@ describe('Message', function () { | ||
it('trans 1k chains * 1k msgs', function (done) { | ||
const sender = A.Msg<number>(), | ||
add1 = n => n + 1; | ||
const sender = A.Msg<Entity<number>>(), | ||
add1 = e => ++e.val; | ||
var tmp = sender; | ||
for (let i = 0; i < 1e3; i++) { | ||
tmp = tmp.trans(add1); | ||
tmp = tmp.recv(add1); | ||
} | ||
@@ -18,6 +22,6 @@ | ||
tmp | ||
.recv(n => n < 1e3 + 1e3 || console.log(`Message: 1k trans chains * 1k msgs ${time()}ms`) || done()); | ||
.recv(e => e.val < 1e3 + 1e3 || console.log(`Message: 1k trans chains * 1k msgs ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
sender.send(i + 1); | ||
sender.send({val: i + 1}); | ||
} | ||
@@ -27,7 +31,7 @@ }); | ||
it('defer 100 chains * 1k msgs', function (done) { | ||
const sender = A.Msg<number>(), | ||
add1 = n => A.Msg<number>().send(n + 1); | ||
const sender = A.Msg<Entity<number>>(), | ||
add1 = e => A.Msg<number>().send(++e.val); | ||
var tmp = sender; | ||
for (let i = 0; i < 1e2; i++) { | ||
tmp = tmp.trans(add1); | ||
tmp = tmp.recv(add1); | ||
} | ||
@@ -37,6 +41,6 @@ | ||
tmp | ||
.recv(n => n < 1e2 + 1e3 || console.log(`Message: 100 defer chains * 1k msgs ${time()}ms`) || done()); | ||
.recv(e => e.val < 1e2 + 1e3 || console.log(`Message: 100 defer chains * 1k msgs ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
sender.send(i + 1); | ||
sender.send({val: i + 1}); | ||
} | ||
@@ -43,0 +47,0 @@ }); |
import { A } from 'arch-stream'; | ||
interface Entity<T> { | ||
val: T; | ||
} | ||
describe('Performance:', function () { | ||
@@ -8,9 +12,8 @@ describe('State', function () { | ||
it('state 1k chains * 1k data', function (done) { | ||
const proxy = A.Proxy.State<number>({ | ||
indexer: n => 1, | ||
index: [1, 2, 3], | ||
void: false | ||
const proxy = A.Proxy.State<Entity<number>>({ | ||
indexer: e => 1, | ||
index: [1, 2, 3] | ||
}); | ||
var stm = A<number>().proxy(proxy), | ||
add1 = n => ++n; | ||
var stm = A<Entity<number>>().proxy(proxy), | ||
add1 = e => ++e.val; | ||
for (let i = 0; i < 1e3; i++) { | ||
@@ -23,6 +26,6 @@ stm = stm.trans(add1); | ||
.export() | ||
.read(n => n < 1e3 + 1e3 || console.log(`State: 1k trans chains * 1k data ${time()}ms`) || done()); | ||
.read(e => e.val < 1e3 + 1e3 || console.log(`State: 1k trans chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -32,9 +35,8 @@ }); | ||
it('imported state 1k chains * 1k data', function (done) { | ||
const proxy = A.Proxy.State<number>({ | ||
indexer: n => 1, | ||
index: [1, 2, 3], | ||
void: false | ||
const proxy = A.Proxy.State<Entity<number>>({ | ||
indexer: e => 1, | ||
index: [1, 2, 3] | ||
}); | ||
var stm = A<number>().import(A<number>().trans(_ => _).export()), | ||
add1 = A<number>().proxy(proxy).trans(n => ++n).export(); | ||
var stm = A<Entity<number>>().import(A<Entity<number>>().trans(_ => _).export()), | ||
add1 = A<Entity<number>>().proxy(proxy).trans(e => ++e.val).export(); | ||
for (let i = 0; i < 1e3; i++) { | ||
@@ -47,6 +49,6 @@ stm = stm.import(add1); | ||
.export() | ||
.read(n => n < 1e3 + 1e3 || console.log(`State: 1k imported trans chains * 1k data ${time()}ms`) || done()); | ||
.read(e => e.val < 1e3 + 1e3 || console.log(`State: 1k imported trans chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -53,0 +55,0 @@ }); |
import { A } from 'arch-stream'; | ||
interface Entity<T> { | ||
val: T; | ||
} | ||
describe('Performance:', function () { | ||
@@ -8,4 +12,4 @@ describe('Stream', function () { | ||
it('trans 1k chains * 1k data', function (done) { | ||
var stm = A<number>().trans(n => n), | ||
add1 = n => n + 1; | ||
var stm = A<Entity<number>>().trans(_ => _), | ||
add1 = e => ++e.val; | ||
for (let i = 0; i < 1e3; i++) { | ||
@@ -18,6 +22,6 @@ stm = stm.trans(add1); | ||
.export() | ||
.read(n => n < 1e3 + 1e3 || console.log(`Stream: 1k trans chains * 1k data ${time()}ms`) || done()); | ||
.read(e => e.val < 1e3 + 1e3 || console.log(`Stream: 1k trans chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -27,4 +31,4 @@ }); | ||
it('imported trans 1k chains * 1k data', function (done) { | ||
var stm = A<number>().import(A<number>().trans(_ => _).export()), | ||
add1 = A<number>().trans(n => n + 1).export(); | ||
var stm = A<Entity<number>>().import(A<Entity<number>>().trans(_ => _).export()), | ||
add1 = A<Entity<number>>().trans(e => ++e.val).export(); | ||
for (let i = 0; i < 1e3; i++) { | ||
@@ -37,6 +41,6 @@ stm = stm.import(add1); | ||
.export() | ||
.read(n => n < 1e3 + 1e3 || console.log(`Stream: imported 1k trans chains * 1k data ${time()}ms`) || done()); | ||
.read(e => e.val < 1e3 + 1e3 || console.log(`Stream: imported 1k trans chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -46,4 +50,4 @@ }); | ||
it('imported 100 flow trans 1k chains * 1k data', function (done) { | ||
var stm = A<number>().import(A<number>().trans(_ => _).export()), | ||
add1 = A<number>().trans(n => n + 1).export(); | ||
var stm = A<Entity<number>>().import(A<Entity<number>>().trans(_ => _).export()), | ||
add1 = A<Entity<number>>().trans(e => ++e.val).export(); | ||
for (let i = 0; i < 1e3; i++) { | ||
@@ -56,6 +60,6 @@ stm = stm.import(add1); | ||
.export() | ||
.flow(1e3).read(n => n < 1e3 + 1e3 || console.log(`Stream: imported 100 flow 1k trans chains * 1k data ${time()}ms`) || done()); | ||
.flow(1e3).read(e => e.val < 1e3 + 1e3 || console.log(`Stream: imported 100 flow 1k trans chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -65,4 +69,4 @@ }); | ||
it('imported 1k flow trans 1k chains * 1k data', function (done) { | ||
var stm = A<number>().import(A<number>().trans(_ => _).export()), | ||
add1 = A<number>().trans(n => n + 1).export(); | ||
var stm = A<Entity<number>>().import(A<Entity<number>>().trans(_ => _).export()), | ||
add1 = A<Entity<number>>().trans(e => ++e.val).export(); | ||
for (let i = 0; i < 1e3; i++) { | ||
@@ -75,6 +79,6 @@ stm = stm.import(add1); | ||
.export() | ||
.flow(1e3).read(n => n < 1e3 + 1e3 || console.log(`Stream: imported 1k flow 1k trans chains * 1k data ${time()}ms`) || done()); | ||
.flow(1e3).read(e => e.val < 1e3 + 1e3 || console.log(`Stream: imported 1k flow 1k trans chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -84,4 +88,4 @@ }); | ||
it('defer 100 chains * 1k data', function (done) { | ||
var stm = A<number>().trans(n => n), | ||
add1 = n => A.Msg<number>().send(n + 1); | ||
var stm = A<Entity<number>>().trans(_ => _), | ||
add1 = e => A.Msg<number>().send(++e.val); | ||
for (let i = 0; i < 1e2; i++) { | ||
@@ -94,6 +98,6 @@ stm = stm.trans(add1); | ||
.export() | ||
.read(n => n < 1e2 + 1e3 || console.log(`Stream: defer 100 chains * 1k data ${time()}ms`) || done()); | ||
.read(e => e.val < 1e2 + 1e3 || console.log(`Stream: defer 100 chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -103,4 +107,4 @@ }); | ||
it('imported defer 100 chains * 1k data', function (done) { | ||
var stm = A<number>().import(A<number>().trans(_ => _).export()), | ||
add1 = A<number>().trans(n => A.Msg<number>().send(n + 1)).export(); | ||
var stm = A<Entity<number>>().import(A<Entity<number>>().trans(_ => _).export()), | ||
add1 = A<Entity<number>>().trans(e => A.Msg<number>().send(++e.val)).export(); | ||
for (let i = 0; i < 1e2; i++) { | ||
@@ -113,6 +117,6 @@ stm = stm.import(add1); | ||
.export() | ||
.read(n => n < 1e2 + 1e3 || console.log(`Stream: imported defer 100 chains * 1k data ${time()}ms`) || done()); | ||
.read(e => e.val < 1e2 + 1e3 || console.log(`Stream: imported defer 100 chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -122,4 +126,4 @@ }); | ||
it('imported 100 flow defer 100 chains * 1k data', function (done) { | ||
var stm = A<number>().import(A<number>().trans(_ => _).export()), | ||
add1 = A<number>().trans(n => A.Msg<number>().send(n + 1)).export(); | ||
var stm = A<Entity<number>>().import(A<Entity<number>>().trans(_ => _).export()), | ||
add1 = A<Entity<number>>().trans(e => A.Msg<number>().send(++e.val)).export(); | ||
for (let i = 0; i < 1e2; i++) { | ||
@@ -132,6 +136,6 @@ stm = stm.import(add1); | ||
.export() | ||
.flow(1e2).read(n => n < 1e2 + 1e3 || console.log(`Stream: imported 100 flow defer 100 chains * 1k data ${time()}ms`) || done()); | ||
.flow(1e2).read(e => e.val < 1e2 + 1e3 || console.log(`Stream: imported 100 flow defer 100 chains * 1k data ${time()}ms`) || done()); | ||
for (let i = 0; i < 1e3; i++) { | ||
mdl.write(i + 1); | ||
mdl.write({val: i + 1}); | ||
} | ||
@@ -138,0 +142,0 @@ }); |
@@ -6,3 +6,3 @@ /** | ||
* @name arch-stream | ||
* @version 0.0.2 | ||
* @version 0.0.3 | ||
* --- | ||
@@ -36,4 +36,8 @@ * @author falsandtru https://github.com/falsandtru/arch-stream | ||
this.send = function (self) { | ||
return function (data) { | ||
self.trampoline_(data); | ||
return function (data, async) { | ||
if (!async) { | ||
self.trampoline_(data); | ||
} else { | ||
tick_1.Tick.queue(self.trampoline_, [data], self); | ||
} | ||
return self; | ||
@@ -44,10 +48,2 @@ }; | ||
return function (callback) { | ||
return self.trans(function (data) { | ||
callback(data); | ||
return data; | ||
}); | ||
}; | ||
}(this); | ||
this.trans = function (self) { | ||
return function (callback) { | ||
self.listener_ = callback; | ||
@@ -64,3 +60,3 @@ self.collection_ = void 0; | ||
return function (done, fail) { | ||
return self.trans(done || fail); | ||
return self.recv(done || fail); | ||
}; | ||
@@ -86,15 +82,21 @@ }(this); | ||
var collection = this.collection_ ? this.collection_ : this.collect_(); | ||
var result = data; | ||
for (var i = 0; i < collection.length; i++) { | ||
var msg = collection[i]; | ||
msg.memory_ = result; | ||
result = msg.listener_(result); | ||
msg.memory_ = data; | ||
var result = msg.listener_(data); | ||
if (isThenable(result)) { | ||
result.then(msg.cascade_, msg.cascade_, msg.cascade_); | ||
var callback = cascade(msg, data); | ||
result.then(callback, callback); | ||
break; | ||
} else if (msg.depth_ % MAX_RESCURSIVE_DEPTH === 0) { | ||
msg.cascade_(result, true); | ||
msg.cascade_(data, true); | ||
break; | ||
} | ||
} | ||
return; | ||
function cascade(msg, data) { | ||
return function () { | ||
return void msg.cascade_(data, true); | ||
}; | ||
} | ||
}; | ||
@@ -108,4 +110,4 @@ Message.prototype.clone_ = function (msg) { | ||
Message.prototype.connect = function (msg) { | ||
this.child_ = msg; | ||
msg.parent_ = this; | ||
this.parent_.child_ = msg; | ||
msg.parent_ = this.parent_; | ||
return this; | ||
@@ -157,3 +159,3 @@ }; | ||
if (this.isDrainable_()) { | ||
this.processing_.push(isUniqueElement(data) ? data : void 0); | ||
this.processing_.push(data); | ||
this.listener_(data); | ||
@@ -186,6 +188,2 @@ return true; | ||
exports.default = Throttle; | ||
function isUniqueElement(elm) { | ||
var type = typeof elm; | ||
return type === 'object' || type === 'function'; | ||
} | ||
},{}],3:[function(require,module,exports){ | ||
@@ -236,6 +234,4 @@ "use strict"; | ||
"use strict"; | ||
var arch_stream_1 = require('arch-stream'); | ||
function default_1(definition) { | ||
var index = definition.index, indexer = definition.indexer; | ||
var void_ = definition.void === false ? false : true; | ||
var map = Object.create(null); | ||
@@ -246,49 +242,20 @@ index.forEach(function (method, i) { | ||
return { trans: cascade }; | ||
function cascade() { | ||
var fs = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
fs[_i - 0] = arguments[_i]; | ||
} | ||
var self = this; | ||
return new arch_stream_1.ArchStreamReflectable(self, self.message_.trans(function (data) { | ||
function cascade(registered) { | ||
return registered(function (data, args) { | ||
var index = map[indexer(data)]; | ||
var result = (index < fs.length && fs[index] || identity)(data); | ||
if (!result || typeof result !== 'object') { | ||
return void_ ? data : result; | ||
} else if ('then' in result) { | ||
var msg = new arch_stream_1.ArchStream.Message(); | ||
result.then(msg.send, msg.send); | ||
return msg; | ||
} | ||
return void_ ? data : result; | ||
})); | ||
return index < args.length ? args[index] : noop; | ||
}); | ||
} | ||
function identity(data) { | ||
return data; | ||
function noop() { | ||
; | ||
} | ||
} | ||
exports.default = default_1; | ||
},{"arch-stream":"arch-stream"}],5:[function(require,module,exports){ | ||
},{}],5:[function(require,module,exports){ | ||
"use strict"; | ||
var arch_stream_1 = require('arch-stream'); | ||
function default_1() { | ||
return { trans: cascade }; | ||
function cascade(f) { | ||
var self = this; | ||
return new arch_stream_1.ArchStreamReflectable(self, self.message_.trans(function (data) { | ||
f(data); | ||
return data; | ||
})); | ||
} | ||
} | ||
exports.default = default_1; | ||
},{"arch-stream":"arch-stream"}],6:[function(require,module,exports){ | ||
"use strict"; | ||
var message_1 = require('../lib/message'); | ||
var compose_1 = require('./compose'); | ||
var state_1 = require('../proxy/state'); | ||
var void_1 = require('../proxy/void'); | ||
var ArchStream = function () { | ||
function ArchStream(parent_, message_, proxies) { | ||
var _this = this; | ||
if (message_ === void 0) { | ||
@@ -306,20 +273,37 @@ message_ = new message_1.default(); | ||
} | ||
proxies.forEach(function (proxy) { | ||
return _this.proxy(proxy); | ||
}); | ||
this.proxy(proxies); | ||
} | ||
ArchStream.prototype.proxy = function (proxy) { | ||
ArchStream.prototype.proxy_ = function (proxy) { | ||
var _this = this; | ||
this.proxies_.push(proxy); | ||
Object.keys(proxy).filter(function (prop) { | ||
return typeof proxy[prop] === 'function'; | ||
}).filter(function (method) { | ||
return !(method in _this) || _this[method] === ArchStream.prototype[method]; | ||
}).forEach(function (method) { | ||
return _this[method] = proxy[method]; | ||
return typeof proxy[prop] === 'function' && prop !== 'import' && prop !== 'export'; | ||
}).map(function (method) { | ||
return [ | ||
method, | ||
method in _this && _this[method] !== ArchStream.prototype[method] ? _this[method] : ArchStream.prototype.trans | ||
]; | ||
}).forEach(function (tuple) { | ||
return _this[tuple[0]] = function () { | ||
var args = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
args[_i - 0] = arguments[_i]; | ||
} | ||
return tuple[1] !== ArchStream.prototype.trans ? proxy[tuple[0]](tuple[1]) : tuple[1].call(_this, function (callback) { | ||
return function (data) { | ||
return callback(data, args)(data); | ||
}; | ||
}(proxy[tuple[0]](identity))); | ||
}; | ||
}); | ||
return this; | ||
}; | ||
ArchStream.prototype.proxy = function (proxies) { | ||
var _this = this; | ||
return proxies instanceof Array ? proxies.reduce(function (_, proxy) { | ||
return _this.proxy_(proxy); | ||
}, this) : this.proxy_(proxies); | ||
}; | ||
ArchStream.prototype.trans = function (callback) { | ||
return new ArchStream(this, this.message_.trans(callback)); | ||
return new ArchStream(this, this.message_.recv(callback)); | ||
}; | ||
@@ -329,14 +313,14 @@ ArchStream.prototype.import = function (module) { | ||
}; | ||
ArchStream.prototype.export = function () { | ||
return new compose_1.default([this.message_]).export(); | ||
ArchStream.prototype.export = function (limit) { | ||
return new compose_1.default([this.message_]).export(limit); | ||
}; | ||
ArchStream.Message = message_1.default; | ||
ArchStream.Proxy = { | ||
State: state_1.default, | ||
Void: void_1.default | ||
}; | ||
ArchStream.Proxy = { State: state_1.default }; | ||
return ArchStream; | ||
}(); | ||
exports.default = ArchStream; | ||
},{"../lib/message":1,"../proxy/state":4,"../proxy/void":5,"./compose":7}],7:[function(require,module,exports){ | ||
function identity(arg) { | ||
return arg; | ||
} | ||
},{"../lib/message":1,"../proxy/state":4,"./compose":6}],6:[function(require,module,exports){ | ||
"use strict"; | ||
@@ -356,7 +340,11 @@ var modular_1 = require('./modular'); | ||
ComposeStream.prototype.import = function (module) { | ||
this.register_(module.procs); | ||
if (module.limit-- > 0) { | ||
this.register_(module.procs); | ||
} else { | ||
throw new Error('Import count has exceeded the limit.'); | ||
} | ||
return this; | ||
}; | ||
ComposeStream.prototype.export = function () { | ||
return new modular_1.default(this.procs_); | ||
ComposeStream.prototype.export = function (limit) { | ||
return new modular_1.default(this.procs_, limit); | ||
}; | ||
@@ -366,3 +354,3 @@ return ComposeStream; | ||
exports.default = ComposeStream; | ||
},{"./modular":8}],8:[function(require,module,exports){ | ||
},{"./modular":7}],7:[function(require,module,exports){ | ||
"use strict"; | ||
@@ -372,4 +360,8 @@ var throttle_1 = require('../lib/throttle'); | ||
var ModularStream = function () { | ||
function ModularStream(procs) { | ||
function ModularStream(procs, limit) { | ||
if (limit === void 0) { | ||
limit = Infinity; | ||
} | ||
this.procs = procs; | ||
this.limit = limit; | ||
} | ||
@@ -398,3 +390,3 @@ ModularStream.prototype.compose_ = function () { | ||
this.compose_(); | ||
this.last_.trans(callback); | ||
this.last_.recv(callback); | ||
}; | ||
@@ -446,3 +438,3 @@ ModularStream.prototype.read = function (callback) { | ||
exports.default = arch_1.default; | ||
},{"./stream/arch":6}]},{},["arch-stream",1,2,3,4,5,6,7,8]); | ||
},{"./stream/arch":5}]},{},["arch-stream",1,2,3,4,5,6,7]); | ||
typeof module === "object" && module && "exports" in module && (module.exports = require("arch-stream")); |
import { Message } from 'arch-stream'; | ||
interface Entity<T> { | ||
val: T; | ||
} | ||
describe('Unit: Message', function () { | ||
@@ -17,72 +21,37 @@ describe('definition', function () { | ||
it('recv > send', function (done) { | ||
const sender = new Message<number>(); | ||
const sender = new Message<Entity<number>>(); | ||
sender | ||
.send(1); | ||
.send({val: 1}); | ||
sender | ||
.recv(data => assert(data === 1) || done()); | ||
.recv(e => assert(e.val === 1) || done()); | ||
}); | ||
it('send > recv', function (done) { | ||
const sender = new Message<number>(); | ||
const sender = new Message<Entity<number>>(); | ||
sender | ||
.recv(data => assert(data === 1) || done()); | ||
.recv(e => assert(e.val === 1) || done()); | ||
sender | ||
.send(1); | ||
.send({val: 1}); | ||
}); | ||
it('trans', function (done) { | ||
new Message<number>() | ||
.send(1) | ||
.trans(data => data + 1) | ||
.recv(data => assert(data === 2) || done()); | ||
new Message<Entity<number>>() | ||
.send({val: 1}) | ||
.recv(e => ++e.val) | ||
.recv(e => assert(e.val === 2) || done()); | ||
}); | ||
it('defer message', function (done) { | ||
new Message<number>() | ||
.send(1) | ||
.trans(data => { | ||
const msg = new Message<number>(); | ||
setTimeout(() => msg.send(data + 1), 1); | ||
new Message<Entity<number>>() | ||
.send({val: 1}) | ||
.recv(e => { | ||
const msg = new Message<any>(); | ||
setTimeout(() => msg.send(++e.val), 1); | ||
return msg; | ||
}) | ||
.trans(data => data + 1) | ||
.recv(data => assert(data === 3) || done()); | ||
.recv(e => assert(e.val === 2) || done()); | ||
}); | ||
it('trans > defer', function (done) { | ||
new Message<number>() | ||
.send(1) | ||
.trans(data => data + 1) | ||
.trans(data => new Message<number>().send(data + 1)) | ||
.trans(data => data + 1) | ||
.trans(data => new Message<number>().send(data + 1)) | ||
.recv(data => assert(data === 5) || done()); | ||
}); | ||
it('defer > trans', function (done) { | ||
new Message<number>() | ||
.send(1) | ||
.trans(data => new Message<number>().send(data + 1)) | ||
.trans(data => data + 1) | ||
.trans(data => new Message<number>().send(data + 1)) | ||
.trans(data => data + 1) | ||
.recv(data => assert(data === 5) || done()); | ||
}); | ||
it('clone', function (done) { | ||
(<any>new Message<number>() | ||
.recv(data => assert(false)) | ||
.trans(data => data + 1) | ||
.recv(data => assert(false)) | ||
.trans(data => new Message<number>().send(data + 1)) | ||
.recv(data => assert(false)) | ||
.trans(data => data + 1) | ||
.recv(data => assert(false))) | ||
.clone() | ||
.send(1) | ||
.recv(data => assert(data === 1) || done()); | ||
}); | ||
}); | ||
}); |
import { ArchStream } from 'arch-stream'; | ||
interface Entity { | ||
state: string; | ||
} | ||
describe('State', function () { | ||
describe('standard', function () { | ||
it('state', function (done) { | ||
const proxy = ArchStream.Proxy.State<{state: string}>({ | ||
const proxy = ArchStream.Proxy.State<Entity>({ | ||
indexer: data => data.state, | ||
index: ['pending', 'resolved', 'rejected'] | ||
}); | ||
(<any>new ArchStream<{state: string}>() | ||
(<any>new ArchStream<Entity>() | ||
.proxy(proxy)) | ||
@@ -22,7 +26,7 @@ .trans(e => e.state = 'resolved', e => e.state = 'resolved', e => e.state = 'rejected') | ||
it('imported state', function (done) { | ||
const proxy = ArchStream.Proxy.State<{state: string}>({ | ||
const proxy = ArchStream.Proxy.State<Entity>({ | ||
indexer: data => data.state, | ||
index: ['pending', 'resolved', 'rejected'] | ||
}); | ||
const mdl = (<any>(new ArchStream<{ state: string }>()) | ||
const mdl = (<any>(new ArchStream<Entity>()) | ||
.proxy(proxy)) | ||
@@ -34,3 +38,3 @@ .trans(e => e.state = 'resolved', e => e.state = 'resolved', e => e.state = 'rejected') | ||
.export(); | ||
new ArchStream<{state: string}>() | ||
new ArchStream<Entity>() | ||
.import(mdl) | ||
@@ -37,0 +41,0 @@ .export() |
import { ArchStream, Message } from 'arch-stream'; | ||
interface Entity<T> { | ||
val: T; | ||
} | ||
describe('Unit: ArchStream', function () { | ||
@@ -17,44 +21,44 @@ describe('definition', function () { | ||
it('read/write', function (done) { | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.trans(_ => _) | ||
.export() | ||
.read(data => assert(data === 1) || done()) | ||
.write(1); | ||
.read(e => assert(e.val === 1) || done()) | ||
.write({val: 1}); | ||
}); | ||
it('trans', function (done) { | ||
new ArchStream<number>() | ||
.trans(data => data + 1) | ||
new ArchStream<Entity<number>>() | ||
.trans(e => ++e.val) | ||
.export() | ||
.read(data => assert(data === 2) || done()) | ||
.write(1); | ||
.read(e => assert(e.val === 2) || done()) | ||
.write({val: 1}); | ||
}); | ||
it('defer message', function (done) { | ||
new ArchStream<number>() | ||
.trans(data => { | ||
new ArchStream<Entity<number>>() | ||
.trans(e => { | ||
const msg = new Message<number>(); | ||
setTimeout(() => msg.send(data + 1), 1); | ||
setTimeout(() => msg.send(++e.val), 1); | ||
return msg; | ||
}) | ||
.trans(data => data + 1) | ||
.trans(e => ++e.val) | ||
.export() | ||
.read(data => assert(data === 3) || done()) | ||
.write(1); | ||
.read(e => assert(e.val === 3) || done()) | ||
.write({val: 1}); | ||
}); | ||
it('import trans', function (done) { | ||
const m = new ArchStream<number>() | ||
.trans(data => data + 1) | ||
const m = new ArchStream<Entity<number>>() | ||
.trans(e => ++e.val) | ||
.export(); | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(m) | ||
.export() | ||
.read(_ => assert(false)); | ||
const stm = new ArchStream<number>() | ||
const stm = new ArchStream<Entity<number>>() | ||
.import(m) | ||
.export() | ||
.read(data => assert(data === 2) || done()); | ||
new ArchStream<number>() | ||
.read(e => assert(e.val === 2) || done()); | ||
new ArchStream<Entity<number>>() | ||
.import(m) | ||
@@ -64,24 +68,24 @@ .export() | ||
stm.write(1); | ||
stm.write({val: 1}); | ||
}); | ||
it('import defer message', function (done) { | ||
const m = new ArchStream<number>() | ||
.trans(data => { | ||
const m = new ArchStream<Entity<number>>() | ||
.trans(e => { | ||
const msg = new Message<number>(); | ||
setTimeout(() => msg.send(data + 1), 1); | ||
setTimeout(() => msg.send(++e.val), 1); | ||
return msg; | ||
}) | ||
.trans(data => data + 1) | ||
.trans(e => ++e.val) | ||
.export(); | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(m) | ||
.export() | ||
.read(_ => assert(false)); | ||
const stm = new ArchStream<number>() | ||
const stm = new ArchStream<Entity<number>>() | ||
.import(m) | ||
.export() | ||
.read(data => assert(data === 3) || done()); | ||
new ArchStream<number>() | ||
.read(e => assert(e.val === 3) || done()); | ||
new ArchStream<Entity<number>>() | ||
.import(m) | ||
@@ -92,10 +96,10 @@ .export() | ||
stm | ||
.write(1); | ||
.write({val: 1}); | ||
}); | ||
it('import many', function (done) { | ||
var astm = new ArchStream<number>() | ||
.trans(data => { | ||
var astm = new ArchStream<Entity<number>>() | ||
.trans(e => { | ||
const msg = new Message<number>(); | ||
setTimeout(() => msg.send(data + 1), 1); | ||
setTimeout(() => msg.send(++e.val), 1); | ||
return msg; | ||
@@ -110,3 +114,3 @@ }); | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add1) | ||
@@ -116,7 +120,7 @@ .import(add1) | ||
.read(_ => assert(false)); | ||
const add2 = new ArchStream<number>() | ||
const add2 = new ArchStream<Entity<number>>() | ||
.import(add1) | ||
.import(add1) | ||
.export(); | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add1) | ||
@@ -127,3 +131,3 @@ .import(add1) | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add2) | ||
@@ -133,7 +137,7 @@ .import(add2) | ||
.read(_ => assert(false)); | ||
const add4 = new ArchStream<number>() | ||
const add4 = new ArchStream<Entity<number>>() | ||
.import(add2) | ||
.import(add2) | ||
.export(); | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add2) | ||
@@ -144,3 +148,3 @@ .import(add2) | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add4) | ||
@@ -150,7 +154,7 @@ .import(add4) | ||
.read(_ => assert(false)); | ||
const add8 = new ArchStream<number>() | ||
const add8 = new ArchStream<Entity<number>>() | ||
.import(add4) | ||
.import(add4) | ||
.export(); | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add4) | ||
@@ -161,3 +165,3 @@ .import(add4) | ||
new ArchStream<number>() | ||
new ArchStream<Entity<number>>() | ||
.import(add2) | ||
@@ -168,15 +172,41 @@ .import(add8) | ||
.export() | ||
.read(data => assert(data === 16) || done()) | ||
.write(1); | ||
.read(e => assert(e.val === 16) || done()) | ||
.write({val: 1}); | ||
}); | ||
it('limited export', function (done) { | ||
var stm0 = new ArchStream<void>().trans(_ => _).export(0); | ||
try { | ||
new ArchStream<void>().import(stm0); | ||
assert(false); | ||
} | ||
catch (e) { | ||
; | ||
} | ||
var stm1 = new ArchStream<void>().trans(_ => _).export(1); | ||
try { | ||
new ArchStream<void>().import(stm1); | ||
} | ||
catch (e) { | ||
assert(false); | ||
} | ||
try { | ||
new ArchStream<void>().import(stm1); | ||
assert(false); | ||
} | ||
catch (e) { | ||
; | ||
} | ||
done(); | ||
}); | ||
it('throttle', function (done) { | ||
var async = false; | ||
new ArchStream<number>() | ||
.trans(data => data + 1) | ||
new ArchStream<Entity<number>>() | ||
.trans(e => ++e.val) | ||
.export() | ||
.flow(1) | ||
.read(data => assert(data === 2 && async === false || data === 3 && async === true) || data === 3 && done()) | ||
.write(1) | ||
.write(2); | ||
.read(e => assert(e.val === 2 && async === false || e.val === 3 && async === true) || e.val === 3 && done()) | ||
.write({val: 1}) | ||
.write({val: 2}); | ||
async = true; | ||
@@ -183,0 +213,0 @@ }); |
@@ -24,3 +24,3 @@ /** | ||
interface Exportable<T> { | ||
export(): IModularStream<T> | ||
export(limit?: number): IModularStream<T> | ||
} | ||
@@ -40,3 +40,3 @@ interface IArchTransformStream<T> extends Transform<T>, Exportable<T> { | ||
import(module: IModularStream<T>): ComposeStream<T> | ||
export(): IModularStream<T> | ||
export(limit?: number): IModularStream<T> | ||
} | ||
@@ -59,6 +59,2 @@ | ||
export interface ICallback<T> { | ||
(data: T): T|Message<T>|IThenable<T> | ||
} | ||
export interface ICallbackVoid<T> { | ||
(data: T): any | ||
@@ -68,3 +64,3 @@ } | ||
export interface IThenable<T> { | ||
then(done: (data: T) => T|Message<T>|IThenable<T>, fail?: (data: T) => T|Message<T>|IThenable<T>, progress?: (data: T) => T|Message<T>|IThenable<T>): IThenable<T> | ||
then(done: ICallback<T>, fail?: ICallback<T>): IThenable<T> | ||
} | ||
@@ -74,24 +70,16 @@ | ||
constructor() | ||
send(data: T): Message<T> | ||
send(data: T, async?: boolean): Message<T> | ||
recv(callback: (data: T) => any): Message<T> | ||
trans(callback: ICallback<T>): Message<T> | ||
} | ||
export class ArchStreamReflectable<T> extends ArchStream<T> { | ||
constructor(parent: ArchStream<T>, msg: Message<T>) | ||
message_: Message<T> | ||
} | ||
export var Proxy: { | ||
State<T>(definition: { index: (string|number)[], indexer: (data: T) => string|number }): IStateProxy<T, ICallbackVoid<T>> | ||
State<T>(definition: { index: (string|number)[], indexer: (data: T) => string|number, void?: boolean }): IStateProxy<T, ICallbackVoid<T>> | ||
Void<T>(): IVoidProxy<T> | ||
State<T>(definition: { index: (string|number)[], indexer: (data: T) => string|number }): IStateProxy<T, ICallback<T>> | ||
} | ||
export interface IProxyCallback<T> { | ||
(callback: (data: T, args?: any[]) => IProxyCallback<T>): IProxyCallback<T> | ||
} | ||
interface IStateProxy<T, U extends Function> { | ||
trans(...callbacks: U[]): ArchStream<T> | ||
} | ||
interface IVoidProxy<T> { | ||
trans(callback: ICallbackVoid<T>): ArchStream<T> | ||
} | ||
} |
99846
38
2748