Socket
Socket
Sign inDemoInstall

@effection/subscription

Package Overview
Dependencies
Maintainers
1
Versions
151
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effection/subscription - npm Package Compare versions

Comparing version 0.6.3-3763fbf to 0.6.3-ad0d7e2

dist/subscribeable.d.ts

1

dist/index.d.ts
export { Subscription, createSubscription } from './subscription';
export { SymbolSubscribeable, SubscriptionSource, forEach, Subscribeable } from './subscribeable';

@@ -26,9 +26,11 @@ 'use strict';

function* createSubscription(subscribe) {
var values = [];
var result;
var results = [];
var semaphore = new Semaphore();
var publish = function publish(value) {
values.push(value);
semaphore.signal(false);
results.push({
done: false,
value: value
});
semaphore.signal();
};

@@ -40,11 +42,8 @@

if (values.length > 0) {
semaphore.signal(false);
if (results.length > 0) {
semaphore.signal();
}
return Promise.resolve(wait.then(function (done) {
return {
done: done,
value: done ? result : values.shift()
};
return Promise.resolve(wait.then(function () {
return results.shift();
}));

@@ -60,6 +59,10 @@ } catch (e) {

try {
result = yield subscribe(function (value) {
var value = yield subscribe(function (value) {
return publish(value);
});
semaphore.signal(true);
results.push({
done: true,
value: value
});
semaphore.signal();
} finally {

@@ -80,3 +83,94 @@ publish = function publish(value) {

var SymbolSubscribeable = /*#__PURE__*/Symbol["for"]('Symbol.subscription');
function* _forEach(source, visit) {
var subscription = yield subscribe(source);
while (true) {
var result = yield subscription.next();
if (result.done) {
return result.value;
} else {
yield visit(result.value);
}
}
}
var Subscribeable = {
from: function from(source) {
return new Chain(source);
}
};
var Chain = /*#__PURE__*/function () {
function Chain(source) {
this.source = source;
}
var _proto = Chain.prototype;
_proto[SymbolSubscribeable] = function () {
return subscribe(this.source);
};
_proto.map = function map(fn) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
publish(fn(item));
});
};
});
};
_proto.filter = function filter(predicate) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
if (predicate(item)) {
publish(item);
}
});
};
});
};
_proto.chain = function chain(next) {
return new Chain(createSubscription(next(this.source)));
};
_proto.forEach = function forEach(visit) {
return _forEach(this.source, visit);
};
return Chain;
}();
function subscribe(source) {
if (isSubscribeable(source)) {
var subscriber = getSubscriber(source);
if (subscriber) {
return subscriber.call(source);
} else {
var error = new Error("cannot subscribe to " + source + " because it does not contain Symbol.subscription");
error.name = 'TypeError';
throw error;
}
} else {
return source;
}
}
function isSubscribeable(value) {
return !!getSubscriber(value);
}
function getSubscriber(source) {
return source[SymbolSubscribeable];
}
exports.Subscribeable = Subscribeable;
exports.SymbolSubscribeable = SymbolSubscribeable;
exports.createSubscription = createSubscription;
exports.forEach = _forEach;
//# sourceMappingURL=subscription.cjs.development.js.map

2

dist/subscription.cjs.production.min.js

@@ -1,2 +0,2 @@

