Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

zen-observable-ts

Package Overview
Dependencies
Maintainers
3
Versions
40
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zen-observable-ts - npm Package Compare versions

Comparing version 0.5.0 to 0.8.6

CHANGELOG.md

401

lib/bundle.umd.js
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports) :
typeof define === 'function' && define.amd ? define(['exports'], factory) :
(factory((global.zenObservable = {})));
(factory((global.apolloLink = global.apolloLink || {}, global.apolloLink.zenObservable = {})));
}(this, (function (exports) { 'use strict';
function cleanupSubscription(subscription) {
var cleanup = subscription._cleanup;
if (!cleanup) {
return;
}
subscription._cleanup = undefined;
cleanup();
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription)) {
return;
}
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription) {
return function () {
subscription.unsubscribe();
};
}
var Subscription = (function () {
function Subscription(observer, subscriber) {
if (Object(observer) !== observer) {
throw new TypeError('Observer must be an object');
}
this._cleanup = undefined;
this._observer = observer;
if (observer.start) {
observer.start(this);
}
if (subscriptionClosed(this)) {
return;
}
var _observer = new SubscriptionObserver(this);
try {
var cleanup = subscriber(_observer);
if (cleanup != null) {
if (typeof cleanup.unsubscribe ===
'function') {
cleanup = cleanupFromSubscription(cleanup);
}
else if (typeof cleanup !== 'function') {
throw new TypeError(cleanup + ' is not a function');
}
this._cleanup = cleanup;
}
}
catch (e) {
if (_observer.error) {
_observer.error(e);
}
return;
}
if (subscriptionClosed(this)) {
cleanupSubscription(this);
}
}
Object.defineProperty(Subscription.prototype, "closed", {
get: function () {
return subscriptionClosed(this);
},
enumerable: true,
configurable: true
});
Subscription.prototype.unsubscribe = function () {
closeSubscription(this);
};
return Subscription;
}());
var SubscriptionObserver = (function () {
function SubscriptionObserver(subscription) {
this._subscription = subscription;
}
Object.defineProperty(SubscriptionObserver.prototype, "closed", {
get: function () {
return subscriptionClosed(this._subscription);
},
enumerable: true,
configurable: true
});
SubscriptionObserver.prototype.next = function (value) {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
return;
}
var observer = subscription._observer;
if (!observer.next) {
return;
}
observer.next(value);
return;
};
SubscriptionObserver.prototype.error = function (value) {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
throw value;
}
var observer = subscription._observer;
subscription._observer = undefined;
try {
if (!observer.error) {
throw value;
}
observer.error(value);
}
catch (e) {
try {
cleanupSubscription(subscription);
}
finally {
throw e;
}
}
cleanupSubscription(subscription);
};
SubscriptionObserver.prototype.complete = function () {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
return;
}
var observer = subscription._observer;
subscription._observer = undefined;
try {
if (observer.complete) {
observer.complete();
}
}
catch (e) {
try {
cleanupSubscription(subscription);
}
finally {
throw e;
}
}
cleanupSubscription(subscription);
};
return SubscriptionObserver;
}());
var Observable = (function () {
function Observable(subscriber) {
if (typeof subscriber !== 'function') {
throw new TypeError('Observable initializer must be a function');
}
this._subscriber = subscriber;
}
Observable.from = function (observable) {
if (observable.subscribe) {
return new Observable(function (observer) {
return observable.subscribe(observer);
});
}
if (Array.isArray(observable)) {
return new Observable(function (observer) {
for (var i = 0; i < observable.length; ++i) {
observer.next(observable[i]);
if (observer.closed) {
return;
}
}
if (observer.complete) {
observer.complete();
}
});
}
throw new TypeError(observable + ' is not observable');
};
Observable.of = function () {
var items = [];
for (var _i = 0; _i < arguments.length; _i++) {
items[_i] = arguments[_i];
}
return new Observable(function (observer) {
for (var i = 0; i < items.length; ++i) {
observer.next(items[i]);
if (observer.closed) {
return;
}
}
if (observer.complete) {
observer.complete();
}
});
};
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
if (typeof observerOrNext === 'function') {
return new Subscription({
next: observerOrNext,
error: error,
complete: complete,
}, this._subscriber);
}
return new Subscription(observerOrNext, this._subscriber);
};
Observable.prototype.forEach = function (fn) {
var _this = this;
return new Promise(function (resolve, reject) {
if (typeof fn !== 'function') {
return Promise.reject(new TypeError(fn + ' is not a function'));
}
_this.subscribe({
start: function (subscription) {
this._subscription = subscription;
},
next: function (value) {
var subscription = this._subscription;
if (subscription.closed) {
return;
}
try {
fn(value);
return;
}
catch (err) {
reject(err);
subscription.unsubscribe();
}
},
error: reject,
complete: resolve,
});
});
};
Observable.prototype.map = function (fn) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(function (observer) {
return _this.subscribe({
next: function (value) {
if (observer.closed) {
return;
}
var _value;
try {
_value = fn(value);
}
catch (e) {
observer.error(e);
return;
}
observer.next(_value);
},
error: function (e) {
observer.error(e);
},
complete: function () {
observer.complete();
},
});
});
};
Observable.prototype.filter = function (fn) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(function (observer) {
_this.subscribe({
next: function (value) {
if (observer.closed) {
return;
}
try {
if (!fn(value)) {
return;
}
}
catch (e) {
if (observer.error) {
observer.error(e);
}
return;
}
observer.next(value);
},
error: function (e) {
observer.error(e);
},
complete: function () {
observer.complete();
},
});
});
};
Observable.prototype.reduce = function (fn, initialValue) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
var hasSeed = arguments.length > 1;
var hasValue = false;
var seed = arguments[1];
var acc = seed;
return new Observable(function (observer) {
_this.subscribe({
next: function (value) {
if (observer.closed) {
return;
}
var first = !hasValue;
hasValue = true;
if (!first || hasSeed) {
try {
acc = fn(acc, value);
}
catch (e) {
observer.error(e);
return;
}
}
else {
acc = value;
}
},
error: function (e) {
observer.error(e);
},
complete: function () {
if (!hasValue && !hasSeed) {
observer.error(new TypeError('Cannot reduce an empty sequence'));
return;
}
observer.next(acc);
observer.complete();
},
});
});
};
Observable.prototype.flatMap = function (fn) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(function (observer) {
var completed = false;
var subscriptions = [];
var outer = _this.subscribe({
next: function (value) {
var _value;
if (fn) {
try {
_value = fn(value);
}
catch (x) {
observer.error(x);
return;
}
}
Observable.from(_value).subscribe({
start: function (s) {
subscriptions.push((this._subscription = s));
},
next: function (data) {
observer.next(data);
},
error: function (e) {
observer.error(e);
},
complete: function () {
var i = subscriptions.indexOf(this._subscription);
if (i >= 0) {
subscriptions.splice(i, 1);
}
closeIfDone();
},
});
},
error: function (e) {
observer.error(e);
},
complete: function () {
completed = true;
closeIfDone();
},
});
function closeIfDone() {
if (completed && subscriptions.length === 0) {
observer.complete();
}
}
return function () {
subscriptions.forEach(function (s) { return s.unsubscribe(); });
outer.unsubscribe();
};
});
};
return Observable;
}());
var Observable = require('zen-observable');
exports.Subscription = Subscription;
exports.SubscriptionObserver = SubscriptionObserver;
exports['default'] = Observable;
exports.default = Observable;
exports.Observable = Observable;

@@ -405,0 +12,0 @@ Object.defineProperty(exports, '__esModule', { value: true });

@@ -6,22 +6,8 @@ import { ZenObservable } from './types';

export declare type ObservableLike<T> = ZenObservable.ObservableLike<T>;
export declare class Subscription implements ZenObservable.Subscription {
_observer?: ZenObservable.Observer<any>;
_cleanup: () => void;
constructor(observer: ZenObservable.Observer<any>, subscriber: ZenObservable.Subscriber<any>);
readonly closed: boolean;
unsubscribe(): void;
}
export declare class SubscriptionObserver<T> implements ZenObservable.SubscriptionObserver<T> {
private _subscription;
constructor(subscription: Subscription);
readonly closed: boolean;
next(value: T): void;
error(value: T): void;
complete(): void;
}
export default class Observable<T> {
private _subscriber;
static from<R>(observable: Observable<R> | ZenObservable.ObservableLike<R> | ArrayLike<R>): Observable<R>;
static of<R>(...items: R[]): Observable<R>;
constructor(subscriber: ZenObservable.Subscriber<T>);
export declare const Observable: {
new <T>(subscriber: Subscriber<T>): Observable<T>;
from<R>(...args: Array<R>): Observable<R>;
of<R>(...args: Array<R>): Observable<R>;
};
export interface Observable<T> {
subscribe(observerOrNext: ((value: T) => void) | ZenObservable.Observer<T>, error?: (error: any) => void, complete?: () => void): ZenObservable.Subscription;

@@ -32,3 +18,6 @@ forEach(fn: (value: T) => void): Promise<void>;

reduce<R = T>(fn: (previousValue: R | T, currentValue: T) => R | T, initialValue?: R | T): Observable<R | T>;
concat(...sources: Array<Observable<T>>): any;
flatMap<R>(fn: (value: T) => ZenObservable.ObservableLike<R>): Observable<R>;
from<R>(...args: Array<R>): Observable<R>;
of<R>(...args: Array<R>): Observable<R>;
}

@@ -1,397 +0,2 @@

function cleanupSubscription(subscription) {
var cleanup = subscription._cleanup;
if (!cleanup) {
return;
}
subscription._cleanup = undefined;
cleanup();
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription)) {
return;
}
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription) {
return function () {
subscription.unsubscribe();
};
}
var Subscription = (function () {
function Subscription(observer, subscriber) {
if (Object(observer) !== observer) {
throw new TypeError('Observer must be an object');
}
this._cleanup = undefined;
this._observer = observer;
if (observer.start) {
observer.start(this);
}
if (subscriptionClosed(this)) {
return;
}
var _observer = new SubscriptionObserver(this);
try {
var cleanup = subscriber(_observer);
if (cleanup != null) {
if (typeof cleanup.unsubscribe ===
'function') {
cleanup = cleanupFromSubscription(cleanup);
}
else if (typeof cleanup !== 'function') {
throw new TypeError(cleanup + ' is not a function');
}
this._cleanup = cleanup;
}
}
catch (e) {
if (_observer.error) {
_observer.error(e);
}
return;
}
if (subscriptionClosed(this)) {
cleanupSubscription(this);
}
}
Object.defineProperty(Subscription.prototype, "closed", {
get: function () {
return subscriptionClosed(this);
},
enumerable: true,
configurable: true
});
Subscription.prototype.unsubscribe = function () {
closeSubscription(this);
};
return Subscription;
}());
export { Subscription };
var SubscriptionObserver = (function () {
function SubscriptionObserver(subscription) {
this._subscription = subscription;
}
Object.defineProperty(SubscriptionObserver.prototype, "closed", {
get: function () {
return subscriptionClosed(this._subscription);
},
enumerable: true,
configurable: true
});
SubscriptionObserver.prototype.next = function (value) {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
return;
}
var observer = subscription._observer;
if (!observer.next) {
return;
}
observer.next(value);
return;
};
SubscriptionObserver.prototype.error = function (value) {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
throw value;
}
var observer = subscription._observer;
subscription._observer = undefined;
try {
if (!observer.error) {
throw value;
}
observer.error(value);
}
catch (e) {
try {
cleanupSubscription(subscription);
}
finally {
throw e;
}
}
cleanupSubscription(subscription);
};
SubscriptionObserver.prototype.complete = function () {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
return;
}
var observer = subscription._observer;
subscription._observer = undefined;
try {
if (observer.complete) {
observer.complete();
}
}
catch (e) {
try {
cleanupSubscription(subscription);
}
finally {
throw e;
}
}
cleanupSubscription(subscription);
};
return SubscriptionObserver;
}());
export { SubscriptionObserver };
var Observable = (function () {
function Observable(subscriber) {
if (typeof subscriber !== 'function') {
throw new TypeError('Observable initializer must be a function');
}
this._subscriber = subscriber;
}
Observable.from = function (observable) {
if (observable.subscribe) {
return new Observable(function (observer) {
return observable.subscribe(observer);
});
}
if (Array.isArray(observable)) {
return new Observable(function (observer) {
for (var i = 0; i < observable.length; ++i) {
observer.next(observable[i]);
if (observer.closed) {
return;
}
}
if (observer.complete) {
observer.complete();
}
});
}
throw new TypeError(observable + ' is not observable');
};
Observable.of = function () {
var items = [];
for (var _i = 0; _i < arguments.length; _i++) {
items[_i] = arguments[_i];
}
return new Observable(function (observer) {
for (var i = 0; i < items.length; ++i) {
observer.next(items[i]);
if (observer.closed) {
return;
}
}
if (observer.complete) {
observer.complete();
}
});
};
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
if (typeof observerOrNext === 'function') {
return new Subscription({
next: observerOrNext,
error: error,
complete: complete,
}, this._subscriber);
}
return new Subscription(observerOrNext, this._subscriber);
};
Observable.prototype.forEach = function (fn) {
var _this = this;
return new Promise(function (resolve, reject) {
if (typeof fn !== 'function') {
return Promise.reject(new TypeError(fn + ' is not a function'));
}
_this.subscribe({
start: function (subscription) {
this._subscription = subscription;
},
next: function (value) {
var subscription = this._subscription;
if (subscription.closed) {
return;
}
try {
fn(value);
return;
}
catch (err) {
reject(err);
subscription.unsubscribe();
}
},
error: reject,
complete: resolve,
});
});
};
Observable.prototype.map = function (fn) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(function (observer) {
return _this.subscribe({
next: function (value) {
if (observer.closed) {
return;
}
var _value;
try {
_value = fn(value);
}
catch (e) {
observer.error(e);
return;
}
observer.next(_value);
},
error: function (e) {
observer.error(e);
},
complete: function () {
observer.complete();
},
});
});
};
Observable.prototype.filter = function (fn) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(function (observer) {
_this.subscribe({
next: function (value) {
if (observer.closed) {
return;
}
try {
if (!fn(value)) {
return;
}
}
catch (e) {
if (observer.error) {
observer.error(e);
}
return;
}
observer.next(value);
},
error: function (e) {
observer.error(e);
},
complete: function () {
observer.complete();
},
});
});
};
Observable.prototype.reduce = function (fn, initialValue) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
var hasSeed = arguments.length > 1;
var hasValue = false;
var seed = arguments[1];
var acc = seed;
return new Observable(function (observer) {
_this.subscribe({
next: function (value) {
if (observer.closed) {
return;
}
var first = !hasValue;
hasValue = true;
if (!first || hasSeed) {
try {
acc = fn(acc, value);
}
catch (e) {
observer.error(e);
return;
}
}
else {
acc = value;
}
},
error: function (e) {
observer.error(e);
},
complete: function () {
if (!hasValue && !hasSeed) {
observer.error(new TypeError('Cannot reduce an empty sequence'));
return;
}
observer.next(acc);
observer.complete();
},
});
});
};
Observable.prototype.flatMap = function (fn) {
var _this = this;
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(function (observer) {
var completed = false;
var subscriptions = [];
var outer = _this.subscribe({
next: function (value) {
var _value;
if (fn) {
try {
_value = fn(value);
}
catch (x) {
observer.error(x);
return;
}
}
Observable.from(_value).subscribe({
start: function (s) {
subscriptions.push((this._subscription = s));
},
next: function (data) {
observer.next(data);
},
error: function (e) {
observer.error(e);
},
complete: function () {
var i = subscriptions.indexOf(this._subscription);
if (i >= 0) {
subscriptions.splice(i, 1);
}
closeIfDone();
},
});
},
error: function (e) {
observer.error(e);
},
complete: function () {
completed = true;
closeIfDone();
},
});
function closeIfDone() {
if (completed && subscriptions.length === 0) {
observer.complete();
}
}
return function () {
subscriptions.forEach(function (s) { return s.unsubscribe(); });
outer.unsubscribe();
};
});
};
return Observable;
}());
export default Observable;
export var Observable = require('zen-observable');
//# sourceMappingURL=zenObservable.js.map
{
"name": "zen-observable-ts",
"version": "0.5.0",
"version": "0.8.6",
"description": "An Implementation of ES Observables in Typescript",

@@ -9,5 +9,5 @@ "author": "Evans Hauser <evanshauser@gmail.com>",

"main": "./lib/bundle.umd.js",
"module": "./lib/zenObservable.js",
"jsnext:main": "./lib/zenObservable.js",
"typings": "./lib/zenObservable.d.ts",
"module": "./lib/index.js",
"jsnext:main": "./lib/index.js",
"typings": "./lib/index.d.ts",
"repository": {

@@ -22,4 +22,3 @@ "type": "git",

"scripts": {
"build:browser":
"browserify ./lib/bundle.umd.js -o=./lib/bundle.js && npm run minify:browser",
"build:browser": "browserify ./lib/bundle.umd.js -o=./lib/bundle.js --i zen-observable && npm run minify:browser",
"build": "tsc -p .",

@@ -29,6 +28,4 @@ "bundle": "rollup -c",

"filesize": "npm run build && npm run build:browser",
"lint":
"tslint --type-check -p tsconfig.json -c ../../tslint.json src/*.ts",
"minify:browser":
"uglifyjs -c -m -o ./lib/bundle.min.js -- ./lib/bundle.js",
"lint": "tslint --type-check -p tsconfig.json -c ../../tslint.json src/*.ts",
"minify:browser": "uglifyjs -c -m -o ./lib/bundle.min.js -- ./lib/bundle.js",
"postbuild": "npm run bundle",

@@ -41,11 +38,11 @@ "prebuild": "npm run clean",

"devDependencies": {
"@types/jest": "21.1.1",
"browserify": "14.4.0",
"jest": "21.2.1",
"rimraf": "2.6.1",
"rollup": "0.45.2",
"ts-jest": "21.0.1",
"tslint": "5.7.0",
"typescript": "2.5.1",
"uglify-js": "3.1.3"
"@types/jest": "^22.1.3",
"browserify": "^16.1.0",
"jest": "^22.4.0",
"rimraf": "^2.6.2",
"rollup": "^0.56.2",
"ts-jest": "^22.0.4",
"tslint": "^5.9.1",
"typescript": "^2.7.2",
"uglify-js": "^3.3.11"
},

@@ -57,4 +54,12 @@ "jest": {

"testRegex": "(/__tests__/.*|\\.(test|spec))\\.(ts|tsx|js)$",
"moduleFileExtensions": ["ts", "tsx", "js", "json"]
"moduleFileExtensions": [
"ts",
"tsx",
"js",
"json"
]
},
"dependencies": {
"zen-observable": "^0.8.6"
}
}

@@ -1,17 +0,3 @@

export default {
entry: 'lib/zenObservable.js',
dest: 'lib/bundle.umd.js',
format: 'umd',
sourceMap: true,
moduleName: 'zenObservable',
exports: 'named',
onwarn,
};
import build from '../../rollup.config';
function onwarn(message) {
const suppressed = ['UNRESOLVED_IMPORT', 'THIS_IS_UNDEFINED'];
if (!suppressed.find(code => message.code === code)) {
return console.warn(message.message);
}
}
export default build('zenObservable');
import Observable from '../zenObservable';
describe('flatMap', () => {
it('Observable.from', () => {
describe.skip('flatMap', () => {
it('Observable.from', done => {
let list: Array<number> = [];
return Observable.from([1, 2, 3])
.flatMap(x => {
return Observable.from([x * 1, x * 2, x * 3]);
})
.forEach(x => {
list.push(x);
})
.then(() => {
expect(list).toEqual([1, 2, 3, 2, 4, 6, 3, 6, 9]);
});
try {
Observable.from([1, 2, 3])
.flatMap(x => {
return Observable.from([x * 1, x * 2, x * 3]);
})
.forEach(x => {
list.push(x);
})
.then(() => {
expect(list).toEqual([1, 2, 3, 2, 4, 6, 3, 6, 9]);
done();
});
} catch (e) {
done.fail(e);
}
});

@@ -28,9 +33,14 @@

it('throws on not a function', () => {
return expect(
() =>
it('throws on not a function', done => {
try {
expect(() =>
Observable.from([1, 2, 3, 4])
.flatMap(<any>1)
.forEach(x => void 0).then,
).toThrow();
.forEach(x => void 0)
.then(() => done.fail()),
).toThrow();
done();
} catch (e) {
done.fail(e);
}
});

@@ -40,14 +50,18 @@

const error = new Error('thrown');
return expect(() =>
Observable.from([1, 2, 3, 4])
.flatMap(() => {
throw error;
})
.subscribe({
error: err => {
expect(err).toEqual(error);
done();
},
}),
).toThrow();
try {
return expect(() =>
Observable.from([1, 2, 3, 4])
.flatMap(() => {
throw error;
})
.subscribe({
error: err => {
expect(err).toEqual(error);
done();
},
}),
).toThrow();
} catch (e) {
done.fail(e);
}
});

@@ -54,0 +68,0 @@

import Observable from '../zenObservable';
describe('forEach ', () => {
it('throws on not a function', () => {
expect(Observable.from([1, 2, 3, 4]).forEach(<any>1).then).toThrow();
it('throws on not a function', done => {
try {
debugger;
Observable.from([1, 2, 3, 4])
.forEach(<any>1)
.then(() => done.fail())
.catch(e => {
try {
expect(e.message).toMatch(/not a function/);
done();
} catch (e) {
done.fail(e);
}
});
} catch (e) {
done.fail(e);
}
});

@@ -7,0 +22,0 @@

@@ -13,9 +13,12 @@ import Observable from '../zenObservable';

it('throws on not a function', () => {
return expect(
() =>
Observable.from([1, 2, 3, 4])
.map(<any>1)
.forEach(x => void 0).then,
).toThrow();
it('throws on not a function', done => {
try {
Observable.from([1, 2, 3, 4])
.map(<any>1)
.forEach(x => void 0)
.then(() => done.fail());
} catch (e) {
expect(e.message).toMatch(/not a function/);
done();
}
});

@@ -25,5 +28,7 @@

const error = new Error('thrown');
return expect(() =>
try {
Observable.from([1, 2, 3, 4])
.map(() => {
.map(num => {
expect(num).toEqual(1);
debugger;
throw error;

@@ -36,4 +41,6 @@ })

},
}),
).not.toThrow();
});
} catch (e) {
done.fail(e);
}
});

@@ -40,0 +47,0 @@

@@ -30,11 +30,2 @@ import Observable from '../zenObservable';

describe('observer', () => {
it('throws when cleanup is not a function', () => {
expect(() => {
const sub = new Observable<number>(observer => {
return <any>1;
}).subscribe({});
sub.unsubscribe();
}).toThrow();
});
it('recalling next, error, complete have no effect', () => {

@@ -67,2 +58,3 @@ const spy = jest.fn();

observer.complete();
return;
}).subscribe({

@@ -96,20 +88,2 @@ complete: () => {

});
it('throws error after complete', () => {
const spy = jest.fn();
const error = new Error('throws');
return new Promise((resolve, reject) => {
new Observable<number>(observer => {
observer.complete();
observer.error(error);
spy();
}).subscribe({
next: reject,
error: reject,
});
}).catch(err => {
expect(spy).not.toBeCalled();
expect(err).toEqual(error);
});
});
});

@@ -52,9 +52,12 @@ import Observable from '../zenObservable';

it('throws on not a function', () => {
return expect(
() =>
Observable.from([1, 2, 3, 4])
.reduce(<any>1)
.forEach(x => void 0).then,
).toThrow();
it('throws on not a function', done => {
try {
Observable.from([1, 2, 3, 4])
.reduce(<any>1)
.forEach(x => void 0)
.then(() => done.fail());
} catch (e) {
expect(e.message).toMatch(/not a function/);
done();
}
});

@@ -61,0 +64,0 @@

@@ -0,1 +1,7 @@

declare function require(name: string);
namespace Observable {
}
import { ZenObservable } from './types';

@@ -9,493 +15,33 @@

// === Abstract Operations ===
function cleanupSubscription(subscription: Subscription) {
// Assert: observer._observer is undefined
export const Observable: {
new <T>(subscriber: Subscriber<T>): Observable<T>;
from<R>(...args: Array<R>): Observable<R>;
of<R>(...args: Array<R>): Observable<R>;
} = require('zen-observable');
let cleanup = subscription._cleanup;
if (!cleanup) {
return;
}
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
cleanup();
}
function subscriptionClosed(subscription: Subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription: Subscription) {
if (subscriptionClosed(subscription)) {
return;
}
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription: ZenObservable.Subscription) {
return () => {
subscription.unsubscribe();
};
}
export class Subscription implements ZenObservable.Subscription {
public _observer?: ZenObservable.Observer<any>;
public _cleanup: () => void;
constructor(
observer: ZenObservable.Observer<any>,
subscriber: ZenObservable.Subscriber<any>,
) {
// Assert: subscriber is callable
// The observer must be an object
if (Object(observer) !== observer) {
throw new TypeError('Observer must be an object');
}
this._cleanup = undefined;
this._observer = observer;
if (observer.start) {
observer.start(this);
}
if (subscriptionClosed(this)) {
return;
}
let _observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
let cleanup = subscriber(_observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup != null) {
if (
typeof (<ZenObservable.Subscription>cleanup).unsubscribe ===
'function'
) {
cleanup = cleanupFromSubscription(
cleanup as ZenObservable.Subscription,
);
} else if (typeof cleanup !== 'function') {
throw new TypeError(cleanup + ' is not a function');
}
this._cleanup = cleanup;
}
} catch (e) {
// If an error occurs during startup, then attempt to send the error
// to the observer
if (_observer.error) {
_observer.error(e);
}
return;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this)) {
cleanupSubscription(this);
}
}
get closed() {
return subscriptionClosed(this);
}
public unsubscribe() {
closeSubscription(this);
}
}
export class SubscriptionObserver<T>
implements ZenObservable.SubscriptionObserver<T> {
private _subscription: Subscription;
constructor(subscription: Subscription) {
this._subscription = subscription;
}
get closed() {
return subscriptionClosed(this._subscription);
}
public next(value: T) {
let subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription)) {
return;
}
let observer = subscription._observer;
// If the observer doesn't support "next", then return undefined
if (!observer.next) {
return;
}
// Send the next value to the sink
observer.next(value);
return;
}
public error(value: T) {
let subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription)) {
throw value;
}
let observer = subscription._observer;
subscription._observer = undefined;
try {
// If the sink does not support "error", then throw the error to the caller
if (!observer.error) {
throw value;
}
observer.error(value);
} catch (e) {
try {
cleanupSubscription(subscription);
} finally {
throw e;
}
}
cleanupSubscription(subscription);
}
public complete() {
let subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription)) {
return;
}
let observer = subscription._observer;
subscription._observer = undefined;
try {
if (observer.complete) {
observer.complete();
}
} catch (e) {
try {
cleanupSubscription(subscription);
} finally {
throw e;
}
}
cleanupSubscription(subscription);
}
}
export default class Observable<T> {
private _subscriber: ZenObservable.Subscriber<T>;
public static from<R>(
observable: Observable<R> | ZenObservable.ObservableLike<R> | ArrayLike<R>,
): Observable<R> {
if ((<ZenObservable.ObservableLike<R>>observable).subscribe) {
return new Observable(observer =>
(<ZenObservable.ObservableLike<R>>observable).subscribe(observer),
);
}
if (Array.isArray(observable)) {
return new Observable(observer => {
for (let i = 0; i < observable.length; ++i) {
observer.next(observable[i]);
if (observer.closed) {
return;
}
}
if (observer.complete) {
observer.complete();
}
});
}
throw new TypeError(observable + ' is not observable');
}
public static of<R>(...items: R[]): Observable<R> {
return new Observable(observer => {
for (let i = 0; i < items.length; ++i) {
observer.next(items[i]);
if (observer.closed) {
return;
}
}
if (observer.complete) {
observer.complete();
}
});
}
constructor(subscriber: ZenObservable.Subscriber<T>) {
// The stream subscriber must be a function
if (typeof subscriber !== 'function') {
throw new TypeError('Observable initializer must be a function');
}
this._subscriber = subscriber;
}
public subscribe(
export interface Observable<T> {
subscribe(
observerOrNext: ((value: T) => void) | ZenObservable.Observer<T>,
error?: (error: any) => void,
complete?: () => void,
): ZenObservable.Subscription {
if (typeof observerOrNext === 'function') {
return new Subscription(
{
next: observerOrNext,
error,
complete,
},
this._subscriber,
);
}
): ZenObservable.Subscription;
return new Subscription(observerOrNext, this._subscriber);
}
forEach(fn: (value: T) => void): Promise<void>;
public forEach(fn: (value: T) => void): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (typeof fn !== 'function') {
return Promise.reject(new TypeError(fn + ' is not a function'));
}
map<R>(fn: (value: T) => R): Observable<R>;
this.subscribe(<ZenObservable.Observer<T>>{
start(subscription: ZenObservable.Subscription) {
this._subscription = subscription;
},
filter(fn: (value: T) => boolean): Observable<T>;
next(value: T) {
let subscription = this._subscription;
if (subscription.closed) {
return;
}
try {
fn(value);
return;
} catch (err) {
reject(err);
subscription.unsubscribe();
}
},
error: reject,
complete: resolve,
});
});
}
public map<R>(fn: (value: T) => R): Observable<R> {
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(observer => {
return this.subscribe({
next(value: T) {
if (observer.closed) {
return;
}
let _value: R;
try {
_value = fn(value);
} catch (e) {
observer.error(e);
return;
}
observer.next(_value);
},
error(e: any) {
observer.error(e);
},
complete() {
observer.complete();
},
});
});
}
public filter(fn: (value: T) => boolean): Observable<T> {
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(observer => {
this.subscribe({
next(value: T) {
if (observer.closed) {
return;
}
try {
if (!fn(value)) {
return;
}
} catch (e) {
if (observer.error) {
observer.error(e);
}
return;
}
observer.next(value);
},
error(e: any) {
observer.error(e);
},
complete() {
observer.complete();
},
});
});
}
public reduce<R = T>(
reduce<R = T>(
fn: (previousValue: R | T, currentValue: T) => R | T,
initialValue?: R | T,
): Observable<R | T> {
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
): Observable<R | T>;
let hasSeed = arguments.length > 1;
let hasValue = false;
let seed = arguments[1];
let acc = seed;
concat(...sources: Array<Observable<T>>);
return new Observable<R | T>(observer => {
this.subscribe({
next(value: R | T) {
if (observer.closed) {
return;
}
flatMap<R>(fn: (value: T) => ZenObservable.ObservableLike<R>): Observable<R>;
let first = !hasValue;
hasValue = true;
from<R>(...args: Array<R>): Observable<R>;
if (!first || hasSeed) {
try {
acc = fn(acc, <T>value);
} catch (e) {
observer.error(e);
return;
}
} else {
acc = value;
}
},
error(e: any) {
observer.error(e);
},
complete() {
if (!hasValue && !hasSeed) {
observer.error(new TypeError('Cannot reduce an empty sequence'));
return;
}
observer.next(acc);
observer.complete();
},
});
});
}
public flatMap<R>(
fn: (value: T) => ZenObservable.ObservableLike<R>,
): Observable<R> {
if (typeof fn !== 'function') {
throw new TypeError(fn + ' is not a function');
}
return new Observable(observer => {
let completed = false;
let subscriptions: Array<ZenObservable.Subscription> = [];
// Subscribe to the outer Observable
let outer = this.subscribe({
next(value: T) {
let _value: ZenObservable.ObservableLike<R>;
if (fn) {
try {
_value = fn(value);
} catch (x) {
observer.error(x);
return;
}
}
// Subscribe to the inner Observable
Observable.from(_value).subscribe({
start(s: ZenObservable.Subscription) {
subscriptions.push((this._subscription = s));
},
next(data: R) {
observer.next(data);
},
error(e) {
observer.error(e);
},
complete() {
let i = subscriptions.indexOf(this._subscription);
if (i >= 0) {
subscriptions.splice(i, 1);
}
closeIfDone();
},
});
},
error(e) {
observer.error(e);
},
complete() {
completed = true;
closeIfDone();
},
});
function closeIfDone() {
if (completed && subscriptions.length === 0) {
observer.complete();
}
}
return () => {
subscriptions.forEach(s => s.unsubscribe());
outer.unsubscribe();
};
});
}
of<R>(...args: Array<R>): Observable<R>;
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc