Socket
Socket
Sign inDemoInstall

zen-observable

Package Overview
Dependencies
Maintainers
1
Versions
37
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zen-observable - npm Package Compare versions

Comparing version 0.1.3 to 0.1.5

LICENSE

7

package.json
{
"name": "zen-observable",
"version": "0.1.3",
"version": "0.1.5",
"description": "An Implementation of ES Observables",
"homepage": "https://github.com/zenparsing/zen-observable"
"homepage": "https://github.com/zenparsing/zen-observable",
"dependencies": {
"es-observable-tests": "^0.1.7"
}
}

@@ -5,2 +5,6 @@ ## zen-observable

Requires ES6 Promises or a Promise polyfill.
### Install
```

@@ -10,2 +14,23 @@ npm install zen-observable

Requires an ES6 Promise polyfill.
### Download
- [zen-observable.js](https://raw.githubusercontent.com/zenparsing/zen-observable/master/zen-observable.js)
### Usage
Node:
```js
var Observable = require("zen-observable");
Observable.of(1, 2, 3).forEach(x => console.log(x));
```
Browser:
```html
<script src="zen-observable.js"></script>
<script>
Observable.of(1, 2, 3).forEach(x => console.log(x));
</script>
```

@@ -127,4 +127,19 @@ // === Non-Promise Job Queueing ===

function SubscriptionObserver(observer, subscriber) {
function closeSubscription(observer) {
if (subscriptionClosed(observer))
return;
observer._observer = undefined;
cleanupSubscription(observer);
}
function cleanupFromSubscription(subscription) {
// TODO: Should we get the method out and apply it here, instead of
// looking up the method at call time?
return _=> { subscription.unsubscribe() };
}
function createSubscription(observer, subscriber) {
// Assert: subscriber is callable

@@ -136,16 +151,30 @@

this._observer = observer;
this._cleanup = undefined;
// TODO: Should we check for a "next" method here?
let subscriptionObserver = new SubscriptionObserver(observer),
subscription = new Subscription(subscriptionObserver),
start = getMethod(observer, "start");
if (start)
start.call(observer, subscription);
if (subscriptionClosed(subscriptionObserver))
return subscription;
try {
// Call the subscriber function
let cleanup = subscriber.call(undefined, this);
let cleanup = subscriber.call(undefined, subscriptionObserver);
// The return value must be undefined, null, or a function
if (cleanup != null && typeof cleanup !== "function")
throw new TypeError(cleanup + " is not a function");
// The return value must be undefined, null, a subscription object, or a function
if (cleanup != null) {
this._cleanup = cleanup;
if (typeof cleanup.unsubscribe === "function")
cleanup = cleanupFromSubscription(cleanup);
else if (typeof cleanup !== "function")
throw new TypeError(cleanup + " is not a function");
subscriptionObserver._cleanup = cleanup;
}
} catch (e) {

@@ -155,22 +184,21 @@

// to the observer
this.error(e);
return;
subscriptionObserver.error(e);
return subscription;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
if (subscriptionClosed(subscriptionObserver))
cleanupSubscription(subscriptionObserver);
return subscription;
}
addMethods(SubscriptionObserver.prototype = {}, {
function SubscriptionObserver(observer) {
cancel() {
this._observer = observer;
this._cleanup = undefined;
}
if (subscriptionClosed(this))
return;
addMethods(SubscriptionObserver.prototype = {}, {
this._observer = undefined;
cleanupSubscription(this);
},
get closed() { return subscriptionClosed(this) },

@@ -200,3 +228,3 @@

// If the observer throws, then close the stream and rethrow the error
try { this.cancel() }
try { closeSubscription(this) }
finally { throw e }

@@ -265,2 +293,10 @@ }

function Subscription(observer) {
this._observer = observer;
}
addMethods(Subscription.prototype = {}, {
unsubscribe() { closeSubscription(this._observer) }
});
export function Observable(subscriber) {

@@ -279,5 +315,3 @@

// Wrap the observer in order to maintain observation invariants
observer = new SubscriptionObserver(observer, this._subscriber);
return _=> { observer.cancel() };
return createSubscription(observer, this._subscriber);
},

@@ -284,0 +318,0 @@

@@ -1,4 +0,4 @@

var runTests = require("es-observable-tests.js").runTests;
var runTests = require("es-observable-tests").runTests;
var Observable = require("../zen-observable.js").Observable;
runTests(Observable);

@@ -1,2 +0,2 @@

/*=esdown=*/(function(fn, deps, name) { function obj() { return {} } if (typeof exports !== 'undefined') fn(require, exports, module); else if (typeof define === 'function' && define.amd) define(['require', 'exports', 'module'].concat(deps), fn); else if (typeof self !== 'undefined' && name) fn(obj, name === '*' ? self : (self[name] = {}), {}); else fn(obj, {}, {}); })(function(require, exports, module) { 'use strict'; function __load(p, l) { module.__es6 = !l; var e = require(p); if (e && e.constructor !== Object) e.default = e; return e; } // === Non-Promise Job Queueing ===
/*=esdown=*/(function(fn, name) { if (typeof exports !== 'undefined') fn(require, exports, module); else if (typeof self !== 'undefined') fn(void 0, name === '*' ? self : (name ? self[name] = {} : {})); })(function(require, exports, module) { 'use strict'; // === Non-Promise Job Queueing ===

@@ -127,4 +127,19 @@ var enqueueJob = (function() {

function SubscriptionObserver(observer, subscriber) {
function closeSubscription(observer) {
if (subscriptionClosed(observer))
return;
observer._observer = undefined;
cleanupSubscription(observer);
}
function cleanupFromSubscription(subscription) {
// TODO: Should we get the method out and apply it here, instead of
// looking up the method at call time?
return function(_) { subscription.unsubscribe() };
}
function createSubscription(observer, subscriber) {
// Assert: subscriber is callable

@@ -136,16 +151,30 @@

this._observer = observer;
this._cleanup = undefined;
// TODO: Should we check for a "next" method here?
var subscriptionObserver = new SubscriptionObserver(observer),
subscription = new Subscription(subscriptionObserver),
start = getMethod(observer, "start");
if (start)
start.call(observer, subscription);
if (subscriptionClosed(subscriptionObserver))
return subscription;
try {
// Call the subscriber function
var cleanup$0 = subscriber.call(undefined, this);
var cleanup$0 = subscriber.call(undefined, subscriptionObserver);
// The return value must be undefined, null, or a function
if (cleanup$0 != null && typeof cleanup$0 !== "function")
throw new TypeError(cleanup$0 + " is not a function");
// The return value must be undefined, null, a subscription object, or a function
if (cleanup$0 != null) {
this._cleanup = cleanup$0;
if (typeof cleanup$0.unsubscribe === "function")
cleanup$0 = cleanupFromSubscription(cleanup$0);
else if (typeof cleanup$0 !== "function")
throw new TypeError(cleanup$0 + " is not a function");
subscriptionObserver._cleanup = cleanup$0;
}
} catch (e) {

@@ -155,22 +184,21 @@

// to the observer
this.error(e);
return;
subscriptionObserver.error(e);
return subscription;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
if (subscriptionClosed(subscriptionObserver))
cleanupSubscription(subscriptionObserver);
return subscription;
}
addMethods(SubscriptionObserver.prototype = {}, {
function SubscriptionObserver(observer) {
cancel: function() {
this._observer = observer;
this._cleanup = undefined;
}
if (subscriptionClosed(this))
return;
addMethods(SubscriptionObserver.prototype = {}, {
this._observer = undefined;
cleanupSubscription(this);
},
get closed() { return subscriptionClosed(this) },

@@ -200,3 +228,3 @@

// If the observer throws, then close the stream and rethrow the error
try { this.cancel() }
try { closeSubscription(this) }
finally { throw e }

@@ -265,2 +293,10 @@ }

function Subscription(observer) {
this._observer = observer;
}
addMethods(Subscription.prototype = {}, {
unsubscribe: function() { closeSubscription(this._observer) }
});
function Observable(subscriber) {

@@ -279,5 +315,3 @@

// Wrap the observer in order to maintain observation invariants
observer = new SubscriptionObserver(observer, this._subscriber);
return function(_) { observer.cancel() };
return createSubscription(observer, this._subscriber);
},

@@ -464,2 +498,2 @@

}, [], "*");
}, "*");
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc