@effection/subscription
Advanced tools
Comparing version 0.8.1 to 0.9.0-25b68eb
# @effection/subscription | ||
## 0.9.0 | ||
### Minor Changes | ||
- 786b20e: make the type of SymbolSubscribable global, not just the value | ||
## 0.8.1 | ||
@@ -4,0 +10,0 @@ |
import { Operation } from 'effection'; | ||
import { Subscription } from './subscription'; | ||
import { DeepPartial } from './match'; | ||
import { SubscriptionSource } from './subscribable'; | ||
import { SubscriptionSource } from './subscription-source'; | ||
export declare class ChainableSubscription<T, TReturn> implements Subscription<T, TReturn> { | ||
@@ -9,2 +9,3 @@ private subscription; | ||
static of<T, TReturn>(source: SubscriptionSource<T, TReturn>): Operation<ChainableSubscription<T, TReturn>>; | ||
static wrap<T, TReturn, R = T>(source: Operation<ChainableSubscription<T, TReturn>>, fn: (inner: ChainableSubscription<T, TReturn>) => ChainableSubscription<R, TReturn>): Operation<ChainableSubscription<R, TReturn>>; | ||
filter(predicate: (value: T) => boolean): ChainableSubscription<T, TReturn>; | ||
@@ -11,0 +12,0 @@ match(reference: DeepPartial<T>): ChainableSubscription<T, TReturn>; |
@@ -1,7 +0,11 @@ | ||
export { Subscription, createSubscription } from './subscription'; | ||
export { SymbolSubscribable, SubscriptionSource, forEach, Subscribable } from './subscribable'; | ||
export { Subscription } from './subscription'; | ||
export { createSubscription } from './create-subscription'; | ||
export { SymbolSubscribable } from './symbol-subscribable'; | ||
export { Subscribable } from './subscribable'; | ||
export { SubscriptionSource } from './subscription-source'; | ||
export { ChainableSubscription } from './chainable-subscription'; | ||
export { ChainableSubscribable } from './chainable-subscribable'; | ||
export { subscribe } from './subscribe'; | ||
import { Operation } from 'effection'; | ||
import { ChainableSubscription } from './chainable-subscription'; | ||
import { SubscriptionSource } from './subscribable'; | ||
export declare function subscribe<T, TReturn>(source: SubscriptionSource<T, TReturn>): Operation<ChainableSubscription<T, TReturn>>; | ||
import { SubscriptionSource } from './subscription-source'; | ||
export declare function forEach<T, TReturn>(source: SubscriptionSource<T, TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn>; |
import { Operation } from 'effection'; | ||
import { Subscription, Subscriber } from './subscription'; | ||
import { DeepPartial } from './match'; | ||
export declare const SymbolSubscribable: unique symbol; | ||
import { Subscription } from './subscription'; | ||
import { SymbolSubscribable } from './symbol-subscribable'; | ||
import { SubscriptionSource } from './subscription-source'; | ||
export interface Subscribable<T, TReturn> { | ||
[SymbolSubscribable](): Operation<Subscription<T, TReturn>>; | ||
} | ||
export declare type SubscriptionSource<T, TReturn> = Subscribable<T, TReturn> | Operation<Subscription<T, TReturn>>; | ||
export declare function forEach<T, TReturn>(source: SubscriptionSource<T, TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn>; | ||
export declare const Subscribable: { | ||
from: <T, TReturn>(source: SubscriptionSource<T, TReturn>) => Chain<T, TReturn>; | ||
from<T, TReturn>(source: SubscriptionSource<T, TReturn>): (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>); | ||
}; | ||
export declare class Chain<T, TReturn> implements Subscribable<T, TReturn> { | ||
private source; | ||
constructor(source: SubscriptionSource<T, TReturn>); | ||
[SymbolSubscribable](): Operation<Subscription<T, TReturn>>; | ||
map<X>(fn: (value: T) => X): Chain<X, TReturn>; | ||
filter(predicate: (value: T) => boolean): Chain<T, TReturn>; | ||
match(reference: DeepPartial<T>): Chain<T, TReturn>; | ||
chain<X = T, XReturn = TReturn>(next: (source: SubscriptionSource<T, TReturn>) => Subscriber<X, XReturn>): Chain<X, XReturn>; | ||
forEach(visit: (value: T) => Operation<void>): Operation<TReturn>; | ||
first(): Operation<T | undefined>; | ||
} | ||
export declare function subscribe<T, TReturn>(source: SubscriptionSource<T, TReturn>): Operation<Subscription<T, TReturn>>; |
@@ -23,84 +23,26 @@ 'use strict'; | ||
/** | ||
* Hack Zone! | ||
* | ||
* tl;dr not only the symbol value, but also the symbol type must be | ||
* global. | ||
* | ||
* In order to satisfy TypeScript that the SymbolSubscribable | ||
* represents the same value across different versions of the same | ||
* package, we need to declare a "fake" global variable that does not | ||
* actually exist, but has a theoretical type. Since it is global, | ||
* TypeScript will see it as the same type everywhere, and so we can | ||
* use the `typeof` for the value, as the type of of our local symbol | ||
* subscribable, and TypeScript will think that all of the types are | ||
* the same. | ||
* | ||
* See https://github.com/microsoft/TypeScript/issues/8099#issuecomment-210134773 | ||
* for details. | ||
*/ | ||
var SymbolSubscribable = /*#__PURE__*/Symbol["for"]('Symbol.subscription'); | ||
function* _forEach(source, visit) { | ||
var subscription = yield subscribe(source); | ||
while (true) { | ||
var result = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
function isSubscribable(value) { | ||
return !!value[SymbolSubscribable]; | ||
} | ||
var Subscribable = { | ||
from: function from(source) { | ||
return new Chain(source); | ||
} | ||
}; | ||
var Chain = /*#__PURE__*/function () { | ||
function Chain(source) { | ||
this.source = source; | ||
} | ||
var _proto = Chain.prototype; | ||
_proto[SymbolSubscribable] = function () { | ||
return subscribe(this.source); | ||
}; | ||
_proto.map = function map(fn) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
publish(fn(item)); | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.filter = function filter(predicate) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.match = function match(reference) { | ||
return this.filter(matcher(reference)); | ||
}; | ||
_proto.chain = function chain(next) { | ||
return new Chain(createSubscription(next(this.source))); | ||
}; | ||
_proto.forEach = function forEach(visit) { | ||
return _forEach(this.source, visit); | ||
}; | ||
_proto.first = function* first() { | ||
var subscription = yield subscribe(this.source); | ||
var _yield$subscription$n = yield subscription.next(), | ||
done = _yield$subscription$n.done, | ||
value = _yield$subscription$n.value; | ||
if (done) { | ||
return undefined; | ||
} else { | ||
return value; | ||
} | ||
}; | ||
return Chain; | ||
}(); | ||
function subscribe(source) { | ||
function rawSubscribe(source) { | ||
if (isSubscribable(source)) { | ||
@@ -111,6 +53,2 @@ return source[SymbolSubscribable](); | ||
} | ||
} // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
function isSubscribable(value) { | ||
return !!value[SymbolSubscribable]; | ||
} | ||
@@ -131,3 +69,3 @@ | ||
return yield effection.resource(chain, function* () { | ||
chain.subscription = yield subscribe(source); | ||
chain.subscription = yield rawSubscribe(source); | ||
yield; | ||
@@ -137,2 +75,11 @@ }); | ||
ChainableSubscription.wrap = function* wrap(source, fn) { | ||
var chain = new ChainableSubscription(DUMMY); | ||
return yield effection.resource(chain, function* () { | ||
var subscription = yield source; | ||
chain.subscription = fn(subscription); | ||
yield; | ||
}); | ||
}; | ||
var _proto = ChainableSubscription.prototype; | ||
@@ -220,2 +167,36 @@ | ||
function makeChainable(operation) { | ||
var _Object$assign; | ||
return Object.assign(operation, (_Object$assign = {}, _Object$assign[SymbolSubscribable] = function* () { | ||
return yield operation; | ||
}, _Object$assign.filter = function filter(predicate) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.wrap(operation, function (inner) { | ||
return inner.filter(predicate); | ||
}); | ||
}); | ||
}, _Object$assign.match = function match(reference) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.wrap(operation, function (inner) { | ||
return inner.match(reference); | ||
}); | ||
}); | ||
}, _Object$assign.map = function map(mapper) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.wrap(operation, function (inner) { | ||
return inner.map(mapper); | ||
}); | ||
}); | ||
}, _Object$assign.first = function* first() { | ||
return yield (yield operation).first(); | ||
}, _Object$assign.expect = function* expect() { | ||
return yield (yield operation).expect(); | ||
}, _Object$assign.forEach = function* forEach(visit) { | ||
return yield (yield operation).forEach(visit); | ||
}, _Object$assign.next = function* next() { | ||
return yield (yield operation).next(); | ||
}, _Object$assign)); | ||
} | ||
var Semaphore = function Semaphore() { | ||
@@ -242,3 +223,3 @@ var _this = this; | ||
function createSubscription(subscribe) { | ||
return function* () { | ||
return makeChainable(function* () { | ||
var results = []; | ||
@@ -290,3 +271,3 @@ var semaphore = new Semaphore(); | ||
return subscription; | ||
}; | ||
}); | ||
} | ||
@@ -300,6 +281,20 @@ | ||
function* subscribe$1(source) { | ||
return yield ChainableSubscription.of(source); | ||
function subscribe(source) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.of(source); | ||
}); | ||
} | ||
var Subscribable = { | ||
from: function from(source) { | ||
console.warn('`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead'); | ||
return subscribe(source); | ||
} | ||
}; | ||
function* forEach(source, visit) { | ||
console.warn('`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead'); | ||
return yield subscribe(source).forEach(visit); | ||
} | ||
exports.ChainableSubscription = ChainableSubscription; | ||
@@ -309,4 +304,4 @@ exports.Subscribable = Subscribable; | ||
exports.createSubscription = createSubscription; | ||
exports.forEach = _forEach; | ||
exports.subscribe = subscribe$1; | ||
exports.forEach = forEach; | ||
exports.subscribe = subscribe; | ||
//# sourceMappingURL=subscription.cjs.development.js.map |
@@ -1,2 +0,2 @@ | ||
"use strict";var n=require("effection");function r(n){return function(t){if("object"==typeof t&&"object"==typeof n){var e=t;return Object.entries(n).every((function(n){var t=n[0];return r(n[1])(e[t])}))}return t===n}}var t=Symbol.for("Symbol.subscription");function*e(n,r){for(var t=yield o(n);;){var e=yield t.next();if(e.done)return e.value;yield r(e.value)}}var i={from:function(n){return new u(n)}},u=function(){function n(n){this.source=n}var i=n.prototype;return i[t]=function(){return o(this.source)},i.map=function(n){return this.chain((function(r){return function(t){return e(r,(function*(r){t(n(r))}))}}))},i.filter=function(n){return this.chain((function(r){return function(t){return e(r,(function*(r){n(r)&&t(r)}))}}))},i.match=function(n){return this.filter(r(n))},i.chain=function(r){return new n(a(r(this.source)))},i.forEach=function(n){return e(this.source,n)},i.first=function*(){var n=yield o(this.source),r=yield n.next();return r.done?void 0:r.value},n}();function o(n){return n[t]?n[t]():n}var c={next:function(){throw new Error("dummy")}},s=function(){function t(n){this.subscription=n}t.of=function*(r){var e=new t(c);return yield n.resource(e,(function*(){e.subscription=yield o(r),yield}))};var e=t.prototype;return e.filter=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var t=yield r.next();if(t.done)return t;if(n(t.value))return t}}})},e.match=function(n){return this.filter(r(n))},e.map=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var t=yield r.next();return t.done?t:{done:!1,value:n(t.value)}}}})},e.first=function*(){var n=yield this.subscription.next();return n.done?void 0:n.value},e.expect=function*(){var n=yield this.subscription.next();if(n.done)throw new Error("expected subscription to contain a value");return n.value},e.forEach=function*(n){for(;;){var r=yield this.subscription.next();if(r.done)return r.value;yield n(r.value)}},e.next=function(){return this.subscription.next()},t}(),f=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function a(r){return function*(){var t=[],e=new f,i=function(n){t.push({done:!1,value:n}),e.signal()};return yield n.resource(new s({next:function(){try{var n=e.wait();return t.length>0&&e.signal(),Promise.resolve(n.then((function(){return t.shift()})))}catch(n){return Promise.reject(n)}}}),(function*(){try{var n=yield r((function(n){return i(n)}));t.push({done:!0,value:n}),e.signal()}finally{i=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}}exports.ChainableSubscription=s,exports.Subscribable=i,exports.SymbolSubscribable=t,exports.createSubscription=a,exports.forEach=e,exports.subscribe=function*(n){return yield s.of(n)}; | ||
"use strict";var n=require("effection"),r=Symbol.for("Symbol.subscription"),e={next:function(){throw new Error("dummy")}},t=function(){function t(n){this.subscription=n}t.of=function*(i){var u=new t(e);return yield n.resource(u,(function*(){u.subscription=yield function(n){return n[r]?n[r]():n}(i),yield}))},t.wrap=function*(r,i){var u=new t(e);return yield n.resource(u,(function*(){var n=yield r;u.subscription=i(n),yield}))};var i=t.prototype;return i.filter=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var e=yield r.next();if(e.done)return e;if(n(e.value))return e}}})},i.match=function(n){return this.filter(function n(r){return function(e){if("object"==typeof e&&"object"==typeof r){var t=e;return Object.entries(r).every((function(r){var e=r[0];return n(r[1])(t[e])}))}return e===r}}(n))},i.map=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var e=yield r.next();return e.done?e:{done:!1,value:n(e.value)}}}})},i.first=function*(){var n=yield this.subscription.next();return n.done?void 0:n.value},i.expect=function*(){var n=yield this.subscription.next();if(n.done)throw new Error("expected subscription to contain a value");return n.value},i.forEach=function*(n){for(;;){var r=yield this.subscription.next();if(r.done)return r.value;yield n(r.value)}},i.next=function(){return this.subscription.next()},t}();function i(n){var e;return Object.assign(n,((e={})[r]=function*(){return yield n},e.filter=function(r){return i((function*(){return yield t.wrap(n,(function(n){return n.filter(r)}))}))},e.match=function(r){return i((function*(){return yield t.wrap(n,(function(n){return n.match(r)}))}))},e.map=function(r){return i((function*(){return yield t.wrap(n,(function(n){return n.map(r)}))}))},e.first=function*(){return yield(yield n).first()},e.expect=function*(){return yield(yield n).expect()},e.forEach=function*(r){return yield(yield n).forEach(r)},e.next=function*(){return yield(yield n).next()},e))}var u=function(){var n=this;this.waiters=[],this.signal=function(r){var e=n.waiters.pop();e&&e(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function o(n){return i((function*(){return yield t.of(n)}))}var c={from:function(n){return console.warn("`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead"),o(n)}};exports.ChainableSubscription=t,exports.Subscribable=c,exports.SymbolSubscribable=r,exports.createSubscription=function(r){return i((function*(){var e=[],i=new u,o=function(n){e.push({done:!1,value:n}),i.signal()};return yield n.resource(new t({next:function(){try{var n=i.wait();return e.length>0&&i.signal(),Promise.resolve(n.then((function(){return e.shift()})))}catch(n){return Promise.reject(n)}}}),(function*(){try{var n=yield r((function(n){return o(n)}));e.push({done:!0,value:n}),i.signal()}finally{o=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}))},exports.forEach=function*(n,r){return console.warn("`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead"),yield o(n).forEach(r)},exports.subscribe=o; | ||
//# sourceMappingURL=subscription.cjs.production.min.js.map |
@@ -6,2 +6,1 @@ import { Operation } from 'effection'; | ||
} | ||
export declare function createSubscription<T, TReturn>(subscribe: Subscriber<T, TReturn>): Operation<Subscription<T, TReturn>>; |
@@ -21,84 +21,26 @@ import { resource } from 'effection'; | ||
/** | ||
* Hack Zone! | ||
* | ||
* tl;dr not only the symbol value, but also the symbol type must be | ||
* global. | ||
* | ||
* In order to satisfy TypeScript that the SymbolSubscribable | ||
* represents the same value across different versions of the same | ||
* package, we need to declare a "fake" global variable that does not | ||
* actually exist, but has a theoretical type. Since it is global, | ||
* TypeScript will see it as the same type everywhere, and so we can | ||
* use the `typeof` for the value, as the type of of our local symbol | ||
* subscribable, and TypeScript will think that all of the types are | ||
* the same. | ||
* | ||
* See https://github.com/microsoft/TypeScript/issues/8099#issuecomment-210134773 | ||
* for details. | ||
*/ | ||
var SymbolSubscribable = /*#__PURE__*/Symbol["for"]('Symbol.subscription'); | ||
function* _forEach(source, visit) { | ||
var subscription = yield subscribe(source); | ||
while (true) { | ||
var result = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
function isSubscribable(value) { | ||
return !!value[SymbolSubscribable]; | ||
} | ||
var Subscribable = { | ||
from: function from(source) { | ||
return new Chain(source); | ||
} | ||
}; | ||
var Chain = /*#__PURE__*/function () { | ||
function Chain(source) { | ||
this.source = source; | ||
} | ||
var _proto = Chain.prototype; | ||
_proto[SymbolSubscribable] = function () { | ||
return subscribe(this.source); | ||
}; | ||
_proto.map = function map(fn) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
publish(fn(item)); | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.filter = function filter(predicate) { | ||
return this.chain(function (source) { | ||
return function (publish) { | ||
return _forEach(source, function* (item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
}); | ||
}; | ||
}); | ||
}; | ||
_proto.match = function match(reference) { | ||
return this.filter(matcher(reference)); | ||
}; | ||
_proto.chain = function chain(next) { | ||
return new Chain(createSubscription(next(this.source))); | ||
}; | ||
_proto.forEach = function forEach(visit) { | ||
return _forEach(this.source, visit); | ||
}; | ||
_proto.first = function* first() { | ||
var subscription = yield subscribe(this.source); | ||
var _yield$subscription$n = yield subscription.next(), | ||
done = _yield$subscription$n.done, | ||
value = _yield$subscription$n.value; | ||
if (done) { | ||
return undefined; | ||
} else { | ||
return value; | ||
} | ||
}; | ||
return Chain; | ||
}(); | ||
function subscribe(source) { | ||
function rawSubscribe(source) { | ||
if (isSubscribable(source)) { | ||
@@ -109,6 +51,2 @@ return source[SymbolSubscribable](); | ||
} | ||
} // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
function isSubscribable(value) { | ||
return !!value[SymbolSubscribable]; | ||
} | ||
@@ -129,3 +67,3 @@ | ||
return yield resource(chain, function* () { | ||
chain.subscription = yield subscribe(source); | ||
chain.subscription = yield rawSubscribe(source); | ||
yield; | ||
@@ -135,2 +73,11 @@ }); | ||
ChainableSubscription.wrap = function* wrap(source, fn) { | ||
var chain = new ChainableSubscription(DUMMY); | ||
return yield resource(chain, function* () { | ||
var subscription = yield source; | ||
chain.subscription = fn(subscription); | ||
yield; | ||
}); | ||
}; | ||
var _proto = ChainableSubscription.prototype; | ||
@@ -218,2 +165,36 @@ | ||
function makeChainable(operation) { | ||
var _Object$assign; | ||
return Object.assign(operation, (_Object$assign = {}, _Object$assign[SymbolSubscribable] = function* () { | ||
return yield operation; | ||
}, _Object$assign.filter = function filter(predicate) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.wrap(operation, function (inner) { | ||
return inner.filter(predicate); | ||
}); | ||
}); | ||
}, _Object$assign.match = function match(reference) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.wrap(operation, function (inner) { | ||
return inner.match(reference); | ||
}); | ||
}); | ||
}, _Object$assign.map = function map(mapper) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.wrap(operation, function (inner) { | ||
return inner.map(mapper); | ||
}); | ||
}); | ||
}, _Object$assign.first = function* first() { | ||
return yield (yield operation).first(); | ||
}, _Object$assign.expect = function* expect() { | ||
return yield (yield operation).expect(); | ||
}, _Object$assign.forEach = function* forEach(visit) { | ||
return yield (yield operation).forEach(visit); | ||
}, _Object$assign.next = function* next() { | ||
return yield (yield operation).next(); | ||
}, _Object$assign)); | ||
} | ||
var Semaphore = function Semaphore() { | ||
@@ -240,3 +221,3 @@ var _this = this; | ||
function createSubscription(subscribe) { | ||
return function* () { | ||
return makeChainable(function* () { | ||
var results = []; | ||
@@ -288,3 +269,3 @@ var semaphore = new Semaphore(); | ||
return subscription; | ||
}; | ||
}); | ||
} | ||
@@ -298,7 +279,21 @@ | ||
function* subscribe$1(source) { | ||
return yield ChainableSubscription.of(source); | ||
function subscribe(source) { | ||
return makeChainable(function* () { | ||
return yield ChainableSubscription.of(source); | ||
}); | ||
} | ||
export { ChainableSubscription, Subscribable, SymbolSubscribable, createSubscription, _forEach as forEach, subscribe$1 as subscribe }; | ||
var Subscribable = { | ||
from: function from(source) { | ||
console.warn('`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead'); | ||
return subscribe(source); | ||
} | ||
}; | ||
function* forEach(source, visit) { | ||
console.warn('`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead'); | ||
return yield subscribe(source).forEach(visit); | ||
} | ||
export { ChainableSubscription, Subscribable, SymbolSubscribable, createSubscription, forEach, subscribe }; | ||
//# sourceMappingURL=subscription.esm.js.map |
{ | ||
"name": "@effection/subscription", | ||
"version": "0.8.1", | ||
"version": "0.9.0-25b68eb", | ||
"description": "Effection Subscriptions", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -67,9 +67,8 @@ # @effection/subscription | ||
### Subscribable.from(source) | ||
### subscribe(source) | ||
In order to lift functions into the context of a subscription, you can | ||
use `Subscribable.from` which will return an instance of | ||
`Subscribable` that allows you to transform a subscription produced | ||
In order to lift functions into the context of a subscription, you can use | ||
`subscribe` which can be used to transform subscriptions via combinators. | ||
### Subscribable#map(fn) | ||
### map(fn) | ||
@@ -80,6 +79,6 @@ Returns a new subscribable whose items are transformed by `fn`. For | ||
``` javascript | ||
Subscribable.from(websocket).map(message => JSON.parse(message)); | ||
subscribe(websocket).map(message => JSON.parse(message)); | ||
``` | ||
### Subscribable#filter(predicate) | ||
### filter(predicate) | ||
@@ -90,6 +89,6 @@ Return a new `Subscribable` that only produces items from its source | ||
``` javascript | ||
Subscribable.from(websocket).filter(message => message.type === 'command'); | ||
subscribe(websocket).filter(message => message.type === 'command'); | ||
``` | ||
### Subscribable#match(reference) | ||
### match(reference) | ||
@@ -101,6 +100,6 @@ Return a new `Subscribable` that only produces items from its source that match | ||
``` javascript | ||
Subscribable.from(websocket).match({ type: 'command' }); | ||
subscribe(websocket).match({ type: 'command' }); | ||
``` | ||
### Subscribable#first() | ||
### first() | ||
@@ -111,3 +110,23 @@ An operation that produces the first item in a subscription or | ||
``` javascript | ||
let message = yield Subscribable.from(websocket).first(); | ||
let message = yield subscribe(websocket).first(); | ||
``` | ||
### expect() | ||
An operation that produces the first item in a subscription or | ||
throws an error if the subscription has no items. | ||
``` javascript | ||
let message = yield subscribe(websocket).expect(); | ||
``` | ||
### forEach() | ||
Calls the given operation function with each item in the subscription. Returns | ||
the return value of the subscriopion when done. | ||
``` javascript | ||
let exitCode = yield subscribe(websocket).forEach(function*(message) { | ||
// ... | ||
}); | ||
``` |
import { Operation, resource } from 'effection'; | ||
import { Subscription } from './subscription'; | ||
import { DeepPartial, matcher } from './match'; | ||
import { SubscriptionSource, subscribe } from './subscribable'; | ||
import { SubscriptionSource, rawSubscribe } from './subscription-source'; | ||
@@ -15,3 +15,3 @@ const DUMMY = { next() { throw new Error('dummy') } }; | ||
return yield resource(chain, function*() { | ||
chain.subscription = yield subscribe(source); | ||
chain.subscription = yield rawSubscribe(source); | ||
yield; | ||
@@ -21,2 +21,15 @@ }); | ||
static *wrap<T, TReturn, R = T>( | ||
source: Operation<ChainableSubscription<T, TReturn>>, | ||
fn: (inner: ChainableSubscription<T, TReturn>) => ChainableSubscription<R, TReturn>, | ||
): Operation<ChainableSubscription<R, TReturn>> { | ||
let chain = new ChainableSubscription<R, TReturn>(DUMMY); | ||
return yield resource(chain, function*() { | ||
let subscription = yield source; | ||
chain.subscription = fn(subscription); | ||
yield; | ||
}); | ||
} | ||
filter(predicate: (value: T) => boolean): ChainableSubscription<T, TReturn> { | ||
@@ -23,0 +36,0 @@ let { subscription } = this; |
@@ -1,11 +0,18 @@ | ||
export { Subscription, createSubscription } from './subscription'; | ||
export { SymbolSubscribable, SubscriptionSource, forEach, Subscribable } from './subscribable'; | ||
export { Subscription } from './subscription'; | ||
export { createSubscription } from './create-subscription'; | ||
export { SymbolSubscribable } from './symbol-subscribable'; | ||
export { Subscribable } from './subscribable'; | ||
export { SubscriptionSource } from './subscription-source'; | ||
export { ChainableSubscription } from './chainable-subscription'; | ||
export { ChainableSubscribable } from './chainable-subscribable'; | ||
export { subscribe } from './subscribe'; | ||
import { Operation } from 'effection'; | ||
import { ChainableSubscription } from './chainable-subscription'; | ||
import { SubscriptionSource } from './subscribable'; | ||
import { SubscriptionSource } from './subscription-source'; | ||
import { subscribe } from './subscribe'; | ||
export function* subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<ChainableSubscription<T,TReturn>> { | ||
return yield ChainableSubscription.of(source); | ||
export function* forEach<T,TReturn>(source: SubscriptionSource<T,TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn> { | ||
console.warn('`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead'); | ||
return yield subscribe<T, TReturn>(source).forEach(visit); | ||
} | ||
import { Operation } from 'effection'; | ||
import { Subscription, createSubscription, Subscriber } from './subscription'; | ||
import { DeepPartial, matcher } from './match'; | ||
import { Subscription } from './subscription'; | ||
import { SymbolSubscribable } from './symbol-subscribable'; | ||
import { subscribe } from './subscribe'; | ||
import { SubscriptionSource } from './subscription-source'; | ||
export const SymbolSubscribable: unique symbol = Symbol.for('Symbol.subscription'); | ||
export interface Subscribable<T,TReturn> { | ||
@@ -11,75 +11,7 @@ [SymbolSubscribable](): Operation<Subscription<T,TReturn>>; | ||
export type SubscriptionSource<T,TReturn> = Subscribable<T,TReturn> | Operation<Subscription<T,TReturn>>; | ||
export function* forEach<T,TReturn>(source: SubscriptionSource<T,TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn> { | ||
let subscription: Subscription<T,TReturn> = yield subscribe(source); | ||
while (true) { | ||
let result: IteratorResult<T,TReturn> = yield subscription.next(); | ||
if (result.done) { | ||
return result.value; | ||
} else { | ||
yield visit(result.value); | ||
} | ||
} | ||
} | ||
export const Subscribable = { | ||
from: <T,TReturn>(source: SubscriptionSource<T,TReturn>) => new Chain(source) | ||
} | ||
export class Chain<T, TReturn> implements Subscribable<T,TReturn> { | ||
constructor(private source: SubscriptionSource<T,TReturn>) {} | ||
[SymbolSubscribable](): Operation<Subscription<T,TReturn>> { | ||
return subscribe(this.source) | ||
from<T,TReturn>(source: SubscriptionSource<T,TReturn>) { | ||
console.warn('`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead'); | ||
return subscribe(source) | ||
} | ||
map<X>(fn: (value: T) => X): Chain<X,TReturn> { | ||
return this.chain(source => publish => forEach(source, function*(item) { | ||
publish(fn(item)); | ||
})); | ||
} | ||
filter(predicate: (value: T) => boolean): Chain<T,TReturn> { | ||
return this.chain(source => publish => forEach(source, function*(item) { | ||
if (predicate(item)) { | ||
publish(item); | ||
} | ||
})) | ||
} | ||
match(reference: DeepPartial<T>): Chain<T,TReturn> { | ||
return this.filter(matcher(reference)); | ||
} | ||
chain<X = T,XReturn = TReturn>(next: (source: SubscriptionSource<T,TReturn>) => Subscriber<X,XReturn>): Chain<X,XReturn> { | ||
return new Chain(createSubscription(next(this.source))); | ||
} | ||
forEach(visit: (value: T) => Operation<void>): Operation<TReturn> { | ||
return forEach(this.source, visit); | ||
} | ||
*first(): Operation<T | undefined> { | ||
let subscription: Subscription<T, TReturn> = yield subscribe(this.source); | ||
let { done, value } = yield subscription.next(); | ||
if (done) { | ||
return undefined; | ||
} else { | ||
return value; | ||
} | ||
} | ||
} | ||
export function subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<Subscription<T,TReturn>> { | ||
if (isSubscribable<T,TReturn>(source)) { | ||
return source[SymbolSubscribable]() | ||
} else { | ||
return source; | ||
} | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
function isSubscribable<T,TReturn>(value: any): value is Subscribable<T,TReturn> { | ||
return !!value[SymbolSubscribable]; | ||
} |
@@ -1,6 +0,3 @@ | ||
import { Operation, resource } from 'effection'; | ||
import { Operation } from 'effection'; | ||
import { ChainableSubscription } from './chainable-subscription'; | ||
import { Semaphore } from './semaphore'; | ||
export type Subscriber<T,TReturn> = (publish: (value: T) => void) => Operation<TReturn>; | ||
@@ -11,40 +8,1 @@ | ||
} | ||
export function createSubscription<T, TReturn>(subscribe: Subscriber<T,TReturn>): Operation<Subscription<T,TReturn>> { | ||
return function*() { | ||
let results: IteratorResult<T,TReturn>[] = []; | ||
let semaphore = new Semaphore<void>(); | ||
let publish = (value: T) => { | ||
results.push({ done: false, value }); | ||
semaphore.signal(); | ||
}; | ||
let next = async (): Promise<IteratorResult<T,TReturn>> => { | ||
let wait = semaphore.wait(); | ||
if (results.length > 0) { | ||
semaphore.signal(); | ||
} | ||
return wait.then(() => results.shift() as IteratorResult<T,TReturn>); | ||
}; | ||
let subscription = yield resource(new ChainableSubscription({ next }), function*() { | ||
try { | ||
let value = yield subscribe((value: T) => publish(value)); | ||
results.push({ done: true, value }); | ||
semaphore.signal(); | ||
} finally { | ||
publish = value => { throw InvalidPublication(value); } | ||
} | ||
}); | ||
return subscription; | ||
} | ||
} | ||
function InvalidPublication(value: unknown) { | ||
let error = new Error(`tried to publish a value: ${value} on an already finished subscription`); | ||
error.name = 'TypeError'; | ||
return error; | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
100391
32
936
128