zen-observable
Advanced tools
Comparing version 0.8.3 to 0.8.5
@@ -75,4 +75,2 @@ 'use strict'; | ||
function cleanupSubscription(subscription) { | ||
// ASSERT: subscription._observer is undefined | ||
var cleanup = subscription._cleanup; | ||
@@ -87,33 +85,69 @@ if (cleanup === undefined) return; | ||
if (typeof cleanup === 'function') { | ||
cleanup(); | ||
} else { | ||
var unsubscribe = getMethod(cleanup, 'unsubscribe'); | ||
if (unsubscribe) { | ||
unsubscribe.call(cleanup); | ||
try { | ||
if (typeof cleanup === 'function') { | ||
cleanup(); | ||
} else { | ||
var unsubscribe = getMethod(cleanup, 'unsubscribe'); | ||
if (unsubscribe) { | ||
unsubscribe.call(cleanup); | ||
} | ||
} | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
} | ||
function subscriptionClosed(subscription) { | ||
return subscription._state === 'closed'; | ||
} | ||
function closeSubscription(subscription) { | ||
subscription._observer = undefined; | ||
subscription._queue = undefined; | ||
subscription._state = 'closed'; | ||
} | ||
function validateSubscription(subscription) { | ||
// ASSERT: subscription._state !== 'closed' | ||
switch (subscription._state) { | ||
case 'ready': | ||
break; | ||
case 'initializing': | ||
throw new Error('Subscription is not initialized'); | ||
case 'running': | ||
throw new Error('Subscription observer is already running'); | ||
function flushSubscription(subscription) { | ||
var queue = subscription._queue; | ||
subscription._queue = undefined; | ||
subscription._state = 'ready'; | ||
for (var i = 0; i < queue.length; ++i) { | ||
notifySubscription(subscription, queue[i].type, queue[i].value); | ||
} | ||
} | ||
function notifySubscription(subscription, type, value) { | ||
if (subscription._state === 'closed') return; | ||
if (subscription._state !== 'ready') { | ||
if (!subscription._queue) { | ||
enqueue(function () { | ||
return flushSubscription(subscription); | ||
}); | ||
subscription._queue = []; | ||
} | ||
subscription._queue.push({ type: type, value: value }); | ||
return; | ||
} | ||
var observer = subscription._observer; | ||
try { | ||
var m = getMethod(observer, type); | ||
switch (type) { | ||
case 'next': | ||
if (m) m.call(observer, value); | ||
break; | ||
case 'error': | ||
closeSubscription(subscription); | ||
if (m) m.call(observer, value);else throw value; | ||
break; | ||
case 'complete': | ||
closeSubscription(subscription); | ||
if (m) m.call(observer); | ||
break; | ||
} | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
if (subscription._state === 'closed') cleanupSubscription(subscription); | ||
} | ||
var Subscription = function () { | ||
@@ -128,2 +162,3 @@ function Subscription(observer, subscriber) { | ||
this._observer = observer; | ||
this._queue = undefined; | ||
this._state = 'initializing'; | ||
@@ -136,8 +171,6 @@ | ||
} catch (e) { | ||
enqueue(function () { | ||
return subscriptionObserver.error(e); | ||
}); | ||
subscriptionObserver.error(e); | ||
} | ||
this._state = 'ready'; | ||
if (!this._queue) this._state = 'ready'; | ||
} | ||
@@ -148,9 +181,5 @@ | ||
value: function unsubscribe() { | ||
if (!subscriptionClosed(this)) { | ||
if (this._state !== 'closed') { | ||
closeSubscription(this); | ||
try { | ||
cleanupSubscription(this); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
cleanupSubscription(this); | ||
} | ||
@@ -161,3 +190,3 @@ } | ||
get: function () { | ||
return subscriptionClosed(this); | ||
return this._state === 'closed'; | ||
} | ||
@@ -179,18 +208,3 @@ }]); | ||
value: function next(value) { | ||
var subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) return; | ||
validateSubscription(subscription); | ||
var observer = subscription._observer; | ||
subscription._state = 'running'; | ||
try { | ||
var m = getMethod(observer, 'next'); | ||
if (m) m.call(observer, value); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
if (!subscriptionClosed(subscription)) subscription._state = 'ready'; | ||
notifySubscription(this._subscription, 'next', value); | ||
} | ||
@@ -200,20 +214,3 @@ }, { | ||
value: function error(value) { | ||
var subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) { | ||
return; | ||
} | ||
validateSubscription(subscription); | ||
var observer = subscription._observer; | ||
closeSubscription(subscription); | ||
try { | ||
var m = getMethod(observer, 'error'); | ||
if (m) m.call(observer, value);else throw value; | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
cleanupSubscription(subscription); | ||
notifySubscription(this._subscription, 'error', value); | ||
} | ||
@@ -223,18 +220,3 @@ }, { | ||
value: function complete() { | ||
var subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) return; | ||
validateSubscription(subscription); | ||
var observer = subscription._observer; | ||
closeSubscription(subscription); | ||
try { | ||
var m = getMethod(observer, 'complete'); | ||
if (m) m.call(observer); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
cleanupSubscription(subscription); | ||
notifySubscription(this._subscription, 'complete'); | ||
} | ||
@@ -244,3 +226,3 @@ }, { | ||
get: function () { | ||
return subscriptionClosed(this._subscription); | ||
return this._subscription._state === 'closed'; | ||
} | ||
@@ -286,6 +268,11 @@ }]); | ||
function done() { | ||
subscription.unsubscribe(); | ||
resolve(); | ||
} | ||
var subscription = _this.subscribe({ | ||
next: function (value) { | ||
try { | ||
fn(value); | ||
fn(value, done); | ||
} catch (e) { | ||
@@ -292,0 +279,0 @@ reject(e); |
{ | ||
"name": "zen-observable", | ||
"version": "0.8.3", | ||
"version": "0.8.5", | ||
"repository": "zenparsing/zen-observable", | ||
@@ -5,0 +5,0 @@ "description": "An Implementation of ES Observables", |
@@ -56,4 +56,2 @@ // === Symbol Support === | ||
function cleanupSubscription(subscription) { | ||
// ASSERT: subscription._observer is undefined | ||
let cleanup = subscription._cleanup; | ||
@@ -69,30 +67,70 @@ if (cleanup === undefined) | ||
if (typeof cleanup === 'function') { | ||
cleanup(); | ||
} else { | ||
let unsubscribe = getMethod(cleanup, 'unsubscribe'); | ||
if (unsubscribe) { | ||
unsubscribe.call(cleanup); | ||
try { | ||
if (typeof cleanup === 'function') { | ||
cleanup(); | ||
} else { | ||
let unsubscribe = getMethod(cleanup, 'unsubscribe'); | ||
if (unsubscribe) { | ||
unsubscribe.call(cleanup); | ||
} | ||
} | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
} | ||
function subscriptionClosed(subscription) { | ||
return subscription._state === 'closed'; | ||
} | ||
function closeSubscription(subscription) { | ||
subscription._observer = undefined; | ||
subscription._queue = undefined; | ||
subscription._state = 'closed'; | ||
} | ||
function validateSubscription(subscription) { | ||
// ASSERT: subscription._state !== 'closed' | ||
switch (subscription._state) { | ||
case 'ready': break; | ||
case 'initializing': throw new Error('Subscription is not initialized'); | ||
case 'running': throw new Error('Subscription observer is already running'); | ||
function flushSubscription(subscription) { | ||
let queue = subscription._queue; | ||
subscription._queue = undefined; | ||
subscription._state = 'ready'; | ||
for (let i = 0; i < queue.length; ++i) { | ||
notifySubscription(subscription, queue[i].type, queue[i].value); | ||
} | ||
} | ||
function notifySubscription(subscription, type, value) { | ||
if (subscription._state === 'closed') | ||
return; | ||
if (subscription._state !== 'ready') { | ||
if (!subscription._queue) { | ||
enqueue(() => flushSubscription(subscription)); | ||
subscription._queue = []; | ||
} | ||
subscription._queue.push({ type, value }); | ||
return; | ||
} | ||
let observer = subscription._observer; | ||
try { | ||
let m = getMethod(observer, type); | ||
switch (type) { | ||
case 'next': | ||
if (m) m.call(observer, value); | ||
break; | ||
case 'error': | ||
closeSubscription(subscription); | ||
if (m) m.call(observer, value); | ||
else throw value; | ||
break; | ||
case 'complete': | ||
closeSubscription(subscription); | ||
if (m) m.call(observer); | ||
break; | ||
} | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
if (subscription._state === 'closed') | ||
cleanupSubscription(subscription); | ||
} | ||
class Subscription { | ||
@@ -106,2 +144,3 @@ | ||
this._observer = observer; | ||
this._queue = undefined; | ||
this._state = 'initializing'; | ||
@@ -114,17 +153,17 @@ | ||
} catch (e) { | ||
enqueue(() => subscriptionObserver.error(e)); | ||
subscriptionObserver.error(e); | ||
} | ||
this._state = 'ready'; | ||
if (!this._queue) | ||
this._state = 'ready'; | ||
} | ||
get closed() { | ||
return subscriptionClosed(this); | ||
return this._state === 'closed'; | ||
} | ||
unsubscribe() { | ||
if (!subscriptionClosed(this)) { | ||
if (this._state !== 'closed') { | ||
closeSubscription(this); | ||
try { cleanupSubscription(this) } | ||
catch (e) { hostReportError(e) } | ||
cleanupSubscription(this); | ||
} | ||
@@ -135,74 +174,7 @@ } | ||
class SubscriptionObserver { | ||
constructor(subscription) { | ||
this._subscription = subscription; | ||
} | ||
get closed() { | ||
return subscriptionClosed(this._subscription); | ||
} | ||
next(value) { | ||
let subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
validateSubscription(subscription); | ||
let observer = subscription._observer; | ||
subscription._state = 'running'; | ||
try { | ||
let m = getMethod(observer, 'next'); | ||
if (m) m.call(observer, value); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
if (!subscriptionClosed(subscription)) | ||
subscription._state = 'ready'; | ||
} | ||
error(value) { | ||
let subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) { | ||
return; | ||
} | ||
validateSubscription(subscription); | ||
let observer = subscription._observer; | ||
closeSubscription(subscription); | ||
try { | ||
let m = getMethod(observer, 'error'); | ||
if (m) m.call(observer, value); | ||
else throw value; | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
cleanupSubscription(subscription); | ||
} | ||
complete() { | ||
let subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
validateSubscription(subscription); | ||
let observer = subscription._observer; | ||
closeSubscription(subscription); | ||
try { | ||
let m = getMethod(observer, 'complete'); | ||
if (m) m.call(observer); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
cleanupSubscription(subscription); | ||
} | ||
constructor(subscription) { this._subscription = subscription } | ||
get closed() { return this._subscription._state === 'closed' } | ||
next(value) { notifySubscription(this._subscription, 'next', value) } | ||
error(value) { notifySubscription(this._subscription, 'error', value) } | ||
complete() { notifySubscription(this._subscription, 'complete') } | ||
} | ||
@@ -240,6 +212,11 @@ | ||
function done() { | ||
subscription.unsubscribe(); | ||
resolve(); | ||
} | ||
let subscription = this.subscribe({ | ||
next(value) { | ||
try { | ||
fn(value); | ||
fn(value, done); | ||
} catch (e) { | ||
@@ -246,0 +223,0 @@ reject(e); |
export function parse(string) { | ||
return new Observable(observer => { | ||
(async () => { | ||
return new Observable(async observer => { | ||
await null; | ||
for (let char of string) { | ||
if (observer.closed) return; | ||
else if (char !== '-') observer.next(char); | ||
await null; | ||
for (let char of string) { | ||
if (observer.closed) return; | ||
else if (char !== '-') observer.next(char); | ||
await null; | ||
} | ||
observer.complete(); | ||
})(); | ||
} | ||
observer.complete(); | ||
}); | ||
} |
@@ -58,2 +58,14 @@ import assert from 'assert'; | ||
it('provides a cancellation function as the second argument', async () => { | ||
let observer; | ||
let results = []; | ||
await Observable.of(1, 2, 3).forEach((value, cancel) => { | ||
results.push(value); | ||
if (value > 1) { | ||
return cancel(); | ||
} | ||
}); | ||
assert.deepEqual(results, [1, 2]); | ||
}); | ||
}); |
@@ -52,21 +52,21 @@ import assert from 'assert'; | ||
it('throws if the subscription is not initialized', async () => { | ||
new Observable(x => { | ||
try { | ||
x.complete(); | ||
assert.ok(false); | ||
} catch (e) { | ||
assert.ok(true); | ||
} | ||
}).subscribe(); | ||
it('queues if the subscription is not initialized', async () => { | ||
let completed = false; | ||
new Observable(x => { x.complete() }).subscribe({ | ||
complete() { completed = true }, | ||
}); | ||
assert.equal(completed, false); | ||
await null; | ||
assert.equal(completed, true); | ||
}); | ||
it('throws if the observer is running', () => { | ||
it('does not queue if the observer is running', () => { | ||
let observer; | ||
let completed = false | ||
new Observable(x => { observer = x }).subscribe({ | ||
next() { | ||
assert.throws(() => observer.complete()); | ||
}, | ||
next() { observer.complete() }, | ||
complete() { completed = true }, | ||
}); | ||
observer.next(); | ||
assert.equal(completed, true); | ||
}); | ||
@@ -132,3 +132,3 @@ | ||
it('throws error if the cleanup function throws', () => { | ||
it('reports error if the cleanup function throws', () => { | ||
let error = {}; | ||
@@ -140,8 +140,5 @@ let observer; | ||
}).subscribe(); | ||
try { | ||
observer.complete(); | ||
} catch (err) { | ||
assert.equal(err, error); | ||
} | ||
observer.complete(); | ||
assert.equal(hostError, error); | ||
}); | ||
}); |
@@ -46,27 +46,24 @@ import assert from 'assert'; | ||
observer.error(1); | ||
assert.ok(!hostError); | ||
}); | ||
it('throws if the subscription is not initialized', async () => { | ||
it('queues if the subscription is not initialized', async () => { | ||
let error; | ||
new Observable(x => { x.error() }).subscribe({ | ||
new Observable(x => { x.error({}) }).subscribe({ | ||
error(err) { error = err }, | ||
}); | ||
assert.equal(error, undefined); | ||
await null; | ||
assert.ok(error instanceof Error); | ||
assert.ok(error); | ||
}); | ||
it('throws if the observer is running', () => { | ||
it('does not queue if the observer is running', () => { | ||
let observer; | ||
let error; | ||
new Observable(x => { observer = x }).subscribe({ | ||
next() { | ||
try { | ||
observer.error(); | ||
assert.ok(false); | ||
} catch (e) { | ||
assert.ok(true); | ||
} | ||
}, | ||
error() {}, | ||
next() { observer.error({}) }, | ||
error(e) { error = e }, | ||
}); | ||
observer.next(); | ||
assert.ok(error); | ||
}); | ||
@@ -136,3 +133,3 @@ | ||
it('throws if the cleanup function throws', () => { | ||
it('reports error if the cleanup function throws', () => { | ||
let error = {}; | ||
@@ -143,13 +140,7 @@ let observer; | ||
return () => { throw error }; | ||
}).subscribe({ | ||
error() {}, | ||
}); | ||
try { | ||
observer.error(1); | ||
assert.ok(false); | ||
} catch (err) { | ||
assert.equal(err, error); | ||
} | ||
}).subscribe(); | ||
observer.error(1); | ||
assert.equal(hostError, error); | ||
}); | ||
}); |
@@ -61,25 +61,26 @@ import assert from 'assert'; | ||
it('throws if the subscription is not initialized', async () => { | ||
let error; | ||
new Observable(x => { x.next() }).subscribe({ | ||
error(err) { error = err }, | ||
it('queues if the subscription is not initialized', async () => { | ||
let values = []; | ||
let observer; | ||
new Observable(x => { observer = x, x.next(1) }).subscribe({ | ||
next(val) { values.push(val) }, | ||
}); | ||
assert.deepEqual(values, []); | ||
observer.next(2); | ||
assert.deepEqual(values, []); | ||
await null; | ||
assert.ok(error instanceof Error); | ||
assert.deepEqual(values, [1, 2]); | ||
}); | ||
it('throws if the observer is running', () => { | ||
it('does not queue if the observer is running', () => { | ||
let observer; | ||
let values = []; | ||
new Observable(x => { observer = x }).subscribe({ | ||
next() { | ||
try { | ||
observer.next(); | ||
assert.ok(false); | ||
} catch (e) { | ||
assert.ok(true); | ||
} | ||
next(val) { | ||
values.push(val); | ||
if (val === 1) observer.next(2); | ||
}, | ||
error() {}, | ||
}); | ||
observer.next(); | ||
observer.next(1); | ||
assert.deepEqual(values, [1, 2]); | ||
}); | ||
@@ -86,0 +87,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
66031
1956