@effection/subscription
Advanced tools
Comparing version 0.6.3-3763fbf to 0.6.3-6e7f062
export { Subscription, createSubscription } from './subscription'; | ||
export { SymbolSubscribable, SubscriptionSource, forEach, Subscribable } from './subscribable'; |
@@ -26,9 +26,11 @@ 'use strict'; | ||
function* createSubscription(subscribe) { | ||
var values = []; | ||
var result; | ||
var results = []; | ||
var semaphore = new Semaphore(); | ||
var publish = function publish(value) { | ||
values.push(value); | ||
semaphore.signal(false); | ||
results.push({ | ||
done: false, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
}; | ||
@@ -40,11 +42,8 @@ | ||
if (values.length > 0) { | ||
semaphore.signal(false); | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
} | ||
return Promise.resolve(wait.then(function (done) { | ||
return { | ||
done: done, | ||
value: done ? result : values.shift() | ||
}; | ||
return Promise.resolve(wait.then(function () { | ||
return results.shift(); | ||
})); | ||
@@ -60,6 +59,10 @@ } catch (e) { | ||
try { | ||
result = yield subscribe(function (value) { | ||
var value = yield subscribe(function (value) { | ||
return publish(value); | ||
}); | ||
semaphore.signal(true); | ||
results.push({ | ||
done: true, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
} finally { | ||
@@ -80,3 +83,108 @@ publish = function publish(value) { | ||
var SymbolSubscribable = /*#__PURE__*/Symbol["for"]('Symbol.subscription'); | ||
function* _forEach(source, visit) { | ||
var subscription = yield subscribe(source); | ||
while (true) { | ||
var result = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
} | ||
var Subscribable = { | ||
from: function from(source) { | ||
return new Chain(source); | ||
} | ||
}; | ||
var Chain = /*#__PURE__*/function () { | ||
function Chain(source) { | ||
this.source = source; | ||
} | ||
var _proto = Chain.prototype; | ||
_proto[SymbolSubscribable] = function () { | ||
return subscribe(this.source); | ||
}; | ||
_proto.map = function map(fn) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
publish(fn(item)); | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.filter = function filter(predicate) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.chain = function chain(next) { | ||
return new Chain(createSubscription(next(this.source))); | ||
}; | ||
_proto.forEach = function forEach(visit) { | ||
return _forEach(this.source, visit); | ||
}; | ||
_proto.first = function* first() { | ||
var subscription = yield subscribe(this.source); | ||
var _yield$subscription$n = yield subscription.next(), | ||
done = _yield$subscription$n.done, | ||
value = _yield$subscription$n.value; | ||
if (done) { | ||
return undefined; | ||
} else { | ||
return value; | ||
} | ||
}; | ||
return Chain; | ||
}(); | ||
function subscribe(source) { | ||
if (isSubscribable(source)) { | ||
var subscriber = getSubscriber(source); | ||
if (subscriber) { | ||
return subscriber.call(source); | ||
} else { | ||
var error = new Error("cannot subscribe to " + source + " because it does not contain Symbol.subscription"); | ||
error.name = 'TypeError'; | ||
throw error; | ||
} | ||
} else { | ||
return source; | ||
} | ||
} | ||
function isSubscribable(value) { | ||
return !!getSubscriber(value); | ||
} | ||
function getSubscriber(source) { | ||
return source[SymbolSubscribable]; | ||
} | ||
exports.Subscribable = Subscribable; | ||
exports.SymbolSubscribable = SymbolSubscribable; | ||
exports.createSubscription = createSubscription; | ||
exports.forEach = _forEach; | ||
//# sourceMappingURL=subscription.cjs.development.js.map |
@@ -1,2 +0,2 @@ | ||
"use strict";var n=require("effection"),r=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};exports.createSubscription=function*(t){var e,i=[],u=new r,o=function(n){i.push(n),u.signal(!1)};return yield n.resource({next:function(){try{var n=u.wait();return i.length>0&&u.signal(!1),Promise.resolve(n.then((function(n){return{done:n,value:n?e:i.shift()}})))}catch(n){return Promise.reject(n)}}},(function*(){try{e=yield t((function(n){return o(n)})),u.signal(!0)}finally{o=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}; | ||
"use strict";var n=require("effection"),r=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function*t(t){var e=[],i=new r,u=function(n){e.push({done:!1,value:n}),i.signal()};return yield n.resource({next:function(){try{var n=i.wait();return e.length>0&&i.signal(),Promise.resolve(n.then((function(){return e.shift()})))}catch(n){return Promise.reject(n)}}},(function*(){try{var n=yield t((function(n){return u(n)}));e.push({done:!0,value:n}),i.signal()}finally{u=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}var e=Symbol.for("Symbol.subscription");function*i(n,r){for(var t=yield c(n);;){var e=yield t.next();if(e.done)return e.value;yield r(e.value)}}var u={from:function(n){return new o(n)}},o=function(){function n(n){this.source=n}var r=n.prototype;return r[e]=function(){return c(this.source)},r.map=function(n){return this.chain((function(r){return function(t){return i(r,(function*(r){t(n(r))}))}}))},r.filter=function(n){return this.chain((function(r){return function(t){return i(r,(function*(r){n(r)&&t(r)}))}}))},r.chain=function(r){return new n(t(r(this.source)))},r.forEach=function(n){return i(this.source,n)},r.first=function*(){var n=yield c(this.source),r=yield n.next();return r.done?void 0:r.value},n}();function c(n){if(s(n)){var r=s(n);if(r)return r.call(n);var t=new Error("cannot subscribe to "+n+" because it does not contain Symbol.subscription");throw t.name="TypeError",t}return n}function s(n){return n[e]}exports.Subscribable=u,exports.SymbolSubscribable=e,exports.createSubscription=t,exports.forEach=i; | ||
//# sourceMappingURL=subscription.cjs.production.min.js.map |
@@ -24,9 +24,11 @@ import { resource } from 'effection'; | ||
function* createSubscription(subscribe) { | ||
var values = []; | ||
var result; | ||
var results = []; | ||
var semaphore = new Semaphore(); | ||
var publish = function publish(value) { | ||
values.push(value); | ||
semaphore.signal(false); | ||
results.push({ | ||
done: false, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
}; | ||
@@ -38,11 +40,8 @@ | ||
if (values.length > 0) { | ||
semaphore.signal(false); | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
} | ||
return Promise.resolve(wait.then(function (done) { | ||
return { | ||
done: done, | ||
value: done ? result : values.shift() | ||
}; | ||
return Promise.resolve(wait.then(function () { | ||
return results.shift(); | ||
})); | ||
@@ -58,6 +57,10 @@ } catch (e) { | ||
try { | ||
result = yield subscribe(function (value) { | ||
var value = yield subscribe(function (value) { | ||
return publish(value); | ||
}); | ||
semaphore.signal(true); | ||
results.push({ | ||
done: true, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
} finally { | ||
@@ -78,3 +81,105 @@ publish = function publish(value) { | ||
export { createSubscription }; | ||
var SymbolSubscribable = /*#__PURE__*/Symbol["for"]('Symbol.subscription'); | ||
function* _forEach(source, visit) { | ||
var subscription = yield subscribe(source); | ||
while (true) { | ||
var result = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
} | ||
var Subscribable = { | ||
from: function from(source) { | ||
return new Chain(source); | ||
} | ||
}; | ||
var Chain = /*#__PURE__*/function () { | ||
function Chain(source) { | ||
this.source = source; | ||
} | ||
var _proto = Chain.prototype; | ||
_proto[SymbolSubscribable] = function () { | ||
return subscribe(this.source); | ||
}; | ||
_proto.map = function map(fn) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
publish(fn(item)); | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.filter = function filter(predicate) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.chain = function chain(next) { | ||
return new Chain(createSubscription(next(this.source))); | ||
}; | ||
_proto.forEach = function forEach(visit) { | ||
return _forEach(this.source, visit); | ||
}; | ||
_proto.first = function* first() { | ||
var subscription = yield subscribe(this.source); | ||
var _yield$subscription$n = yield subscription.next(), | ||
done = _yield$subscription$n.done, | ||
value = _yield$subscription$n.value; | ||
if (done) { | ||
return undefined; | ||
} else { | ||
return value; | ||
} | ||
}; | ||
return Chain; | ||
}(); | ||
function subscribe(source) { | ||
if (isSubscribable(source)) { | ||
var subscriber = getSubscriber(source); | ||
if (subscriber) { | ||
return subscriber.call(source); | ||
} else { | ||
var error = new Error("cannot subscribe to " + source + " because it does not contain Symbol.subscription"); | ||
error.name = 'TypeError'; | ||
throw error; | ||
} | ||
} else { | ||
return source; | ||
} | ||
} | ||
function isSubscribable(value) { | ||
return !!getSubscriber(value); | ||
} | ||
function getSubscriber(source) { | ||
return source[SymbolSubscribable]; | ||
} | ||
export { Subscribable, SymbolSubscribable, createSubscription, _forEach as forEach }; | ||
//# sourceMappingURL=subscription.esm.js.map |
{ | ||
"name": "@effection/subscription", | ||
"version": "0.6.3-3763fbf", | ||
"version": "0.6.3-6e7f062", | ||
"description": "Effection Subscriptions", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -58,1 +58,33 @@ # @effection/subscription | ||
no need to call the `unsubscribe()` method ever. | ||
### SymbolSubscribable | ||
In order to facilitate interoperation of subscription producers and | ||
consumers, any object can implement the `[SymbolSubscribable]()` | ||
method in order to be turned into a subscription. This follows the | ||
pattern of `Symbol.iterator`, and `Symbol.observable`. Any object that | ||
implements this method can be consumed as a subscription. | ||
### Subscribable.from(source) | ||
In order to lift functions into the context of a subscription, you can | ||
use `Subscribable.from` which will return an instance of | ||
`Subscribable` that allows you to transform a subscription produced | ||
### Subscribable#map(fn) | ||
Returns a new subscribable whose items are transformed by `fn`. For | ||
example: | ||
``` javascript | ||
Subscribable.from(websocket).map(message => JSON.parse(message)); | ||
``` | ||
### Subscribable#filter(predicate) | ||
Return a new `Subscribable` that only produces items from its source | ||
that match `predicate`. | ||
``` javascript | ||
Subscribable.from(websocket).filter(message => message.type === 'command'); | ||
``` |
export { Subscription, createSubscription } from './subscription'; | ||
export { SymbolSubscribable, SubscriptionSource, forEach, Subscribable } from './subscribable'; |
@@ -12,18 +12,17 @@ import { Operation, resource } from 'effection'; | ||
export function * createSubscription<T, TReturn>(subscribe: Subscriber<T,TReturn>): Operation<Subscription<T,TReturn>> { | ||
let values: T[] = []; | ||
let result: TReturn; | ||
let results: IteratorResult<T,TReturn>[] = []; | ||
let semaphore = new Semaphore<boolean>(); | ||
let semaphore = new Semaphore<void>(); | ||
let publish = (value: T) => { | ||
values.push(value); | ||
semaphore.signal(false); | ||
results.push({ done: false, value }); | ||
semaphore.signal(); | ||
}; | ||
let next = async () => { | ||
let next = async (): Promise<IteratorResult<T,TReturn>> => { | ||
let wait = semaphore.wait(); | ||
if (values.length > 0) { | ||
semaphore.signal(false); | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
} | ||
return wait.then(done => ({ done, value: done ? result : values.shift()})); | ||
return wait.then(() => results.shift() as IteratorResult<T,TReturn>); | ||
}; | ||
@@ -33,4 +32,5 @@ | ||
try { | ||
result = yield subscribe((value: T) => publish(value)); | ||
semaphore.signal(true); | ||
let value = yield subscribe((value: T) => publish(value)); | ||
results.push({ done: true, value }); | ||
semaphore.signal(); | ||
} finally { | ||
@@ -37,0 +37,0 @@ publish = value => { throw InvalidPublication(value); } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
44927
18
474
90
1