zen-observable
Advanced tools
Comparing version 0.7.1 to 0.8.0
@@ -1,1 +0,1 @@ | ||
module.exports = require("./zen-observable.js").Observable; | ||
module.exports = require("./zen-observable.js"); |
{ | ||
"name": "zen-observable", | ||
"version": "0.7.1", | ||
"version": "0.8.0", | ||
"repository": "zenparsing/zen-observable", | ||
@@ -10,2 +10,4 @@ "description": "An Implementation of ES Observables", | ||
"esdown": "^1.2.8", | ||
"eslint": "^4.16.0", | ||
"mocha": "^5.0.0", | ||
"moon-unit": "^0.2.1" | ||
@@ -15,6 +17,7 @@ }, | ||
"scripts": { | ||
"test": "esdown test", | ||
"test": "mocha", | ||
"lint": "eslint src/*", | ||
"build": "esdown - src/Observable.js zen-observable.js -g '*'", | ||
"prepublishOnly": "npm run build" | ||
"prepublishOnly": "eslint src/* && mocha && npm run build" | ||
} | ||
} |
119
README.md
# zen-observable | ||
An implementation of [ES Observables](https://github.com/zenparsing/es-observable). | ||
An implementation of Observables for Javascript. Requires Promises or a Promise polyfill. | ||
Requires ES6 Promises or a Promise polyfill. | ||
## Install | ||
@@ -15,3 +13,3 @@ | ||
- [zen-observable.js](https://raw.githubusercontent.com/zenparsing/zen-observable/master/zen-observable.js) | ||
https://unpkg.com/zen-observable/zen-observable.js | ||
@@ -23,3 +21,4 @@ ## Usage | ||
```js | ||
var Observable = require("zen-observable"); | ||
var Observable = require('zen-observable'); | ||
Observable.of(1, 2, 3).subscribe(x => console.log(x)); | ||
@@ -31,12 +30,13 @@ ``` | ||
```html | ||
<script src="zen-observable.js"></script> | ||
<script src='zen-observable.js'></script> | ||
<script> | ||
Observable.of(1, 2, 3).subscribe(x => console.log(x)); | ||
Observable.of(1, 2, 3).subscribe(x => console.log(x)); | ||
</script> | ||
``` | ||
ES Modules: | ||
Modules: | ||
```js | ||
import Observable from "zen-observable"; | ||
import Observable from 'zen-observable'; | ||
Observable.of(1, 2, 3).subscribe(x => console.log(x)); | ||
@@ -47,14 +47,14 @@ ``` | ||
### new Observable ( subscribe ) | ||
### new Observable(subscribe) | ||
```js | ||
let observable = new Observable(observer => { | ||
// Emit a single value after 1 second | ||
let timer = setTimeout(_=> { | ||
observer.next("hello"); | ||
observer.complete(); | ||
}, 1000); | ||
// Emit a single value after 1 second | ||
let timer = setTimeout(() => { | ||
observer.next('hello'); | ||
observer.complete(); | ||
}, 1000); | ||
// On unsubscription, cancel the timer | ||
return _=> clearTimeout(timer); | ||
// On unsubscription, cancel the timer | ||
return () => clearTimeout(timer); | ||
}); | ||
@@ -68,6 +68,7 @@ ``` | ||
- `complete()` Terminates the sequence successfully. | ||
- `closed` A boolean property whose value is `true` if the observer's subscription is closed. | ||
The subscriber function can optionally return either a cleanup function or a subscription object. If it returns a cleanup function, that function will be called when the subscription has closed. If it returns a subscription object, then the subscription's `unsubscribe` method will be invoked when the subscription has closed. | ||
### Observable.of ( ...items ) | ||
### Observable.of(...items) | ||
@@ -77,3 +78,3 @@ ```js | ||
Observable.of(1, 2, 3).subscribe(x => { | ||
console.log(x); | ||
console.log(x); | ||
}); | ||
@@ -84,3 +85,3 @@ ``` | ||
### Observable.from ( value ) | ||
### Observable.from(value) | ||
@@ -92,3 +93,3 @@ ```js | ||
Observable.from(list).subscribe(x => { | ||
console.log(x); | ||
console.log(x); | ||
}); | ||
@@ -98,5 +99,5 @@ ``` | ||
```js | ||
// Convert something "observable" to an Observable instance | ||
// Convert something 'observable' to an Observable instance | ||
Observable.from(otherObservable).subscribe(x => { | ||
console.log(x); | ||
console.log(x); | ||
}); | ||
@@ -107,18 +108,17 @@ ``` | ||
- If `value` is an implementation of ES Observables, then it is converted to an instance of Observable as defined by this library. | ||
- If `value` is an implementation of Observable, then it is converted to an instance of Observable as defined by this library. | ||
- Otherwise, it is converted to an Observable which synchronously iterates over `value`. | ||
### observable.subscribe ( observer ) | ||
### observable.subscribe([observer]) | ||
```js | ||
let subscription = observable.subscribe({ | ||
next(x) { console.log(x) }, | ||
error(err) { console.log(`Finished with error: ${ err }`) }, | ||
complete() { console.log("Finished") } | ||
}) | ||
next(x) { console.log(x) }, | ||
error(err) { console.log(`Finished with error: ${ err }`) }, | ||
complete() { console.log('Finished') } | ||
}); | ||
``` | ||
Subscribes to the observable. The `observer` argument must be an object. It may have any of the following methods: | ||
Subscribes to the observable. Observer objects may have any of the following methods: | ||
- `start(subscription)` Receives the subscription object during initialization. | ||
- `next(value)` Receives the next value of the sequence. | ||
@@ -128,22 +128,25 @@ - `error(exception)` Receives the terminating error of the sequence. | ||
The subscription object can be used to cancel the stream. | ||
Returns a subscription object that can be used to cancel the stream. | ||
### observable.subscribe(nextCallback[, errorCallback, completeCallback]) | ||
```js | ||
// Stop receiving data from the stream | ||
subscription.unsubscribe(); | ||
let subscription = observable.subscribe( | ||
x => console.log(x), | ||
err => console.log(`Finished with error: ${ err }`), | ||
() => console.log('Finished') | ||
); | ||
``` | ||
## Extended API | ||
Subscribes to the observable with callback functions. Returns a subscription object that can be used to cancel the stream. | ||
*The following methods are not yet defined by the ES Observable specification.* | ||
### observable.forEach(callback) | ||
### observable.forEach ( callback ) | ||
```js | ||
observable.forEach(x => { | ||
console.log(`Received value: ${ x }`); | ||
}).then(_=> { | ||
console.log("Finished successfully") | ||
console.log(`Received value: ${ x }`); | ||
}).then(() => { | ||
console.log('Finished successfully') | ||
}).catch(err => { | ||
console.log(`Finished with error: ${ err }`); | ||
console.log(`Finished with error: ${ err }`); | ||
}) | ||
@@ -154,9 +157,9 @@ ``` | ||
### observable.filter ( callback ) | ||
### observable.filter(callback) | ||
```js | ||
Observable.of(1, 2, 3).filter(value => { | ||
return value > 2; | ||
return value > 2; | ||
}).subscribe(value => { | ||
console.log(value); | ||
console.log(value); | ||
}); | ||
@@ -168,3 +171,3 @@ // 3 | ||
### observable.map ( callback ) | ||
### observable.map(callback) | ||
@@ -175,5 +178,5 @@ Returns a new Observable that emits the results of calling the `callback` argument for every value in the stream. | ||
Observable.of(1, 2, 3).map(value => { | ||
return value * 2; | ||
return value * 2; | ||
}).subscribe(value => { | ||
console.log(value); | ||
console.log(value); | ||
}); | ||
@@ -185,9 +188,9 @@ // 2 | ||
### observable.reduce ( callback [, initialValue] ) | ||
### observable.reduce(callback [,initialValue]) | ||
```js | ||
Observable.of(0, 1, 2, 3, 4).reduce((previousValue, currentValue) => { | ||
return previousValue + currentValue; | ||
return previousValue + currentValue; | ||
}).subscribe(result => { | ||
console.log(result); | ||
console.log(result); | ||
}); | ||
@@ -198,1 +201,15 @@ // 10 | ||
Returns a new Observable that applies a function against an accumulator and each value of the stream to reduce it to a single value. | ||
### observable.concat(...sources) | ||
```js | ||
Observable.of(1, 2, 3).concat( | ||
Observable.of(4, 5, 6), | ||
Observable.of(7, 8, 9) | ||
).subscribe(result => { | ||
console.log(result); | ||
}); | ||
// 1, 2, 3, 4, 5, 6, 7, 8, 9 | ||
``` | ||
Merges the current observable with additional observables. |
// === Symbol Support === | ||
function hasSymbol(name) { | ||
return typeof Symbol === "function" && Boolean(Symbol[name]); | ||
} | ||
const hasSymbols = () => typeof Symbol === 'function'; | ||
const hasSymbol = name => hasSymbols() && Boolean(Symbol[name]); | ||
const getSymbol = name => hasSymbol(name) ? Symbol[name] : '@@' + name; | ||
function getSymbol(name) { | ||
return hasSymbol(name) ? Symbol[name] : "@@" + name; | ||
if (hasSymbols() && !hasSymbol('observable')) { | ||
Symbol.observable = Symbol('observable'); | ||
} | ||
// Ponyfill Symbol.observable for interoperability with other libraries | ||
if (typeof Symbol === "function" && !Symbol.observable) { | ||
Symbol.observable = Symbol("observable"); | ||
} | ||
// === Abstract Operations === | ||
function hostReportError(e) { | ||
setTimeout(() => { throw e }); | ||
} | ||
function getMethod(obj, key) { | ||
@@ -28,4 +19,4 @@ let value = obj[key]; | ||
if (typeof value !== "function") | ||
throw new TypeError(value + " is not a function"); | ||
if (typeof value !== 'function') | ||
throw new TypeError(value + ' is not a function'); | ||
@@ -38,3 +29,3 @@ return value; | ||
if (ctor !== undefined) { | ||
ctor = ctor[getSymbol("species")]; | ||
ctor = ctor[getSymbol('species')]; | ||
if (ctor === null) { | ||
@@ -47,2 +38,6 @@ ctor = undefined; | ||
function isObservable(x) { | ||
return x instanceof Observable; // SPEC: Brand check | ||
} | ||
function addMethods(target, methods) { | ||
@@ -56,85 +51,89 @@ Object.keys(methods).forEach(k => { | ||
function hostReportError(e) { | ||
if (hostReportError.log) { | ||
hostReportError.log(e); | ||
} else { | ||
setTimeout(() => { throw e }); | ||
} | ||
} | ||
function enqueue(fn) { | ||
Promise.resolve().then(() => { | ||
try { fn() } | ||
catch (e) { hostReportError(e) } | ||
}); | ||
} | ||
function cleanupSubscription(subscription) { | ||
// Assert: observer._observer is undefined | ||
// ASSERT: subscription._observer is undefined | ||
let cleanup = subscription._cleanup; | ||
if (!cleanup) | ||
if (cleanup === undefined) | ||
return; | ||
// Drop the reference to the cleanup function so that we won't call it | ||
// more than once | ||
subscription._cleanup = undefined; | ||
// Call the cleanup function | ||
try { cleanup() } | ||
catch (e) { hostReportError(e) } | ||
if (!cleanup) { | ||
return; | ||
} | ||
if (typeof cleanup === 'function') { | ||
cleanup(); | ||
} else { | ||
let unsubscribe = getMethod(cleanup, 'unsubscribe'); | ||
if (unsubscribe) { | ||
unsubscribe.call(cleanup); | ||
} | ||
} | ||
} | ||
function subscriptionClosed(subscription) { | ||
return subscription._observer === undefined; | ||
return subscription._state === 'closed'; | ||
} | ||
function closeSubscription(subscription) { | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
subscription._observer = undefined; | ||
cleanupSubscription(subscription); | ||
subscription._state = 'closed'; | ||
} | ||
function cleanupFromSubscription(subscription) { | ||
return () => { subscription.unsubscribe() }; | ||
function validateSubscription(subscription) { | ||
// ASSERT: subscription._state !== 'closed' | ||
switch (subscription._state) { | ||
case 'ready': break; | ||
case 'initializing': throw new Error('Subscription is not initialized'); | ||
case 'running': throw new Error('Subscription observer is already running'); | ||
} | ||
} | ||
function Subscription(observer, subscriber) { | ||
// Assert: subscriber is callable | ||
// ASSERT: observer is an object | ||
// ASSERT: subscriber is callable | ||
// The observer must be an object | ||
if (Object(observer) !== observer) | ||
throw new TypeError("Observer must be an object"); | ||
this._cleanup = undefined; | ||
this._observer = observer; | ||
this._state = 'initializing'; | ||
try { | ||
let start = getMethod(observer, "start"); | ||
if (start) start.call(observer, this); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
let subscriptionObserver = new SubscriptionObserver(this); | ||
if (subscriptionClosed(this)) | ||
return; | ||
observer = new SubscriptionObserver(this); | ||
try { | ||
// Call the subscriber function | ||
let cleanup = subscriber.call(undefined, observer); | ||
// The return value must be undefined, null, a subscription object, or a function | ||
if (cleanup != null) { | ||
if (typeof cleanup.unsubscribe === "function") | ||
cleanup = cleanupFromSubscription(cleanup); | ||
else if (typeof cleanup !== "function") | ||
throw new TypeError(cleanup + " is not a function"); | ||
this._cleanup = cleanup; | ||
} | ||
this._cleanup = subscriber.call(undefined, subscriptionObserver); | ||
} catch (e) { | ||
// If an error occurs during startup, then attempt to send the error | ||
// to the observer | ||
observer.error(e); | ||
return; | ||
enqueue(() => subscriptionObserver.error(e)); | ||
} | ||
// If the stream is already finished, then perform cleanup | ||
if (subscriptionClosed(this)) | ||
cleanupSubscription(this); | ||
this._state = 'ready'; | ||
} | ||
addMethods(Subscription.prototype = {}, { | ||
get closed() { return subscriptionClosed(this) }, | ||
unsubscribe() { closeSubscription(this) }, | ||
get closed() { | ||
return subscriptionClosed(this); | ||
}, | ||
unsubscribe() { | ||
if (!subscriptionClosed(this)) { | ||
closeSubscription(this); | ||
try { cleanupSubscription(this) } | ||
catch (e) { hostReportError(e) } | ||
} | ||
}, | ||
}); | ||
@@ -148,16 +147,18 @@ | ||
get closed() { return subscriptionClosed(this._subscription) }, | ||
get closed() { | ||
return subscriptionClosed(this._subscription); | ||
}, | ||
next(value) { | ||
let subscription = this._subscription; | ||
// If the stream is closed, then return undefined | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
validateSubscription(subscription); | ||
let observer = subscription._observer; | ||
subscription._state = 'running'; | ||
try { | ||
// If the observer has a "next" method, send the next value | ||
let m = getMethod(observer, "next"); | ||
let m = getMethod(observer, 'next'); | ||
if (m) m.call(observer, value); | ||
@@ -167,2 +168,5 @@ } catch (e) { | ||
} | ||
if (!subscriptionClosed(subscription)) | ||
subscription._state = 'ready'; | ||
}, | ||
@@ -172,14 +176,13 @@ | ||
let subscription = this._subscription; | ||
// If the stream is closed, throw the error to the caller | ||
if (subscriptionClosed(subscription)) { | ||
hostReportError(value); | ||
return; | ||
} | ||
validateSubscription(subscription); | ||
let observer = subscription._observer; | ||
subscription._observer = undefined; | ||
closeSubscription(subscription); | ||
try { | ||
let m = getMethod(observer, "error"); | ||
let m = getMethod(observer, 'error'); | ||
if (m) m.call(observer, value); | ||
@@ -196,11 +199,12 @@ else throw value; | ||
let subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
validateSubscription(subscription); | ||
let observer = subscription._observer; | ||
subscription._observer = undefined; | ||
closeSubscription(subscription); | ||
try { | ||
let m = getMethod(observer, "complete"); | ||
let m = getMethod(observer, 'complete'); | ||
if (m) m.call(observer); | ||
@@ -216,10 +220,8 @@ } catch (e) { | ||
export function Observable(subscriber) { | ||
// Constructor cannot be called as a function | ||
function Observable(subscriber) { | ||
if (!(this instanceof Observable)) | ||
throw new TypeError("Observable cannot be called as a function"); | ||
throw new TypeError('Observable cannot be called as a function'); | ||
// The stream subscriber must be a function | ||
if (typeof subscriber !== "function") | ||
throw new TypeError("Observable initializer must be a function"); | ||
if (typeof subscriber !== 'function') | ||
throw new TypeError('Observable initializer must be a function'); | ||
@@ -231,13 +233,10 @@ this._subscriber = subscriber; | ||
subscribe(observer, ...args) { | ||
if (typeof observer === 'function') { | ||
subscribe(observer) { | ||
if (typeof observer !== 'object' || observer === null) { | ||
observer = { | ||
next: observer, | ||
error: args[0], | ||
complete: args[1], | ||
error: arguments[1], | ||
complete: arguments[2], | ||
}; | ||
} else if (typeof observer !== 'object' || observer === null) { | ||
observer = {}; | ||
} | ||
return new Subscription(observer, this._subscriber); | ||
@@ -248,29 +247,16 @@ }, | ||
return new Promise((resolve, reject) => { | ||
if (typeof fn !== "function") | ||
return Promise.reject(new TypeError(fn + " is not a function")); | ||
if (typeof fn !== 'function') { | ||
reject(new TypeError(fn + ' is not a function')); | ||
return; | ||
} | ||
this.subscribe({ | ||
_subscription: null, | ||
start(subscription) { | ||
if (Object(subscription) !== subscription) | ||
throw new TypeError(subscription + " is not an object"); | ||
this._subscription = subscription; | ||
}, | ||
let subscription = this.subscribe({ | ||
next(value) { | ||
let subscription = this._subscription; | ||
if (subscription.closed) | ||
return; | ||
try { | ||
fn(value); | ||
} catch (err) { | ||
reject(err); | ||
} catch (e) { | ||
reject(e); | ||
subscription.unsubscribe(); | ||
} | ||
}, | ||
error: reject, | ||
@@ -283,4 +269,4 @@ complete: resolve, | ||
map(fn) { | ||
if (typeof fn !== "function") | ||
throw new TypeError(fn + " is not a function"); | ||
if (typeof fn !== 'function') | ||
throw new TypeError(fn + ' is not a function'); | ||
@@ -291,11 +277,6 @@ let C = getSpecies(this); | ||
next(value) { | ||
if (observer.closed) | ||
return; | ||
try { value = fn(value) } | ||
catch (e) { return observer.error(e) } | ||
observer.next(value); | ||
}, | ||
error(e) { observer.error(e) }, | ||
@@ -307,4 +288,4 @@ complete() { observer.complete() }, | ||
filter(fn) { | ||
if (typeof fn !== "function") | ||
throw new TypeError(fn + " is not a function"); | ||
if (typeof fn !== 'function') | ||
throw new TypeError(fn + ' is not a function'); | ||
@@ -315,11 +296,6 @@ let C = getSpecies(this); | ||
next(value) { | ||
if (observer.closed) | ||
return; | ||
try { if (!fn(value)) return } | ||
try { if (!fn(value)) return; } | ||
catch (e) { return observer.error(e) } | ||
observer.next(value); | ||
}, | ||
error(e) { observer.error(e) }, | ||
@@ -331,4 +307,4 @@ complete() { observer.complete() }, | ||
reduce(fn) { | ||
if (typeof fn !== "function") | ||
throw new TypeError(fn + " is not a function"); | ||
if (typeof fn !== 'function') | ||
throw new TypeError(fn + ' is not a function'); | ||
@@ -344,5 +320,2 @@ let C = getSpecies(this); | ||
next(value) { | ||
if (observer.closed) | ||
return; | ||
let first = !hasValue; | ||
@@ -362,5 +335,4 @@ hasValue = true; | ||
complete() { | ||
if (!hasValue && !hasSeed) { | ||
return observer.error(new TypeError("Cannot reduce an empty sequence")); | ||
} | ||
if (!hasValue && !hasSeed) | ||
return observer.error(new TypeError('Cannot reduce an empty sequence')); | ||
@@ -374,5 +346,37 @@ observer.next(acc); | ||
concat(...sources) { | ||
let C = getSpecies(this); | ||
return new C(observer => { | ||
let subscription; | ||
function startNext(next) { | ||
subscription = next.subscribe({ | ||
next(v) { observer.next(v) }, | ||
error(e) { observer.error(e) }, | ||
complete() { | ||
if (sources.length === 0) { | ||
subscription = undefined; | ||
observer.complete(); | ||
} else { | ||
startNext(C.from(sources.shift())); | ||
} | ||
}, | ||
}); | ||
} | ||
startNext(this); | ||
return () => { | ||
if (subscription) { | ||
subscription = undefined; | ||
subscription.unsubscribe(); | ||
} | ||
}; | ||
}); | ||
}, | ||
}); | ||
Object.defineProperty(Observable.prototype, getSymbol("observable"), { | ||
Object.defineProperty(Observable.prototype, getSymbol('observable'), { | ||
value: function() { return this }, | ||
@@ -386,9 +390,8 @@ writable: true, | ||
from(x) { | ||
let C = typeof this === "function" ? this : Observable; | ||
let C = typeof this === 'function' ? this : Observable; | ||
if (x == null) | ||
throw new TypeError(x + " is not an object"); | ||
throw new TypeError(x + ' is not an object'); | ||
let method = getMethod(x, getSymbol("observable")); | ||
let method = getMethod(x, getSymbol('observable')); | ||
if (method) { | ||
@@ -398,5 +401,5 @@ let observable = method.call(x); | ||
if (Object(observable) !== observable) | ||
throw new TypeError(observable + " is not an object"); | ||
throw new TypeError(observable + ' is not an object'); | ||
if (observable.constructor === C) | ||
if (isObservable(observable) && observable.constructor === C) | ||
return observable; | ||
@@ -407,12 +410,16 @@ | ||
if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) { | ||
return new C(observer => { | ||
for (let item of method.call(x)) { | ||
observer.next(item); | ||
if (observer.closed) | ||
return; | ||
} | ||
observer.complete(); | ||
}); | ||
if (hasSymbol('iterator')) { | ||
method = getMethod(x, getSymbol('iterator')); | ||
if (method) { | ||
return new C(observer => { | ||
enqueue(() => { | ||
if (observer.closed) return; | ||
for (let item of method.call(x)) { | ||
observer.next(item); | ||
if (observer.closed) return; | ||
} | ||
observer.complete(); | ||
}); | ||
}); | ||
} | ||
} | ||
@@ -422,26 +429,28 @@ | ||
return new C(observer => { | ||
for (let i = 0; i < x.length; ++i) { | ||
observer.next(x[i]); | ||
if (observer.closed) | ||
return; | ||
} | ||
observer.complete(); | ||
enqueue(() => { | ||
if (observer.closed) return; | ||
for (let i = 0; i < x.length; ++i) { | ||
observer.next(x[i]); | ||
if (observer.closed) return; | ||
} | ||
observer.complete(); | ||
}); | ||
}); | ||
} | ||
throw new TypeError(x + " is not observable"); | ||
throw new TypeError(x + ' is not observable'); | ||
}, | ||
of(...items) { | ||
let C = typeof this === "function" ? this : Observable; | ||
let C = typeof this === 'function' ? this : Observable; | ||
return new C(observer => { | ||
for (let i = 0; i < items.length; ++i) { | ||
observer.next(items[i]); | ||
if (observer.closed) | ||
return; | ||
} | ||
observer.complete(); | ||
enqueue(() => { | ||
if (observer.closed) return; | ||
for (let i = 0; i < items.length; ++i) { | ||
observer.next(items[i]); | ||
if (observer.closed) return; | ||
} | ||
observer.complete(); | ||
}); | ||
}); | ||
@@ -452,3 +461,3 @@ }, | ||
Object.defineProperty(Observable, getSymbol("species"), { | ||
Object.defineProperty(Observable, getSymbol('species'), { | ||
get() { return this }, | ||
@@ -458,7 +467,9 @@ configurable: true, | ||
Object.defineProperty(Observable, "extensions", { | ||
value: { | ||
observableSymbol: getSymbol("observable"), | ||
setHostReportError(fn) { hostReportError = fn }, | ||
}, | ||
}); | ||
if (hasSymbols()) { | ||
Object.defineProperty(Observable, Symbol('extensions'), { | ||
value: { symbol: getSymbol('observable'), hostReportError }, | ||
configurable: true, | ||
}); | ||
} | ||
module.exports = Observable; |
@@ -1,13 +0,15 @@ | ||
export default { | ||
const Observable = require('../src/Observable'); | ||
const assert = require('assert'); | ||
"Basics" (test, { Observable }) { | ||
describe('filter', () => { | ||
it('filters the results using the supplied callback', async () => { | ||
let list = []; | ||
return Observable.from([1, 2, 3, 4]) | ||
await Observable | ||
.from([1, 2, 3, 4]) | ||
.filter(x => x > 2) | ||
.forEach(x => list.push(x)) | ||
.then(() => test.equals(list, [3, 4])); | ||
}, | ||
.forEach(x => list.push(x)); | ||
}; | ||
assert.deepEqual(list, [3, 4]); | ||
}); | ||
}); |
@@ -1,12 +0,14 @@ | ||
export default { | ||
const Observable = require('../src/Observable'); | ||
const assert = require('assert'); | ||
"Basics" (test, { Observable }) { | ||
describe('map', () => { | ||
it('maps the results using the supplied callback', async () => { | ||
let list = []; | ||
return Observable.from([1, 2, 3]) | ||
await Observable.from([1, 2, 3]) | ||
.map(x => x * 2) | ||
.forEach(x => list.push(x)) | ||
.then(() => test.equals(list, [2, 4, 6])); | ||
}, | ||
.forEach(x => list.push(x)); | ||
}; | ||
assert.deepEqual(list, [2, 4, 6]); | ||
}); | ||
}); |
@@ -1,44 +0,39 @@ | ||
export default { | ||
const Observable = require('../src/Observable'); | ||
const assert = require('assert'); | ||
"No seed" (test, { Observable }) { | ||
return Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => { | ||
describe('reduce', () => { | ||
it('reduces without a seed', async () => { | ||
await Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => { | ||
return a + b; | ||
}).forEach(x => { | ||
test.equals(x, 21); | ||
assert.equal(x, 21); | ||
}); | ||
}, | ||
}); | ||
"No seed - one value" (test, { Observable }) { | ||
return Observable.from([1]).reduce((a, b) => { | ||
return a + b; | ||
}).forEach(x => { | ||
test.equals(x, 1); | ||
}); | ||
}, | ||
it('errors if empty and no seed', async () => { | ||
try { | ||
await Observable.from([]).reduce((a, b) => { | ||
return a + b; | ||
}).forEach(() => null); | ||
assert.ok(false); | ||
} catch (err) { | ||
assert.ok(true); | ||
} | ||
}); | ||
"No seed - empty (throws)" (test, { Observable }) { | ||
return Observable.from([]).reduce((a, b) => { | ||
it('reduces with a seed', async () => { | ||
Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => { | ||
return a + b; | ||
}).forEach(() => null) | ||
.then( | ||
() => test.assert(false), | ||
() => test.assert(true)); | ||
}, | ||
"Seed" (test, { Observable }) { | ||
return Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => { | ||
return a + b; | ||
}, 100).forEach(x => { | ||
test.equals(x, 121); | ||
assert.equal(x, 121); | ||
}); | ||
}, | ||
}); | ||
"Seed - empty" (test, { Observable }) { | ||
return Observable.from([]).reduce((a, b) => { | ||
it('reduces an empty list with a seed', async () => { | ||
await Observable.from([]).reduce((a, b) => { | ||
return a + b; | ||
}, 100).forEach(x => { | ||
test.equals(x, 100); | ||
assert.equal(x, 100); | ||
}); | ||
}, | ||
}; | ||
}); | ||
}); |
@@ -1,28 +0,29 @@ | ||
export default { | ||
const Observable = require('../src/Observable'); | ||
const assert = require('assert'); | ||
"uses Observable when constructor is undefined" (test, { Observable }) { | ||
describe('species', () => { | ||
it('uses Observable when constructor is undefined', () => { | ||
let instance = new Observable(() => {}); | ||
instance.constructor = undefined; | ||
test.equals(instance.map(x => x) instanceof Observable, true); | ||
}, | ||
assert.ok(instance.map(x => x) instanceof Observable); | ||
}); | ||
"uses Observable if species is null" (test, { Observable }) { | ||
it('uses Observable if species is null', () => { | ||
let instance = new Observable(() => {}); | ||
instance.constructor = { [Symbol.species]: null }; | ||
test.equals(instance.map(x => x) instanceof Observable, true); | ||
}, | ||
assert.ok(instance.map(x => x) instanceof Observable); | ||
}); | ||
"uses Observable if species is undefined" (test, { Observable }) { | ||
it('uses Observable if species is undefined', () => { | ||
let instance = new Observable(() => {}); | ||
instance.constructor = { [Symbol.species]: undefined }; | ||
test.equals(instance.map(x => x) instanceof Observable, true); | ||
}, | ||
assert.ok(instance.map(x => x) instanceof Observable); | ||
}); | ||
"uses value of Symbol.species" (test, { Observable }) { | ||
it('uses value of Symbol.species', () => { | ||
function ctor() {} | ||
let instance = new Observable(() => {}); | ||
instance.constructor = { [Symbol.species]: ctor }; | ||
test.equals(instance.map(x => x) instanceof ctor, true); | ||
}, | ||
}; | ||
assert.ok(instance.map(x => x) instanceof ctor); | ||
}); | ||
}); |
'use strict'; (function(fn, name) { if (typeof exports !== "undefined") { fn(exports, module); } else if (typeof self !== "undefined") { var e = name === "*" ? self : (name ? self[name] = {} : {}); fn(e, { exports: e }); } })(function(exports, module) { // === Symbol Support === | ||
function hasSymbol(name) { | ||
return typeof Symbol === "function" && Boolean(Symbol[name]); | ||
} | ||
var hasSymbols = function() { return typeof Symbol === 'function'; }; | ||
var hasSymbol = function(name) { return hasSymbols() && Boolean(Symbol[name]); }; | ||
var getSymbol = function(name) { return hasSymbol(name) ? Symbol[name] : '@@' + name; }; | ||
function getSymbol(name) { | ||
return hasSymbol(name) ? Symbol[name] : "@@" + name; | ||
if (hasSymbols() && !hasSymbol('observable')) { | ||
Symbol.observable = Symbol('observable'); | ||
} | ||
// Ponyfill Symbol.observable for interoperability with other libraries | ||
if (typeof Symbol === "function" && !Symbol.observable) { | ||
Symbol.observable = Symbol("observable"); | ||
} | ||
// === Abstract Operations === | ||
function hostReportError(e) { | ||
setTimeout(function() { throw e }); | ||
} | ||
function getMethod(obj, key) { | ||
@@ -28,4 +19,4 @@ var value = obj[key]; | ||
if (typeof value !== "function") | ||
throw new TypeError(value + " is not a function"); | ||
if (typeof value !== 'function') | ||
throw new TypeError(value + ' is not a function'); | ||
@@ -38,3 +29,3 @@ return value; | ||
if (ctor !== undefined) { | ||
ctor = ctor[getSymbol("species")]; | ||
ctor = ctor[getSymbol('species')]; | ||
if (ctor === null) { | ||
@@ -47,2 +38,6 @@ ctor = undefined; | ||
function isObservable(x) { | ||
return x instanceof Observable; // SPEC: Brand check | ||
} | ||
function addMethods(target, methods) { | ||
@@ -56,85 +51,89 @@ Object.keys(methods).forEach(function(k) { | ||
function hostReportError(e) { | ||
if (hostReportError.log) { | ||
hostReportError.log(e); | ||
} else { | ||
setTimeout(function() { throw e }); | ||
} | ||
} | ||
function enqueue(fn) { | ||
Promise.resolve().then(function() { | ||
try { fn() } | ||
catch (e) { hostReportError(e) } | ||
}); | ||
} | ||
function cleanupSubscription(subscription) { | ||
// Assert: observer._observer is undefined | ||
// ASSERT: subscription._observer is undefined | ||
var cleanup = subscription._cleanup; | ||
if (!cleanup) | ||
if (cleanup === undefined) | ||
return; | ||
// Drop the reference to the cleanup function so that we won't call it | ||
// more than once | ||
subscription._cleanup = undefined; | ||
// Call the cleanup function | ||
try { cleanup() } | ||
catch (e) { hostReportError(e) } | ||
if (!cleanup) { | ||
return; | ||
} | ||
if (typeof cleanup === 'function') { | ||
cleanup(); | ||
} else { | ||
var unsubscribe$0 = getMethod(cleanup, 'unsubscribe'); | ||
if (unsubscribe$0) { | ||
unsubscribe$0.call(cleanup); | ||
} | ||
} | ||
} | ||
function subscriptionClosed(subscription) { | ||
return subscription._observer === undefined; | ||
return subscription._state === 'closed'; | ||
} | ||
function closeSubscription(subscription) { | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
subscription._observer = undefined; | ||
cleanupSubscription(subscription); | ||
subscription._state = 'closed'; | ||
} | ||
function cleanupFromSubscription(subscription) { | ||
return function() { subscription.unsubscribe() }; | ||
function validateSubscription(subscription) { | ||
// ASSERT: subscription._state !== 'closed' | ||
switch (subscription._state) { | ||
case 'ready': break; | ||
case 'initializing': throw new Error('Subscription is not initialized'); | ||
case 'running': throw new Error('Subscription observer is already running'); | ||
} | ||
} | ||
function Subscription(observer, subscriber) { | ||
// Assert: subscriber is callable | ||
// ASSERT: observer is an object | ||
// ASSERT: subscriber is callable | ||
// The observer must be an object | ||
if (Object(observer) !== observer) | ||
throw new TypeError("Observer must be an object"); | ||
this._cleanup = undefined; | ||
this._observer = observer; | ||
this._state = 'initializing'; | ||
try { | ||
var start$0 = getMethod(observer, "start"); | ||
if (start$0) start$0.call(observer, this); | ||
} catch (e) { | ||
hostReportError(e); | ||
} | ||
var subscriptionObserver = new SubscriptionObserver(this); | ||
if (subscriptionClosed(this)) | ||
return; | ||
observer = new SubscriptionObserver(this); | ||
try { | ||
// Call the subscriber function | ||
var cleanup$0 = subscriber.call(undefined, observer); | ||
// The return value must be undefined, null, a subscription object, or a function | ||
if (cleanup$0 != null) { | ||
if (typeof cleanup$0.unsubscribe === "function") | ||
cleanup$0 = cleanupFromSubscription(cleanup$0); | ||
else if (typeof cleanup$0 !== "function") | ||
throw new TypeError(cleanup$0 + " is not a function"); | ||
this._cleanup = cleanup$0; | ||
} | ||
this._cleanup = subscriber.call(undefined, subscriptionObserver); | ||
} catch (e) { | ||
// If an error occurs during startup, then attempt to send the error | ||
// to the observer | ||
observer.error(e); | ||
return; | ||
enqueue(function() { return subscriptionObserver.error(e); }); | ||
} | ||
// If the stream is already finished, then perform cleanup | ||
if (subscriptionClosed(this)) | ||
cleanupSubscription(this); | ||
this._state = 'ready'; | ||
} | ||
addMethods(Subscription.prototype = {}, { | ||
get closed() { return subscriptionClosed(this) }, | ||
unsubscribe: function() { closeSubscription(this) }, | ||
get closed() { | ||
return subscriptionClosed(this); | ||
}, | ||
unsubscribe: function() { | ||
if (!subscriptionClosed(this)) { | ||
closeSubscription(this); | ||
try { cleanupSubscription(this) } | ||
catch (e) { hostReportError(e) } | ||
} | ||
}, | ||
}); | ||
@@ -148,16 +147,18 @@ | ||
get closed() { return subscriptionClosed(this._subscription) }, | ||
get closed() { | ||
return subscriptionClosed(this._subscription); | ||
}, | ||
next: function(value) { | ||
var subscription = this._subscription; | ||
// If the stream is closed, then return undefined | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
validateSubscription(subscription); | ||
var observer = subscription._observer; | ||
subscription._state = 'running'; | ||
try { | ||
// If the observer has a "next" method, send the next value | ||
var m$0 = getMethod(observer, "next"); | ||
var m$0 = getMethod(observer, 'next'); | ||
if (m$0) m$0.call(observer, value); | ||
@@ -167,2 +168,5 @@ } catch (e) { | ||
} | ||
if (!subscriptionClosed(subscription)) | ||
subscription._state = 'ready'; | ||
}, | ||
@@ -172,14 +176,13 @@ | ||
var subscription = this._subscription; | ||
// If the stream is closed, throw the error to the caller | ||
if (subscriptionClosed(subscription)) { | ||
hostReportError(value); | ||
return; | ||
} | ||
validateSubscription(subscription); | ||
var observer = subscription._observer; | ||
subscription._observer = undefined; | ||
closeSubscription(subscription); | ||
try { | ||
var m$1 = getMethod(observer, "error"); | ||
var m$1 = getMethod(observer, 'error'); | ||
if (m$1) m$1.call(observer, value); | ||
@@ -196,11 +199,12 @@ else throw value; | ||
var subscription = this._subscription; | ||
if (subscriptionClosed(subscription)) | ||
return; | ||
validateSubscription(subscription); | ||
var observer = subscription._observer; | ||
subscription._observer = undefined; | ||
closeSubscription(subscription); | ||
try { | ||
var m$2 = getMethod(observer, "complete"); | ||
var m$2 = getMethod(observer, 'complete'); | ||
if (m$2) m$2.call(observer); | ||
@@ -217,9 +221,7 @@ } catch (e) { | ||
function Observable(subscriber) { | ||
// Constructor cannot be called as a function | ||
if (!(this instanceof Observable)) | ||
throw new TypeError("Observable cannot be called as a function"); | ||
throw new TypeError('Observable cannot be called as a function'); | ||
// The stream subscriber must be a function | ||
if (typeof subscriber !== "function") | ||
throw new TypeError("Observable initializer must be a function"); | ||
if (typeof subscriber !== 'function') | ||
throw new TypeError('Observable initializer must be a function'); | ||
@@ -231,13 +233,10 @@ this._subscriber = subscriber; | ||
subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]); | ||
if (typeof observer === 'function') { | ||
subscribe: function(observer) { | ||
if (typeof observer !== 'object' || observer === null) { | ||
observer = { | ||
next: observer, | ||
error: args[0], | ||
complete: args[1], | ||
error: arguments[1], | ||
complete: arguments[2], | ||
}; | ||
} else if (typeof observer !== 'object' || observer === null) { | ||
observer = {}; | ||
} | ||
return new Subscription(observer, this._subscriber); | ||
@@ -248,29 +247,16 @@ }, | ||
return new Promise(function(resolve, reject) { | ||
if (typeof fn !== "function") | ||
return Promise.reject(new TypeError(fn + " is not a function")); | ||
if (typeof fn !== 'function') { | ||
reject(new TypeError(fn + ' is not a function')); | ||
return; | ||
} | ||
__this.subscribe({ | ||
_subscription: null, | ||
start: function(subscription) { | ||
if (Object(subscription) !== subscription) | ||
throw new TypeError(subscription + " is not an object"); | ||
this._subscription = subscription; | ||
}, | ||
var subscription = __this.subscribe({ | ||
next: function(value) { | ||
var subscription = this._subscription; | ||
if (subscription.closed) | ||
return; | ||
try { | ||
fn(value); | ||
} catch (err) { | ||
reject(err); | ||
} catch (e) { | ||
reject(e); | ||
subscription.unsubscribe(); | ||
} | ||
}, | ||
error: reject, | ||
@@ -283,4 +269,4 @@ complete: resolve, | ||
map: function(fn) { var __this = this; | ||
if (typeof fn !== "function") | ||
throw new TypeError(fn + " is not a function"); | ||
if (typeof fn !== 'function') | ||
throw new TypeError(fn + ' is not a function'); | ||
@@ -291,11 +277,6 @@ var C = getSpecies(this); | ||
next: function(value) { | ||
if (observer.closed) | ||
return; | ||
try { value = fn(value) } | ||
catch (e) { return observer.error(e) } | ||
observer.next(value); | ||
}, | ||
error: function(e) { observer.error(e) }, | ||
@@ -307,4 +288,4 @@ complete: function() { observer.complete() }, | ||
filter: function(fn) { var __this = this; | ||
if (typeof fn !== "function") | ||
throw new TypeError(fn + " is not a function"); | ||
if (typeof fn !== 'function') | ||
throw new TypeError(fn + ' is not a function'); | ||
@@ -315,11 +296,6 @@ var C = getSpecies(this); | ||
next: function(value) { | ||
if (observer.closed) | ||
return; | ||
try { if (!fn(value)) return } | ||
try { if (!fn(value)) return; } | ||
catch (e) { return observer.error(e) } | ||
observer.next(value); | ||
}, | ||
error: function(e) { observer.error(e) }, | ||
@@ -331,4 +307,4 @@ complete: function() { observer.complete() }, | ||
reduce: function(fn) { var __this = this; | ||
if (typeof fn !== "function") | ||
throw new TypeError(fn + " is not a function"); | ||
if (typeof fn !== 'function') | ||
throw new TypeError(fn + ' is not a function'); | ||
@@ -344,5 +320,2 @@ var C = getSpecies(this); | ||
next: function(value) { | ||
if (observer.closed) | ||
return; | ||
var first = !hasValue; | ||
@@ -362,5 +335,4 @@ hasValue = true; | ||
complete: function() { | ||
if (!hasValue && !hasSeed) { | ||
return observer.error(new TypeError("Cannot reduce an empty sequence")); | ||
} | ||
if (!hasValue && !hasSeed) | ||
return observer.error(new TypeError('Cannot reduce an empty sequence')); | ||
@@ -374,5 +346,37 @@ observer.next(acc); | ||
concat: function() { var __this = this; for (var sources = [], __$0 = 0; __$0 < arguments.length; ++__$0) sources.push(arguments[__$0]); | ||
var C = getSpecies(this); | ||
return new C(function(observer) { | ||
var subscription; | ||
function startNext(next) { | ||
subscription = next.subscribe({ | ||
next: function(v) { observer.next(v) }, | ||
error: function(e) { observer.error(e) }, | ||
complete: function() { | ||
if (sources.length === 0) { | ||
subscription = undefined; | ||
observer.complete(); | ||
} else { | ||
startNext(C.from(sources.shift())); | ||
} | ||
}, | ||
}); | ||
} | ||
startNext(__this); | ||
return function() { | ||
if (subscription) { | ||
subscription = undefined; | ||
subscription.unsubscribe(); | ||
} | ||
}; | ||
}); | ||
}, | ||
}); | ||
Object.defineProperty(Observable.prototype, getSymbol("observable"), { | ||
Object.defineProperty(Observable.prototype, getSymbol('observable'), { | ||
value: function() { return this }, | ||
@@ -386,9 +390,8 @@ writable: true, | ||
from: function(x) { | ||
var C = typeof this === "function" ? this : Observable; | ||
var C = typeof this === 'function' ? this : Observable; | ||
if (x == null) | ||
throw new TypeError(x + " is not an object"); | ||
throw new TypeError(x + ' is not an object'); | ||
var method = getMethod(x, getSymbol("observable")); | ||
var method = getMethod(x, getSymbol('observable')); | ||
if (method) { | ||
@@ -398,5 +401,5 @@ var observable$0 = method.call(x); | ||
if (Object(observable$0) !== observable$0) | ||
throw new TypeError(observable$0 + " is not an object"); | ||
throw new TypeError(observable$0 + ' is not an object'); | ||
if (observable$0.constructor === C) | ||
if (isObservable(observable$0) && observable$0.constructor === C) | ||
return observable$0; | ||
@@ -407,12 +410,16 @@ | ||
if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) { | ||
return new C(function(observer) { | ||
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value; | ||
observer.next(item$0); | ||
if (observer.closed) | ||
return; | ||
} | ||
observer.complete(); | ||
}); | ||
if (hasSymbol('iterator')) { | ||
method = getMethod(x, getSymbol('iterator')); | ||
if (method) { | ||
return new C(function(observer) { | ||
enqueue(function() { | ||
if (observer.closed) return; | ||
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value; | ||
observer.next(item$0); | ||
if (observer.closed) return; | ||
} | ||
observer.complete(); | ||
}); | ||
}); | ||
} | ||
} | ||
@@ -422,26 +429,28 @@ | ||
return new C(function(observer) { | ||
for (var i$0 = 0; i$0 < x.length; ++i$0) { | ||
observer.next(x[i$0]); | ||
if (observer.closed) | ||
return; | ||
} | ||
observer.complete(); | ||
enqueue(function() { | ||
if (observer.closed) return; | ||
for (var i$0 = 0; i$0 < x.length; ++i$0) { | ||
observer.next(x[i$0]); | ||
if (observer.closed) return; | ||
} | ||
observer.complete(); | ||
}); | ||
}); | ||
} | ||
throw new TypeError(x + " is not observable"); | ||
throw new TypeError(x + ' is not observable'); | ||
}, | ||
of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]); | ||
var C = typeof this === "function" ? this : Observable; | ||
var C = typeof this === 'function' ? this : Observable; | ||
return new C(function(observer) { | ||
for (var i$1 = 0; i$1 < items.length; ++i$1) { | ||
observer.next(items[i$1]); | ||
if (observer.closed) | ||
return; | ||
} | ||
observer.complete(); | ||
enqueue(function() { | ||
if (observer.closed) return; | ||
for (var i$1 = 0; i$1 < items.length; ++i$1) { | ||
observer.next(items[i$1]); | ||
if (observer.closed) return; | ||
} | ||
observer.complete(); | ||
}); | ||
}); | ||
@@ -452,3 +461,3 @@ }, | ||
Object.defineProperty(Observable, getSymbol("species"), { | ||
Object.defineProperty(Observable, getSymbol('species'), { | ||
get: function() { return this }, | ||
@@ -458,12 +467,12 @@ configurable: true, | ||
Object.defineProperty(Observable, "extensions", { | ||
value: { | ||
observableSymbol: getSymbol("observable"), | ||
setHostReportError: function(fn) { hostReportError = fn }, | ||
}, | ||
}); | ||
if (hasSymbols()) { | ||
Object.defineProperty(Observable, Symbol('extensions'), { | ||
value: { symbol: getSymbol('observable'), hostReportError: hostReportError }, | ||
configurable: true, | ||
}); | ||
} | ||
exports.Observable = Observable; | ||
module.exports = Observable; | ||
}, "*"); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
26
1683
200
58740
4
1