zen-observable
Advanced tools
Comparing version 0.8.5 to 0.8.6
@@ -110,2 +110,3 @@ 'use strict'; | ||
notifySubscription(subscription, queue[i].type, queue[i].value); | ||
if (subscription._state === 'closed') break; | ||
} | ||
@@ -115,15 +116,4 @@ } | ||
function notifySubscription(subscription, type, value) { | ||
if (subscription._state === 'closed') return; | ||
subscription._state = 'running'; | ||
if (subscription._state !== 'ready') { | ||
if (!subscription._queue) { | ||
enqueue(function () { | ||
return flushSubscription(subscription); | ||
}); | ||
subscription._queue = []; | ||
} | ||
subscription._queue.push({ type: type, value: value }); | ||
return; | ||
} | ||
var observer = subscription._observer; | ||
@@ -150,5 +140,25 @@ | ||
if (subscription._state === 'closed') cleanupSubscription(subscription); | ||
if (subscription._state === 'closed') cleanupSubscription(subscription);else if (subscription._state === 'running') subscription._state = 'ready'; | ||
} | ||
function onNotify(subscription, type, value) { | ||
if (subscription._state === 'closed') return; | ||
if (subscription._state === 'buffering') { | ||
subscription._queue.push({ type: type, value: value }); | ||
return; | ||
} | ||
if (subscription._state !== 'ready') { | ||
subscription._state = 'buffering'; | ||
subscription._queue = [{ type: type, value: value }]; | ||
enqueue(function () { | ||
return flushSubscription(subscription); | ||
}); | ||
return; | ||
} | ||
notifySubscription(subscription, type, value); | ||
} | ||
var Subscription = function () { | ||
@@ -174,3 +184,3 @@ function Subscription(observer, subscriber) { | ||
if (!this._queue) this._state = 'ready'; | ||
if (this._state === 'initializing') this._state = 'ready'; | ||
} | ||
@@ -206,3 +216,3 @@ | ||
value: function next(value) { | ||
notifySubscription(this._subscription, 'next', value); | ||
onNotify(this._subscription, 'next', value); | ||
} | ||
@@ -212,3 +222,3 @@ }, { | ||
value: function error(value) { | ||
notifySubscription(this._subscription, 'error', value); | ||
onNotify(this._subscription, 'error', value); | ||
} | ||
@@ -218,3 +228,3 @@ }, { | ||
value: function complete() { | ||
notifySubscription(this._subscription, 'complete'); | ||
onNotify(this._subscription, 'complete'); | ||
} | ||
@@ -221,0 +231,0 @@ }, { |
{ | ||
"name": "zen-observable", | ||
"version": "0.8.5", | ||
"version": "0.8.6", | ||
"repository": "zenparsing/zen-observable", | ||
@@ -5,0 +5,0 @@ "description": "An Implementation of ES Observables", |
@@ -92,2 +92,4 @@ // === Symbol Support === | ||
notifySubscription(subscription, queue[i].type, queue[i].value); | ||
if (subscription._state === 'closed') | ||
break; | ||
} | ||
@@ -97,14 +99,4 @@ } | ||
function notifySubscription(subscription, type, value) { | ||
if (subscription._state === 'closed') | ||
return; | ||
subscription._state = 'running'; | ||
if (subscription._state !== 'ready') { | ||
if (!subscription._queue) { | ||
enqueue(() => flushSubscription(subscription)); | ||
subscription._queue = []; | ||
} | ||
subscription._queue.push({ type, value }); | ||
return; | ||
} | ||
let observer = subscription._observer; | ||
@@ -134,4 +126,26 @@ | ||
cleanupSubscription(subscription); | ||
else if (subscription._state === 'running') | ||
subscription._state = 'ready'; | ||
} | ||
function onNotify(subscription, type, value) { | ||
if (subscription._state === 'closed') | ||
return; | ||
if (subscription._state === 'buffering') { | ||
subscription._queue.push({ type, value }); | ||
return; | ||
} | ||
if (subscription._state !== 'ready') { | ||
subscription._state = 'buffering'; | ||
subscription._queue = [{ type, value }]; | ||
enqueue(() => flushSubscription(subscription)); | ||
return; | ||
} | ||
notifySubscription(subscription, type, value); | ||
} | ||
class Subscription { | ||
@@ -156,3 +170,3 @@ | ||
if (!this._queue) | ||
if (this._state === 'initializing') | ||
this._state = 'ready'; | ||
@@ -176,5 +190,5 @@ } | ||
get closed() { return this._subscription._state === 'closed' } | ||
next(value) { notifySubscription(this._subscription, 'next', value) } | ||
error(value) { notifySubscription(this._subscription, 'error', value) } | ||
complete() { notifySubscription(this._subscription, 'complete') } | ||
next(value) { onNotify(this._subscription, 'next', value) } | ||
error(value) { onNotify(this._subscription, 'error', value) } | ||
complete() { onNotify(this._subscription, 'complete') } | ||
} | ||
@@ -181,0 +195,0 @@ |
@@ -62,3 +62,3 @@ import assert from 'assert'; | ||
it('does not queue if the observer is running', () => { | ||
it('queues if the observer is running', async () => { | ||
let observer; | ||
@@ -71,2 +71,4 @@ let completed = false | ||
observer.next(); | ||
assert.equal(completed, false); | ||
await null; | ||
assert.equal(completed, true); | ||
@@ -73,0 +75,0 @@ }); |
@@ -59,3 +59,3 @@ import assert from 'assert'; | ||
it('does not queue if the observer is running', () => { | ||
it('queues if the observer is running', async () => { | ||
let observer; | ||
@@ -68,2 +68,4 @@ let error; | ||
observer.next(); | ||
assert.ok(!error); | ||
await null; | ||
assert.ok(error); | ||
@@ -70,0 +72,0 @@ }); |
@@ -65,5 +65,9 @@ import assert from 'assert'; | ||
new Observable(x => { observer = x, x.next(1) }).subscribe({ | ||
next(val) { values.push(val) }, | ||
next(val) { | ||
values.push(val); | ||
if (val === 1) { | ||
observer.next(3); | ||
} | ||
}, | ||
}); | ||
assert.deepEqual(values, []); | ||
observer.next(2); | ||
@@ -73,5 +77,7 @@ assert.deepEqual(values, []); | ||
assert.deepEqual(values, [1, 2]); | ||
await null; | ||
assert.deepEqual(values, [1, 2, 3]); | ||
}); | ||
it('does not queue if the observer is running', () => { | ||
it('queues if the observer is running', async () => { | ||
let observer; | ||
@@ -86,2 +92,4 @@ let values = []; | ||
observer.next(1); | ||
assert.deepEqual(values, [1]); | ||
await null; | ||
assert.deepEqual(values, [1, 2]); | ||
@@ -88,0 +96,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
66931
1985