async-stream-emitter
Advanced tools
Comparing version 6.0.1 to 7.0.0
@@ -1,1 +0,1 @@ | ||
function e(e){return e&&e.__esModule&&Object.prototype.hasOwnProperty.call(e,"default")?e.default:e}var t=class{async next(e){let t=this.createConsumer(e),s=await t.next();return t.return(),s}async once(e){let t=await this.next(e);if(t.done){if(null!=e){let e=new Error("Stream consumer operation timed out early because stream ended");throw e.name="TimeoutError",e}await new Promise((()=>{}))}return t.value}createConsumer(){throw new TypeError("Method must be overriden by subclass")}[Symbol.asyncIterator](){return this.createConsumer()}};let s=class{constructor(e,t,s,r){this.id=t,this._backpressure=0,this.currentNode=s,this.timeout=r,this.isAlive=!0,this.stream=e,this.stream.setConsumer(this.id,this)}getStats(){let e={id:this.id,backpressure:this._backpressure};return null!=this.timeout&&(e.timeout=this.timeout),e}_resetBackpressure(){this._backpressure=0}applyBackpressure(e){this._backpressure++}releaseBackpressure(e){this._backpressure--}getBackpressure(){return this._backpressure}clearActiveTimeout(){clearTimeout(this._timeoutId),delete this._timeoutId}write(e){void 0!==this._timeoutId&&this.clearActiveTimeout(e),this.applyBackpressure(e),this._resolve&&(this._resolve(),delete this._resolve)}kill(e){this._killPacket={value:e,done:!0},void 0!==this._timeoutId&&this.clearActiveTimeout(this._killPacket),this._destroy(),this._resolve&&(this._resolve(),delete this._resolve)}_destroy(){this.isAlive=!1,this._resetBackpressure(),this.stream.removeConsumer(this.id)}async _waitForNextItem(e){return new Promise(((t,s)=>{let r;if(this._resolve=t,void 0!==e){let t=new Error("Stream consumer iteration timed out");(async()=>{let i=function(e){let t,s=new Promise((s=>{t=setTimeout(s,e)}));return{timeoutId:t,promise:s}}(e);r=i.timeoutId,await i.promise,t.name="TimeoutError",delete this._resolve,s(t)})()}this._timeoutId=r}))}async next(){for(this.stream.setConsumer(this.id,this);;){if(!this.currentNode.next)try{await this._waitForNextItem(this.timeout)}catch(e){throw this._destroy(),e}if(this._killPacket){this._destroy();let e=this._killPacket;return delete this._killPacket,e}if(this.currentNode=this.currentNode.next,this.releaseBackpressure(this.currentNode.data),!this.currentNode.consumerId||this.currentNode.consumerId===this.id)return this.currentNode.data.done&&this._destroy(),this.currentNode.data}}return(){return delete this.currentNode,this._destroy(),{}}[Symbol.asyncIterator](){return this}};const r=t,i=s;var n=class extends r{constructor(){super(),this.nextConsumerId=1,this._consumers=new Map,this.tailNode={next:null,data:{value:void 0,done:!1}}}_write(e,t,s){let r={data:{value:e,done:t},next:null};s&&(r.consumerId=s),this.tailNode.next=r,this.tailNode=r;for(let e of this._consumers.values())e.write(r.data)}write(e){this._write(e,!1)}close(e){this._write(e,!0)}writeToConsumer(e,t){this._write(t,!1,e)}closeConsumer(e,t){this._write(t,!0,e)}kill(e){for(let t of this._consumers.keys())this.killConsumer(t,e)}killConsumer(e,t){let s=this._consumers.get(e);s&&s.kill(t)}getBackpressure(){let e=0;for(let t of this._consumers.values()){let s=t.getBackpressure();s>e&&(e=s)}return e}getConsumerBackpressure(e){let t=this._consumers.get(e);return t?t.getBackpressure():0}hasConsumer(e){return this._consumers.has(e)}setConsumer(e,t){this._consumers.set(e,t),t.currentNode||(t.currentNode=this.tailNode)}removeConsumer(e){return this._consumers.delete(e)}getConsumerStats(e){let t=this._consumers.get(e);if(t)return t.getStats()}getConsumerStatsList(){let e=[];for(let t of this._consumers.values())e.push(t.getStats());return e}createConsumer(e){return new i(this,this.nextConsumerId++,this.tailNode,e)}getConsumerList(){return[...this._consumers.values()]}getConsumerCount(){return this._consumers.size}};const o=t;var u=class extends o{constructor(e,t,s){super(),this._streamDemux=e,this.name=t,this.usabilityMode=!!s}createConsumer(e){return this._streamDemux.createConsumer(this.name,e,this.usabilityMode)}};let a=class{constructor(e,t,s,r,i,n){this.id=t,this._backpressure=0,this.currentNode=s,this.timeout=i,this.isAlive=!0,this._mainStream=e,this._mainStream.setConsumer(this.id,this),this.streamName=r,this.usabilityMode=!!n}getStats(){let e={id:this.id,backpressure:this._backpressure,stream:this.streamName};return null!=this.timeout&&(e.timeout=this.timeout),e}_resetBackpressure(){this._backpressure=0}applyBackpressure(e){this._backpressure++}releaseBackpressure(e){this._backpressure--}getBackpressure(){return this._backpressure}clearActiveTimeout(){clearTimeout(this._timeoutId),delete this._timeoutId}write(e){(e.done||e.value.stream===this.streamName)&&(void 0!==this._timeoutId&&this.clearActiveTimeout(e),this.applyBackpressure(e)),this._resolve&&(this._resolve(),delete this._resolve)}kill(e){this._killPacket={value:e,done:!0},void 0!==this._timeoutId&&this.clearActiveTimeout(this._killPacket),this._destroy(),this._resolve&&(this._resolve(),delete this._resolve)}_destroy(){this.isAlive=!1,this._resetBackpressure(),this._mainStream.removeConsumer(this.id)}async _waitForNextItem(e){return new Promise(((t,s)=>{let r;if(this._resolve=t,null!=e){let t=new Error("Stream consumer iteration timed out");(async()=>{let i=l(e);r=i.timeoutId,await i.promise,t.name="TimeoutError",delete this._resolve,s(t)})()}this._timeoutId=r}))}async next(){for(this._mainStream.setConsumer(this.id,this);;){if(!this.currentNode.next)try{await this._waitForNextItem(this.timeout)}catch(e){throw this._destroy(),e}if(this._killPacket){this._destroy();let e=this._killPacket;return delete this._killPacket,e}for(;this.currentNode.next?.data?.value&&this.currentNode.next.data.value.stream!==this.streamName&&this.currentNode.next.consumerId!==this.id&&!this.currentNode.next.data.done;)this.currentNode=this.currentNode.next,this.usabilityMode&&await l(0);if(this.currentNode.next)return this.currentNode=this.currentNode.next,this.releaseBackpressure(this.currentNode.data),this.currentNode.data?.value?.data?.done?(this._destroy(),this.currentNode.data.value.data):this.currentNode.data.done?(this._destroy(),this.currentNode.data):this.currentNode.data.value.data}}return(){return delete this.currentNode,this._destroy(),{}}[Symbol.asyncIterator](){return this}};function l(e){let t,s=new Promise((s=>{t=setTimeout(s,e)}));return{timeoutId:t,promise:s}}const m=n,h=u,c=a;var d=class{constructor(){this._mainStream=new m}write(e,t){this._mainStream.write({stream:e,data:{value:t,done:!1}})}close(e,t){this._mainStream.write({stream:e,data:{value:t,done:!0}})}closeAll(e){this._mainStream.close(e)}writeToConsumer(e,t){this._mainStream.writeToConsumer(e,{consumerId:e,data:{value:t,done:!1}})}closeConsumer(e,t){this._mainStream.closeConsumer(e,{consumerId:e,data:{value:t,done:!0}})}getConsumerStats(e){return this._mainStream.getConsumerStats(e)}getConsumerStatsList(e){return this._mainStream.getConsumerStatsList().filter((t=>t.stream===e))}getConsumerStatsListAll(){return this._mainStream.getConsumerStatsList()}kill(e,t){let s=this.getConsumerStatsList(e),r=s.length;for(let e=0;e<r;e++)this.killConsumer(s[e].id,t)}killAll(e){this._mainStream.kill(e)}killConsumer(e,t){this._mainStream.killConsumer(e,t)}getBackpressure(e){let t=this.getConsumerStatsList(e),s=t.length,r=0;for(let e=0;e<s;e++){let s=t[e];s.backpressure>r&&(r=s.backpressure)}return r}getBackpressureAll(){return this._mainStream.getBackpressure()}getConsumerBackpressure(e){return this._mainStream.getConsumerBackpressure(e)}hasConsumer(e,t){let s=this._mainStream.getConsumerStats(t);return!!s&&s.stream===e}hasConsumerAll(e){return this._mainStream.hasConsumer(e)}getConsumerCount(e){return this.getConsumerStatsList(e).length}getConsumerCountAll(){return this.getConsumerStatsListAll().length}createConsumer(e,t,s){return new c(this._mainStream,this._mainStream.nextConsumerId++,this._mainStream.tailNode,e,t,s)}stream(e,t){return new h(this,e,t)}};const _=d;function p(e){let{usabilityMode:t}=e||{};this.usabilityMode=t,this._listenerDemux=new _}p.prototype.emit=function(e,t){this._listenerDemux.write(e,t)},p.prototype.listener=function(e,t){return this._listenerDemux.stream(e,t)},p.prototype.closeListener=function(e){this._listenerDemux.close(e)},p.prototype.closeAllListeners=function(){this._listenerDemux.closeAll()},p.prototype.getListenerConsumerStats=function(e){return this._listenerDemux.getConsumerStats(e)},p.prototype.getListenerConsumerStatsList=function(e){return this._listenerDemux.getConsumerStatsList(e)},p.prototype.getAllListenersConsumerStatsList=function(){return this._listenerDemux.getConsumerStatsListAll()},p.prototype.getListenerConsumerCount=function(e){return this._listenerDemux.getConsumerCount(e)},p.prototype.getAllListenersConsumerCount=function(){return this._listenerDemux.getConsumerCountAll()},p.prototype.killListener=function(e){this._listenerDemux.kill(e)},p.prototype.killAllListeners=function(){this._listenerDemux.killAll()},p.prototype.killListenerConsumer=function(e){this._listenerDemux.killConsumer(e)},p.prototype.getListenerBackpressure=function(e){return this._listenerDemux.getBackpressure(e)},p.prototype.getAllListenersBackpressure=function(){return this._listenerDemux.getBackpressureAll()},p.prototype.getListenerConsumerBackpressure=function(e){return this._listenerDemux.getConsumerBackpressure(e)},p.prototype.hasListenerConsumer=function(e,t){return this._listenerDemux.hasConsumer(e,t)},p.prototype.hasAnyListenerConsumer=function(e){return this._listenerDemux.hasConsumerAll(e)};var k=e(p);export{k as default}; | ||
function e(e){return e&&e.__esModule&&Object.prototype.hasOwnProperty.call(e,"default")?e.default:e}var t=class{async next(e){let t=this.createConsumer(e),s=await t.next();return t.return(),s}async once(e){let t=await this.next(e);if(t.done){if(null!=e){let e=new Error("Stream consumer operation timed out early because stream ended");throw e.name="TimeoutError",e}await new Promise((()=>{}))}return t.value}createConsumer(){throw new TypeError("Method must be overriden by subclass")}[Symbol.asyncIterator](){return this.createConsumer()}};let s=class{constructor(e,t,s,r){this.id=t,this._backpressure=0,this.currentNode=s,this.timeout=r,this.isAlive=!0,this.stream=e,this.stream.setConsumer(this.id,this)}getStats(){let e={id:this.id,backpressure:this._backpressure};return null!=this.timeout&&(e.timeout=this.timeout),e}_resetBackpressure(){this._backpressure=0}applyBackpressure(e){this._backpressure++}releaseBackpressure(e){this._backpressure--}getBackpressure(){return this._backpressure}clearActiveTimeout(){clearTimeout(this._timeoutId),delete this._timeoutId}write(e){void 0!==this._timeoutId&&this.clearActiveTimeout(e),this.applyBackpressure(e),this._resolve&&(this._resolve(),delete this._resolve)}kill(e){this._killPacket={value:e,done:!0},void 0!==this._timeoutId&&this.clearActiveTimeout(this._killPacket),this._destroy(),this._resolve&&(this._resolve(),delete this._resolve)}_destroy(){this.isAlive=!1,this._resetBackpressure(),this.stream.removeConsumer(this.id)}async _waitForNextItem(e){return new Promise(((t,s)=>{let r;if(this._resolve=t,void 0!==e){let t=new Error("Stream consumer iteration timed out");(async()=>{let i=function(e){let t,s=new Promise((s=>{t=setTimeout(s,e)}));return{timeoutId:t,promise:s}}(e);r=i.timeoutId,await i.promise,t.name="TimeoutError",delete this._resolve,s(t)})()}this._timeoutId=r}))}async next(){for(this.stream.setConsumer(this.id,this);;){if(!this.currentNode.next)try{await this._waitForNextItem(this.timeout)}catch(e){throw this._destroy(),e}if(this._killPacket){this._destroy();let e=this._killPacket;return delete this._killPacket,e}if(this.currentNode=this.currentNode.next,this.releaseBackpressure(this.currentNode.data),!this.currentNode.consumerId||this.currentNode.consumerId===this.id)return this.currentNode.data.done&&this._destroy(),this.currentNode.data}}return(){return delete this.currentNode,this._destroy(),{}}[Symbol.asyncIterator](){return this}};const r=t,i=s;var n=class extends r{constructor(e){super(),e=e||{},this._nextConsumerId=1,this.generateConsumerId=e.generateConsumerId,this.generateConsumerId||(this.generateConsumerId=()=>this._nextConsumerId++),this.removeConsumerCallback=e.removeConsumerCallback,this._consumers=new Map,this.tailNode={next:null,data:{value:void 0,done:!1}}}_write(e,t,s){let r={data:{value:e,done:t},next:null};s&&(r.consumerId=s),this.tailNode.next=r,this.tailNode=r;for(let e of this._consumers.values())e.write(r.data)}write(e){this._write(e,!1)}close(e){this._write(e,!0)}writeToConsumer(e,t){this._write(t,!1,e)}closeConsumer(e,t){this._write(t,!0,e)}kill(e){for(let t of this._consumers.keys())this.killConsumer(t,e)}killConsumer(e,t){let s=this._consumers.get(e);s&&s.kill(t)}getBackpressure(){let e=0;for(let t of this._consumers.values()){let s=t.getBackpressure();s>e&&(e=s)}return e}getConsumerBackpressure(e){let t=this._consumers.get(e);return t?t.getBackpressure():0}hasConsumer(e){return this._consumers.has(e)}setConsumer(e,t){this._consumers.set(e,t),t.currentNode||(t.currentNode=this.tailNode)}removeConsumer(e){let t=this._consumers.delete(e);return this.removeConsumerCallback&&this.removeConsumerCallback(e),t}getConsumerStats(e){let t=this._consumers.get(e);if(t)return t.getStats()}getConsumerStatsList(){let e=[];for(let t of this._consumers.values())e.push(t.getStats());return e}createConsumer(e){return new i(this,this.generateConsumerId(),this.tailNode,e)}getConsumerList(){return[...this._consumers.values()]}getConsumerCount(){return this._consumers.size}};const o=class{async next(e){let t=this.createConsumer(e),s=await t.next();return t.return(),s}async once(e){let t=await this.next(e);if(t.done){if(null!=e){let e=new Error("Stream consumer operation timed out early because stream ended");throw e.name="TimeoutError",e}await new Promise((()=>{}))}return t.value}createConsumer(){throw new TypeError("Method must be overriden by subclass")}[Symbol.asyncIterator](){return this.createConsumer()}};var u=class extends o{constructor(e,t){super(),this._streamDemux=e,this.name=t}createConsumer(e){return this._streamDemux.createConsumer(this.name,e)}};const a=n,l=u;const m=class{constructor(){this.streams={},this._nextConsumerId=1,this.generateConsumerId=()=>this._nextConsumerId++}write(e,t){this.streams[e]&&this.streams[e].write(t)}close(e,t){this.streams[e]&&this.streams[e].close(t)}closeAll(e){for(let t of Object.values(this.streams))t.close(e)}writeToConsumer(e,t){for(let s of Object.values(this.streams))if(s.hasConsumer(e))return s.writeToConsumer(e,t)}closeConsumer(e,t){for(let s of Object.values(this.streams))if(s.hasConsumer(e))return s.closeConsumer(e,t)}getConsumerStats(e){for(let[t,s]of Object.entries(this.streams))if(s.hasConsumer(e))return{...s.getConsumerStats(e),stream:t}}getConsumerStatsList(e){return this.streams[e]?this.streams[e].getConsumerStatsList().map((t=>({...t,stream:e}))):[]}getConsumerStatsListAll(){let e=[];for(let t of Object.keys(this.streams)){let s=this.getConsumerStatsList(t);for(let t of s)e.push(t)}return e}kill(e,t){this.streams[e]&&this.streams[e].kill(t)}killAll(e){for(let t of Object.values(this.streams))t.kill(e)}killConsumer(e,t){for(let s of Object.values(this.streams))if(s.hasConsumer(e))return s.killConsumer(e,t)}getBackpressure(e){return this.streams[e]?this.streams[e].getBackpressure():0}getBackpressureAll(){return Object.values(this.streams).reduce(((e,t)=>Math.max(e,t.getBackpressure())),0)}getConsumerBackpressure(e){for(let t of Object.values(this.streams))if(t.hasConsumer(e))return t.getConsumerBackpressure(e);return 0}hasConsumer(e,t){return!!this.streams[e]&&this.streams[e].hasConsumer(t)}hasConsumerAll(e){return Object.values(this.streams).some((t=>t.hasConsumer(e)))}getConsumerCount(e){return this.streams[e]?this.streams[e].getConsumerCount():0}getConsumerCountAll(){return Object.values(this.streams).reduce(((e,t)=>e+t.getConsumerCount()),0)}createConsumer(e,t){return this.streams[e]||(this.streams[e]=new a({generateConsumerId:this.generateConsumerId,removeConsumerCallback:()=>{this.getConsumerCount(e)||delete this.streams[e]}})),this.streams[e].createConsumer(t)}stream(e){return new l(this,e)}unstream(e){delete this.streams[e]}};function h(e){this._listenerDemux=new m}h.prototype.emit=function(e,t){this._listenerDemux.write(e,t)},h.prototype.listener=function(e){return this._listenerDemux.stream(e)},h.prototype.closeListener=function(e){this._listenerDemux.close(e)},h.prototype.closeAllListeners=function(){this._listenerDemux.closeAll()},h.prototype.removeListener=function(e){this._listenerDemux.unstream(e)},h.prototype.getListenerConsumerStats=function(e){return this._listenerDemux.getConsumerStats(e)},h.prototype.getListenerConsumerStatsList=function(e){return this._listenerDemux.getConsumerStatsList(e)},h.prototype.getAllListenersConsumerStatsList=function(){return this._listenerDemux.getConsumerStatsListAll()},h.prototype.getListenerConsumerCount=function(e){return this._listenerDemux.getConsumerCount(e)},h.prototype.getAllListenersConsumerCount=function(){return this._listenerDemux.getConsumerCountAll()},h.prototype.killListener=function(e){this._listenerDemux.kill(e)},h.prototype.killAllListeners=function(){this._listenerDemux.killAll()},h.prototype.killListenerConsumer=function(e){this._listenerDemux.killConsumer(e)},h.prototype.getListenerBackpressure=function(e){return this._listenerDemux.getBackpressure(e)},h.prototype.getAllListenersBackpressure=function(){return this._listenerDemux.getBackpressureAll()},h.prototype.getListenerConsumerBackpressure=function(e){return this._listenerDemux.getConsumerBackpressure(e)},h.prototype.hasListenerConsumer=function(e,t){return this._listenerDemux.hasConsumer(e,t)},h.prototype.hasAnyListenerConsumer=function(e){return this._listenerDemux.hasConsumerAll(e)};var c=e(h);export{c as default}; |
const StreamDemux = require('stream-demux'); | ||
function AsyncStreamEmitter(options) { | ||
let { usabilityMode } = options || {}; | ||
this.usabilityMode = usabilityMode || false; | ||
this._listenerDemux = new StreamDemux(); | ||
@@ -14,3 +12,3 @@ } | ||
AsyncStreamEmitter.prototype.listener = function (eventName) { | ||
return this._listenerDemux.stream(eventName, this.usabilityMode); | ||
return this._listenerDemux.stream(eventName); | ||
}; | ||
@@ -26,2 +24,6 @@ | ||
AsyncStreamEmitter.prototype.removeListener = function (eventName) { | ||
this._listenerDemux.unstream(eventName); | ||
}; | ||
AsyncStreamEmitter.prototype.getListenerConsumerStats = function (consumerId) { | ||
@@ -28,0 +30,0 @@ return this._listenerDemux.getConsumerStats(consumerId); |
{ | ||
"name": "async-stream-emitter", | ||
"version": "6.0.1", | ||
"version": "7.0.0", | ||
"description": "An alternantive to EventEmitter using consumable streams.", | ||
@@ -36,4 +36,4 @@ "main": "index.js", | ||
"dependencies": { | ||
"stream-demux": "^9.0.2" | ||
"stream-demux": "^10.0.0" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
18477
236
+ Addedstream-demux@10.0.1(transitive)
- Removedstream-demux@9.0.2(transitive)
Updatedstream-demux@^10.0.0