Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

zen-observable

Package Overview
Dependencies
Maintainers
1
Versions
37
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zen-observable - npm Package Compare versions

Comparing version 0.8.3 to 0.8.5

155

lib/Observable.js

@@ -75,4 +75,2 @@ 'use strict';

function cleanupSubscription(subscription) {
// ASSERT: subscription._observer is undefined
var cleanup = subscription._cleanup;

@@ -87,33 +85,69 @@ if (cleanup === undefined) return;

if (typeof cleanup === 'function') {
cleanup();
} else {
var unsubscribe = getMethod(cleanup, 'unsubscribe');
if (unsubscribe) {
unsubscribe.call(cleanup);
try {
if (typeof cleanup === 'function') {
cleanup();
} else {
var unsubscribe = getMethod(cleanup, 'unsubscribe');
if (unsubscribe) {
unsubscribe.call(cleanup);
}
}
} catch (e) {
hostReportError(e);
}
}
function subscriptionClosed(subscription) {
return subscription._state === 'closed';
}
function closeSubscription(subscription) {
subscription._observer = undefined;
subscription._queue = undefined;
subscription._state = 'closed';
}
function validateSubscription(subscription) {
// ASSERT: subscription._state !== 'closed'
switch (subscription._state) {
case 'ready':
break;
case 'initializing':
throw new Error('Subscription is not initialized');
case 'running':
throw new Error('Subscription observer is already running');
function flushSubscription(subscription) {
var queue = subscription._queue;
subscription._queue = undefined;
subscription._state = 'ready';
for (var i = 0; i < queue.length; ++i) {
notifySubscription(subscription, queue[i].type, queue[i].value);
}
}
function notifySubscription(subscription, type, value) {
if (subscription._state === 'closed') return;
if (subscription._state !== 'ready') {
if (!subscription._queue) {
enqueue(function () {
return flushSubscription(subscription);
});
subscription._queue = [];
}
subscription._queue.push({ type: type, value: value });
return;
}
var observer = subscription._observer;
try {
var m = getMethod(observer, type);
switch (type) {
case 'next':
if (m) m.call(observer, value);
break;
case 'error':
closeSubscription(subscription);
if (m) m.call(observer, value);else throw value;
break;
case 'complete':
closeSubscription(subscription);
if (m) m.call(observer);
break;
}
} catch (e) {
hostReportError(e);
}
if (subscription._state === 'closed') cleanupSubscription(subscription);
}
var Subscription = function () {

@@ -128,2 +162,3 @@ function Subscription(observer, subscriber) {

this._observer = observer;
this._queue = undefined;
this._state = 'initializing';

@@ -136,8 +171,6 @@

} catch (e) {
enqueue(function () {
return subscriptionObserver.error(e);
});
subscriptionObserver.error(e);
}
this._state = 'ready';
if (!this._queue) this._state = 'ready';
}

@@ -148,9 +181,5 @@

value: function unsubscribe() {
if (!subscriptionClosed(this)) {
if (this._state !== 'closed') {
closeSubscription(this);
try {
cleanupSubscription(this);
} catch (e) {
hostReportError(e);
}
cleanupSubscription(this);
}

@@ -161,3 +190,3 @@ }

get: function () {
return subscriptionClosed(this);
return this._state === 'closed';
}

@@ -179,18 +208,3 @@ }]);

value: function next(value) {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) return;
validateSubscription(subscription);
var observer = subscription._observer;
subscription._state = 'running';
try {
var m = getMethod(observer, 'next');
if (m) m.call(observer, value);
} catch (e) {
hostReportError(e);
}
if (!subscriptionClosed(subscription)) subscription._state = 'ready';
notifySubscription(this._subscription, 'next', value);
}

@@ -200,20 +214,3 @@ }, {

value: function error(value) {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) {
return;
}
validateSubscription(subscription);
var observer = subscription._observer;
closeSubscription(subscription);
try {
var m = getMethod(observer, 'error');
if (m) m.call(observer, value);else throw value;
} catch (e) {
hostReportError(e);
}
cleanupSubscription(subscription);
notifySubscription(this._subscription, 'error', value);
}

@@ -223,18 +220,3 @@ }, {

value: function complete() {
var subscription = this._subscription;
if (subscriptionClosed(subscription)) return;
validateSubscription(subscription);
var observer = subscription._observer;
closeSubscription(subscription);
try {
var m = getMethod(observer, 'complete');
if (m) m.call(observer);
} catch (e) {
hostReportError(e);
}
cleanupSubscription(subscription);
notifySubscription(this._subscription, 'complete');
}

@@ -244,3 +226,3 @@ }, {

get: function () {
return subscriptionClosed(this._subscription);
return this._subscription._state === 'closed';
}

@@ -286,6 +268,11 @@ }]);