"use strict";var n=require("effection"),r=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};exports.createSubscription=function*(t){var e,i=[],u=new r,o=function(n){i.push(n),u.signal(!1)};return yield n.resource({next:function(){try{var n=u.wait();return i.length>0&&u.signal(!1),Promise.resolve(n.then((function(n){return{done:n,value:n?e:i.shift()}})))}catch(n){return Promise.reject(n)}}},(function*(){try{e=yield t((function(n){return o(n)})),u.signal(!0)}finally{o=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))};
"use strict";var n=require("effection"),r=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function*t(t){var e=[],i=new r,u=function(n){e.push({done:!1,value:n}),i.signal()};return yield n.resource({next:function(){try{var n=i.wait();return e.length>0&&i.signal(),Promise.resolve(n.then((function(){return e.shift()})))}catch(n){return Promise.reject(n)}}},(function*(){try{var n=yield t((function(n){return u(n)}));e.push({done:!0,value:n}),i.signal()}finally{u=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}var e=Symbol.for("Symbol.subscription");function*i(n,r){for(var t=yield c(n);;){var e=yield t.next();if(e.done)return e.value;yield r(e.value)}}var u={from:function(n){return new o(n)}},o=function(){function n(n){this.source=n}var r=n.prototype;return r[e]=function(){return c(this.source)},r.map=function(n){return this.chain((function(r){return function(t){return i(r,(function*(r){t(n(r))}))}}))},r.filter=function(n){return this.chain((function(r){return function(t){return i(r,(function*(r){n(r)&&t(r)}))}}))},r.chain=function(r){return new n(t(r(this.source)))},r.forEach=function(n){return i(this.source,n)},n}();function c(n){if(s(n)){var r=s(n);if(r)return r.call(n);var t=new Error("cannot subscribe to "+n+" because it does not contain Symbol.subscription");throw t.name="TypeError",t}return n}function s(n){return n[e]}exports.Subscribeable=u,exports.SymbolSubscribeable=e,exports.createSubscription=t,exports.forEach=i;
//# sourceMappingURL=subscription.cjs.production.min.js.map

@@ -24,9 +24,11 @@ import { resource } from 'effection';

function* createSubscription(subscribe) {
var values = [];
var result;
var results = [];
var semaphore = new Semaphore();
var publish = function publish(value) {
values.push(value);
semaphore.signal(false);
results.push({
done: false,
value: value
});
semaphore.signal();
};

@@ -38,11 +40,8 @@

if (values.length > 0) {
semaphore.signal(false);
if (results.length > 0) {
semaphore.signal();
}
return Promise.resolve(wait.then(function (done) {
return {
done: done,
value: done ? result : values.shift()
};
return Promise.resolve(wait.then(function () {
return results.shift();
}));

@@ -58,6 +57,10 @@ } catch (e) {

try {
result = yield subscribe(function (value) {
var value = yield subscribe(function (value) {
return publish(value);
});
semaphore.signal(true);
results.push({
done: true,
value: value
});
semaphore.signal();
} finally {

@@ -78,3 +81,91 @@ publish = function publish(value) {

export { createSubscription };
var SymbolSubscribeable = /*#__PURE__*/Symbol["for"]('Symbol.subscription');
function* _forEach(source, visit) {
var subscription = yield subscribe(source);
while (true) {
var result = yield subscription.next();
if (result.done) {
return result.value;
} else {
yield visit(result.value);
}
}
}
var Subscribeable = {
from: function from(source) {
return new Chain(source);
}
};
var Chain = /*#__PURE__*/function () {
function Chain(source) {
this.source = source;
}
var _proto = Chain.prototype;
_proto[SymbolSubscribeable] = function () {
return subscribe(this.source);
};
_proto.map = function map(fn) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
publish(fn(item));
});
};
});
};
_proto.filter = function filter(predicate) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
if (predicate(item)) {
publish(item);
}
});
};
});
};
_proto.chain = function chain(next) {
return new Chain(createSubscription(next(this.source)));
};
_proto.forEach = function forEach(visit) {
return _forEach(this.source, visit);
};
return Chain;
}();
function subscribe(source) {
if (isSubscribeable(source)) {
var subscriber = getSubscriber(source);
if (subscriber) {
return subscriber.call(source);
} else {
var error = new Error("cannot subscribe to " + source + " because it does not contain Symbol.subscription");
error.name = 'TypeError';
throw error;
}
} else {
return source;
}
}
function isSubscribeable(value) {
return !!getSubscriber(value);
}
function getSubscriber(source) {
return source[SymbolSubscribeable];
}
export { Subscribeable, SymbolSubscribeable, createSubscription, _forEach as forEach };
//# sourceMappingURL=subscription.esm.js.map
{
"name": "@effection/subscription",
"version": "0.6.3-3763fbf",
"version": "0.6.3-ad0d7e2",
"description": "Effection Subscriptions",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -58,1 +58,33 @@ # @effection/subscription

no need to call the `unsubscribe()` method ever.
### SymbolSubscribeable
In order to facilitate interoperation of subscription producers and
consumers, any object can implement the `[SymbolSubscribeable]()`
method in order to be turned into a subscription. This follows the
pattern of `Symbol.iterator`, and `Symbol.observable`. Any object that
implements this method can be consumed as a subscription.
### Subscribeable.from(source)
In order to lift functions into the context of a subscription, you can
use `Subscribeable.from` which will return an instance of
`Subscribeable` that allows you to transform a subscription produced
### Subscribeable#map(fn)
Returns a new subscribeable whose items are transformed by `fn`. For
example:
``` javascript
Subscribeable.from(websocket).map(message => JSON.parse(message));
```
### Subscribeable#filter(predicate)
Return a new `Subscribeable` that only produces items from its source
that match `predicate`.
``` javascript
Subscribeable.from(websocket).filter(message => message.type === 'command');
```
export { Subscription, createSubscription } from './subscription';
export { SymbolSubscribeable, SubscriptionSource, forEach, Subscribeable } from './subscribeable';

@@ -12,18 +12,17 @@ import { Operation, resource } from 'effection';

export function * createSubscription<T, TReturn>(subscribe: Subscriber<T,TReturn>): Operation<Subscription<T,TReturn>> {
let values: T[] = [];
let result: TReturn;
let results: IteratorResult<T,TReturn>[] = [];
let semaphore = new Semaphore<boolean>();
let semaphore = new Semaphore<void>();
let publish = (value: T) => {
values.push(value);
semaphore.signal(false);
results.push({ done: false, value });
semaphore.signal();
};
let next = async () => {
let next = async (): Promise<IteratorResult<T,TReturn>> => {
let wait = semaphore.wait();
if (values.length > 0) {
semaphore.signal(false);
if (results.length > 0) {
semaphore.signal();
}
return wait.then(done => ({ done, value: done ? result : values.shift()}));
return wait.then(() => results.shift() as IteratorResult<T,TReturn>);
};

@@ -33,4 +32,5 @@

try {
result = yield subscribe((value: T) => publish(value));
semaphore.signal(true);
let value = yield subscribe((value: T) => publish(value));
results.push({ done: true, value });
semaphore.signal();
} finally {

@@ -37,0 +37,0 @@ publish = value => { throw InvalidPublication(value); }

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc