@effection/subscription
Advanced tools
Comparing version 0.6.3-ad0d7e2 to 0.6.3-c47cc6c
export { Subscription, createSubscription } from './subscription'; | ||
export { SymbolSubscribeable, SubscriptionSource, forEach, Subscribeable } from './subscribeable'; |
@@ -26,11 +26,9 @@ 'use strict'; | ||
function* createSubscription(subscribe) { | ||
var results = []; | ||
var values = []; | ||
var result; | ||
var semaphore = new Semaphore(); | ||
var publish = function publish(value) { | ||
results.push({ | ||
done: false, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
values.push(value); | ||
semaphore.signal(false); | ||
}; | ||
@@ -42,8 +40,11 @@ | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
if (values.length > 0) { | ||
semaphore.signal(false); | ||
} | ||
return Promise.resolve(wait.then(function () { | ||
return results.shift(); | ||
return Promise.resolve(wait.then(function (done) { | ||
return { | ||
done: done, | ||
value: done ? result : values.shift() | ||
}; | ||
})); | ||
@@ -59,10 +60,6 @@ } catch (e) { | ||
try { | ||
var value = yield subscribe(function (value) { | ||
result = yield subscribe(function (value) { | ||
return publish(value); | ||
}); | ||
results.push({ | ||
done: true, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
semaphore.signal(true); | ||
} finally { | ||
@@ -83,94 +80,3 @@ publish = function publish(value) { | ||
var SymbolSubscribeable = /*#__PURE__*/Symbol["for"]('Symbol.subscription'); | ||
function* _forEach(source, visit) { | ||
var subscription = yield subscribe(source); | ||
while (true) { | ||
var result = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
} | ||
var Subscribeable = { | ||
from: function from(source) { | ||
return new Chain(source); | ||
} | ||
}; | ||
var Chain = /*#__PURE__*/function () { | ||
function Chain(source) { | ||
this.source = source; | ||
} | ||
var _proto = Chain.prototype; | ||
_proto[SymbolSubscribeable] = function () { | ||
return subscribe(this.source); | ||
}; | ||
_proto.map = function map(fn) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
publish(fn(item)); | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.filter = function filter(predicate) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.chain = function chain(next) { | ||
return new Chain(createSubscription(next(this.source))); | ||
}; | ||
_proto.forEach = function forEach(visit) { | ||
return _forEach(this.source, visit); | ||
}; | ||
return Chain; | ||
}(); | ||
function subscribe(source) { | ||
if (isSubscribeable(source)) { | ||
var subscriber = getSubscriber(source); | ||
if (subscriber) { | ||
return subscriber.call(source); | ||
} else { | ||
var error = new Error("cannot subscribe to " + source + " because it does not contain Symbol.subscription"); | ||
error.name = 'TypeError'; | ||
throw error; | ||
} | ||
} else { | ||
return source; | ||
} | ||
} | ||
function isSubscribeable(value) { | ||
return !!getSubscriber(value); | ||
} | ||
function getSubscriber(source) { | ||
return source[SymbolSubscribeable]; | ||
} | ||
exports.Subscribeable = Subscribeable; | ||
exports.SymbolSubscribeable = SymbolSubscribeable; | ||
exports.createSubscription = createSubscription; | ||
exports.forEach = _forEach; | ||
//# sourceMappingURL=subscription.cjs.development.js.map |
@@ -1,2 +0,2 @@ | ||
"use strict";var n=require("effection"),r=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function*t(t){var e=[],i=new r,u=function(n){e.push({done:!1,value:n}),i.signal()};return yield n.resource({next:function(){try{var n=i.wait();return e.length>0&&i.signal(),Promise.resolve(n.then((function(){return e.shift()})))}catch(n){return Promise.reject(n)}}},(function*(){try{var n=yield t((function(n){return u(n)}));e.push({done:!0,value:n}),i.signal()}finally{u=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}var e=Symbol.for("Symbol.subscription");function*i(n,r){for(var t=yield c(n);;){var e=yield t.next();if(e.done)return e.value;yield r(e.value)}}var u={from:function(n){return new o(n)}},o=function(){function n(n){this.source=n}var r=n.prototype;return r[e]=function(){return c(this.source)},r.map=function(n){return this.chain((function(r){return function(t){return i(r,(function*(r){t(n(r))}))}}))},r.filter=function(n){return this.chain((function(r){return function(t){return i(r,(function*(r){n(r)&&t(r)}))}}))},r.chain=function(r){return new n(t(r(this.source)))},r.forEach=function(n){return i(this.source,n)},n}();function c(n){if(s(n)){var r=s(n);if(r)return r.call(n);var t=new Error("cannot subscribe to "+n+" because it does not contain Symbol.subscription");throw t.name="TypeError",t}return n}function s(n){return n[e]}exports.Subscribeable=u,exports.SymbolSubscribeable=e,exports.createSubscription=t,exports.forEach=i; | ||
"use strict";var n=require("effection"),r=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};exports.createSubscription=function*(t){var e,i=[],u=new r,o=function(n){i.push(n),u.signal(!1)};return yield n.resource({next:function(){try{var n=u.wait();return i.length>0&&u.signal(!1),Promise.resolve(n.then((function(n){return{done:n,value:n?e:i.shift()}})))}catch(n){return Promise.reject(n)}}},(function*(){try{e=yield t((function(n){return o(n)})),u.signal(!0)}finally{o=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}; | ||
//# sourceMappingURL=subscription.cjs.production.min.js.map |
@@ -24,11 +24,9 @@ import { resource } from 'effection'; | ||
function* createSubscription(subscribe) { | ||
var results = []; | ||
var values = []; | ||
var result; | ||
var semaphore = new Semaphore(); | ||
var publish = function publish(value) { | ||
results.push({ | ||
done: false, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
values.push(value); | ||
semaphore.signal(false); | ||
}; | ||
@@ -40,8 +38,11 @@ | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
if (values.length > 0) { | ||
semaphore.signal(false); | ||
} | ||
return Promise.resolve(wait.then(function () { | ||
return results.shift(); | ||
return Promise.resolve(wait.then(function (done) { | ||
return { | ||
done: done, | ||
value: done ? result : values.shift() | ||
}; | ||
})); | ||
@@ -57,10 +58,6 @@ } catch (e) { | ||
try { | ||
var value = yield subscribe(function (value) { | ||
result = yield subscribe(function (value) { | ||
return publish(value); | ||
}); | ||
results.push({ | ||
done: true, | ||
value: value | ||
}); | ||
semaphore.signal(); | ||
semaphore.signal(true); | ||
} finally { | ||
@@ -81,91 +78,3 @@ publish = function publish(value) { | ||
var SymbolSubscribeable = /*#__PURE__*/Symbol["for"]('Symbol.subscription'); | ||
function* _forEach(source, visit) { | ||
var subscription = yield subscribe(source); | ||
while (true) { | ||
var result = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
} | ||
var Subscribeable = { | ||
from: function from(source) { | ||
return new Chain(source); | ||
} | ||
}; | ||
var Chain = /*#__PURE__*/function () { | ||
function Chain(source) { | ||
this.source = source; | ||
} | ||
var _proto = Chain.prototype; | ||
_proto[SymbolSubscribeable] = function () { | ||
return subscribe(this.source); | ||
}; | ||
_proto.map = function map(fn) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
publish(fn(item)); | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.filter = function filter(predicate) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.chain = function chain(next) { | ||
return new Chain(createSubscription(next(this.source))); | ||
}; | ||
_proto.forEach = function forEach(visit) { | ||
return _forEach(this.source, visit); | ||
}; | ||
return Chain; | ||
}(); | ||
function subscribe(source) { | ||
if (isSubscribeable(source)) { | ||
var subscriber = getSubscriber(source); | ||
if (subscriber) { | ||
return subscriber.call(source); | ||
} else { | ||
var error = new Error("cannot subscribe to " + source + " because it does not contain Symbol.subscription"); | ||
error.name = 'TypeError'; | ||
throw error; | ||
} | ||
} else { | ||
return source; | ||
} | ||
} | ||
function isSubscribeable(value) { | ||
return !!getSubscriber(value); | ||
} | ||
function getSubscriber(source) { | ||
return source[SymbolSubscribeable]; | ||
} | ||
export { Subscribeable, SymbolSubscribeable, createSubscription, _forEach as forEach }; | ||
export { createSubscription }; | ||
//# sourceMappingURL=subscription.esm.js.map |
{ | ||
"name": "@effection/subscription", | ||
"version": "0.6.3-ad0d7e2", | ||
"version": "0.6.3-c47cc6c", | ||
"description": "Effection Subscriptions", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -58,33 +58,1 @@ # @effection/subscription | ||
no need to call the `unsubscribe()` method ever. | ||
### SymbolSubscribeable | ||
In order to facilitate interoperation of subscription producers and | ||
consumers, any object can implement the `[SymbolSubscribeable]()` | ||
method in order to be turned into a subscription. This follows the | ||
pattern of `Symbol.iterator`, and `Symbol.observable`. Any object that | ||
implements this method can be consumed as a subscription. | ||
### Subscribeable.from(source) | ||
In order to lift functions into the context of a subscription, you can | ||
use `Subscribeable.from` which will return an instance of | ||
`Subscribeable` that allows you to transform a subscription produced | ||
### Subscribeable#map(fn) | ||
Returns a new subscribeable whose items are transformed by `fn`. For | ||
example: | ||
``` javascript | ||
Subscribeable.from(websocket).map(message => JSON.parse(message)); | ||
``` | ||
### Subscribeable#filter(predicate) | ||
Return a new `Subscribeable` that only produces items from its source | ||
that match `predicate`. | ||
``` javascript | ||
Subscribeable.from(websocket).filter(message => message.type === 'command'); | ||
``` |
export { Subscription, createSubscription } from './subscription'; | ||
export { SymbolSubscribeable, SubscriptionSource, forEach, Subscribeable } from './subscribeable'; |
@@ -12,17 +12,18 @@ import { Operation, resource } from 'effection'; | ||
export function * createSubscription<T, TReturn>(subscribe: Subscriber<T,TReturn>): Operation<Subscription<T,TReturn>> { | ||
let results: IteratorResult<T,TReturn>[] = []; | ||
let values: T[] = []; | ||
let result: TReturn; | ||
let semaphore = new Semaphore<void>(); | ||
let semaphore = new Semaphore<boolean>(); | ||
let publish = (value: T) => { | ||
results.push({ done: false, value }); | ||
semaphore.signal(); | ||
values.push(value); | ||
semaphore.signal(false); | ||
}; | ||
let next = async (): Promise<IteratorResult<T,TReturn>> => { | ||
let next = async () => { | ||
let wait = semaphore.wait(); | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
if (values.length > 0) { | ||
semaphore.signal(false); | ||
} | ||
return wait.then(() => results.shift() as IteratorResult<T,TReturn>); | ||
return wait.then(done => ({ done, value: done ? result : values.shift()})); | ||
}; | ||
@@ -32,5 +33,4 @@ | ||
try { | ||
let value = yield subscribe((value: T) => publish(value)); | ||
results.push({ done: true, value }); | ||
semaphore.signal(); | ||
result = yield subscribe((value: T) => publish(value)); | ||
semaphore.signal(true); | ||
} finally { | ||
@@ -37,0 +37,0 @@ publish = value => { throw InvalidPublication(value); } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
18927
16
196
58