function done() {
subscription.unsubscribe();
resolve();
}
var subscription = _this.subscribe({
next: function (value) {
try {
fn(value);
fn(value, done);
} catch (e) {

@@ -292,0 +279,0 @@ reject(e);

{
"name": "zen-observable",
"version": "0.8.3",
"version": "0.8.5",
"repository": "zenparsing/zen-observable",

@@ -5,0 +5,0 @@ "description": "An Implementation of ES Observables",

@@ -56,4 +56,2 @@ // === Symbol Support ===

function cleanupSubscription(subscription) {
// ASSERT: subscription._observer is undefined
let cleanup = subscription._cleanup;

@@ -69,30 +67,70 @@ if (cleanup === undefined)

if (typeof cleanup === 'function') {
cleanup();
} else {
let unsubscribe = getMethod(cleanup, 'unsubscribe');
if (unsubscribe) {
unsubscribe.call(cleanup);
try {
if (typeof cleanup === 'function') {
cleanup();
} else {
let unsubscribe = getMethod(cleanup, 'unsubscribe');
if (unsubscribe) {
unsubscribe.call(cleanup);
}
}
} catch (e) {
hostReportError(e);
}
}
function subscriptionClosed(subscription) {
return subscription._state === 'closed';
}
function closeSubscription(subscription) {
subscription._observer = undefined;
subscription._queue = undefined;
subscription._state = 'closed';
}
function validateSubscription(subscription) {
// ASSERT: subscription._state !== 'closed'
switch (subscription._state) {
case 'ready': break;
case 'initializing': throw new Error('Subscription is not initialized');
case 'running': throw new Error('Subscription observer is already running');
function flushSubscription(subscription) {
let queue = subscription._queue;
subscription._queue = undefined;
subscription._state = 'ready';
for (let i = 0; i < queue.length; ++i) {
notifySubscription(subscription, queue[i].type, queue[i].value);
}
}
function notifySubscription(subscription, type, value) {
if (subscription._state === 'closed')
return;
if (subscription._state !== 'ready') {
if (!subscription._queue) {
enqueue(() => flushSubscription(subscription));
subscription._queue = [];
}
subscription._queue.push({ type, value });
return;
}
let observer = subscription._observer;
try {
let m = getMethod(observer, type);
switch (type) {
case 'next':
if (m) m.call(observer, value);
break;
case 'error':
closeSubscription(subscription);
if (m) m.call(observer, value);
else throw value;
break;
case 'complete':
closeSubscription(subscription);
if (m) m.call(observer);
break;
}
} catch (e) {
hostReportError(e);
}
if (subscription._state === 'closed')
cleanupSubscription(subscription);
}
class Subscription {

@@ -106,2 +144,3 @@

this._observer = observer;
this._queue = undefined;
this._state = 'initializing';

@@ -114,17 +153,17 @@

} catch (e) {
enqueue(() => subscriptionObserver.error(e));
subscriptionObserver.error(e);
}
this._state = 'ready';
if (!this._queue)
this._state = 'ready';
}
get closed() {
return subscriptionClosed(this);
return this._state === 'closed';
}
unsubscribe() {
if (!subscriptionClosed(this)) {
if (this._state !== 'closed') {
closeSubscription(this);
try { cleanupSubscription(this) }
catch (e) { hostReportError(e) }
cleanupSubscription(this);
}

@@ -135,74 +174,7 @@ }

class SubscriptionObserver {
constructor(subscription) {
this._subscription = subscription;
}
get closed() {
return subscriptionClosed(this._subscription);
}
next(value) {
let subscription = this._subscription;
if (subscriptionClosed(subscription))
return;
validateSubscription(subscription);
let observer = subscription._observer;
subscription._state = 'running';
try {
let m = getMethod(observer, 'next');
if (m) m.call(observer, value);
} catch (e) {
hostReportError(e);
}
if (!subscriptionClosed(subscription))
subscription._state = 'ready';
}
error(value) {
let subscription = this._subscription;
if (subscriptionClosed(subscription)) {
return;
}
validateSubscription(subscription);
let observer = subscription._observer;
closeSubscription(subscription);
try {
let m = getMethod(observer, 'error');
if (m) m.call(observer, value);
else throw value;
} catch (e) {
hostReportError(e);
}
cleanupSubscription(subscription);
}
complete() {
let subscription = this._subscription;
if (subscriptionClosed(subscription))
return;
validateSubscription(subscription);
let observer = subscription._observer;
closeSubscription(subscription);
try {
let m = getMethod(observer, 'complete');
if (m) m.call(observer);
} catch (e) {
hostReportError(e);
}
cleanupSubscription(subscription);
}
constructor(subscription) { this._subscription = subscription }
get closed() { return this._subscription._state === 'closed' }
next(value) { notifySubscription(this._subscription, 'next', value) }
error(value) { notifySubscription(this._subscription, 'error', value) }
complete() { notifySubscription(this._subscription, 'complete') }
}

@@ -240,6 +212,11 @@

function done() {
subscription.unsubscribe();
resolve();
}
let subscription = this.subscribe({
next(value) {
try {
fn(value);
fn(value, done);
} catch (e) {

@@ -246,0 +223,0 @@ reject(e);

export function parse(string) {
return new Observable(observer => {
(async () => {
return new Observable(async observer => {
await null;
for (let char of string) {
if (observer.closed) return;
else if (char !== '-') observer.next(char);
await null;
for (let char of string) {
if (observer.closed) return;
else if (char !== '-') observer.next(char);
await null;
}
observer.complete();
})();
}
observer.complete();
});
}

@@ -58,2 +58,14 @@ import assert from 'assert';

it('provides a cancellation function as the second argument', async () => {
let observer;
let results = [];
await Observable.of(1, 2, 3).forEach((value, cancel) => {
results.push(value);
if (value > 1) {
return cancel();
}
});
assert.deepEqual(results, [1, 2]);
});
});

@@ -52,21 +52,21 @@ import assert from 'assert';

it('throws if the subscription is not initialized', async () => {
new Observable(x => {
try {
x.complete();
assert.ok(false);
} catch (e) {
assert.ok(true);
}
}).subscribe();
it('queues if the subscription is not initialized', async () => {
let completed = false;
new Observable(x => { x.complete() }).subscribe({
complete() { completed = true },
});
assert.equal(completed, false);
await null;
assert.equal(completed, true);
});
it('throws if the observer is running', () => {
it('does not queue if the observer is running', () => {
let observer;
let completed = false
new Observable(x => { observer = x }).subscribe({
next() {
assert.throws(() => observer.complete());
},
next() { observer.complete() },
complete() { completed = true },
});
observer.next();
assert.equal(completed, true);
});

@@ -132,3 +132,3 @@

it('throws error if the cleanup function throws', () => {
it('reports error if the cleanup function throws', () => {
let error = {};

@@ -140,8 +140,5 @@ let observer;

}).subscribe();
try {
observer.complete();
} catch (err) {
assert.equal(err, error);
}
observer.complete();
assert.equal(hostError, error);
});
});

@@ -46,27 +46,24 @@ import assert from 'assert';

observer.error(1);
assert.ok(!hostError);
});
it('throws if the subscription is not initialized', async () => {
it('queues if the subscription is not initialized', async () => {
let error;
new Observable(x => { x.error() }).subscribe({
new Observable(x => { x.error({}) }).subscribe({
error(err) { error = err },
});
assert.equal(error, undefined);
await null;
assert.ok(error instanceof Error);
assert.ok(error);
});
it('throws if the observer is running', () => {
it('does not queue if the observer is running', () => {
let observer;
let error;
new Observable(x => { observer = x }).subscribe({
next() {
try {
observer.error();
assert.ok(false);
} catch (e) {
assert.ok(true);
}
},
error() {},
next() { observer.error({}) },
error(e) { error = e },
});
observer.next();
assert.ok(error);
});

@@ -136,3 +133,3 @@

it('throws if the cleanup function throws', () => {
it('reports error if the cleanup function throws', () => {
let error = {};

@@ -143,13 +140,7 @@ let observer;

return () => { throw error };
}).subscribe({
error() {},
});
try {
observer.error(1);
assert.ok(false);
} catch (err) {
assert.equal(err, error);
}
}).subscribe();
observer.error(1);
assert.equal(hostError, error);
});
});

@@ -61,25 +61,26 @@ import assert from 'assert';

it('throws if the subscription is not initialized', async () => {
let error;
new Observable(x => { x.next() }).subscribe({
error(err) { error = err },
it('queues if the subscription is not initialized', async () => {
let values = [];
let observer;
new Observable(x => { observer = x, x.next(1) }).subscribe({
next(val) { values.push(val) },
});
assert.deepEqual(values, []);
observer.next(2);
assert.deepEqual(values, []);
await null;
assert.ok(error instanceof Error);
assert.deepEqual(values, [1, 2]);
});
it('throws if the observer is running', () => {
it('does not queue if the observer is running', () => {
let observer;
let values = [];
new Observable(x => { observer = x }).subscribe({
next() {
try {
observer.next();
assert.ok(false);
} catch (e) {
assert.ok(true);
}
next(val) {
values.push(val);
if (val === 1) observer.next(2);
},
error() {},
});
observer.next();
observer.next(1);
assert.deepEqual(values, [1, 2]);
});

@@ -86,0 +87,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc