Socket
Socket
Sign inDemoInstall

zen-observable

Package Overview
Dependencies
Maintainers
1
Versions
37
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zen-observable - npm Package Compare versions

Comparing version 0.7.1 to 0.8.0

.eslintrc.js

2

index.js

@@ -1,1 +0,1 @@

module.exports = require("./zen-observable.js").Observable;
module.exports = require("./zen-observable.js");
{
"name": "zen-observable",
"version": "0.7.1",
"version": "0.8.0",
"repository": "zenparsing/zen-observable",

@@ -10,2 +10,4 @@ "description": "An Implementation of ES Observables",

"esdown": "^1.2.8",
"eslint": "^4.16.0",
"mocha": "^5.0.0",
"moon-unit": "^0.2.1"

@@ -15,6 +17,7 @@ },

"scripts": {
"test": "esdown test",
"test": "mocha",
"lint": "eslint src/*",
"build": "esdown - src/Observable.js zen-observable.js -g '*'",
"prepublishOnly": "npm run build"
"prepublishOnly": "eslint src/* && mocha && npm run build"
}
}
# zen-observable
An implementation of [ES Observables](https://github.com/zenparsing/es-observable).
An implementation of Observables for Javascript. Requires Promises or a Promise polyfill.
Requires ES6 Promises or a Promise polyfill.
## Install

@@ -15,3 +13,3 @@

- [zen-observable.js](https://raw.githubusercontent.com/zenparsing/zen-observable/master/zen-observable.js)
https://unpkg.com/zen-observable/zen-observable.js

@@ -23,3 +21,4 @@ ## Usage

```js
var Observable = require("zen-observable");
var Observable = require('zen-observable');
Observable.of(1, 2, 3).subscribe(x => console.log(x));

@@ -31,12 +30,13 @@ ```

```html
<script src="zen-observable.js"></script>
<script src='zen-observable.js'></script>
<script>
Observable.of(1, 2, 3).subscribe(x => console.log(x));
Observable.of(1, 2, 3).subscribe(x => console.log(x));
</script>
```
ES Modules:
Modules:
```js
import Observable from "zen-observable";
import Observable from 'zen-observable';
Observable.of(1, 2, 3).subscribe(x => console.log(x));

@@ -47,14 +47,14 @@ ```

### new Observable ( subscribe )
### new Observable(subscribe)
```js
let observable = new Observable(observer => {
// Emit a single value after 1 second
let timer = setTimeout(_=> {
observer.next("hello");
observer.complete();
}, 1000);
// Emit a single value after 1 second
let timer = setTimeout(() => {
observer.next('hello');
observer.complete();
}, 1000);
// On unsubscription, cancel the timer
return _=> clearTimeout(timer);
// On unsubscription, cancel the timer
return () => clearTimeout(timer);
});

@@ -68,6 +68,7 @@ ```

- `complete()` Terminates the sequence successfully.
- `closed` A boolean property whose value is `true` if the observer's subscription is closed.
The subscriber function can optionally return either a cleanup function or a subscription object. If it returns a cleanup function, that function will be called when the subscription has closed. If it returns a subscription object, then the subscription's `unsubscribe` method will be invoked when the subscription has closed.
### Observable.of ( ...items )
### Observable.of(...items)

@@ -77,3 +78,3 @@ ```js

Observable.of(1, 2, 3).subscribe(x => {
console.log(x);
console.log(x);
});

@@ -84,3 +85,3 @@ ```

### Observable.from ( value )
### Observable.from(value)

@@ -92,3 +93,3 @@ ```js

Observable.from(list).subscribe(x => {
console.log(x);
console.log(x);
});

@@ -98,5 +99,5 @@ ```

```js
// Convert something "observable" to an Observable instance
// Convert something 'observable' to an Observable instance
Observable.from(otherObservable).subscribe(x => {
console.log(x);
console.log(x);
});

@@ -107,18 +108,17 @@ ```

- If `value` is an implementation of ES Observables, then it is converted to an instance of Observable as defined by this library.
- If `value` is an implementation of Observable, then it is converted to an instance of Observable as defined by this library.
- Otherwise, it is converted to an Observable which synchronously iterates over `value`.
### observable.subscribe ( observer )
### observable.subscribe([observer])
```js
let subscription = observable.subscribe({
next(x) { console.log(x) },
error(err) { console.log(`Finished with error: ${ err }`) },
complete() { console.log("Finished") }
})
next(x) { console.log(x) },
error(err) { console.log(`Finished with error: ${ err }`) },
complete() { console.log('Finished') }
});
```
Subscribes to the observable. The `observer` argument must be an object. It may have any of the following methods:
Subscribes to the observable. Observer objects may have any of the following methods:
- `start(subscription)` Receives the subscription object during initialization.
- `next(value)` Receives the next value of the sequence.

@@ -128,22 +128,25 @@ - `error(exception)` Receives the terminating error of the sequence.

The subscription object can be used to cancel the stream.
Returns a subscription object that can be used to cancel the stream.
### observable.subscribe(nextCallback[, errorCallback, completeCallback])
```js
// Stop receiving data from the stream
subscription.unsubscribe();
let subscription = observable.subscribe(
x => console.log(x),
err => console.log(`Finished with error: ${ err }`),
() => console.log('Finished')
);
```
## Extended API
Subscribes to the observable with callback functions. Returns a subscription object that can be used to cancel the stream.
*The following methods are not yet defined by the ES Observable specification.*
### observable.forEach(callback)
### observable.forEach ( callback )
```js
observable.forEach(x => {
console.log(`Received value: ${ x }`);
}).then(_=> {
console.log("Finished successfully")
console.log(`Received value: ${ x }`);
}).then(() => {
console.log('Finished successfully')
}).catch(err => {
console.log(`Finished with error: ${ err }`);
console.log(`Finished with error: ${ err }`);
})

@@ -154,9 +157,9 @@ ```

### observable.filter ( callback )
### observable.filter(callback)
```js
Observable.of(1, 2, 3).filter(value => {
return value > 2;
return value > 2;
}).subscribe(value => {
console.log(value);
console.log(value);
});

@@ -168,3 +171,3 @@ // 3

### observable.map ( callback )
### observable.map(callback)

@@ -175,5 +178,5 @@ Returns a new Observable that emits the results of calling the `callback` argument for every value in the stream.

Observable.of(1, 2, 3).map(value => {
return value * 2;
return value * 2;
}).subscribe(value => {
console.log(value);
console.log(value);
});

@@ -185,9 +188,9 @@ // 2

### observable.reduce ( callback [, initialValue] )
### observable.reduce(callback [,initialValue])
```js
Observable.of(0, 1, 2, 3, 4).reduce((previousValue, currentValue) => {
return previousValue + currentValue;
return previousValue + currentValue;
}).subscribe(result => {
console.log(result);
console.log(result);
});

@@ -198,1 +201,15 @@ // 10

Returns a new Observable that applies a function against an accumulator and each value of the stream to reduce it to a single value.
### observable.concat(...sources)
```js
Observable.of(1, 2, 3).concat(
Observable.of(4, 5, 6),
Observable.of(7, 8, 9)
).subscribe(result => {
console.log(result);
});
// 1, 2, 3, 4, 5, 6, 7, 8, 9
```
Merges the current observable with additional observables.
// === Symbol Support ===
function hasSymbol(name) {
return typeof Symbol === "function" && Boolean(Symbol[name]);
}
const hasSymbols = () => typeof Symbol === 'function';
const hasSymbol = name => hasSymbols() && Boolean(Symbol[name]);
const getSymbol = name => hasSymbol(name) ? Symbol[name] : '@@' + name;
function getSymbol(name) {
return hasSymbol(name) ? Symbol[name] : "@@" + name;
if (hasSymbols() && !hasSymbol('observable')) {
Symbol.observable = Symbol('observable');
}
// Ponyfill Symbol.observable for interoperability with other libraries
if (typeof Symbol === "function" && !Symbol.observable) {
Symbol.observable = Symbol("observable");
}
// === Abstract Operations ===
function hostReportError(e) {
setTimeout(() => { throw e });
}
function getMethod(obj, key) {

@@ -28,4 +19,4 @@ let value = obj[key];

if (typeof value !== "function")
throw new TypeError(value + " is not a function");
if (typeof value !== 'function')
throw new TypeError(value + ' is not a function');

@@ -38,3 +29,3 @@ return value;

if (ctor !== undefined) {
ctor = ctor[getSymbol("species")];
ctor = ctor[getSymbol('species')];
if (ctor === null) {

@@ -47,2 +38,6 @@ ctor = undefined;

function isObservable(x) {
return x instanceof Observable; // SPEC: Brand check
}
function addMethods(target, methods) {

@@ -56,85 +51,89 @@ Object.keys(methods).forEach(k => {

function hostReportError(e) {
if (hostReportError.log) {
hostReportError.log(e);
} else {
setTimeout(() => { throw e });
}
}
function enqueue(fn) {
Promise.resolve().then(() => {
try { fn() }
catch (e) { hostReportError(e) }
});
}
function cleanupSubscription(subscription) {
// Assert: observer._observer is undefined
// ASSERT: subscription._observer is undefined
let cleanup = subscription._cleanup;
if (!cleanup)
if (cleanup === undefined)
return;
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
try { cleanup() }
catch (e) { hostReportError(e) }
if (!cleanup) {
return;
}
if (typeof cleanup === 'function') {
cleanup();
} else {
let unsubscribe = getMethod(cleanup, 'unsubscribe');
if (unsubscribe) {
unsubscribe.call(cleanup);
}
}
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
return subscription._state === 'closed';
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription))
return;
subscription._observer = undefined;
cleanupSubscription(subscription);
subscription._state = 'closed';
}
function cleanupFromSubscription(subscription) {
return () => { subscription.unsubscribe() };
function validateSubscription(subscription) {
// ASSERT: subscription._state !== 'closed'
switch (subscription._state) {
case 'ready': break;
case 'initializing': throw new Error('Subscription is not initialized');
case 'running': throw new Error('Subscription observer is already running');
}
}
function Subscription(observer, subscriber) {
// Assert: subscriber is callable
// ASSERT: observer is an object
// ASSERT: subscriber is callable
// The observer must be an object
if (Object(observer) !== observer)
throw new TypeError("Observer must be an object");
this._cleanup = undefined;
this._observer = observer;
this._state = 'initializing';
try {
let start = getMethod(observer, "start");
if (start) start.call(observer, this);
} catch (e) {
hostReportError(e);
}
let subscriptionObserver = new SubscriptionObserver(this);
if (subscriptionClosed(this))
return;
observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
let cleanup = subscriber.call(undefined, observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup != null) {
if (typeof cleanup.unsubscribe === "function")
cleanup = cleanupFromSubscription(cleanup);
else if (typeof cleanup !== "function")
throw new TypeError(cleanup + " is not a function");
this._cleanup = cleanup;
}
this._cleanup = subscriber.call(undefined, subscriptionObserver);
} catch (e) {
// If an error occurs during startup, then attempt to send the error
// to the observer
observer.error(e);
return;
enqueue(() => subscriptionObserver.error(e));
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
this._state = 'ready';
}
addMethods(Subscription.prototype = {}, {
get closed() { return subscriptionClosed(this) },
unsubscribe() { closeSubscription(this) },
get closed() {
return subscriptionClosed(this);
},
unsubscribe() {
if (!subscriptionClosed(this)) {
closeSubscription(this);
try { cleanupSubscription(this) }
catch (e) { hostReportError(e) }
}
},
});

@@ -148,16 +147,18 @@

get closed() { return subscriptionClosed(this._subscription) },
get closed() {
return subscriptionClosed(this._subscription);
},
next(value) {
let subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return;
validateSubscription(subscription);
let observer = subscription._observer;
subscription._state = 'running';
try {
// If the observer has a "next" method, send the next value
let m = getMethod(observer, "next");
let m = getMethod(observer, 'next');
if (m) m.call(observer, value);

@@ -167,2 +168,5 @@ } catch (e) {

}
if (!subscriptionClosed(subscription))
subscription._state = 'ready';
},

@@ -172,14 +176,13 @@

let subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription)) {
hostReportError(value);
return;
}
validateSubscription(subscription);
let observer = subscription._observer;
subscription._observer = undefined;
closeSubscription(subscription);
try {
let m = getMethod(observer, "error");
let m = getMethod(observer, 'error');
if (m) m.call(observer, value);

@@ -196,11 +199,12 @@ else throw value;

let subscription = this._subscription;
if (subscriptionClosed(subscription))
return;
validateSubscription(subscription);
let observer = subscription._observer;
subscription._observer = undefined;
closeSubscription(subscription);
try {
let m = getMethod(observer, "complete");
let m = getMethod(observer, 'complete');
if (m) m.call(observer);

@@ -216,10 +220,8 @@ } catch (e) {

export function Observable(subscriber) {
// Constructor cannot be called as a function
function Observable(subscriber) {
if (!(this instanceof Observable))
throw new TypeError("Observable cannot be called as a function");
throw new TypeError('Observable cannot be called as a function');
// The stream subscriber must be a function
if (typeof subscriber !== "function")
throw new TypeError("Observable initializer must be a function");
if (typeof subscriber !== 'function')
throw new TypeError('Observable initializer must be a function');

@@ -231,13 +233,10 @@ this._subscriber = subscriber;

subscribe(observer, ...args) {
if (typeof observer === 'function') {
subscribe(observer) {
if (typeof observer !== 'object' || observer === null) {
observer = {
next: observer,
error: args[0],
complete: args[1],
error: arguments[1],
complete: arguments[2],
};
} else if (typeof observer !== 'object' || observer === null) {
observer = {};
}
return new Subscription(observer, this._subscriber);

@@ -248,29 +247,16 @@ },

return new Promise((resolve, reject) => {
if (typeof fn !== "function")
return Promise.reject(new TypeError(fn + " is not a function"));
if (typeof fn !== 'function') {
reject(new TypeError(fn + ' is not a function'));
return;
}
this.subscribe({
_subscription: null,
start(subscription) {
if (Object(subscription) !== subscription)
throw new TypeError(subscription + " is not an object");
this._subscription = subscription;
},
let subscription = this.subscribe({
next(value) {
let subscription = this._subscription;
if (subscription.closed)
return;
try {
fn(value);
} catch (err) {
reject(err);
} catch (e) {
reject(e);
subscription.unsubscribe();
}
},
error: reject,

@@ -283,4 +269,4 @@ complete: resolve,

map(fn) {
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');

@@ -291,11 +277,6 @@ let C = getSpecies(this);

next(value) {
if (observer.closed)
return;
try { value = fn(value) }
catch (e) { return observer.error(e) }
observer.next(value);
},
error(e) { observer.error(e) },

@@ -307,4 +288,4 @@ complete() { observer.complete() },

filter(fn) {
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');

@@ -315,11 +296,6 @@ let C = getSpecies(this);

next(value) {
if (observer.closed)
return;
try { if (!fn(value)) return }
try { if (!fn(value)) return; }
catch (e) { return observer.error(e) }
observer.next(value);
},
error(e) { observer.error(e) },

@@ -331,4 +307,4 @@ complete() { observer.complete() },

reduce(fn) {
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');

@@ -344,5 +320,2 @@ let C = getSpecies(this);

next(value) {
if (observer.closed)
return;
let first = !hasValue;

@@ -362,5 +335,4 @@ hasValue = true;

complete() {
if (!hasValue && !hasSeed) {
return observer.error(new TypeError("Cannot reduce an empty sequence"));
}
if (!hasValue && !hasSeed)
return observer.error(new TypeError('Cannot reduce an empty sequence'));

@@ -374,5 +346,37 @@ observer.next(acc);

concat(...sources) {
let C = getSpecies(this);
return new C(observer => {
let subscription;
function startNext(next) {
subscription = next.subscribe({
next(v) { observer.next(v) },
error(e) { observer.error(e) },
complete() {
if (sources.length === 0) {
subscription = undefined;
observer.complete();
} else {
startNext(C.from(sources.shift()));
}
},
});
}
startNext(this);
return () => {
if (subscription) {
subscription = undefined;
subscription.unsubscribe();
}
};
});
},
});
Object.defineProperty(Observable.prototype, getSymbol("observable"), {
Object.defineProperty(Observable.prototype, getSymbol('observable'), {
value: function() { return this },

@@ -386,9 +390,8 @@ writable: true,

from(x) {
let C = typeof this === "function" ? this : Observable;
let C = typeof this === 'function' ? this : Observable;
if (x == null)
throw new TypeError(x + " is not an object");
throw new TypeError(x + ' is not an object');
let method = getMethod(x, getSymbol("observable"));
let method = getMethod(x, getSymbol('observable'));
if (method) {

@@ -398,5 +401,5 @@ let observable = method.call(x);

if (Object(observable) !== observable)
throw new TypeError(observable + " is not an object");
throw new TypeError(observable + ' is not an object');
if (observable.constructor === C)
if (isObservable(observable) && observable.constructor === C)
return observable;

@@ -407,12 +410,16 @@

if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) {
return new C(observer => {
for (let item of method.call(x)) {
observer.next(item);
if (observer.closed)
return;
}
observer.complete();
});
if (hasSymbol('iterator')) {
method = getMethod(x, getSymbol('iterator'));
if (method) {
return new C(observer => {
enqueue(() => {
if (observer.closed) return;
for (let item of method.call(x)) {
observer.next(item);
if (observer.closed) return;
}
observer.complete();
});
});
}
}

@@ -422,26 +429,28 @@

return new C(observer => {
for (let i = 0; i < x.length; ++i) {
observer.next(x[i]);
if (observer.closed)
return;
}
observer.complete();
enqueue(() => {
if (observer.closed) return;
for (let i = 0; i < x.length; ++i) {
observer.next(x[i]);
if (observer.closed) return;
}
observer.complete();
});
});
}
throw new TypeError(x + " is not observable");
throw new TypeError(x + ' is not observable');
},
of(...items) {
let C = typeof this === "function" ? this : Observable;
let C = typeof this === 'function' ? this : Observable;
return new C(observer => {
for (let i = 0; i < items.length; ++i) {
observer.next(items[i]);
if (observer.closed)
return;
}
observer.complete();
enqueue(() => {
if (observer.closed) return;
for (let i = 0; i < items.length; ++i) {
observer.next(items[i]);
if (observer.closed) return;
}
observer.complete();
});
});

@@ -452,3 +461,3 @@ },

Object.defineProperty(Observable, getSymbol("species"), {
Object.defineProperty(Observable, getSymbol('species'), {
get() { return this },

@@ -458,7 +467,9 @@ configurable: true,

Object.defineProperty(Observable, "extensions", {
value: {
observableSymbol: getSymbol("observable"),
setHostReportError(fn) { hostReportError = fn },
},
});
if (hasSymbols()) {
Object.defineProperty(Observable, Symbol('extensions'), {
value: { symbol: getSymbol('observable'), hostReportError },
configurable: true,
});
}
module.exports = Observable;

@@ -1,13 +0,15 @@

export default {
const Observable = require('../src/Observable');
const assert = require('assert');
"Basics" (test, { Observable }) {
describe('filter', () => {
it('filters the results using the supplied callback', async () => {
let list = [];
return Observable.from([1, 2, 3, 4])
await Observable
.from([1, 2, 3, 4])
.filter(x => x > 2)
.forEach(x => list.push(x))
.then(() => test.equals(list, [3, 4]));
},
.forEach(x => list.push(x));
};
assert.deepEqual(list, [3, 4]);
});
});

@@ -1,12 +0,14 @@

export default {
const Observable = require('../src/Observable');
const assert = require('assert');
"Basics" (test, { Observable }) {
describe('map', () => {
it('maps the results using the supplied callback', async () => {
let list = [];
return Observable.from([1, 2, 3])
await Observable.from([1, 2, 3])
.map(x => x * 2)
.forEach(x => list.push(x))
.then(() => test.equals(list, [2, 4, 6]));
},
.forEach(x => list.push(x));
};
assert.deepEqual(list, [2, 4, 6]);
});
});

@@ -1,44 +0,39 @@

export default {
const Observable = require('../src/Observable');
const assert = require('assert');
"No seed" (test, { Observable }) {
return Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => {
describe('reduce', () => {
it('reduces without a seed', async () => {
await Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => {
return a + b;
}).forEach(x => {
test.equals(x, 21);
assert.equal(x, 21);
});
},
});
"No seed - one value" (test, { Observable }) {
return Observable.from([1]).reduce((a, b) => {
return a + b;
}).forEach(x => {
test.equals(x, 1);
});
},
it('errors if empty and no seed', async () => {
try {
await Observable.from([]).reduce((a, b) => {
return a + b;
}).forEach(() => null);
assert.ok(false);
} catch (err) {
assert.ok(true);
}
});
"No seed - empty (throws)" (test, { Observable }) {
return Observable.from([]).reduce((a, b) => {
it('reduces with a seed', async () => {
Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => {
return a + b;
}).forEach(() => null)
.then(
() => test.assert(false),
() => test.assert(true));
},
"Seed" (test, { Observable }) {
return Observable.from([1, 2, 3, 4, 5, 6]).reduce((a, b) => {
return a + b;
}, 100).forEach(x => {
test.equals(x, 121);
assert.equal(x, 121);
});
},
});
"Seed - empty" (test, { Observable }) {
return Observable.from([]).reduce((a, b) => {
it('reduces an empty list with a seed', async () => {
await Observable.from([]).reduce((a, b) => {
return a + b;
}, 100).forEach(x => {
test.equals(x, 100);
assert.equal(x, 100);
});
},
};
});
});

@@ -1,28 +0,29 @@

export default {
const Observable = require('../src/Observable');
const assert = require('assert');
"uses Observable when constructor is undefined" (test, { Observable }) {
describe('species', () => {
it('uses Observable when constructor is undefined', () => {
let instance = new Observable(() => {});
instance.constructor = undefined;
test.equals(instance.map(x => x) instanceof Observable, true);
},
assert.ok(instance.map(x => x) instanceof Observable);
});
"uses Observable if species is null" (test, { Observable }) {
it('uses Observable if species is null', () => {
let instance = new Observable(() => {});
instance.constructor = { [Symbol.species]: null };
test.equals(instance.map(x => x) instanceof Observable, true);
},
assert.ok(instance.map(x => x) instanceof Observable);
});
"uses Observable if species is undefined" (test, { Observable }) {
it('uses Observable if species is undefined', () => {
let instance = new Observable(() => {});
instance.constructor = { [Symbol.species]: undefined };
test.equals(instance.map(x => x) instanceof Observable, true);
},
assert.ok(instance.map(x => x) instanceof Observable);
});
"uses value of Symbol.species" (test, { Observable }) {
it('uses value of Symbol.species', () => {
function ctor() {}
let instance = new Observable(() => {});
instance.constructor = { [Symbol.species]: ctor };
test.equals(instance.map(x => x) instanceof ctor, true);
},
};
assert.ok(instance.map(x => x) instanceof ctor);
});
});
'use strict'; (function(fn, name) { if (typeof exports !== "undefined") { fn(exports, module); } else if (typeof self !== "undefined") { var e = name === "*" ? self : (name ? self[name] = {} : {}); fn(e, { exports: e }); } })(function(exports, module) { // === Symbol Support ===
function hasSymbol(name) {
return typeof Symbol === "function" && Boolean(Symbol[name]);
}
var hasSymbols = function() { return typeof Symbol === 'function'; };
var hasSymbol = function(name) { return hasSymbols() && Boolean(Symbol[name]); };
var getSymbol = function(name) { return hasSymbol(name) ? Symbol[name] : '@@' + name; };
function getSymbol(name) {
return hasSymbol(name) ? Symbol[name] : "@@" + name;
if (hasSymbols() && !hasSymbol('observable')) {
Symbol.observable = Symbol('observable');
}
// Ponyfill Symbol.observable for interoperability with other libraries
if (typeof Symbol === "function" && !Symbol.observable) {
Symbol.observable = Symbol("observable");
}
// === Abstract Operations ===
function hostReportError(e) {
setTimeout(function() { throw e });
}
function getMethod(obj, key) {

@@ -28,4 +19,4 @@ var value = obj[key];

if (typeof value !== "function")
throw new TypeError(value + " is not a function");
if (typeof value !== 'function')
throw new TypeError(value + ' is not a function');

@@ -38,3 +29,3 @@ return value;

if (ctor !== undefined) {
ctor = ctor[getSymbol("species")];
ctor = ctor[getSymbol('species')];
if (ctor === null) {

@@ -47,2 +38,6 @@ ctor = undefined;

function isObservable(x) {
return x instanceof Observable; // SPEC: Brand check
}
function addMethods(target, methods) {

@@ -56,85 +51,89 @@ Object.keys(methods).forEach(function(k) {

function hostReportError(e) {
if (hostReportError.log) {
hostReportError.log(e);
} else {
setTimeout(function() { throw e });
}
}
function enqueue(fn) {
Promise.resolve().then(function() {
try { fn() }
catch (e) { hostReportError(e) }
});
}
function cleanupSubscription(subscription) {
// Assert: observer._observer is undefined
// ASSERT: subscription._observer is undefined
var cleanup = subscription._cleanup;
if (!cleanup)
if (cleanup === undefined)
return;
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
try { cleanup() }
catch (e) { hostReportError(e) }
if (!cleanup) {
return;
}
if (typeof cleanup === 'function') {
cleanup();
} else {
var unsubscribe$0 = getMethod(cleanup, 'unsubscribe');
if (unsubscribe$0) {
unsubscribe$0.call(cleanup);
}
}
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
return subscription._state === 'closed';
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription))
return;
subscription._observer = undefined;
cleanupSubscription(subscription);
subscription._state = 'closed';
}
function cleanupFromSubscription(subscription) {
return function() { subscription.unsubscribe() };
function validateSubscription(subscription) {
// ASSERT: subscription._state !== 'closed'
switch (subscription._state) {
case 'ready': break;
case 'initializing': throw new Error('Subscription is not initialized');
case 'running': throw new Error('Subscription observer is already running');
}
}
function Subscription(observer, subscriber) {
// Assert: subscriber is callable
// ASSERT: observer is an object
// ASSERT: subscriber is callable
// The observer must be an object
if (Object(observer) !== observer)
throw new TypeError("Observer must be an object");
this._cleanup = undefined;
this._observer = observer;
this._state = 'initializing';
try {
var start$0 = getMethod(observer, "start");
if (start$0) start$0.call(observer, this);
} catch (e) {
hostReportError(e);
}
var subscriptionObserver = new SubscriptionObserver(this);
if (subscriptionClosed(this))
return;
observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
var cleanup$0 = subscriber.call(undefined, observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup$0 != null) {
if (typeof cleanup$0.unsubscribe === "function")
cleanup$0 = cleanupFromSubscription(cleanup$0);
else if (typeof cleanup$0 !== "function")
throw new TypeError(cleanup$0 + " is not a function");
this._cleanup = cleanup$0;
}
this._cleanup = subscriber.call(undefined, subscriptionObserver);
} catch (e) {
// If an error occurs during startup, then attempt to send the error
// to the observer
observer.error(e);
return;
enqueue(function() { return subscriptionObserver.error(e); });
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
this._state = 'ready';
}
addMethods(Subscription.prototype = {}, {
get closed() { return subscriptionClosed(this) },
unsubscribe: function() { closeSubscription(this) },
get closed() {
return subscriptionClosed(this);
},
unsubscribe: function() {
if (!subscriptionClosed(this)) {
closeSubscription(this);
try { cleanupSubscription(this) }
catch (e) { hostReportError(e) }
}
},
});

@@ -148,16 +147,18 @@

get closed() { return subscriptionClosed(this._subscription) },
get closed() {
return subscriptionClosed(this._subscription);
},
next: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return;
validateSubscription(subscription);
var observer = subscription._observer;
subscription._state = 'running';
try {
// If the observer has a "next" method, send the next value
var m$0 = getMethod(observer, "next");
var m$0 = getMethod(observer, 'next');
if (m$0) m$0.call(observer, value);

@@ -167,2 +168,5 @@ } catch (e) {

}
if (!subscriptionClosed(subscription))
subscription._state = 'ready';
},

@@ -172,14 +176,13 @@

var subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription)) {
hostReportError(value);
return;
}
validateSubscription(subscription);
var observer = subscription._observer;
subscription._observer = undefined;
closeSubscription(subscription);
try {
var m$1 = getMethod(observer, "error");
var m$1 = getMethod(observer, 'error');
if (m$1) m$1.call(observer, value);

@@ -196,11 +199,12 @@ else throw value;

var subscription = this._subscription;
if (subscriptionClosed(subscription))
return;
validateSubscription(subscription);
var observer = subscription._observer;
subscription._observer = undefined;
closeSubscription(subscription);
try {
var m$2 = getMethod(observer, "complete");
var m$2 = getMethod(observer, 'complete');
if (m$2) m$2.call(observer);

@@ -217,9 +221,7 @@ } catch (e) {

function Observable(subscriber) {
// Constructor cannot be called as a function
if (!(this instanceof Observable))
throw new TypeError("Observable cannot be called as a function");
throw new TypeError('Observable cannot be called as a function');
// The stream subscriber must be a function
if (typeof subscriber !== "function")
throw new TypeError("Observable initializer must be a function");
if (typeof subscriber !== 'function')
throw new TypeError('Observable initializer must be a function');

@@ -231,13 +233,10 @@ this._subscriber = subscriber;

subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]);
if (typeof observer === 'function') {
subscribe: function(observer) {
if (typeof observer !== 'object' || observer === null) {
observer = {
next: observer,
error: args[0],
complete: args[1],
error: arguments[1],
complete: arguments[2],
};
} else if (typeof observer !== 'object' || observer === null) {
observer = {};
}
return new Subscription(observer, this._subscriber);

@@ -248,29 +247,16 @@ },

return new Promise(function(resolve, reject) {
if (typeof fn !== "function")
return Promise.reject(new TypeError(fn + " is not a function"));
if (typeof fn !== 'function') {
reject(new TypeError(fn + ' is not a function'));
return;
}
__this.subscribe({
_subscription: null,
start: function(subscription) {
if (Object(subscription) !== subscription)
throw new TypeError(subscription + " is not an object");
this._subscription = subscription;
},
var subscription = __this.subscribe({
next: function(value) {
var subscription = this._subscription;
if (subscription.closed)
return;
try {
fn(value);
} catch (err) {
reject(err);
} catch (e) {
reject(e);
subscription.unsubscribe();
}
},
error: reject,

@@ -283,4 +269,4 @@ complete: resolve,

map: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');

@@ -291,11 +277,6 @@ var C = getSpecies(this);

next: function(value) {
if (observer.closed)
return;
try { value = fn(value) }
catch (e) { return observer.error(e) }
observer.next(value);
},
error: function(e) { observer.error(e) },

@@ -307,4 +288,4 @@ complete: function() { observer.complete() },

filter: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');

@@ -315,11 +296,6 @@ var C = getSpecies(this);

next: function(value) {
if (observer.closed)
return;
try { if (!fn(value)) return }
try { if (!fn(value)) return; }
catch (e) { return observer.error(e) }
observer.next(value);
},
error: function(e) { observer.error(e) },

@@ -331,4 +307,4 @@ complete: function() { observer.complete() },

reduce: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');

@@ -344,5 +320,2 @@ var C = getSpecies(this);

next: function(value) {
if (observer.closed)
return;
var first = !hasValue;

@@ -362,5 +335,4 @@ hasValue = true;

complete: function() {
if (!hasValue && !hasSeed) {
return observer.error(new TypeError("Cannot reduce an empty sequence"));
}
if (!hasValue && !hasSeed)
return observer.error(new TypeError('Cannot reduce an empty sequence'));

@@ -374,5 +346,37 @@ observer.next(acc);

concat: function() { var __this = this; for (var sources = [], __$0 = 0; __$0 < arguments.length; ++__$0) sources.push(arguments[__$0]);
var C = getSpecies(this);
return new C(function(observer) {
var subscription;
function startNext(next) {
subscription = next.subscribe({
next: function(v) { observer.next(v) },
error: function(e) { observer.error(e) },
complete: function() {
if (sources.length === 0) {
subscription = undefined;
observer.complete();
} else {
startNext(C.from(sources.shift()));
}
},
});
}
startNext(__this);
return function() {
if (subscription) {
subscription = undefined;
subscription.unsubscribe();
}
};
});
},
});
Object.defineProperty(Observable.prototype, getSymbol("observable"), {
Object.defineProperty(Observable.prototype, getSymbol('observable'), {
value: function() { return this },

@@ -386,9 +390,8 @@ writable: true,

from: function(x) {
var C = typeof this === "function" ? this : Observable;
var C = typeof this === 'function' ? this : Observable;
if (x == null)
throw new TypeError(x + " is not an object");
throw new TypeError(x + ' is not an object');
var method = getMethod(x, getSymbol("observable"));
var method = getMethod(x, getSymbol('observable'));
if (method) {

@@ -398,5 +401,5 @@ var observable$0 = method.call(x);

if (Object(observable$0) !== observable$0)
throw new TypeError(observable$0 + " is not an object");
throw new TypeError(observable$0 + ' is not an object');
if (observable$0.constructor === C)
if (isObservable(observable$0) && observable$0.constructor === C)
return observable$0;

@@ -407,12 +410,16 @@

if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) {
return new C(function(observer) {
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value;
observer.next(item$0);
if (observer.closed)
return;
}
observer.complete();
});
if (hasSymbol('iterator')) {
method = getMethod(x, getSymbol('iterator'));
if (method) {
return new C(function(observer) {
enqueue(function() {
if (observer.closed) return;
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value;
observer.next(item$0);
if (observer.closed) return;
}
observer.complete();
});
});
}
}

@@ -422,26 +429,28 @@

return new C(function(observer) {
for (var i$0 = 0; i$0 < x.length; ++i$0) {
observer.next(x[i$0]);
if (observer.closed)
return;
}
observer.complete();
enqueue(function() {
if (observer.closed) return;
for (var i$0 = 0; i$0 < x.length; ++i$0) {
observer.next(x[i$0]);
if (observer.closed) return;
}
observer.complete();
});
});
}
throw new TypeError(x + " is not observable");
throw new TypeError(x + ' is not observable');
},
of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]);
var C = typeof this === "function" ? this : Observable;
var C = typeof this === 'function' ? this : Observable;
return new C(function(observer) {
for (var i$1 = 0; i$1 < items.length; ++i$1) {
observer.next(items[i$1]);
if (observer.closed)
return;
}
observer.complete();
enqueue(function() {
if (observer.closed) return;
for (var i$1 = 0; i$1 < items.length; ++i$1) {
observer.next(items[i$1]);
if (observer.closed) return;
}
observer.complete();
});
});

@@ -452,3 +461,3 @@ },

Object.defineProperty(Observable, getSymbol("species"), {
Object.defineProperty(Observable, getSymbol('species'), {
get: function() { return this },

@@ -458,12 +467,12 @@ configurable: true,

Object.defineProperty(Observable, "extensions", {
value: {
observableSymbol: getSymbol("observable"),
setHostReportError: function(fn) { hostReportError = fn },
},
});
if (hasSymbols()) {
Object.defineProperty(Observable, Symbol('extensions'), {
value: { symbol: getSymbol('observable'), hostReportError: hostReportError },
configurable: true,
});
}
exports.Observable = Observable;
module.exports = Observable;
}, "*");
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc