vega-dataflow
Advanced tools
Comparing version 5.7.0 to 5.7.1
@@ -1,1 +0,1 @@ | ||
!function(t,n){"object"==typeof exports&&"undefined"!=typeof module?n(exports,require("vega-util"),require("vega-loader"),require("vega-format")):"function"==typeof define&&define.amd?define(["exports","vega-util","vega-loader","vega-format"],n):n((t=t||self).vega={},t.vega,t.vega,t.vega)}(this,(function(t,n,e,r){"use strict";function i(t){var e=t||n.identity,r=[],i={};return r.add=function(t){var n=e(t);return i[n]||(i[n]=1,r.push(t)),r},r.remove=function(t){var n,s=e(t);return i[s]&&(i[s]=0,(n=r.indexOf(t))>=0&&r.splice(n,1)),r},r}async function s(t,n){try{await n(t)}catch(n){t.error(n)}}var a=Symbol("vega_id"),u=1;function o(t){return t[a]}function l(t,n){return t[a]=n,t}function c(t){var n=t===Object(t)?t:{data:t};return o(n)?n:l(n,u++)}function h(t,n){for(var e in t)n[e]=t[e];return n}function f(t){return t&&t.constructor===d}function d(){var t=[],e=[],r=[],i=[],s=[],a=null,u=!1;return{constructor:d,insert:function(e){for(var r=n.array(e),i=0,s=r.length;i<s;++i)t.push(r[i]);return this},remove:function(t){for(var r=n.isFunction(t)?i:e,s=n.array(t),a=0,u=s.length;a<u;++a)r.push(s[a]);return this},modify:function(t,e,i){var a={field:e,value:n.constant(i)};return n.isFunction(t)?(a.filter=t,s.push(a)):(a.tuple=t,r.push(a)),this},encode:function(t,e){return n.isFunction(t)?s.push({filter:t,field:e}):r.push({tuple:t,field:e}),this},clean:function(t){return a=t,this},reflow:function(){return u=!0,this},pulse:function(n,l){var h,f,d,p,g,v,m={},_={};for(h=0,f=l.length;h<f;++h)m[o(l[h])]=1;for(h=0,f=e.length;h<f;++h)m[o(g=e[h])]=-1;for(h=0,f=i.length;h<f;++h)p=i[h],l.forEach((function(t){p(t)&&(m[o(t)]=-1)}));for(h=0,f=t.length;h<f;++h)v=o(g=t[h]),m[v]?m[v]=1:n.add.push(c(t[h]));for(h=0,f=l.length;h<f;++h)g=l[h],m[o(g)]<0&&n.rem.push(g);function y(t,e,r){r?t[e]=r(t):n.encode=e,u||(_[o(t)]=t)}for(h=0,f=r.length;h<f;++h)g=(d=r[h]).tuple,p=d.field,(v=m[o(g)])>0&&(y(g,p,d.value),n.modifies(p));for(h=0,f=s.length;h<f;++h)d=s[h],p=d.filter,l.forEach((function(t){p(t)&&m[o(t)]>0&&y(t,d.field,d.value)})),n.modifies(d.field);if(u)n.mod=e.length||i.length?l.filter((function(t){return m[o(t)]>0})):l.slice();else for(v in _)n.mod.push(_[v]);return(a||null==a&&(e.length||i.length))&&n.clean(!0),n}}}var p="_:mod:_";function g(){Object.defineProperty(this,p,{writable:!0,value:{}})}var v=g.prototype;v.set=function(t,e,r,i){var s=this,a=s[t],u=s[p];return null!=e&&e>=0?(a[e]!==r||i)&&(a[e]=r,u[e+":"+t]=-1,u[t]=-1):(a!==r||i)&&(s[t]=r,u[t]=n.isArray(r)?1+r.length:-1),s},v.modified=function(t,e){var r,i=this[p];if(!arguments.length){for(r in i)if(i[r])return!0;return!1}if(n.isArray(t)){for(r=0;r<t.length;++r)if(i[t[r]])return!0;return!1}return null!=e&&e>=0?e+1<i[t]||!!i[e+":"+t]:!!i[t]},v.clear=function(){return this[p]={},this};var m=0,_=new g;function y(t,n,e,r){this.id=++m,this.value=t,this.stamp=-1,this.rank=-1,this.qrank=-1,this.flags=0,n&&(this._update=n),e&&this.parameters(e,r)}var k=y.prototype;function w(t){return function(n){var e=this.flags;return 0===arguments.length?!!(e&t):(this.flags=n?e|t:e&~t,this)}}k.targets=function(){return this._targets||(this._targets=i(n.id))},k.set=function(t){return this.value!==t?(this.value=t,1):0},k.skip=w(1),k.modified=w(2),k.parameters=function(t,e,r){e=!1!==e;var i,s,a,u,o=this,l=o._argval=o._argval||new g,c=o._argops=o._argops||[],h=[];function f(t,n,r){r instanceof y?(r!==o&&(e&&r.targets().add(o),h.push(r)),c.push({op:r,name:t,index:n})):l.set(t,n,r)}for(i in t)if(s=t[i],"pulse"===i)n.array(s).forEach((function(t){t instanceof y?t!==o&&(t.targets().add(o),h.push(t)):n.error("Pulse parameters must be operator instances.")})),o.source=s;else if(n.isArray(s))for(l.set(i,-1,Array(a=s.length)),u=0;u<a;++u)f(i,u,s[u]);else f(i,-1,s);return this.marshall().clear(),r&&(c.initonly=!0),h},k.marshall=function(t){var n,e,r,i,s,a=this._argval||_,u=this._argops;if(u){for(e=0,r=u.length;e<r;++e)s=(i=(n=u[e]).op).modified()&&i.stamp===t,a.set(n.name,n.index,i.value,s);if(u.initonly){for(e=0;e<r;++e)(n=u[e]).op.targets().remove(this);this._argops=null,this._update=null}}return a},k.detach=function(){var t,n,e,r=this._argops;if(r)for(t=0,n=r.length;t<n;++t)(e=r[t].op)._targets&&e._targets.remove(this)},k.evaluate=function(t){var n=this._update;if(n){var e=this.marshall(t.stamp),r=n.call(this,e,t);if(e.clear(),r!==this.value)this.value=r;else if(!this.modified())return t.StopPropagation}},k.run=function(t){return t.stamp<this.stamp?t.StopPropagation:(this.skip()?(this.skip(!1),n=0):n=this.evaluate(t),this.pulse=n||t);var n};var F=0;function A(t,n,e){this.id=++F,this.value=null,e&&(this.receive=e),t&&(this._filter=t),n&&(this._apply=n)}function D(t,n,e){return new A(t,n,e)}var E=A.prototype;E._filter=n.truthy,E._apply=n.identity,E.targets=function(){return this._targets||(this._targets=i(n.id))},E.consume=function(t){return arguments.length?(this._consume=!!t,this):!!this._consume},E.receive=function(t){if(this._filter(t)){for(var n=this.value=this._apply(t),e=this._targets,r=e?e.length:0,i=0;i<r;++i)e[i].receive(n);this._consume&&(t.preventDefault(),t.stopPropagation())}},E.filter=function(t){var n=D(t);return this.targets().add(n),n},E.apply=function(t){var n=D(null,t);return this.targets().add(n),n},E.merge=function(){var t=D();this.targets().add(t);for(var n=0,e=arguments.length;n<e;++n)arguments[n].targets().add(t);return t},E.throttle=function(t){var n=-1;return this.filter((function(){var e=Date.now();return e-n>t?(n=e,1):0}))},E.debounce=function(t){var e=D();return this.targets().add(D(null,null,n.debounce(t,(function(t){var n=t.dataflow;e.receive(t),n&&n.run&&n.run()})))),e},E.between=function(t,n){var e=!1;return t.targets().add(D(null,null,(function(){e=!0}))),n.targets().add(D(null,null,(function(){e=!1}))),this.filter((function(){return e}))};var P={skip:!0};function q(t,e,r,i,s,a){var u,o,l=n.extend({},a,P);n.isFunction(r)||(r=n.constant(r)),void 0===i?u=n=>t.touch(r(n)):n.isFunction(i)?(o=new y(null,i,s,!1),u=n=>{o.evaluate(n);const e=r(n),i=o.value;f(i)?t.pulse(e,i,a):t.update(e,i,l)}):u=n=>t.update(r(n),i,l),e.apply(u)}function O(t,e,r,i,s,a){if(void 0===i)e.targets().add(r);else{const u=a||{},o=new y(null,function(t,e){return e=n.isFunction(e)?e:n.constant(e),t?function(n,r){const i=e(n,r);return t.skip()||(t.skip(i!==this.value).value=i),i}:e}(r,i),s,!1);o.modified(u.force),o.rank=e.rank,e.targets().add(o),r&&(o.skip(!0),o.value=r.value,o.targets().add(r),t.connect(r,[o]))}}const b={};function M(t,n,e){this.dataflow=t,this.stamp=null==n?-1:n,this.add=[],this.rem=[],this.mod=[],this.fields=null,this.encode=e||null}const S=M.prototype;function L(t,n){return t?(e,r)=>t(e,r)&&n(e,r):n}function R(t,e){var r=[];return n.visitArray(t,e,t=>r.push(t)),r}function x(t,n){var e={};return t.visit(n,(function(t){e[o(t)]=1})),t=>e[o(t)]?null:t}function C(t,n,e,r){var i,s,a,u,o,l=this,c=0;for(this.dataflow=t,this.stamp=n,this.fields=null,this.encode=r||null,this.pulses=e,a=0,u=e.length;a<u;++a)if((i=e[a]).stamp===n){if(i.fields)for(o in s=l.fields||(l.fields={}),i.fields)s[o]=1;i.changed(l.ADD)&&(c|=l.ADD),i.changed(l.REM)&&(c|=l.REM),i.changed(l.MOD)&&(c|=l.MOD)}this.changes=c}S.StopPropagation=b,S.ADD=1,S.REM=2,S.MOD=4,S.ADD_REM=3,S.ADD_MOD=5,S.ALL=7,S.REFLOW=8,S.SOURCE=16,S.NO_SOURCE=32,S.NO_FIELDS=64,S.fork=function(t){return new M(this.dataflow).init(this,t)},S.clone=function(){const t=this.fork(7);return t.add=t.add.slice(),t.rem=t.rem.slice(),t.mod=t.mod.slice(),t.source&&(t.source=t.source.slice()),t.materialize(23)},S.addAll=function(){let t=this;return t.source&&t.source.length!==t.add.length?(t=new M(this.dataflow).init(this),t.add=t.source,t):t},S.init=function(t,n){const e=this;return e.stamp=t.stamp,e.encode=t.encode,!t.fields||64&n||(e.fields=t.fields),1&n?(e.addF=t.addF,e.add=t.add):(e.addF=null,e.add=[]),2&n?(e.remF=t.remF,e.rem=t.rem):(e.remF=null,e.rem=[]),4&n?(e.modF=t.modF,e.mod=t.mod):(e.modF=null,e.mod=[]),32&n?(e.srcF=null,e.source=null):(e.srcF=t.srcF,e.source=t.source,t.cleans&&(e.cleans=t.cleans)),e},S.runAfter=function(t){this.dataflow.runAfter(t)},S.changed=function(t){var n=t||7;return 1&n&&this.add.length||2&n&&this.rem.length||4&n&&this.mod.length},S.reflow=function(t){if(t)return this.fork(7).reflow();var n=this.add.length,e=this.source&&this.source.length;return e&&e!==n&&(this.mod=this.source,n&&this.filter(4,x(this,1))),this},S.clean=function(t){return arguments.length?(this.cleans=!!t,this):this.cleans},S.modifies=function(t){var e=this.fields||(this.fields={});return n.isArray(t)?t.forEach(t=>e[t]=!0):e[t]=!0,this},S.modified=function(t,e){var r=this.fields;return!(!e&&!this.mod.length||!r)&&(arguments.length?n.isArray(t)?t.some(t=>r[t]):r[t]:!!r)},S.filter=function(t,n){var e=this;return 1&t&&(e.addF=L(e.addF,n)),2&t&&(e.remF=L(e.remF,n)),4&t&&(e.modF=L(e.modF,n)),16&t&&(e.srcF=L(e.srcF,n)),e},S.materialize=function(t){var n=this;return 1&(t=t||7)&&n.addF&&(n.add=R(n.add,n.addF),n.addF=null),2&t&&n.remF&&(n.rem=R(n.rem,n.remF),n.remF=null),4&t&&n.modF&&(n.mod=R(n.mod,n.modF),n.modF=null),16&t&&n.srcF&&(n.source=n.source.filter(n.srcF),n.srcF=null),n},S.visit=function(t,e){var r,i,s=this,a=e;return 16&t?(n.visitArray(s.source,s.srcF,a),s):(1&t&&n.visitArray(s.add,s.addF,a),2&t&&n.visitArray(s.rem,s.remF,a),4&t&&n.visitArray(s.mod,s.modF,a),8&t&&(r=s.source)&&((i=s.add.length+s.mod.length)===r.length||(i?n.visitArray(r,x(s,5),a):n.visitArray(r,s.srcF,a))),s)};var z=n.inherits(C,M);function U(t){return t.error("Dataflow already running. Use runAsync() to chain invocations."),t}z.fork=function(t){var n=new M(this.dataflow).init(this,t&this.NO_FIELDS);return void 0!==t&&(t&n.ADD&&this.visit(n.ADD,(function(t){return n.add.push(t)})),t&n.REM&&this.visit(n.REM,(function(t){return n.rem.push(t)})),t&n.MOD&&this.visit(n.MOD,(function(t){return n.mod.push(t)}))),n},z.changed=function(t){return this.changes&t},z.modified=function(t){var e=this,r=e.fields;return r&&e.changes&e.MOD?n.isArray(t)?t.some((function(t){return r[t]})):r[t]:0},z.filter=function(){n.error("MultiPulse does not support filtering.")},z.materialize=function(){n.error("MultiPulse does not support materialization.")},z.visit=function(t,n){var e=this,r=e.pulses,i=r.length,s=0;if(t&e.SOURCE)for(;s<i;++s)r[s].visit(t,n);else for(;s<i;++s)r[s].stamp===e.stamp&&r[s].visit(t,n);return e};var j={skip:!1,force:!1};function T(t){var n=[];return{clear:()=>n=[],size:()=>n.length,peek:()=>n[0],push:e=>(n.push(e),N(n,0,n.length-1,t)),pop:()=>{var e,r=n.pop();return n.length?(e=n[0],n[0]=r,function(t,n,e){var r,i=n,s=t.length,a=t[n],u=1+(n<<1);for(;u<s;)(r=u+1)<s&&e(t[u],t[r])>=0&&(u=r),t[n]=t[u],u=1+((n=u)<<1);t[n]=a,N(t,i,n,e)}(n,0,t)):e=r,e}}}function N(t,n,e,r){var i,s,a;for(i=t[e];e>n&&r(i,s=t[a=e-1>>1])<0;)t[e]=s,e=a;return t[e]=i}function I(){this.logger(n.logger()),this.logLevel(n.Error),this._clock=0,this._rank=0,this._locale=r.defaultLocale();try{this._loader=e.loader()}catch(t){}this._touched=i(n.id),this._input={},this._pulse=null,this._heap=T((t,n)=>t.qrank-n.qrank),this._postrun=[]}var $=I.prototype;function W(t){return function(){return this._log[t].apply(this,arguments)}}function B(t,n){y.call(this,t,null,n)}$.stamp=function(){return this._clock},$.loader=function(t){return arguments.length?(this._loader=t,this):this._loader},$.locale=function(t){return arguments.length?(this._locale=t,this):this._locale},$.cleanThreshold=1e4,$.add=function(t,e,r,i){var s,a=1;return t instanceof y?s=t:t&&t.prototype instanceof y?s=new t:n.isFunction(t)?s=new y(null,t):(a=0,s=new y(t,e)),this.rank(s),a&&(i=r,r=e),r&&this.connect(s,s.parameters(r,i)),this.touch(s),s},$.connect=function(t,n){var e,r,i=t.rank;for(e=0,r=n.length;e<r;++e)if(i<n[e].rank)return void this.rerank(t)},$.rank=function(t){t.rank=++this._rank},$.rerank=function(t){for(var e,r,i,s=[t];s.length;)if(this.rank(e=s.pop()),r=e._targets)for(i=r.length;--i>=0;)s.push(e=r[i]),e===t&&n.error("Cycle detected in dataflow graph.")},$.pulse=function(t,n,e){this.touch(t,e||j);var r=new M(this,this._clock+(this._pulse?0:1)),i=t.pulse&&t.pulse.source||[];return r.target=t,this._input[t.id]=n.pulse(r,i),this},$.touch=function(t,n){var e=n||j;return this._pulse?this._enqueue(t):this._touched.add(t),e.skip&&t.skip(!0),this},$.update=function(t,n,e){var r=e||j;return(t.set(n)||r.force)&&this.touch(t,r),this},$.changeset=d,$.ingest=function(t,n,e){return n=this.parse(n,e),this.pulse(t,this.changeset().insert(n))},$.parse=function(t,n){const r=this.locale();return e.read(t,n,r.timeParse,r.utcParse)},$.preload=async function(t,e,r){const i=this,s=i._pending||function(t){var n,e=new Promise((function(t){n=t}));return e.requests=0,e.done=function(){0==--e.requests&&(t._pending=null,n(t))},t._pending=e}(i);s.requests+=1;const a=await i.request(e,r);return i.pulse(t,i.changeset().remove(n.truthy).insert(a.data||[])),s.done(),a},$.request=async function(t,n){const r=this;let i,s=0;try{i=await r.loader().load(t,{context:"dataflow",response:e.responseType(n&&n.type)});try{i=r.parse(i,n)}catch(n){s=-2,r.warn("Data ingestion failed",t,n)}}catch(n){s=-1,r.warn("Loading failed",t,n)}return{data:i,status:s}},$.events=function(t,e,r,i){for(var s,a=this,u=D(r,i),o=function(t){t.dataflow=a;try{u.receive(t)}catch(t){a.error(t)}finally{a.run()}},l=0,c=(s="string"==typeof t&&"undefined"!=typeof document?document.querySelectorAll(t):n.array(t)).length;l<c;++l)s[l].addEventListener(e,o);return u},$.on=function(t,n,e,r,i){return(t instanceof y?O:q)(this,t,n,e,r,i),this},$.evaluate=async function(t,e,r){const a=this,u=[];if(a._pulse)return U(a);if(a._pending&&await a._pending,e&&await s(a,e),!a._touched.length)return a.debug("Dataflow invoked, but nothing to do."),a;const o=++a._clock;a._pulse=new M(a,o,t),a._touched.forEach(t=>a._enqueue(t,!0)),a._touched=i(n.id);let l,c,h,f=0;try{for(;a._heap.size()>0;)l=a._heap.pop(),l.rank===l.qrank?(c=l.run(a._getPulse(l,t)),c.then?c=await c:c.async&&(u.push(c.async),c=b),c!==b&&l._targets&&l._targets.forEach(t=>a._enqueue(t)),++f):a._enqueue(l,!0)}catch(t){a._heap.clear(),h=t}if(a._input={},a._pulse=null,a.debug(`Pulse ${o}: ${f} operators`),h&&(a._postrun=[],a.error(h)),a._postrun.length){const t=a._postrun.sort((t,n)=>n.priority-t.priority);a._postrun=[];for(let n=0;n<t.length;++n)await s(a,t[n].callback)}return r&&await s(a,r),u.length&&Promise.all(u).then(t=>a.runAsync(null,()=>{t.forEach(t=>{try{t(a)}catch(t){a.error(t)}})})),a},$.run=function(t,n,e){return this._pulse?U(this):(this.evaluate(t,n,e),this)},$.runAsync=async function(t,n,e){for(;this._running;)await this._running;const r=()=>this._running=null;return(this._running=this.evaluate(t,n,e)).then(r,r),this._running},$.runAfter=function(t,n,e){if(this._pulse||n)this._postrun.push({priority:e||0,callback:t});else try{t(this)}catch(t){this.error(t)}},$._enqueue=function(t,n){var e=t.stamp<this._clock;e&&(t.stamp=this._clock),(e||n)&&(t.qrank=t.rank,this._heap.push(t))},$._getPulse=function(t,e){var r=t.source,i=this._clock;return r&&n.isArray(r)?new C(this,i,r.map(t=>t.pulse),e):this._input[t.id]||function(t,n){if(n&&n.stamp===t.stamp)return n;t=t.fork(),n&&n!==b&&(t.source=n.source);return t}(this._pulse,r&&r.pulse)},$.logger=function(t){return arguments.length?(this._log=t,this):this._log},$.error=W("error"),$.warn=W("warn"),$.info=W("info"),$.debug=W("debug"),$.logLevel=W("level");var G=n.inherits(B,y);G.run=function(t){return t.stamp<this.stamp?t.StopPropagation:(this.skip()?this.skip(!1):n=this.evaluate(t),(n=n||t).then?n=n.then(t=>this.pulse=t):n!==t.StopPropagation&&(this.pulse=n),n);var n},G.evaluate=function(t){var n=this.marshall(t.stamp),e=this.transform(n,t);return n.clear(),e},G.transform=function(){};var H={};function J(t){return t=t&&t.toLowerCase(),n.hasOwnProperty(H,t)?H[t]:null}t.Dataflow=I,t.EventStream=A,t.MultiPulse=C,t.Operator=y,t.Parameters=g,t.Pulse=M,t.Transform=B,t.UniqueList=i,t.asyncCallback=s,t.changeset=d,t.definition=function(t){var n=J(t);return n&&n.Definition||null},t.derive=function(t){return h(t,c({}))},t.ingest=c,t.isChangeSet=f,t.isTuple=function(t){return!(!t||!o(t))},t.rederive=h,t.replace=function(t,n){return l(n,o(t))},t.stableCompare=function(t,n){return t?n?(e,r)=>t(e,r)||o(n(e))-o(n(r)):(n,e)=>t(n,e)||o(n)-o(e):null},t.transform=J,t.transforms=H,t.tupleid=o,Object.defineProperty(t,"__esModule",{value:!0})})); | ||
!function(t,e){"object"==typeof exports&&"undefined"!=typeof module?e(exports,require("vega-util"),require("vega-loader"),require("vega-format")):"function"==typeof define&&define.amd?define(["exports","vega-util","vega-loader","vega-format"],e):e((t="undefined"!=typeof globalThis?globalThis:t||self).vega={},t.vega,t.vega,t.vega)}(this,(function(t,e,n,r){"use strict";function s(t){const n=t||e.identity,r=[],s={};return r.add=t=>{const e=n(t);return s[e]||(s[e]=1,r.push(t)),r},r.remove=t=>{const e=n(t);if(s[e]){s[e]=0;const n=r.indexOf(t);n>=0&&r.splice(n,1)}return r},r}async function i(t,e){try{await e(t)}catch(e){t.error(e)}}const o=Symbol("vega_id");let a=1;function u(t){return t[o]}function l(t,e){return t[o]=e,t}function h(t){const e=t===Object(t)?t:{data:t};return u(e)?e:l(e,a++)}function c(t,e){for(const n in t)e[n]=t[n];return e}function d(t){return t&&t.constructor===f}function f(){let t=[],n=[],r=[],s=[],i=[],o=null,a=!1;return{constructor:f,insert(n){let r=e.array(n),s=0,i=r.length;for(;s<i;++s)t.push(r[s]);return this},remove(t){let r=e.isFunction(t)?s:n,i=e.array(t),o=0,a=i.length;for(;o<a;++o)r.push(i[o]);return this},modify(t,n,s){let o={field:n,value:e.constant(s)};return e.isFunction(t)?(o.filter=t,i.push(o)):(o.tuple=t,r.push(o)),this},encode(t,n){return e.isFunction(t)?i.push({filter:t,field:n}):r.push({tuple:t,field:n}),this},clean(t){return o=t,this},reflow(){return a=!0,this},pulse(e,l){let c,d,f,p,g,m,_={},v={};for(c=0,d=l.length;c<d;++c)_[u(l[c])]=1;for(c=0,d=n.length;c<d;++c)g=n[c],_[u(g)]=-1;for(c=0,d=s.length;c<d;++c)p=s[c],l.forEach(t=>{p(t)&&(_[u(t)]=-1)});for(c=0,d=t.length;c<d;++c)g=t[c],m=u(g),_[m]?_[m]=1:e.add.push(h(t[c]));for(c=0,d=l.length;c<d;++c)g=l[c],_[u(g)]<0&&e.rem.push(g);function y(t,n,r){r?t[n]=r(t):e.encode=n,a||(v[u(t)]=t)}for(c=0,d=r.length;c<d;++c)f=r[c],g=f.tuple,p=f.field,m=_[u(g)],m>0&&(y(g,p,f.value),e.modifies(p));for(c=0,d=i.length;c<d;++c)f=i[c],p=f.filter,l.forEach(t=>{p(t)&&_[u(t)]>0&&y(t,f.field,f.value)}),e.modifies(f.field);if(a)e.mod=n.length||s.length?l.filter(t=>_[u(t)]>0):l.slice();else for(m in v)e.mod.push(v[m]);return(o||null==o&&(n.length||s.length))&&e.clean(!0),e}}}function p(){Object.defineProperty(this,"_:mod:_",{writable:!0,value:{}})}p.prototype={set(t,n,r,s){const i=this,o=i[t],a=i["_:mod:_"];return null!=n&&n>=0?(o[n]!==r||s)&&(o[n]=r,a[n+":"+t]=-1,a[t]=-1):(o!==r||s)&&(i[t]=r,a[t]=e.isArray(r)?1+r.length:-1),i},modified(t,n){let r,s=this["_:mod:_"];if(!arguments.length){for(r in s)if(s[r])return!0;return!1}if(e.isArray(t)){for(r=0;r<t.length;++r)if(s[t[r]])return!0;return!1}return null!=n&&n>=0?n+1<s[t]||!!s[n+":"+t]:!!s[t]},clear(){return this["_:mod:_"]={},this}};let g=0;const m=new p;function _(t,e,n,r){this.id=++g,this.value=t,this.stamp=-1,this.rank=-1,this.qrank=-1,this.flags=0,e&&(this._update=e),n&&this.parameters(n,r)}function v(t){return function(e){const n=this.flags;return 0===arguments.length?!!(n&t):(this.flags=e?n|t:n&~t,this)}}_.prototype={targets(){return this._targets||(this._targets=s(e.id))},set(t){return this.value!==t?(this.value=t,1):0},skip:v(1),modified:v(2),parameters(t,n,r){n=!1!==n;let s,i,o,a,u=this._argval=this._argval||new p,l=this._argops=this._argops||[],h=[];const c=(t,e,r)=>{r instanceof _?(r!==this&&(n&&r.targets().add(this),h.push(r)),l.push({op:r,name:t,index:e})):u.set(t,e,r)};for(s in t)if(i=t[s],"pulse"===s)e.array(i).forEach(t=>{t instanceof _?t!==this&&(t.targets().add(this),h.push(t)):e.error("Pulse parameters must be operator instances.")}),this.source=i;else if(e.isArray(i))for(u.set(s,-1,Array(o=i.length)),a=0;a<o;++a)c(s,a,i[a]);else c(s,-1,i);return this.marshall().clear(),r&&(l.initonly=!0),h},marshall(t){let e,n,r,s,i,o=this._argval||m,a=this._argops;if(a){for(n=0,r=a.length;n<r;++n)e=a[n],s=e.op,i=s.modified()&&s.stamp===t,o.set(e.name,e.index,s.value,i);if(a.initonly){for(n=0;n<r;++n)e=a[n],e.op.targets().remove(this);this._argops=null,this._update=null}}return o},detach(){let t,e,n,r,s=this._argops;if(s)for(t=0,e=s.length;t<e;++t)n=s[t],r=n.op,r._targets&&r._targets.remove(this)},evaluate(t){const e=this._update;if(e){const n=this.marshall(t.stamp),r=e.call(this,n,t);if(n.clear(),r!==this.value)this.value=r;else if(!this.modified())return t.StopPropagation}},run(t){if(t.stamp<this.stamp)return t.StopPropagation;let e;return this.skip()?(this.skip(!1),e=0):e=this.evaluate(t),this.pulse=e||t}};let y=0;function k(t,e,n){this.id=++y,this.value=null,n&&(this.receive=n),t&&(this._filter=t),e&&(this._apply=e)}function w(t,e,n){return new k(t,e,n)}k.prototype={_filter:e.truthy,_apply:e.identity,targets(){return this._targets||(this._targets=s(e.id))},consume(t){return arguments.length?(this._consume=!!t,this):!!this._consume},receive(t){if(this._filter(t)){let e=this.value=this._apply(t),n=this._targets,r=n?n.length:0,s=0;for(;s<r;++s)n[s].receive(e);this._consume&&(t.preventDefault(),t.stopPropagation())}},filter(t){const e=w(t);return this.targets().add(e),e},apply(t){const e=w(null,t);return this.targets().add(e),e},merge(){const t=w();this.targets().add(t);for(let e=0,n=arguments.length;e<n;++e)arguments[e].targets().add(t);return t},throttle(t){let e=-1;return this.filter(()=>{const n=Date.now();return n-e>t?(e=n,1):0})},debounce(t){const n=w();return this.targets().add(w(null,null,e.debounce(t,t=>{const e=t.dataflow;n.receive(t),e&&e.run&&e.run()}))),n},between(t,e){let n=!1;return t.targets().add(w(null,null,()=>n=!0)),e.targets().add(w(null,null,()=>n=!1)),this.filter(()=>n)},detach(){}};const F={skip:!0};function A(t,n,r,s,i,o){let a,u,l=e.extend({},o,F);e.isFunction(r)||(r=e.constant(r)),void 0===s?a=e=>t.touch(r(e)):e.isFunction(s)?(u=new _(null,s,i,!1),a=e=>{u.evaluate(e);const n=r(e),s=u.value;d(s)?t.pulse(n,s,o):t.update(n,s,l)}):a=e=>t.update(r(e),s,l),n.apply(a)}function D(t,n,r,s,i,o){if(void 0===s)n.targets().add(r);else{const a=o||{},u=new _(null,function(t,n){return n=e.isFunction(n)?n:e.constant(n),t?function(e,r){const s=n(e,r);return t.skip()||(t.skip(s!==this.value).value=s),s}:n}(r,s),i,!1);u.modified(a.force),u.rank=n.rank,n.targets().add(u),r&&(u.skip(!0),u.value=r.value,u.targets().add(r),t.connect(r,[u]))}}const E={};function P(t,e,n){this.dataflow=t,this.stamp=null==e?-1:e,this.add=[],this.rem=[],this.mod=[],this.fields=null,this.encode=n||null}function b(t,n){const r=[];return e.visitArray(t,n,t=>r.push(t)),r}function q(t,e){const n={};return t.visit(e,(function(t){n[u(t)]=1})),t=>n[u(t)]?null:t}function O(t,e){return t?(n,r)=>t(n,r)&&e(n,r):e}function M(t,e,n,r){let s,i,o,a,u,l=this,h=0;for(this.dataflow=t,this.stamp=e,this.fields=null,this.encode=r||null,this.pulses=n,o=0,a=n.length;o<a;++o)if(s=n[o],s.stamp===e){if(s.fields)for(u in i=l.fields||(l.fields={}),s.fields)i[u]=1;s.changed(l.ADD)&&(h|=l.ADD),s.changed(l.REM)&&(h|=l.REM),s.changed(l.MOD)&&(h|=l.MOD)}this.changes=h}function S(t){return t.error("Dataflow already running. Use runAsync() to chain invocations."),t}P.prototype={StopPropagation:E,ADD:1,REM:2,MOD:4,ADD_REM:3,ADD_MOD:5,ALL:7,REFLOW:8,SOURCE:16,NO_SOURCE:32,NO_FIELDS:64,fork(t){return new P(this.dataflow).init(this,t)},clone(){const t=this.fork(7);return t.add=t.add.slice(),t.rem=t.rem.slice(),t.mod=t.mod.slice(),t.source&&(t.source=t.source.slice()),t.materialize(23)},addAll(){let t=this;return!t.source||t.add===t.rem||!t.rem.length&&t.source.length===t.add.length||(t=new P(this.dataflow).init(this),t.add=t.source,t.rem=[]),t},init(t,e){const n=this;return n.stamp=t.stamp,n.encode=t.encode,!t.fields||64&e||(n.fields=t.fields),1&e?(n.addF=t.addF,n.add=t.add):(n.addF=null,n.add=[]),2&e?(n.remF=t.remF,n.rem=t.rem):(n.remF=null,n.rem=[]),4&e?(n.modF=t.modF,n.mod=t.mod):(n.modF=null,n.mod=[]),32&e?(n.srcF=null,n.source=null):(n.srcF=t.srcF,n.source=t.source,t.cleans&&(n.cleans=t.cleans)),n},runAfter(t){this.dataflow.runAfter(t)},changed(t){const e=t||7;return 1&e&&this.add.length||2&e&&this.rem.length||4&e&&this.mod.length},reflow(t){if(t)return this.fork(7).reflow();const e=this.add.length,n=this.source&&this.source.length;return n&&n!==e&&(this.mod=this.source,e&&this.filter(4,q(this,1))),this},clean(t){return arguments.length?(this.cleans=!!t,this):this.cleans},modifies(t){const n=this.fields||(this.fields={});return e.isArray(t)?t.forEach(t=>n[t]=!0):n[t]=!0,this},modified(t,n){const r=this.fields;return!(!n&&!this.mod.length||!r)&&(arguments.length?e.isArray(t)?t.some(t=>r[t]):r[t]:!!r)},filter(t,e){const n=this;return 1&t&&(n.addF=O(n.addF,e)),2&t&&(n.remF=O(n.remF,e)),4&t&&(n.modF=O(n.modF,e)),16&t&&(n.srcF=O(n.srcF,e)),n},materialize(t){const e=this;return 1&(t=t||7)&&e.addF&&(e.add=b(e.add,e.addF),e.addF=null),2&t&&e.remF&&(e.rem=b(e.rem,e.remF),e.remF=null),4&t&&e.modF&&(e.mod=b(e.mod,e.modF),e.modF=null),16&t&&e.srcF&&(e.source=e.source.filter(e.srcF),e.srcF=null),e},visit(t,n){let r,s,i=this,o=n;return 16&t?(e.visitArray(i.source,i.srcF,o),i):(1&t&&e.visitArray(i.add,i.addF,o),2&t&&e.visitArray(i.rem,i.remF,o),4&t&&e.visitArray(i.mod,i.modF,o),8&t&&(r=i.source)&&(s=i.add.length+i.mod.length,s===r.length||(s?e.visitArray(r,q(i,5),o):e.visitArray(r,i.srcF,o))),i)}},e.inherits(M,P,{fork(t){const e=new P(this.dataflow).init(this,t&this.NO_FIELDS);return void 0!==t&&(t&e.ADD&&this.visit(e.ADD,t=>e.add.push(t)),t&e.REM&&this.visit(e.REM,t=>e.rem.push(t)),t&e.MOD&&this.visit(e.MOD,t=>e.mod.push(t))),e},changed(t){return this.changes&t},modified(t){const n=this,r=n.fields;return r&&n.changes&n.MOD?e.isArray(t)?t.some((function(t){return r[t]})):r[t]:0},filter(){e.error("MultiPulse does not support filtering.")},materialize(){e.error("MultiPulse does not support materialization.")},visit(t,e){let n=this,r=n.pulses,s=r.length,i=0;if(t&n.SOURCE)for(;i<s;++i)r[i].visit(t,e);else for(;i<s;++i)r[i].stamp===n.stamp&&r[i].visit(t,e);return n}});const L={skip:!1,force:!1};function R(t){let e=[];return{clear:()=>e=[],size:()=>e.length,peek:()=>e[0],push:n=>(e.push(n),x(e,0,e.length-1,t)),pop:()=>{let n,r=e.pop();return e.length?(n=e[0],e[0]=r,function(t,e,n){let r,s=e,i=t.length,o=t[e],a=1+(e<<1);for(;a<i;)r=a+1,r<i&&n(t[a],t[r])>=0&&(a=r),t[e]=t[a],a=1+((e=a)<<1);t[e]=o,x(t,s,e,n)}(e,0,t)):n=r,n}}}function x(t,e,n,r){let s,i,o;for(s=t[n];n>e&&(o=n-1>>1,i=t[o],r(s,i)<0);)t[n]=i,n=o;return t[n]=s}function C(){this.logger(e.logger()),this.logLevel(e.Error),this._clock=0,this._rank=0,this._locale=r.defaultLocale();try{this._loader=n.loader()}catch(t){}this._touched=s(e.id),this._input={},this._pulse=null,this._heap=R((t,e)=>t.qrank-e.qrank),this._postrun=[]}function z(t){return function(){return this._log[t].apply(this,arguments)}}function T(t,e){_.call(this,t,null,e)}C.prototype={stamp(){return this._clock},loader(t){return arguments.length?(this._loader=t,this):this._loader},locale(t){return arguments.length?(this._locale=t,this):this._locale},logger(t){return arguments.length?(this._log=t,this):this._log},error:z("error"),warn:z("warn"),info:z("info"),debug:z("debug"),logLevel:z("level"),cleanThreshold:1e4,add:function(t,n,r,s){let i,o=1;return t instanceof _?i=t:t&&t.prototype instanceof _?i=new t:e.isFunction(t)?i=new _(null,t):(o=0,i=new _(t,n)),this.rank(i),o&&(s=r,r=n),r&&this.connect(i,i.parameters(r,s)),this.touch(i),i},connect:function(t,e){let n,r,s=t.rank;for(n=0,r=e.length;n<r;++n)if(s<e[n].rank)return void this.rerank(t)},rank:function(t){t.rank=++this._rank},rerank:function(t){let n,r,s,i=[t];for(;i.length;)if(this.rank(n=i.pop()),r=n._targets)for(s=r.length;--s>=0;)i.push(n=r[s]),n===t&&e.error("Cycle detected in dataflow graph.")},pulse:function(t,e,n){this.touch(t,n||L);const r=new P(this,this._clock+(this._pulse?0:1)),s=t.pulse&&t.pulse.source||[];return r.target=t,this._input[t.id]=e.pulse(r,s),this},touch:function(t,e){const n=e||L;return this._pulse?this._enqueue(t):this._touched.add(t),n.skip&&t.skip(!0),this},update:function(t,e,n){const r=n||L;return(t.set(e)||r.force)&&this.touch(t,r),this},changeset:f,ingest:function(t,e,n){return e=this.parse(e,n),this.pulse(t,this.changeset().insert(e))},parse:function(t,e){const r=this.locale();return n.read(t,e,r.timeParse,r.utcParse)},preload:async function(t,n,r){const s=this,i=s._pending||function(t){let e,n=new Promise(t=>e=t);return n.requests=0,n.done=()=>{0==--n.requests&&(t._pending=null,e(t))},t._pending=n}(s);i.requests+=1;const o=await s.request(n,r);return s.pulse(t,s.changeset().remove(e.truthy).insert(o.data||[])),i.done(),o},request:async function(t,e){const r=this;let s,i=0;try{s=await r.loader().load(t,{context:"dataflow",response:n.responseType(e&&e.type)});try{s=r.parse(s,e)}catch(e){i=-2,r.warn("Data ingestion failed",t,e)}}catch(e){i=-1,r.warn("Loading failed",t,e)}return{data:s,status:i}},events:function(t,n,r,s){let i,o=this,a=w(r,s),u=function(t){t.dataflow=o;try{a.receive(t)}catch(t){o.error(t)}finally{o.run()}};i="string"==typeof t&&"undefined"!=typeof document?document.querySelectorAll(t):e.array(t);for(let t=0,e=i.length;t<e;++t)i[t].addEventListener(n,u);return a},on:function(t,e,n,r,s){return(t instanceof _?D:A)(this,t,e,n,r,s),this},evaluate:async function(t,n,r){const o=this,a=[];if(o._pulse)return S(o);if(o._pending&&await o._pending,n&&await i(o,n),!o._touched.length)return o.debug("Dataflow invoked, but nothing to do."),o;const u=++o._clock;o._pulse=new P(o,u,t),o._touched.forEach(t=>o._enqueue(t,!0)),o._touched=s(e.id);let l,h,c,d=0;try{for(;o._heap.size()>0;)l=o._heap.pop(),l.rank===l.qrank?(h=l.run(o._getPulse(l,t)),h.then?h=await h:h.async&&(a.push(h.async),h=E),h!==E&&l._targets&&l._targets.forEach(t=>o._enqueue(t)),++d):o._enqueue(l,!0)}catch(t){o._heap.clear(),c=t}if(o._input={},o._pulse=null,o.debug(`Pulse ${u}: ${d} operators`),c&&(o._postrun=[],o.error(c)),o._postrun.length){const t=o._postrun.sort((t,e)=>e.priority-t.priority);o._postrun=[];for(let e=0;e<t.length;++e)await i(o,t[e].callback)}return r&&await i(o,r),a.length&&Promise.all(a).then(t=>o.runAsync(null,()=>{t.forEach(t=>{try{t(o)}catch(t){o.error(t)}})})),o},run:function(t,e,n){return this._pulse?S(this):(this.evaluate(t,e,n),this)},runAsync:async function(t,e,n){for(;this._running;)await this._running;const r=()=>this._running=null;return(this._running=this.evaluate(t,e,n)).then(r,r),this._running},runAfter:function(t,e,n){if(this._pulse||e)this._postrun.push({priority:n||0,callback:t});else try{t(this)}catch(t){this.error(t)}},_enqueue:function(t,e){const n=t.stamp<this._clock;n&&(t.stamp=this._clock),(n||e)&&(t.qrank=t.rank,this._heap.push(t))},_getPulse:function(t,n){const r=t.source,s=this._clock;return r&&e.isArray(r)?new M(this,s,r.map(t=>t.pulse),n):this._input[t.id]||function(t,e){if(e&&e.stamp===t.stamp)return e;t=t.fork(),e&&e!==E&&(t.source=e.source);return t}(this._pulse,r&&r.pulse)}},e.inherits(T,_,{run(t){if(t.stamp<this.stamp)return t.StopPropagation;let e;return this.skip()?this.skip(!1):e=this.evaluate(t),e=e||t,e.then?e=e.then(t=>this.pulse=t):e!==t.StopPropagation&&(this.pulse=e),e},evaluate(t){const e=this.marshall(t.stamp),n=this.transform(e,t);return e.clear(),n},transform(){}});const U={};function j(t){return t=t&&t.toLowerCase(),e.hasOwnProperty(U,t)?U[t]:null}t.Dataflow=C,t.EventStream=k,t.MultiPulse=M,t.Operator=_,t.Parameters=p,t.Pulse=P,t.Transform=T,t.UniqueList=s,t.asyncCallback=i,t.changeset=f,t.definition=function(t){const e=j(t);return e&&e.Definition||null},t.derive=function(t){return c(t,h({}))},t.ingest=h,t.isChangeSet=d,t.isTuple=function(t){return!(!t||!u(t))},t.rederive=c,t.replace=function(t,e){return l(e,u(t))},t.stableCompare=function(t,e){return t?e?(n,r)=>t(n,r)||u(e(n))-u(e(r)):(e,n)=>t(e,n)||u(e)-u(n):null},t.transform=j,t.transforms=U,t.tupleid=u,Object.defineProperty(t,"__esModule",{value:!0})})); |
{ | ||
"name": "vega-dataflow", | ||
"version": "5.7.0", | ||
"version": "5.7.1", | ||
"description": "Reactive dataflow processing.", | ||
@@ -28,7 +28,7 @@ "keywords": [ | ||
"dependencies": { | ||
"vega-format": "^1.0.0", | ||
"vega-loader": "^4.3.0", | ||
"vega-util": "^1.14.0" | ||
"vega-format": "^1.0.2", | ||
"vega-loader": "^4.3.1", | ||
"vega-util": "^1.15.0" | ||
}, | ||
"gitHead": "62565bbe084a422c4a0cbc6e19c6f7c45a3e5137" | ||
"gitHead": "28db83352e43e321dfe55fc5cb6489b211e45662" | ||
} |
@@ -9,3 +9,3 @@ import {ingest, tupleid} from './Tuple'; | ||
export default function changeset() { | ||
var add = [], // insert tuples | ||
let add = [], // insert tuples | ||
rem = [], // remove tuples | ||
@@ -20,9 +20,9 @@ mod = [], // modify tuples | ||
constructor: changeset, | ||
insert: function(t) { | ||
var d = array(t), i = 0, n = d.length; | ||
insert(t) { | ||
let d = array(t), i = 0, n = d.length; | ||
for (; i<n; ++i) add.push(d[i]); | ||
return this; | ||
}, | ||
remove: function(t) { | ||
var a = isFunction(t) ? remp : rem, | ||
remove(t) { | ||
let a = isFunction(t) ? remp : rem, | ||
d = array(t), i = 0, n = d.length; | ||
@@ -32,4 +32,4 @@ for (; i<n; ++i) a.push(d[i]); | ||
}, | ||
modify: function(t, field, value) { | ||
var m = {field: field, value: constant(value)}; | ||
modify(t, field, value) { | ||
let m = {field: field, value: constant(value)}; | ||
if (isFunction(t)) { | ||
@@ -44,3 +44,3 @@ m.filter = t; | ||
}, | ||
encode: function(t, set) { | ||
encode(t, set) { | ||
if (isFunction(t)) modp.push({filter: t, field: set}); | ||
@@ -50,12 +50,12 @@ else mod.push({tuple: t, field: set}); | ||
}, | ||
clean: function(value) { | ||
clean(value) { | ||
clean = value; | ||
return this; | ||
}, | ||
reflow: function() { | ||
reflow() { | ||
reflow = true; | ||
return this; | ||
}, | ||
pulse: function(pulse, tuples) { | ||
var cur = {}, out = {}, i, n, m, f, t, id; | ||
pulse(pulse, tuples) { | ||
let cur = {}, out = {}, i, n, m, f, t, id; | ||
@@ -76,3 +76,3 @@ // build lookup table of current tuples | ||
f = remp[i]; | ||
tuples.forEach(function(t) { | ||
tuples.forEach(t => { | ||
if (f(t)) cur[tupleid(t)] = -1; | ||
@@ -128,3 +128,3 @@ }); | ||
f = m.filter; | ||
tuples.forEach(function(t) { | ||
tuples.forEach(t => { | ||
if (f(t) && cur[tupleid(t)] > 0) { | ||
@@ -141,3 +141,3 @@ modify(t, m.field, m.value); | ||
pulse.mod = rem.length || remp.length | ||
? tuples.filter(function(t) { return cur[tupleid(t)] > 0; }) | ||
? tuples.filter(t => cur[tupleid(t)] > 0) | ||
: tuples.slice(); | ||
@@ -144,0 +144,0 @@ } else { |
@@ -21,4 +21,4 @@ import Operator from '../Operator'; | ||
export default function(init, update, params, react) { | ||
var shift = 1, | ||
op; | ||
let shift = 1, | ||
op; | ||
@@ -25,0 +25,0 @@ if (init instanceof Operator) { |
@@ -10,3 +10,3 @@ /** | ||
export default function(target, sources) { | ||
var targetRank = target.rank, i, n; | ||
let targetRank = target.rank, i, n; | ||
@@ -13,0 +13,0 @@ for (i=0, n=sources.length; i<n; ++i) { |
@@ -41,150 +41,149 @@ import add from './add'; | ||
var prototype = Dataflow.prototype; | ||
function logMethod(method) { | ||
return function() { | ||
return this._log[method].apply(this, arguments); | ||
}; | ||
} | ||
/** | ||
* The current timestamp of this dataflow. This value reflects the | ||
* timestamp of the previous dataflow run. The dataflow is initialized | ||
* with a stamp value of 0. The initial run of the dataflow will have | ||
* a timestap of 1, and so on. This value will match the | ||
* {@link Pulse.stamp} property. | ||
* @return {number} - The current timestamp value. | ||
*/ | ||
prototype.stamp = function() { | ||
return this._clock; | ||
}; | ||
Dataflow.prototype = { | ||
/** | ||
* Gets or sets the loader instance to use for data file loading. A | ||
* loader object must provide a "load" method for loading files and a | ||
* "sanitize" method for checking URL/filename validity. Both methods | ||
* should accept a URI and options hash as arguments, and return a Promise | ||
* that resolves to the loaded file contents (load) or a hash containing | ||
* sanitized URI data with the sanitized url assigned to the "href" property | ||
* (sanitize). | ||
* @param {object} _ - The loader instance to use. | ||
* @return {object|Dataflow} - If no arguments are provided, returns | ||
* the current loader instance. Otherwise returns this Dataflow instance. | ||
*/ | ||
prototype.loader = function(_) { | ||
if (arguments.length) { | ||
this._loader = _; | ||
return this; | ||
} else { | ||
return this._loader; | ||
} | ||
}; | ||
/** | ||
* The current timestamp of this dataflow. This value reflects the | ||
* timestamp of the previous dataflow run. The dataflow is initialized | ||
* with a stamp value of 0. The initial run of the dataflow will have | ||
* a timestap of 1, and so on. This value will match the | ||
* {@link Pulse.stamp} property. | ||
* @return {number} - The current timestamp value. | ||
*/ | ||
stamp() { | ||
return this._clock; | ||
}, | ||
/** | ||
* Gets or sets the locale instance to use for formatting and parsing | ||
* string values. The locale object should be provided by the | ||
* vega-format library, and include methods such as format, timeFormat, | ||
* utcFormat, timeParse, and utcParse. | ||
* @param {object} _ - The locale instance to use. | ||
* @return {object|Dataflow} - If no arguments are provided, returns | ||
* the current locale instance. Otherwise returns this Dataflow instance. | ||
*/ | ||
prototype.locale = function(_) { | ||
if (arguments.length) { | ||
this._locale = _; | ||
return this; | ||
} else { | ||
return this._locale; | ||
} | ||
}; | ||
/** | ||
* Gets or sets the loader instance to use for data file loading. A | ||
* loader object must provide a "load" method for loading files and a | ||
* "sanitize" method for checking URL/filename validity. Both methods | ||
* should accept a URI and options hash as arguments, and return a Promise | ||
* that resolves to the loaded file contents (load) or a hash containing | ||
* sanitized URI data with the sanitized url assigned to the "href" property | ||
* (sanitize). | ||
* @param {object} _ - The loader instance to use. | ||
* @return {object|Dataflow} - If no arguments are provided, returns | ||
* the current loader instance. Otherwise returns this Dataflow instance. | ||
*/ | ||
loader(_) { | ||
if (arguments.length) { | ||
this._loader = _; | ||
return this; | ||
} else { | ||
return this._loader; | ||
} | ||
}, | ||
/** | ||
* Empty entry threshold for garbage cleaning. Map data structures will | ||
* perform cleaning once the number of empty entries exceeds this value. | ||
*/ | ||
prototype.cleanThreshold = 1e4; | ||
/** | ||
* Gets or sets the locale instance to use for formatting and parsing | ||
* string values. The locale object should be provided by the | ||
* vega-format library, and include methods such as format, timeFormat, | ||
* utcFormat, timeParse, and utcParse. | ||
* @param {object} _ - The locale instance to use. | ||
* @return {object|Dataflow} - If no arguments are provided, returns | ||
* the current locale instance. Otherwise returns this Dataflow instance. | ||
*/ | ||
locale(_) { | ||
if (arguments.length) { | ||
this._locale = _; | ||
return this; | ||
} else { | ||
return this._locale; | ||
} | ||
}, | ||
// OPERATOR REGISTRATION | ||
prototype.add = add; | ||
prototype.connect = connect; | ||
prototype.rank = rank; | ||
prototype.rerank = rerank; | ||
/** | ||
* Get or set the logger instance used to log messages. If no arguments are | ||
* provided, returns the current logger instance. Otherwise, sets the logger | ||
* and return this Dataflow instance. Provided loggers must support the full | ||
* API of logger objects generated by the vega-util logger method. Note that | ||
* by default the log level of the new logger will be used; use the logLevel | ||
* method to adjust the log level as needed. | ||
*/ | ||
logger(logger) { | ||
if (arguments.length) { | ||
this._log = logger; | ||
return this; | ||
} else { | ||
return this._log; | ||
} | ||
}, | ||
// OPERATOR UPDATES | ||
prototype.pulse = pulse; | ||
prototype.touch = touch; | ||
prototype.update = update; | ||
prototype.changeset = changeset; | ||
/** | ||
* Logs an error message. By default, logged messages are written to console | ||
* output. The message will only be logged if the current log level is high | ||
* enough to permit error messages. | ||
*/ | ||
error: logMethod('error'), | ||
// DATA LOADING | ||
prototype.ingest = ingest; | ||
prototype.parse = parse; | ||
prototype.preload = preload; | ||
prototype.request = request; | ||
/** | ||
* Logs a warning message. By default, logged messages are written to console | ||
* output. The message will only be logged if the current log level is high | ||
* enough to permit warning messages. | ||
*/ | ||
warn: logMethod('warn'), | ||
// EVENT HANDLING | ||
prototype.events = events; | ||
prototype.on = on; | ||
/** | ||
* Logs a information message. By default, logged messages are written to | ||
* console output. The message will only be logged if the current log level is | ||
* high enough to permit information messages. | ||
*/ | ||
info: logMethod('info'), | ||
// PULSE PROPAGATION | ||
prototype.evaluate = evaluate; | ||
prototype.run = run; | ||
prototype.runAsync = runAsync; | ||
prototype.runAfter = runAfter; | ||
prototype._enqueue = enqueue; | ||
prototype._getPulse = getPulse; | ||
/** | ||
* Logs a debug message. By default, logged messages are written to console | ||
* output. The message will only be logged if the current log level is high | ||
* enough to permit debug messages. | ||
*/ | ||
debug: logMethod('debug'), | ||
// LOGGING AND ERROR HANDLING | ||
/** | ||
* Get or set the current log level. If an argument is provided, it | ||
* will be used as the new log level. | ||
* @param {number} [level] - Should be one of None, Warn, Info | ||
* @return {number} - The current log level. | ||
*/ | ||
logLevel: logMethod('level'), | ||
function logMethod(method) { | ||
return function() { | ||
return this._log[method].apply(this, arguments); | ||
}; | ||
} | ||
/** | ||
* Empty entry threshold for garbage cleaning. Map data structures will | ||
* perform cleaning once the number of empty entries exceeds this value. | ||
*/ | ||
cleanThreshold: 1e4, | ||
/** | ||
* Get or set the logger instance used to log messages. If no arguments are | ||
* provided, returns the current logger instance. Otherwise, sets the logger | ||
* and return this Dataflow instance. Provided loggers must support the full | ||
* API of logger objects generated by the vega-util logger method. Note that | ||
* by default the log level of the new logger will be used; use the logLevel | ||
* method to adjust the log level as needed. | ||
*/ | ||
prototype.logger = function(logger) { | ||
if (arguments.length) { | ||
this._log = logger; | ||
return this; | ||
} else { | ||
return this._log; | ||
} | ||
}; | ||
// OPERATOR REGISTRATION | ||
add, | ||
connect, | ||
rank, | ||
rerank, | ||
/** | ||
* Logs an error message. By default, logged messages are written to console | ||
* output. The message will only be logged if the current log level is high | ||
* enough to permit error messages. | ||
*/ | ||
prototype.error = logMethod('error'); | ||
// OPERATOR UPDATES | ||
pulse, | ||
touch, | ||
update, | ||
changeset, | ||
/** | ||
* Logs a warning message. By default, logged messages are written to console | ||
* output. The message will only be logged if the current log level is high | ||
* enough to permit warning messages. | ||
*/ | ||
prototype.warn = logMethod('warn'); | ||
// DATA LOADING | ||
ingest, | ||
parse, | ||
preload, | ||
request, | ||
/** | ||
* Logs a information message. By default, logged messages are written to | ||
* console output. The message will only be logged if the current log level is | ||
* high enough to permit information messages. | ||
*/ | ||
prototype.info = logMethod('info'); | ||
// EVENT HANDLING | ||
events, | ||
on, | ||
/** | ||
* Logs a debug message. By default, logged messages are written to console | ||
* output. The message will only be logged if the current log level is high | ||
* enough to permit debug messages. | ||
*/ | ||
prototype.debug = logMethod('debug'); | ||
/** | ||
* Get or set the current log level. If an argument is provided, it | ||
* will be used as the new log level. | ||
* @param {number} [level] - Should be one of None, Warn, Info | ||
* @return {number} - The current log level. | ||
*/ | ||
prototype.logLevel = logMethod('level'); | ||
// PULSE PROPAGATION | ||
evaluate, | ||
run, | ||
runAsync, | ||
runAfter, | ||
_enqueue: enqueue, | ||
_getPulse: getPulse | ||
}; |
@@ -16,3 +16,3 @@ import {stream} from '../EventStream'; | ||
export default function(source, type, filter, apply) { | ||
var df = this, | ||
let df = this, | ||
s = stream(filter, apply), | ||
@@ -37,3 +37,3 @@ send = function(e) { | ||
for (var i=0, n=sources.length; i<n; ++i) { | ||
for (let i=0, n=sources.length; i<n; ++i) { | ||
sources[i].addEventListener(type, send); | ||
@@ -40,0 +40,0 @@ } |
@@ -73,8 +73,8 @@ import {read, responseType} from 'vega-loader'; | ||
function loadPending(df) { | ||
var pending = new Promise(function(a) { accept = a; }), | ||
accept; | ||
let accept, | ||
pending = new Promise(a => accept = a); | ||
pending.requests = 0; | ||
pending.done = function() { | ||
pending.done = () => { | ||
if (--pending.requests === 0) { | ||
@@ -81,0 +81,0 @@ df._pending = null; |
@@ -5,3 +5,3 @@ import Operator from '../Operator'; | ||
var SKIP = {skip: true}; | ||
const SKIP = {skip: true}; | ||
@@ -35,3 +35,3 @@ /** | ||
export default function(source, target, update, params, options) { | ||
var fn = source instanceof Operator ? onOperator : onStream; | ||
const fn = source instanceof Operator ? onOperator : onStream; | ||
fn(this, source, target, update, params, options); | ||
@@ -42,3 +42,3 @@ return this; | ||
function onStream(df, stream, target, update, params, options) { | ||
var opt = extend({}, options, SKIP), func, op; | ||
let opt = extend({}, options, SKIP), func, op; | ||
@@ -45,0 +45,0 @@ if (!isFunction(target)) target = constant(target); |
@@ -19,3 +19,3 @@ import {error} from 'vega-util'; | ||
export function rerank(op) { | ||
var queue = [op], | ||
let queue = [op], | ||
cur, list, i; | ||
@@ -22,0 +22,0 @@ |
@@ -237,3 +237,3 @@ /* eslint-disable require-atomic-updates */ | ||
export function enqueue(op, force) { | ||
var q = op.stamp < this._clock; | ||
const q = op.stamp < this._clock; | ||
if (q) op.stamp = this._clock; | ||
@@ -259,4 +259,4 @@ if (q || force) { | ||
export function getPulse(op, encode) { | ||
var s = op.source, | ||
stamp = this._clock; | ||
const s = op.source, | ||
stamp = this._clock; | ||
@@ -263,0 +263,0 @@ return s && isArray(s) |
import Pulse from '../Pulse'; | ||
var NO_OPT = {skip: false, force: false}; | ||
const NO_OPT = {skip: false, force: false}; | ||
@@ -18,3 +18,3 @@ /** | ||
export function touch(op, options) { | ||
var opt = options || NO_OPT; | ||
const opt = options || NO_OPT; | ||
if (this._pulse) { | ||
@@ -43,3 +43,3 @@ // if in midst of propagation, add to priority queue | ||
export function update(op, value, options) { | ||
var opt = options || NO_OPT; | ||
const opt = options || NO_OPT; | ||
if (op.set(value) || opt.force) { | ||
@@ -68,4 +68,4 @@ this.touch(op, opt); | ||
var p = new Pulse(this, this._clock + (this._pulse ? 0 : 1)), | ||
t = op.pulse && op.pulse.source || []; | ||
const p = new Pulse(this, this._clock + (this._pulse ? 0 : 1)), | ||
t = op.pulse && op.pulse.source || []; | ||
@@ -72,0 +72,0 @@ p.target = op; |
import UniqueList from './util/UniqueList'; | ||
import {debounce, id, identity, truthy} from 'vega-util'; | ||
var STREAM_ID = 0; | ||
let STREAM_ID = 0; | ||
@@ -37,89 +37,94 @@ /** | ||
var prototype = EventStream.prototype; | ||
EventStream.prototype = { | ||
_filter: truthy, | ||
prototype._filter = truthy; | ||
_apply: identity, | ||
prototype._apply = identity; | ||
targets() { | ||
return this._targets || (this._targets = UniqueList(id)); | ||
}, | ||
prototype.targets = function() { | ||
return this._targets || (this._targets = UniqueList(id)); | ||
}; | ||
consume(_) { | ||
if (!arguments.length) return !!this._consume; | ||
this._consume = !!_; | ||
return this; | ||
}, | ||
prototype.consume = function(_) { | ||
if (!arguments.length) return !!this._consume; | ||
this._consume = !!_; | ||
return this; | ||
}; | ||
receive(evt) { | ||
if (this._filter(evt)) { | ||
let val = (this.value = this._apply(evt)), | ||
trg = this._targets, | ||
n = trg ? trg.length : 0, | ||
i = 0; | ||
prototype.receive = function(evt) { | ||
if (this._filter(evt)) { | ||
var val = (this.value = this._apply(evt)), | ||
trg = this._targets, | ||
n = trg ? trg.length : 0, | ||
i = 0; | ||
for (; i<n; ++i) trg[i].receive(val); | ||
for (; i<n; ++i) trg[i].receive(val); | ||
if (this._consume) { | ||
evt.preventDefault(); | ||
evt.stopPropagation(); | ||
if (this._consume) { | ||
evt.preventDefault(); | ||
evt.stopPropagation(); | ||
} | ||
} | ||
} | ||
}; | ||
}, | ||
prototype.filter = function(filter) { | ||
var s = stream(filter); | ||
this.targets().add(s); | ||
return s; | ||
}; | ||
filter(filter) { | ||
const s = stream(filter); | ||
this.targets().add(s); | ||
return s; | ||
}, | ||
prototype.apply = function(apply) { | ||
var s = stream(null, apply); | ||
this.targets().add(s); | ||
return s; | ||
}; | ||
apply(apply) { | ||
const s = stream(null, apply); | ||
this.targets().add(s); | ||
return s; | ||
}, | ||
prototype.merge = function() { | ||
var s = stream(); | ||
merge() { | ||
const s = stream(); | ||
this.targets().add(s); | ||
for (var i=0, n=arguments.length; i<n; ++i) { | ||
arguments[i].targets().add(s); | ||
} | ||
this.targets().add(s); | ||
for (let i=0, n=arguments.length; i<n; ++i) { | ||
arguments[i].targets().add(s); | ||
} | ||
return s; | ||
}; | ||
return s; | ||
}, | ||
prototype.throttle = function(pause) { | ||
var t = -1; | ||
return this.filter(function() { | ||
var now = Date.now(); | ||
if ((now - t) > pause) { | ||
t = now; | ||
return 1; | ||
} else { | ||
return 0; | ||
} | ||
}); | ||
}; | ||
throttle(pause) { | ||
let t = -1; | ||
return this.filter(() => { | ||
const now = Date.now(); | ||
if ((now - t) > pause) { | ||
t = now; | ||
return 1; | ||
} else { | ||
return 0; | ||
} | ||
}); | ||
}, | ||
prototype.debounce = function(delay) { | ||
var s = stream(); | ||
debounce(delay) { | ||
const s = stream(); | ||
this.targets().add(stream(null, null, | ||
debounce(delay, function(e) { | ||
var df = e.dataflow; | ||
s.receive(e); | ||
if (df && df.run) df.run(); | ||
}) | ||
)); | ||
this.targets().add(stream(null, null, | ||
debounce(delay, e => { | ||
const df = e.dataflow; | ||
s.receive(e); | ||
if (df && df.run) df.run(); | ||
}) | ||
)); | ||
return s; | ||
}; | ||
return s; | ||
}, | ||
prototype.between = function(a, b) { | ||
var active = false; | ||
a.targets().add(stream(null, null, function() { active = true; })); | ||
b.targets().add(stream(null, null, function() { active = false; })); | ||
return this.filter(function() { return active; }); | ||
between(a, b) { | ||
let active = false; | ||
a.targets().add(stream(null, null, () => active = true)); | ||
b.targets().add(stream(null, null, () => active = false)); | ||
return this.filter(() => active); | ||
}, | ||
detach() { | ||
// no-op for handling detach requests | ||
// ensures compatibility with operators (#2753) | ||
} | ||
}; |
@@ -17,3 +17,3 @@ import Pulse from './Pulse'; | ||
export default function MultiPulse(dataflow, stamp, pulses, encode) { | ||
var p = this, | ||
let p = this, | ||
c = 0, | ||
@@ -45,63 +45,57 @@ pulse, hash, i, n, f; | ||
var prototype = inherits(MultiPulse, Pulse); | ||
/** | ||
* Creates a new pulse based on the values of this pulse. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* @return {Pulse} | ||
*/ | ||
prototype.fork = function(flags) { | ||
var p = new Pulse(this.dataflow).init(this, flags & this.NO_FIELDS); | ||
if (flags !== undefined) { | ||
if (flags & p.ADD) { | ||
this.visit(p.ADD, function(t) { return p.add.push(t); }); | ||
inherits(MultiPulse, Pulse, { | ||
/** | ||
* Creates a new pulse based on the values of this pulse. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* @return {Pulse} | ||
*/ | ||
fork(flags) { | ||
const p = new Pulse(this.dataflow).init(this, flags & this.NO_FIELDS); | ||
if (flags !== undefined) { | ||
if (flags & p.ADD) this.visit(p.ADD, t => p.add.push(t)); | ||
if (flags & p.REM) this.visit(p.REM, t => p.rem.push(t)); | ||
if (flags & p.MOD) this.visit(p.MOD, t => p.mod.push(t)); | ||
} | ||
if (flags & p.REM) { | ||
this.visit(p.REM, function(t) { return p.rem.push(t); }); | ||
} | ||
if (flags & p.MOD) { | ||
this.visit(p.MOD, function(t) { return p.mod.push(t); }); | ||
} | ||
} | ||
return p; | ||
}; | ||
return p; | ||
}, | ||
prototype.changed = function(flags) { | ||
return this.changes & flags; | ||
}; | ||
changed(flags) { | ||
return this.changes & flags; | ||
}, | ||
prototype.modified = function(_) { | ||
var p = this, fields = p.fields; | ||
return !(fields && (p.changes & p.MOD)) ? 0 | ||
: isArray(_) ? _.some(function(f) { return fields[f]; }) | ||
: fields[_]; | ||
}; | ||
modified(_) { | ||
const p = this, fields = p.fields; | ||
return !(fields && (p.changes & p.MOD)) ? 0 | ||
: isArray(_) ? _.some(function(f) { return fields[f]; }) | ||
: fields[_]; | ||
}, | ||
prototype.filter = function() { | ||
error('MultiPulse does not support filtering.'); | ||
}; | ||
filter() { | ||
error('MultiPulse does not support filtering.'); | ||
}, | ||
prototype.materialize = function() { | ||
error('MultiPulse does not support materialization.'); | ||
}; | ||
materialize() { | ||
error('MultiPulse does not support materialization.'); | ||
}, | ||
prototype.visit = function(flags, visitor) { | ||
var p = this, | ||
pulses = p.pulses, | ||
n = pulses.length, | ||
i = 0; | ||
visit(flags, visitor) { | ||
let p = this, | ||
pulses = p.pulses, | ||
n = pulses.length, | ||
i = 0; | ||
if (flags & p.SOURCE) { | ||
for (; i<n; ++i) { | ||
pulses[i].visit(flags, visitor); | ||
} | ||
} else { | ||
for (; i<n; ++i) { | ||
if (pulses[i].stamp === p.stamp) { | ||
if (flags & p.SOURCE) { | ||
for (; i<n; ++i) { | ||
pulses[i].visit(flags, visitor); | ||
} | ||
} else { | ||
for (; i<n; ++i) { | ||
if (pulses[i].stamp === p.stamp) { | ||
pulses[i].visit(flags, visitor); | ||
} | ||
} | ||
} | ||
return p; | ||
} | ||
return p; | ||
}; | ||
}); |
@@ -5,9 +5,10 @@ import Parameters from './Parameters'; | ||
var OP_ID = 0; | ||
var PULSE = 'pulse'; | ||
var NO_PARAMS = new Parameters(); | ||
let OP_ID = 0; | ||
const PULSE = 'pulse', | ||
NO_PARAMS = new Parameters(); | ||
// Boolean Flags | ||
var SKIP = 1, | ||
MODIFIED = 2; | ||
const SKIP = 1, | ||
MODIFIED = 2; | ||
@@ -45,31 +46,5 @@ /** | ||
var prototype = Operator.prototype; | ||
/** | ||
* Returns a list of target operators dependent on this operator. | ||
* If this list does not exist, it is created and then returned. | ||
* @return {UniqueList} | ||
*/ | ||
prototype.targets = function() { | ||
return this._targets || (this._targets = UniqueList(id)); | ||
}; | ||
/** | ||
* Sets the value of this operator. | ||
* @param {*} value - the value to set. | ||
* @return {Number} Returns 1 if the operator value has changed | ||
* according to strict equality, returns 0 otherwise. | ||
*/ | ||
prototype.set = function(value) { | ||
if (this.value !== value) { | ||
this.value = value; | ||
return 1; | ||
} else { | ||
return 0; | ||
} | ||
}; | ||
function flag(bit) { | ||
return function(state) { | ||
var f = this.flags; | ||
const f = this.flags; | ||
if (arguments.length === 0) return !!(f & bit); | ||
@@ -81,177 +56,204 @@ this.flags = state ? (f | bit) : (f & ~bit); | ||
/** | ||
* Indicates that operator evaluation should be skipped on the next pulse. | ||
* This operator will still propagate incoming pulses, but its update function | ||
* will not be invoked. The skip flag is reset after every pulse, so calling | ||
* this method will affect processing of the next pulse only. | ||
*/ | ||
prototype.skip = flag(SKIP); | ||
Operator.prototype = { | ||
/** | ||
* Indicates that this operator's value has been modified on its most recent | ||
* pulse. Normally modification is checked via strict equality; however, in | ||
* some cases it is more efficient to update the internal state of an object. | ||
* In those cases, the modified flag can be used to trigger propagation. Once | ||
* set, the modification flag persists across pulses until unset. The flag can | ||
* be used with the last timestamp to test if a modification is recent. | ||
*/ | ||
prototype.modified = flag(MODIFIED); | ||
/** | ||
* Returns a list of target operators dependent on this operator. | ||
* If this list does not exist, it is created and then returned. | ||
* @return {UniqueList} | ||
*/ | ||
targets() { | ||
return this._targets || (this._targets = UniqueList(id)); | ||
}, | ||
/** | ||
* Sets the parameters for this operator. The parameter values are analyzed for | ||
* operator instances. If found, this operator will be added as a dependency | ||
* of the parameterizing operator. Operator values are dynamically marshalled | ||
* from each operator parameter prior to evaluation. If a parameter value is | ||
* an array, the array will also be searched for Operator instances. However, | ||
* the search does not recurse into sub-arrays or object properties. | ||
* @param {object} params - A hash of operator parameters. | ||
* @param {boolean} [react=true] - A flag indicating if this operator should | ||
* automatically update (react) when parameter values change. In other words, | ||
* this flag determines if the operator registers itself as a listener on | ||
* any upstream operators included in the parameters. | ||
* @param {boolean} [initonly=false] - A flag indicating if this operator | ||
* should calculate an update only upon its initiatal evaluation, then | ||
* deregister dependencies and suppress all future update invocations. | ||
* @return {Operator[]} - An array of upstream dependencies. | ||
*/ | ||
prototype.parameters = function(params, react, initonly) { | ||
react = react !== false; | ||
var self = this, | ||
argval = (self._argval = self._argval || new Parameters()), | ||
argops = (self._argops = self._argops || []), | ||
deps = [], | ||
name, value, n, i; | ||
function add(name, index, value) { | ||
if (value instanceof Operator) { | ||
if (value !== self) { | ||
if (react) value.targets().add(self); | ||
deps.push(value); | ||
} | ||
argops.push({op:value, name:name, index:index}); | ||
/** | ||
* Sets the value of this operator. | ||
* @param {*} value - the value to set. | ||
* @return {Number} Returns 1 if the operator value has changed | ||
* according to strict equality, returns 0 otherwise. | ||
*/ | ||
set(value) { | ||
if (this.value !== value) { | ||
this.value = value; | ||
return 1; | ||
} else { | ||
argval.set(name, index, value); | ||
return 0; | ||
} | ||
} | ||
}, | ||
for (name in params) { | ||
value = params[name]; | ||
/** | ||
* Indicates that operator evaluation should be skipped on the next pulse. | ||
* This operator will still propagate incoming pulses, but its update function | ||
* will not be invoked. The skip flag is reset after every pulse, so calling | ||
* this method will affect processing of the next pulse only. | ||
*/ | ||
skip: flag(SKIP), | ||
if (name === PULSE) { | ||
array(value).forEach(function(op) { | ||
if (!(op instanceof Operator)) { | ||
error('Pulse parameters must be operator instances.'); | ||
} else if (op !== self) { | ||
op.targets().add(self); | ||
deps.push(op); | ||
/** | ||
* Indicates that this operator's value has been modified on its most recent | ||
* pulse. Normally modification is checked via strict equality; however, in | ||
* some cases it is more efficient to update the internal state of an object. | ||
* In those cases, the modified flag can be used to trigger propagation. Once | ||
* set, the modification flag persists across pulses until unset. The flag can | ||
* be used with the last timestamp to test if a modification is recent. | ||
*/ | ||
modified: flag(MODIFIED), | ||
/** | ||
* Sets the parameters for this operator. The parameter values are analyzed for | ||
* operator instances. If found, this operator will be added as a dependency | ||
* of the parameterizing operator. Operator values are dynamically marshalled | ||
* from each operator parameter prior to evaluation. If a parameter value is | ||
* an array, the array will also be searched for Operator instances. However, | ||
* the search does not recurse into sub-arrays or object properties. | ||
* @param {object} params - A hash of operator parameters. | ||
* @param {boolean} [react=true] - A flag indicating if this operator should | ||
* automatically update (react) when parameter values change. In other words, | ||
* this flag determines if the operator registers itself as a listener on | ||
* any upstream operators included in the parameters. | ||
* @param {boolean} [initonly=false] - A flag indicating if this operator | ||
* should calculate an update only upon its initiatal evaluation, then | ||
* deregister dependencies and suppress all future update invocations. | ||
* @return {Operator[]} - An array of upstream dependencies. | ||
*/ | ||
parameters(params, react, initonly) { | ||
react = react !== false; | ||
let argval = (this._argval = this._argval || new Parameters()), | ||
argops = (this._argops = this._argops || []), | ||
deps = [], | ||
name, value, n, i; | ||
const add = (name, index, value) => { | ||
if (value instanceof Operator) { | ||
if (value !== this) { | ||
if (react) value.targets().add(this); | ||
deps.push(value); | ||
} | ||
}); | ||
self.source = value; | ||
} else if (isArray(value)) { | ||
argval.set(name, -1, Array(n = value.length)); | ||
for (i=0; i<n; ++i) add(name, i, value[i]); | ||
} else { | ||
add(name, -1, value); | ||
argops.push({op:value, name:name, index:index}); | ||
} else { | ||
argval.set(name, index, value); | ||
} | ||
}; | ||
for (name in params) { | ||
value = params[name]; | ||
if (name === PULSE) { | ||
array(value).forEach(op => { | ||
if (!(op instanceof Operator)) { | ||
error('Pulse parameters must be operator instances.'); | ||
} else if (op !== this) { | ||
op.targets().add(this); | ||
deps.push(op); | ||
} | ||
}); | ||
this.source = value; | ||
} else if (isArray(value)) { | ||
argval.set(name, -1, Array(n = value.length)); | ||
for (i=0; i<n; ++i) add(name, i, value[i]); | ||
} else { | ||
add(name, -1, value); | ||
} | ||
} | ||
} | ||
this.marshall().clear(); // initialize values | ||
if (initonly) argops.initonly = true; | ||
this.marshall().clear(); // initialize values | ||
if (initonly) argops.initonly = true; | ||
return deps; | ||
}; | ||
return deps; | ||
}, | ||
/** | ||
* Internal method for marshalling parameter values. | ||
* Visits each operator dependency to pull the latest value. | ||
* @return {Parameters} A Parameters object to pass to the update function. | ||
*/ | ||
prototype.marshall = function(stamp) { | ||
var argval = this._argval || NO_PARAMS, | ||
argops = this._argops, item, i, n, op, mod; | ||
/** | ||
* Internal method for marshalling parameter values. | ||
* Visits each operator dependency to pull the latest value. | ||
* @return {Parameters} A Parameters object to pass to the update function. | ||
*/ | ||
marshall(stamp) { | ||
let argval = this._argval || NO_PARAMS, | ||
argops = this._argops, | ||
item, i, n, op, mod; | ||
if (argops) { | ||
for (i=0, n=argops.length; i<n; ++i) { | ||
item = argops[i]; | ||
op = item.op; | ||
mod = op.modified() && op.stamp === stamp; | ||
argval.set(item.name, item.index, op.value, mod); | ||
if (argops) { | ||
for (i=0, n=argops.length; i<n; ++i) { | ||
item = argops[i]; | ||
op = item.op; | ||
mod = op.modified() && op.stamp === stamp; | ||
argval.set(item.name, item.index, op.value, mod); | ||
} | ||
if (argops.initonly) { | ||
for (i=0; i<n; ++i) { | ||
item = argops[i]; | ||
item.op.targets().remove(this); | ||
} | ||
this._argops = null; | ||
this._update = null; | ||
} | ||
} | ||
return argval; | ||
}, | ||
if (argops.initonly) { | ||
for (i=0; i<n; ++i) { | ||
/** | ||
* Detach this operator from the dataflow. | ||
* Unregisters listeners on upstream dependencies. | ||
*/ | ||
detach() { | ||
let argops = this._argops, | ||
i, n, item, op; | ||
if (argops) { | ||
for (i=0, n=argops.length; i<n; ++i) { | ||
item = argops[i]; | ||
item.op.targets().remove(this); | ||
op = item.op; | ||
if (op._targets) { | ||
op._targets.remove(this); | ||
} | ||
} | ||
this._argops = null; | ||
this._update = null; | ||
} | ||
} | ||
return argval; | ||
}; | ||
}, | ||
/** | ||
* Detach this operator from the dataflow. | ||
* Unregisters listeners on upstream dependencies. | ||
*/ | ||
prototype.detach = function() { | ||
var argops = this._argops, | ||
i, n, item, op; | ||
/** | ||
* Delegate method to perform operator processing. | ||
* Subclasses can override this method to perform custom processing. | ||
* By default, it marshalls parameters and calls the update function | ||
* if that function is defined. If the update function does not | ||
* change the operator value then StopPropagation is returned. | ||
* If no update function is defined, this method does nothing. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return The output pulse or StopPropagation. A falsy return value | ||
* (including undefined) will let the input pulse pass through. | ||
*/ | ||
evaluate(pulse) { | ||
const update = this._update; | ||
if (update) { | ||
const params = this.marshall(pulse.stamp), | ||
v = update.call(this, params, pulse); | ||
if (argops) { | ||
for (i=0, n=argops.length; i<n; ++i) { | ||
item = argops[i]; | ||
op = item.op; | ||
if (op._targets) { | ||
op._targets.remove(this); | ||
params.clear(); | ||
if (v !== this.value) { | ||
this.value = v; | ||
} else if (!this.modified()) { | ||
return pulse.StopPropagation; | ||
} | ||
} | ||
} | ||
}; | ||
}, | ||
/** | ||
* Delegate method to perform operator processing. | ||
* Subclasses can override this method to perform custom processing. | ||
* By default, it marshalls parameters and calls the update function | ||
* if that function is defined. If the update function does not | ||
* change the operator value then StopPropagation is returned. | ||
* If no update function is defined, this method does nothing. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return The output pulse or StopPropagation. A falsy return value | ||
* (including undefined) will let the input pulse pass through. | ||
*/ | ||
prototype.evaluate = function(pulse) { | ||
var update = this._update; | ||
if (update) { | ||
var params = this.marshall(pulse.stamp), | ||
v = update.call(this, params, pulse); | ||
params.clear(); | ||
if (v !== this.value) { | ||
this.value = v; | ||
} else if (!this.modified()) { | ||
return pulse.StopPropagation; | ||
/** | ||
* Run this operator for the current pulse. If this operator has already | ||
* been run at (or after) the pulse timestamp, returns StopPropagation. | ||
* Internally, this method calls {@link evaluate} to perform processing. | ||
* If {@link evaluate} returns a falsy value, the input pulse is returned. | ||
* This method should NOT be overridden, instead overrride {@link evaluate}. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return the output pulse for this operator (or StopPropagation) | ||
*/ | ||
run(pulse) { | ||
if (pulse.stamp < this.stamp) return pulse.StopPropagation; | ||
let rv; | ||
if (this.skip()) { | ||
this.skip(false); | ||
rv = 0; | ||
} else { | ||
rv = this.evaluate(pulse); | ||
} | ||
return (this.pulse = rv || pulse); | ||
} | ||
}; | ||
/** | ||
* Run this operator for the current pulse. If this operator has already | ||
* been run at (or after) the pulse timestamp, returns StopPropagation. | ||
* Internally, this method calls {@link evaluate} to perform processing. | ||
* If {@link evaluate} returns a falsy value, the input pulse is returned. | ||
* This method should NOT be overridden, instead overrride {@link evaluate}. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return the output pulse for this operator (or StopPropagation) | ||
*/ | ||
prototype.run = function(pulse) { | ||
if (pulse.stamp < this.stamp) return pulse.StopPropagation; | ||
var rv; | ||
if (this.skip()) { | ||
this.skip(false); | ||
rv = 0; | ||
} else { | ||
rv = this.evaluate(pulse); | ||
} | ||
return (this.pulse = rv || pulse); | ||
}; |
import {isArray} from 'vega-util'; | ||
var CACHE = '_:mod:_'; | ||
const CACHE = '_:mod:_'; | ||
@@ -13,67 +13,67 @@ /** | ||
var prototype = Parameters.prototype; | ||
Parameters.prototype = { | ||
/** | ||
* Set a parameter value. If the parameter value changes, the parameter | ||
* will be recorded as modified. | ||
* @param {string} name - The parameter name. | ||
* @param {number} index - The index into an array-value parameter. Ignored if | ||
* the argument is undefined, null or less than zero. | ||
* @param {*} value - The parameter value to set. | ||
* @param {boolean} [force=false] - If true, records the parameter as modified | ||
* even if the value is unchanged. | ||
* @return {Parameters} - This parameter object. | ||
*/ | ||
set(name, index, value, force) { | ||
const o = this, | ||
v = o[name], | ||
mod = o[CACHE]; | ||
/** | ||
* Set a parameter value. If the parameter value changes, the parameter | ||
* will be recorded as modified. | ||
* @param {string} name - The parameter name. | ||
* @param {number} index - The index into an array-value parameter. Ignored if | ||
* the argument is undefined, null or less than zero. | ||
* @param {*} value - The parameter value to set. | ||
* @param {boolean} [force=false] - If true, records the parameter as modified | ||
* even if the value is unchanged. | ||
* @return {Parameters} - This parameter object. | ||
*/ | ||
prototype.set = function(name, index, value, force) { | ||
var o = this, | ||
v = o[name], | ||
mod = o[CACHE]; | ||
if (index != null && index >= 0) { | ||
if (v[index] !== value || force) { | ||
v[index] = value; | ||
mod[index + ':' + name] = -1; | ||
mod[name] = -1; | ||
if (index != null && index >= 0) { | ||
if (v[index] !== value || force) { | ||
v[index] = value; | ||
mod[index + ':' + name] = -1; | ||
mod[name] = -1; | ||
} | ||
} else if (v !== value || force) { | ||
o[name] = value; | ||
mod[name] = isArray(value) ? 1 + value.length : -1; | ||
} | ||
} else if (v !== value || force) { | ||
o[name] = value; | ||
mod[name] = isArray(value) ? 1 + value.length : -1; | ||
} | ||
return o; | ||
}; | ||
return o; | ||
}, | ||
/** | ||
* Tests if one or more parameters has been modified. If invoked with no | ||
* arguments, returns true if any parameter value has changed. If the first | ||
* argument is array, returns trues if any parameter name in the array has | ||
* changed. Otherwise, tests if the given name and optional array index has | ||
* changed. | ||
* @param {string} name - The parameter name to test. | ||
* @param {number} [index=undefined] - The parameter array index to test. | ||
* @return {boolean} - Returns true if a queried parameter was modified. | ||
*/ | ||
prototype.modified = function(name, index) { | ||
var mod = this[CACHE], k; | ||
if (!arguments.length) { | ||
for (k in mod) { if (mod[k]) return true; } | ||
return false; | ||
} else if (isArray(name)) { | ||
for (k=0; k<name.length; ++k) { | ||
if (mod[name[k]]) return true; | ||
/** | ||
* Tests if one or more parameters has been modified. If invoked with no | ||
* arguments, returns true if any parameter value has changed. If the first | ||
* argument is array, returns trues if any parameter name in the array has | ||
* changed. Otherwise, tests if the given name and optional array index has | ||
* changed. | ||
* @param {string} name - The parameter name to test. | ||
* @param {number} [index=undefined] - The parameter array index to test. | ||
* @return {boolean} - Returns true if a queried parameter was modified. | ||
*/ | ||
modified(name, index) { | ||
let mod = this[CACHE], k; | ||
if (!arguments.length) { | ||
for (k in mod) { if (mod[k]) return true; } | ||
return false; | ||
} else if (isArray(name)) { | ||
for (k=0; k<name.length; ++k) { | ||
if (mod[name[k]]) return true; | ||
} | ||
return false; | ||
} | ||
return false; | ||
return (index != null && index >= 0) | ||
? (index + 1 < mod[name] || !!mod[index + ':' + name]) | ||
: !!mod[name]; | ||
}, | ||
/** | ||
* Clears the modification records. After calling this method, | ||
* all parameters are considered unmodified. | ||
*/ | ||
clear() { | ||
this[CACHE] = {}; | ||
return this; | ||
} | ||
return (index != null && index >= 0) | ||
? (index + 1 < mod[name] || !!mod[index + ':' + name]) | ||
: !!mod[name]; | ||
}; | ||
/** | ||
* Clears the modification records. After calling this method, | ||
* all parameters are considered unmodified. | ||
*/ | ||
prototype.clear = function() { | ||
this[CACHE] = {}; | ||
return this; | ||
}; |
652
src/Pulse.js
@@ -54,364 +54,370 @@ import {tupleid} from './Tuple'; | ||
const prototype = Pulse.prototype; | ||
function materialize(data, filter) { | ||
const out = []; | ||
visitArray(data, filter, _ => out.push(_)); | ||
return out; | ||
} | ||
/** | ||
* Sentinel value indicating pulse propagation should stop. | ||
*/ | ||
prototype.StopPropagation = StopPropagation; | ||
function filter(pulse, flags) { | ||
const map = {}; | ||
pulse.visit(flags, function(t) { map[tupleid(t)] = 1; }); | ||
return t => map[tupleid(t)] ? null : t; | ||
} | ||
/** | ||
* Boolean flag indicating ADD (added) tuples. | ||
*/ | ||
prototype.ADD = ADD; | ||
function addFilter(a, b) { | ||
return a | ||
? (t, i) => a(t, i) && b(t, i) | ||
: b; | ||
} | ||
/** | ||
* Boolean flag indicating REM (removed) tuples. | ||
*/ | ||
prototype.REM = REM; | ||
Pulse.prototype = { | ||
/** | ||
* Boolean flag indicating MOD (modified) tuples. | ||
*/ | ||
prototype.MOD = MOD; | ||
/** | ||
* Sentinel value indicating pulse propagation should stop. | ||
*/ | ||
StopPropagation, | ||
/** | ||
* Boolean flag indicating ADD (added) and REM (removed) tuples. | ||
*/ | ||
prototype.ADD_REM = ADD_REM; | ||
/** | ||
* Boolean flag indicating ADD (added) tuples. | ||
*/ | ||
ADD, | ||
/** | ||
* Boolean flag indicating ADD (added) and MOD (modified) tuples. | ||
*/ | ||
prototype.ADD_MOD = ADD_MOD; | ||
/** | ||
* Boolean flag indicating REM (removed) tuples. | ||
*/ | ||
REM, | ||
/** | ||
* Boolean flag indicating ADD, REM and MOD tuples. | ||
*/ | ||
prototype.ALL = ALL; | ||
/** | ||
* Boolean flag indicating MOD (modified) tuples. | ||
*/ | ||
MOD, | ||
/** | ||
* Boolean flag indicating all tuples in a data source | ||
* except for the ADD, REM and MOD tuples. | ||
*/ | ||
prototype.REFLOW = REFLOW; | ||
/** | ||
* Boolean flag indicating ADD (added) and REM (removed) tuples. | ||
*/ | ||
ADD_REM, | ||
/** | ||
* Boolean flag indicating a 'pass-through' to a | ||
* backing data source, ignoring ADD, REM and MOD tuples. | ||
*/ | ||
prototype.SOURCE = SOURCE; | ||
/** | ||
* Boolean flag indicating ADD (added) and MOD (modified) tuples. | ||
*/ | ||
ADD_MOD, | ||
/** | ||
* Boolean flag indicating that source data should be | ||
* suppressed when creating a forked pulse. | ||
*/ | ||
prototype.NO_SOURCE = NO_SOURCE; | ||
/** | ||
* Boolean flag indicating ADD, REM and MOD tuples. | ||
*/ | ||
ALL, | ||
/** | ||
* Boolean flag indicating that field modifications should be | ||
* suppressed when creating a forked pulse. | ||
*/ | ||
prototype.NO_FIELDS = NO_FIELDS; | ||
/** | ||
* Boolean flag indicating all tuples in a data source | ||
* except for the ADD, REM and MOD tuples. | ||
*/ | ||
REFLOW, | ||
/** | ||
* Creates a new pulse based on the values of this pulse. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* By default, new empty ADD, REM and MOD arrays are created. | ||
* @param {number} flags - Integer of boolean flags indicating which (if any) | ||
* tuple arrays should be copied to the new pulse. The supported flag values | ||
* are ADD, REM and MOD. Array references are copied directly: new array | ||
* instances are not created. | ||
* @return {Pulse} - The forked pulse instance. | ||
* @see init | ||
*/ | ||
prototype.fork = function(flags) { | ||
return new Pulse(this.dataflow).init(this, flags); | ||
}; | ||
/** | ||
* Boolean flag indicating a 'pass-through' to a | ||
* backing data source, ignoring ADD, REM and MOD tuples. | ||
*/ | ||
SOURCE, | ||
/** | ||
* Creates a copy of this pulse with new materialized array | ||
* instances for the ADD, REM, MOD, and SOURCE arrays. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* @return {Pulse} - The cloned pulse instance. | ||
* @see init | ||
*/ | ||
prototype.clone = function() { | ||
const p = this.fork(ALL); | ||
p.add = p.add.slice(); | ||
p.rem = p.rem.slice(); | ||
p.mod = p.mod.slice(); | ||
if (p.source) p.source = p.source.slice(); | ||
return p.materialize(ALL | SOURCE); | ||
}; | ||
/** | ||
* Boolean flag indicating that source data should be | ||
* suppressed when creating a forked pulse. | ||
*/ | ||
NO_SOURCE, | ||
/** | ||
* Returns a pulse that adds all tuples from a backing source. This is | ||
* useful for cases where operators are added to a dataflow after an | ||
* upstream data pipeline has already been processed, ensuring that | ||
* new operators can observe all tuples within a stream. | ||
* @return {Pulse} - A pulse instance with all source tuples included | ||
* in the add array. If the current pulse already has all source | ||
* tuples in its add array, it is returned directly. If the current | ||
* pulse does not have a backing source, it is returned directly. | ||
*/ | ||
prototype.addAll = function() { | ||
let p = this; | ||
if (!p.source || p.source.length === p.add.length) { | ||
return p; | ||
} else { | ||
p = new Pulse(this.dataflow).init(this); | ||
p.add = p.source; | ||
return p; | ||
} | ||
}; | ||
/** | ||
* Boolean flag indicating that field modifications should be | ||
* suppressed when creating a forked pulse. | ||
*/ | ||
NO_FIELDS, | ||
/** | ||
* Initialize this pulse based on the values of another pulse. This method | ||
* is used internally by {@link fork} to initialize a new forked tuple. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* By default, new empty ADD, REM and MOD arrays are created. | ||
* @param {Pulse} src - The source pulse to copy from. | ||
* @param {number} flags - Integer of boolean flags indicating which (if any) | ||
* tuple arrays should be copied to the new pulse. The supported flag values | ||
* are ADD, REM and MOD. Array references are copied directly: new array | ||
* instances are not created. By default, source data arrays are copied | ||
* to the new pulse. Use the NO_SOURCE flag to enforce a null source. | ||
* @return {Pulse} - Returns this Pulse instance. | ||
*/ | ||
prototype.init = function(src, flags) { | ||
const p = this; | ||
p.stamp = src.stamp; | ||
p.encode = src.encode; | ||
/** | ||
* Creates a new pulse based on the values of this pulse. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* By default, new empty ADD, REM and MOD arrays are created. | ||
* @param {number} flags - Integer of boolean flags indicating which (if any) | ||
* tuple arrays should be copied to the new pulse. The supported flag values | ||
* are ADD, REM and MOD. Array references are copied directly: new array | ||
* instances are not created. | ||
* @return {Pulse} - The forked pulse instance. | ||
* @see init | ||
*/ | ||
fork(flags) { | ||
return new Pulse(this.dataflow).init(this, flags); | ||
}, | ||
if (src.fields && !(flags & NO_FIELDS)) { | ||
p.fields = src.fields; | ||
} | ||
/** | ||
* Creates a copy of this pulse with new materialized array | ||
* instances for the ADD, REM, MOD, and SOURCE arrays. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* @return {Pulse} - The cloned pulse instance. | ||
* @see init | ||
*/ | ||
clone() { | ||
const p = this.fork(ALL); | ||
p.add = p.add.slice(); | ||
p.rem = p.rem.slice(); | ||
p.mod = p.mod.slice(); | ||
if (p.source) p.source = p.source.slice(); | ||
return p.materialize(ALL | SOURCE); | ||
}, | ||
if (flags & ADD) { | ||
p.addF = src.addF; | ||
p.add = src.add; | ||
} else { | ||
p.addF = null; | ||
p.add = []; | ||
} | ||
/** | ||
* Returns a pulse that adds all tuples from a backing source. This is | ||
* useful for cases where operators are added to a dataflow after an | ||
* upstream data pipeline has already been processed, ensuring that | ||
* new operators can observe all tuples within a stream. | ||
* @return {Pulse} - A pulse instance with all source tuples included | ||
* in the add array. If the current pulse already has all source | ||
* tuples in its add array, it is returned directly. If the current | ||
* pulse does not have a backing source, it is returned directly. | ||
*/ | ||
addAll() { | ||
let p = this; | ||
const reuse = !p.source | ||
|| p.add === p.rem // special case for indexed set (e.g., crossfilter) | ||
|| (!p.rem.length && p.source.length === p.add.length); | ||
if (flags & REM) { | ||
p.remF = src.remF; | ||
p.rem = src.rem; | ||
} else { | ||
p.remF = null; | ||
p.rem = []; | ||
} | ||
if (reuse) { | ||
return p; | ||
} else { | ||
p = new Pulse(this.dataflow).init(this); | ||
p.add = p.source; | ||
p.rem = []; // new operators can ignore rem #2769 | ||
return p; | ||
} | ||
}, | ||
if (flags & MOD) { | ||
p.modF = src.modF; | ||
p.mod = src.mod; | ||
} else { | ||
p.modF = null; | ||
p.mod = []; | ||
} | ||
/** | ||
* Initialize this pulse based on the values of another pulse. This method | ||
* is used internally by {@link fork} to initialize a new forked tuple. | ||
* The dataflow, time stamp and field modification values are copied over. | ||
* By default, new empty ADD, REM and MOD arrays are created. | ||
* @param {Pulse} src - The source pulse to copy from. | ||
* @param {number} flags - Integer of boolean flags indicating which (if any) | ||
* tuple arrays should be copied to the new pulse. The supported flag values | ||
* are ADD, REM and MOD. Array references are copied directly: new array | ||
* instances are not created. By default, source data arrays are copied | ||
* to the new pulse. Use the NO_SOURCE flag to enforce a null source. | ||
* @return {Pulse} - Returns this Pulse instance. | ||
*/ | ||
init(src, flags) { | ||
const p = this; | ||
p.stamp = src.stamp; | ||
p.encode = src.encode; | ||
if (flags & NO_SOURCE) { | ||
p.srcF = null; | ||
p.source = null; | ||
} else { | ||
p.srcF = src.srcF; | ||
p.source = src.source; | ||
if (src.cleans) p.cleans = src.cleans; | ||
} | ||
if (src.fields && !(flags & NO_FIELDS)) { | ||
p.fields = src.fields; | ||
} | ||
return p; | ||
}; | ||
if (flags & ADD) { | ||
p.addF = src.addF; | ||
p.add = src.add; | ||
} else { | ||
p.addF = null; | ||
p.add = []; | ||
} | ||
/** | ||
* Schedules a function to run after pulse propagation completes. | ||
* @param {function} func - The function to run. | ||
*/ | ||
prototype.runAfter = function(func) { | ||
this.dataflow.runAfter(func); | ||
}; | ||
if (flags & REM) { | ||
p.remF = src.remF; | ||
p.rem = src.rem; | ||
} else { | ||
p.remF = null; | ||
p.rem = []; | ||
} | ||
/** | ||
* Indicates if tuples have been added, removed or modified. | ||
* @param {number} [flags] - The tuple types (ADD, REM or MOD) to query. | ||
* Defaults to ALL, returning true if any tuple type has changed. | ||
* @return {boolean} - Returns true if one or more queried tuple types have | ||
* changed, false otherwise. | ||
*/ | ||
prototype.changed = function(flags) { | ||
var f = flags || ALL; | ||
return ((f & ADD) && this.add.length) | ||
|| ((f & REM) && this.rem.length) | ||
|| ((f & MOD) && this.mod.length); | ||
}; | ||
if (flags & MOD) { | ||
p.modF = src.modF; | ||
p.mod = src.mod; | ||
} else { | ||
p.modF = null; | ||
p.mod = []; | ||
} | ||
/** | ||
* Forces a "reflow" of tuple values, such that all tuples in the backing | ||
* source are added to the MOD set, unless already present in the ADD set. | ||
* @param {boolean} [fork=false] - If true, returns a forked copy of this | ||
* pulse, and invokes reflow on that derived pulse. | ||
* @return {Pulse} - The reflowed pulse instance. | ||
*/ | ||
prototype.reflow = function(fork) { | ||
if (fork) return this.fork(ALL).reflow(); | ||
if (flags & NO_SOURCE) { | ||
p.srcF = null; | ||
p.source = null; | ||
} else { | ||
p.srcF = src.srcF; | ||
p.source = src.source; | ||
if (src.cleans) p.cleans = src.cleans; | ||
} | ||
var len = this.add.length, | ||
src = this.source && this.source.length; | ||
if (src && src !== len) { | ||
this.mod = this.source; | ||
if (len) this.filter(MOD, filter(this, ADD)); | ||
} | ||
return this; | ||
}; | ||
return p; | ||
}, | ||
/** | ||
* Get/set metadata to pulse requesting garbage collection | ||
* to reclaim currently unused resources. | ||
*/ | ||
prototype.clean = function(value) { | ||
if (arguments.length) { | ||
this.cleans = !!value; | ||
return this; | ||
} else { | ||
return this.cleans; | ||
} | ||
}; | ||
/** | ||
* Schedules a function to run after pulse propagation completes. | ||
* @param {function} func - The function to run. | ||
*/ | ||
runAfter(func) { | ||
this.dataflow.runAfter(func); | ||
}, | ||
/** | ||
* Marks one or more data field names as modified to assist dependency | ||
* tracking and incremental processing by transform operators. | ||
* @param {string|Array<string>} _ - The field(s) to mark as modified. | ||
* @return {Pulse} - This pulse instance. | ||
*/ | ||
prototype.modifies = function(_) { | ||
var hash = this.fields || (this.fields = {}); | ||
if (isArray(_)) { | ||
_.forEach(f => hash[f] = true); | ||
} else { | ||
hash[_] = true; | ||
} | ||
return this; | ||
}; | ||
/** | ||
* Indicates if tuples have been added, removed or modified. | ||
* @param {number} [flags] - The tuple types (ADD, REM or MOD) to query. | ||
* Defaults to ALL, returning true if any tuple type has changed. | ||
* @return {boolean} - Returns true if one or more queried tuple types have | ||
* changed, false otherwise. | ||
*/ | ||
changed(flags) { | ||
const f = flags || ALL; | ||
return ((f & ADD) && this.add.length) | ||
|| ((f & REM) && this.rem.length) | ||
|| ((f & MOD) && this.mod.length); | ||
}, | ||
/** | ||
* Checks if one or more data fields have been modified during this pulse | ||
* propagation timestamp. | ||
* @param {string|Array<string>} _ - The field(s) to check for modified. | ||
* @param {boolean} nomod - If true, will check the modified flag even if | ||
* no mod tuples exist. If false (default), mod tuples must be present. | ||
* @return {boolean} - Returns true if any of the provided fields has been | ||
* marked as modified, false otherwise. | ||
*/ | ||
prototype.modified = function(_, nomod) { | ||
var fields = this.fields; | ||
return !((nomod || this.mod.length) && fields) ? false | ||
: !arguments.length ? !!fields | ||
: isArray(_) ? _.some(f => fields[f]) | ||
: fields[_]; | ||
}; | ||
/** | ||
* Forces a "reflow" of tuple values, such that all tuples in the backing | ||
* source are added to the MOD set, unless already present in the ADD set. | ||
* @param {boolean} [fork=false] - If true, returns a forked copy of this | ||
* pulse, and invokes reflow on that derived pulse. | ||
* @return {Pulse} - The reflowed pulse instance. | ||
*/ | ||
reflow(fork) { | ||
if (fork) return this.fork(ALL).reflow(); | ||
/** | ||
* Adds a filter function to one more tuple sets. Filters are applied to | ||
* backing tuple arrays, to determine the actual set of tuples considered | ||
* added, removed or modified. They can be used to delay materialization of | ||
* a tuple set in order to avoid expensive array copies. In addition, the | ||
* filter functions can serve as value transformers: unlike standard predicate | ||
* function (which return boolean values), Pulse filters should return the | ||
* actual tuple value to process. If a tuple set is already filtered, the | ||
* new filter function will be appended into a conjuntive ('and') query. | ||
* @param {number} flags - Flags indicating the tuple set(s) to filter. | ||
* @param {function(*):object} filter - Filter function that will be applied | ||
* to the tuple set array, and should return a data tuple if the value | ||
* should be included in the tuple set, and falsy (or null) otherwise. | ||
* @return {Pulse} - Returns this pulse instance. | ||
*/ | ||
prototype.filter = function(flags, filter) { | ||
var p = this; | ||
if (flags & ADD) p.addF = addFilter(p.addF, filter); | ||
if (flags & REM) p.remF = addFilter(p.remF, filter); | ||
if (flags & MOD) p.modF = addFilter(p.modF, filter); | ||
if (flags & SOURCE) p.srcF = addFilter(p.srcF, filter); | ||
return p; | ||
}; | ||
const len = this.add.length, | ||
src = this.source && this.source.length; | ||
if (src && src !== len) { | ||
this.mod = this.source; | ||
if (len) this.filter(MOD, filter(this, ADD)); | ||
} | ||
return this; | ||
}, | ||
function addFilter(a, b) { | ||
return a | ||
? (t, i) => a(t, i) && b(t, i) | ||
: b; | ||
} | ||
/** | ||
* Get/set metadata to pulse requesting garbage collection | ||
* to reclaim currently unused resources. | ||
*/ | ||
clean(value) { | ||
if (arguments.length) { | ||
this.cleans = !!value; | ||
return this; | ||
} else { | ||
return this.cleans; | ||
} | ||
}, | ||
/** | ||
* Materialize one or more tuple sets in this pulse. If the tuple set(s) have | ||
* a registered filter function, it will be applied and the tuple set(s) will | ||
* be replaced with materialized tuple arrays. | ||
* @param {number} flags - Flags indicating the tuple set(s) to materialize. | ||
* @return {Pulse} - Returns this pulse instance. | ||
*/ | ||
prototype.materialize = function(flags) { | ||
flags = flags || ALL; | ||
var p = this; | ||
if ((flags & ADD) && p.addF) { | ||
p.add = materialize(p.add, p.addF); | ||
p.addF = null; | ||
} | ||
if ((flags & REM) && p.remF) { | ||
p.rem = materialize(p.rem, p.remF); | ||
p.remF = null; | ||
} | ||
if ((flags & MOD) && p.modF) { | ||
p.mod = materialize(p.mod, p.modF); | ||
p.modF = null; | ||
} | ||
if ((flags & SOURCE) && p.srcF) { | ||
p.source = p.source.filter(p.srcF); | ||
p.srcF = null; | ||
} | ||
return p; | ||
}; | ||
/** | ||
* Marks one or more data field names as modified to assist dependency | ||
* tracking and incremental processing by transform operators. | ||
* @param {string|Array<string>} _ - The field(s) to mark as modified. | ||
* @return {Pulse} - This pulse instance. | ||
*/ | ||
modifies(_) { | ||
const hash = this.fields || (this.fields = {}); | ||
if (isArray(_)) { | ||
_.forEach(f => hash[f] = true); | ||
} else { | ||
hash[_] = true; | ||
} | ||
return this; | ||
}, | ||
function materialize(data, filter) { | ||
var out = []; | ||
visitArray(data, filter, _ => out.push(_)); | ||
return out; | ||
} | ||
/** | ||
* Checks if one or more data fields have been modified during this pulse | ||
* propagation timestamp. | ||
* @param {string|Array<string>} _ - The field(s) to check for modified. | ||
* @param {boolean} nomod - If true, will check the modified flag even if | ||
* no mod tuples exist. If false (default), mod tuples must be present. | ||
* @return {boolean} - Returns true if any of the provided fields has been | ||
* marked as modified, false otherwise. | ||
*/ | ||
modified(_, nomod) { | ||
const fields = this.fields; | ||
return !((nomod || this.mod.length) && fields) ? false | ||
: !arguments.length ? !!fields | ||
: isArray(_) ? _.some(f => fields[f]) | ||
: fields[_]; | ||
}, | ||
function filter(pulse, flags) { | ||
var map = {}; | ||
pulse.visit(flags, function(t) { map[tupleid(t)] = 1; }); | ||
return t => map[tupleid(t)] ? null : t; | ||
} | ||
/** | ||
* Adds a filter function to one more tuple sets. Filters are applied to | ||
* backing tuple arrays, to determine the actual set of tuples considered | ||
* added, removed or modified. They can be used to delay materialization of | ||
* a tuple set in order to avoid expensive array copies. In addition, the | ||
* filter functions can serve as value transformers: unlike standard predicate | ||
* function (which return boolean values), Pulse filters should return the | ||
* actual tuple value to process. If a tuple set is already filtered, the | ||
* new filter function will be appended into a conjuntive ('and') query. | ||
* @param {number} flags - Flags indicating the tuple set(s) to filter. | ||
* @param {function(*):object} filter - Filter function that will be applied | ||
* to the tuple set array, and should return a data tuple if the value | ||
* should be included in the tuple set, and falsy (or null) otherwise. | ||
* @return {Pulse} - Returns this pulse instance. | ||
*/ | ||
filter(flags, filter) { | ||
const p = this; | ||
if (flags & ADD) p.addF = addFilter(p.addF, filter); | ||
if (flags & REM) p.remF = addFilter(p.remF, filter); | ||
if (flags & MOD) p.modF = addFilter(p.modF, filter); | ||
if (flags & SOURCE) p.srcF = addFilter(p.srcF, filter); | ||
return p; | ||
}, | ||
/** | ||
* Visit one or more tuple sets in this pulse. | ||
* @param {number} flags - Flags indicating the tuple set(s) to visit. | ||
* Legal values are ADD, REM, MOD and SOURCE (if a backing data source | ||
* has been set). | ||
* @param {function(object):*} - Visitor function invoked per-tuple. | ||
* @return {Pulse} - Returns this pulse instance. | ||
*/ | ||
prototype.visit = function(flags, visitor) { | ||
var p = this, v = visitor, src, sum; | ||
if (flags & SOURCE) { | ||
visitArray(p.source, p.srcF, v); | ||
/** | ||
* Materialize one or more tuple sets in this pulse. If the tuple set(s) have | ||
* a registered filter function, it will be applied and the tuple set(s) will | ||
* be replaced with materialized tuple arrays. | ||
* @param {number} flags - Flags indicating the tuple set(s) to materialize. | ||
* @return {Pulse} - Returns this pulse instance. | ||
*/ | ||
materialize(flags) { | ||
flags = flags || ALL; | ||
const p = this; | ||
if ((flags & ADD) && p.addF) { | ||
p.add = materialize(p.add, p.addF); | ||
p.addF = null; | ||
} | ||
if ((flags & REM) && p.remF) { | ||
p.rem = materialize(p.rem, p.remF); | ||
p.remF = null; | ||
} | ||
if ((flags & MOD) && p.modF) { | ||
p.mod = materialize(p.mod, p.modF); | ||
p.modF = null; | ||
} | ||
if ((flags & SOURCE) && p.srcF) { | ||
p.source = p.source.filter(p.srcF); | ||
p.srcF = null; | ||
} | ||
return p; | ||
} | ||
}, | ||
if (flags & ADD) visitArray(p.add, p.addF, v); | ||
if (flags & REM) visitArray(p.rem, p.remF, v); | ||
if (flags & MOD) visitArray(p.mod, p.modF, v); | ||
/** | ||
* Visit one or more tuple sets in this pulse. | ||
* @param {number} flags - Flags indicating the tuple set(s) to visit. | ||
* Legal values are ADD, REM, MOD and SOURCE (if a backing data source | ||
* has been set). | ||
* @param {function(object):*} - Visitor function invoked per-tuple. | ||
* @return {Pulse} - Returns this pulse instance. | ||
*/ | ||
visit(flags, visitor) { | ||
let p = this, v = visitor, src, sum; | ||
if ((flags & REFLOW) && (src = p.source)) { | ||
sum = p.add.length + p.mod.length; | ||
if (sum === src.length) { | ||
// do nothing | ||
} else if (sum) { | ||
visitArray(src, filter(p, ADD_MOD), v); | ||
} else { | ||
// if no add/rem/mod tuples, visit source | ||
visitArray(src, p.srcF, v); | ||
if (flags & SOURCE) { | ||
visitArray(p.source, p.srcF, v); | ||
return p; | ||
} | ||
if (flags & ADD) visitArray(p.add, p.addF, v); | ||
if (flags & REM) visitArray(p.rem, p.remF, v); | ||
if (flags & MOD) visitArray(p.mod, p.modF, v); | ||
if ((flags & REFLOW) && (src = p.source)) { | ||
sum = p.add.length + p.mod.length; | ||
if (sum === src.length) { | ||
// do nothing | ||
} else if (sum) { | ||
visitArray(src, filter(p, ADD_MOD), v); | ||
} else { | ||
// if no add/rem/mod tuples, visit source | ||
visitArray(src, p.srcF, v); | ||
} | ||
} | ||
return p; | ||
} | ||
return p; | ||
}; |
import {hasOwnProperty} from 'vega-util'; | ||
export var transforms = {}; | ||
export const transforms = {}; | ||
export function definition(type) { | ||
var t = transform(type); | ||
const t = transform(type); | ||
return t && t.Definition || null; | ||
@@ -8,0 +8,0 @@ } |
@@ -16,54 +16,54 @@ import Operator from './Operator'; | ||
var prototype = inherits(Transform, Operator); | ||
inherits(Transform, Operator, { | ||
/** | ||
* Overrides {@link Operator.evaluate} for transform operators. | ||
* Internally, this method calls {@link evaluate} to perform processing. | ||
* If {@link evaluate} returns a falsy value, the input pulse is returned. | ||
* This method should NOT be overridden, instead overrride {@link evaluate}. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return the output pulse for this operator (or StopPropagation) | ||
*/ | ||
run(pulse) { | ||
if (pulse.stamp < this.stamp) return pulse.StopPropagation; | ||
/** | ||
* Overrides {@link Operator.evaluate} for transform operators. | ||
* Internally, this method calls {@link evaluate} to perform processing. | ||
* If {@link evaluate} returns a falsy value, the input pulse is returned. | ||
* This method should NOT be overridden, instead overrride {@link evaluate}. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return the output pulse for this operator (or StopPropagation) | ||
*/ | ||
prototype.run = function(pulse) { | ||
if (pulse.stamp < this.stamp) return pulse.StopPropagation; | ||
let rv; | ||
if (this.skip()) { | ||
this.skip(false); | ||
} else { | ||
rv = this.evaluate(pulse); | ||
} | ||
rv = rv || pulse; | ||
var rv; | ||
if (this.skip()) { | ||
this.skip(false); | ||
} else { | ||
rv = this.evaluate(pulse); | ||
} | ||
rv = rv || pulse; | ||
if (rv.then) { | ||
rv = rv.then(_ => this.pulse =_); | ||
} else if (rv !== pulse.StopPropagation) { | ||
this.pulse = rv; | ||
} | ||
if (rv.then) { | ||
rv = rv.then(_ => this.pulse =_); | ||
} else if (rv !== pulse.StopPropagation) { | ||
this.pulse = rv; | ||
} | ||
return rv; | ||
}, | ||
return rv; | ||
}; | ||
/** | ||
* Overrides {@link Operator.evaluate} for transform operators. | ||
* Marshalls parameter values and then invokes {@link transform}. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return {Pulse} The output pulse (or StopPropagation). A falsy return | ||
value (including undefined) will let the input pulse pass through. | ||
*/ | ||
evaluate(pulse) { | ||
const params = this.marshall(pulse.stamp), | ||
out = this.transform(params, pulse); | ||
params.clear(); | ||
return out; | ||
}, | ||
/** | ||
* Overrides {@link Operator.evaluate} for transform operators. | ||
* Marshalls parameter values and then invokes {@link transform}. | ||
* @param {Pulse} pulse - the current dataflow pulse. | ||
* @return {Pulse} The output pulse (or StopPropagation). A falsy return | ||
value (including undefined) will let the input pulse pass through. | ||
*/ | ||
prototype.evaluate = function(pulse) { | ||
var params = this.marshall(pulse.stamp), | ||
out = this.transform(params, pulse); | ||
params.clear(); | ||
return out; | ||
}; | ||
/** | ||
* Process incoming pulses. | ||
* Subclasses should override this method to implement transforms. | ||
* @param {Parameters} _ - The operator parameter values. | ||
* @param {Pulse} pulse - The current dataflow pulse. | ||
* @return {Pulse} The output pulse (or StopPropagation). A falsy return | ||
* value (including undefined) will let the input pulse pass through. | ||
*/ | ||
prototype.transform = function() {}; | ||
/** | ||
* Process incoming pulses. | ||
* Subclasses should override this method to implement transforms. | ||
* @param {Parameters} _ - The operator parameter values. | ||
* @param {Pulse} pulse - The current dataflow pulse. | ||
* @return {Pulse} The output pulse (or StopPropagation). A falsy return | ||
* value (including undefined) will let the input pulse pass through. | ||
*/ | ||
transform() {} | ||
}); |
@@ -1,3 +0,3 @@ | ||
var TUPLE_ID_KEY = Symbol('vega_id'), | ||
TUPLE_ID = 1; | ||
const TUPLE_ID_KEY = Symbol('vega_id'); | ||
let TUPLE_ID = 1; | ||
@@ -50,3 +50,3 @@ /** | ||
export function ingest(datum) { | ||
var t = (datum === Object(datum)) ? datum : {data: datum}; | ||
const t = (datum === Object(datum)) ? datum : {data: datum}; | ||
return tupleid(t) ? t : setid(t, TUPLE_ID++); | ||
@@ -71,3 +71,3 @@ } | ||
export function rederive(t, d) { | ||
for (var k in t) d[k] = t[k]; | ||
for (const k in t) d[k] = t[k]; | ||
return d; | ||
@@ -74,0 +74,0 @@ } |
export default function Heap(cmp) { | ||
var nodes = []; | ||
let nodes = []; | ||
return { | ||
@@ -12,3 +12,3 @@ clear: () => nodes = [], | ||
pop: () => { | ||
var last = nodes.pop(), item; | ||
let last = nodes.pop(), item; | ||
if (nodes.length) { | ||
@@ -27,3 +27,3 @@ item = nodes[0]; | ||
function siftdown(array, start, idx, cmp) { | ||
var item, parent, pidx; | ||
let item, parent, pidx; | ||
@@ -45,3 +45,3 @@ item = array[idx]; | ||
function siftup(array, idx, cmp) { | ||
var start = idx, | ||
let start = idx, | ||
end = array.length, | ||
@@ -48,0 +48,0 @@ item = array[idx], |
import {identity} from 'vega-util'; | ||
export default function UniqueList(idFunc) { | ||
var $ = idFunc || identity, | ||
list = [], | ||
ids = {}; | ||
const $ = idFunc || identity, | ||
list = [], | ||
ids = {}; | ||
list.add = function(_) { | ||
var id = $(_); | ||
list.add = _ => { | ||
const id = $(_); | ||
if (!ids[id]) { | ||
@@ -17,9 +17,8 @@ ids[id] = 1; | ||
list.remove = function(_) { | ||
var id = $(_), idx; | ||
list.remove = _ => { | ||
const id = $(_); | ||
if (ids[id]) { | ||
ids[id] = 0; | ||
if ((idx = list.indexOf(_)) >= 0) { | ||
list.splice(idx, 1); | ||
} | ||
const idx = list.indexOf(_); | ||
if (idx >= 0) list.splice(idx, 1); | ||
} | ||
@@ -26,0 +25,0 @@ return list; |
Sorry, the diff of this file is too big to display
4016
161943
Updatedvega-format@^1.0.2
Updatedvega-loader@^4.3.1
Updatedvega-util@^1.15.0