Socket
Socket
Sign inDemoInstall

@effection/subscription

Package Overview
Dependencies
Maintainers
1
Versions
151
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effection/subscription - npm Package Compare versions

Comparing version 0.9.0 to 0.10.0

dist/chainable-subscribable.d.ts

6

CHANGELOG.md
# @effection/subscription
## 0.10.0
### Minor Changes
- 5d118ee: Chain via `subscribe` instead of `Subscribable.from` which is now deprecated.
## 0.9.0

@@ -4,0 +10,0 @@

3

dist/chainable-subscription.d.ts
import { Operation } from 'effection';
import { Subscription } from './subscription';
import { DeepPartial } from './match';
import { SubscriptionSource } from './subscribable';
import { SubscriptionSource } from './subscription-source';
export declare class ChainableSubscription<T, TReturn> implements Subscription<T, TReturn> {

@@ -9,2 +9,3 @@ private subscription;

static of<T, TReturn>(source: SubscriptionSource<T, TReturn>): Operation<ChainableSubscription<T, TReturn>>;
static wrap<T, TReturn, R = T>(source: Operation<ChainableSubscription<T, TReturn>>, fn: (inner: ChainableSubscription<T, TReturn>) => ChainableSubscription<R, TReturn>): Operation<ChainableSubscription<R, TReturn>>;
filter(predicate: (value: T) => boolean): ChainableSubscription<T, TReturn>;

@@ -11,0 +12,0 @@ match(reference: DeepPartial<T>): ChainableSubscription<T, TReturn>;

export { Subscription, createSubscription } from './subscription';
export { SymbolSubscribable } from './symbol-subscribable';
export { SubscriptionSource, forEach, Subscribable } from './subscribable';
export { Subscribable } from './subscribable';
export { SubscriptionSource } from './subscription-source';
export { ChainableSubscription } from './chainable-subscription';
export { ChainableSubscribable } from './chainable-subscribable';
export { subscribe } from './subscribe';
import { Operation } from 'effection';
import { ChainableSubscription } from './chainable-subscription';
import { SubscriptionSource } from './subscribable';
export declare function subscribe<T, TReturn>(source: SubscriptionSource<T, TReturn>): Operation<ChainableSubscription<T, TReturn>>;
import { SubscriptionSource } from './subscription-source';
export declare function forEach<T, TReturn>(source: SubscriptionSource<T, TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn>;
import { Operation } from 'effection';
import { Subscription, Subscriber } from './subscription';
import { DeepPartial } from './match';
import { Subscription } from './subscription';
import { SymbolSubscribable } from './symbol-subscribable';
import { SubscriptionSource } from './subscription-source';
export interface Subscribable<T, TReturn> {
[SymbolSubscribable](): Operation<Subscription<T, TReturn>>;
}
export declare type SubscriptionSource<T, TReturn> = Subscribable<T, TReturn> | Operation<Subscription<T, TReturn>>;
export declare function forEach<T, TReturn>(source: SubscriptionSource<T, TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn>;
export declare const Subscribable: {
from: <T, TReturn>(source: SubscriptionSource<T, TReturn>) => Chain<T, TReturn>;
from<T, TReturn>(source: SubscriptionSource<T, TReturn>): (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").OperationFn<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("effection").Sequence<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>) | (import("effection").Controller<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & PromiseLike<import("./chainable-subscription").ChainableSubscription<T, TReturn>> & import("./chainable-subscribable").ChainableSubscribableMethods<T, TReturn>);
};
export declare class Chain<T, TReturn> implements Subscribable<T, TReturn> {
private source;
constructor(source: SubscriptionSource<T, TReturn>);
[SymbolSubscribable](): Operation<Subscription<T, TReturn>>;
map<X>(fn: (value: T) => X): Chain<X, TReturn>;
filter(predicate: (value: T) => boolean): Chain<T, TReturn>;
match(reference: DeepPartial<T>): Chain<T, TReturn>;
chain<X = T, XReturn = TReturn>(next: (source: SubscriptionSource<T, TReturn>) => Subscriber<X, XReturn>): Chain<X, XReturn>;
forEach(visit: (value: T) => Operation<void>): Operation<TReturn>;
first(): Operation<T | undefined>;
}
export declare function subscribe<T, TReturn>(source: SubscriptionSource<T, TReturn>): Operation<Subscription<T, TReturn>>;

@@ -43,82 +43,6 @@ 'use strict';

function* _forEach(source, visit) {
var subscription = yield subscribe(source);
while (true) {
var result = yield subscription.next();
if (result.done) {
return result.value;
} else {
yield visit(result.value);
}
}
function isSubscribable(value) {
return !!value[SymbolSubscribable];
}
var Subscribable = {
from: function from(source) {
return new Chain(source);
}
};
var Chain = /*#__PURE__*/function () {
function Chain(source) {
this.source = source;
}
var _proto = Chain.prototype;
_proto[SymbolSubscribable] = function () {
return subscribe(this.source);
};
_proto.map = function map(fn) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
publish(fn(item));
});
};
});
};
_proto.filter = function filter(predicate) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
if (predicate(item)) {
publish(item);
}
});
};
});
};
_proto.match = function match(reference) {
return this.filter(matcher(reference));
};
_proto.chain = function chain(next) {
return new Chain(createSubscription(next(this.source)));
};
_proto.forEach = function forEach(visit) {
return _forEach(this.source, visit);
};
_proto.first = function* first() {
var subscription = yield subscribe(this.source);
var _yield$subscription$n = yield subscription.next(),
done = _yield$subscription$n.done,
value = _yield$subscription$n.value;
if (done) {
return undefined;
} else {
return value;
}
};
return Chain;
}();
function subscribe(source) {
function rawSubscribe(source) {
if (isSubscribable(source)) {

@@ -129,6 +53,2 @@ return source[SymbolSubscribable]();

}
} // eslint-disable-next-line @typescript-eslint/no-explicit-any
function isSubscribable(value) {
return !!value[SymbolSubscribable];
}

@@ -149,3 +69,3 @@

return yield effection.resource(chain, function* () {
chain.subscription = yield subscribe(source);
chain.subscription = yield rawSubscribe(source);
yield;

@@ -155,2 +75,11 @@ });

ChainableSubscription.wrap = function* wrap(source, fn) {
var chain = new ChainableSubscription(DUMMY);
return yield effection.resource(chain, function* () {
var subscription = yield source;
chain.subscription = fn(subscription);
yield;
});
};
var _proto = ChainableSubscription.prototype;

@@ -315,6 +244,54 @@

function* subscribe$1(source) {
return yield ChainableSubscription.of(source);
function makeChainable(operation) {
var _Object$assign;
return Object.assign(operation, (_Object$assign = {}, _Object$assign[SymbolSubscribable] = function* () {
return yield operation;
}, _Object$assign.filter = function filter(predicate) {
return makeChainable(function* () {
return yield ChainableSubscription.wrap(operation, function (inner) {
return inner.filter(predicate);
});
});
}, _Object$assign.match = function match(reference) {
return makeChainable(function* () {
return yield ChainableSubscription.wrap(operation, function (inner) {
return inner.match(reference);
});
});
}, _Object$assign.map = function map(mapper) {
return makeChainable(function* () {
return yield ChainableSubscription.wrap(operation, function (inner) {
return inner.map(mapper);
});
});
}, _Object$assign.first = function* first() {
return yield (yield operation).first();
}, _Object$assign.expect = function* expect() {
return yield (yield operation).expect();
}, _Object$assign.forEach = function* forEach(visit) {
return yield (yield operation).forEach(visit);
}, _Object$assign.next = function* next() {
return yield (yield operation).next();
}, _Object$assign));
}
function subscribe(source) {
return makeChainable(function* () {
return yield ChainableSubscription.of(source);
});
}
var Subscribable = {
from: function from(source) {
console.warn('`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead');
return subscribe(source);
}
};
function* forEach(source, visit) {
console.warn('`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead');
return yield subscribe(source).forEach(visit);
}
exports.ChainableSubscription = ChainableSubscription;

@@ -324,4 +301,4 @@ exports.Subscribable = Subscribable;

exports.createSubscription = createSubscription;
exports.forEach = _forEach;
exports.subscribe = subscribe$1;
exports.forEach = forEach;
exports.subscribe = subscribe;
//# sourceMappingURL=subscription.cjs.development.js.map

@@ -1,2 +0,2 @@

"use strict";var n=require("effection");function r(n){return function(t){if("object"==typeof t&&"object"==typeof n){var e=t;return Object.entries(n).every((function(n){var t=n[0];return r(n[1])(e[t])}))}return t===n}}var t=Symbol.for("Symbol.subscription");function*e(n,r){for(var t=yield o(n);;){var e=yield t.next();if(e.done)return e.value;yield r(e.value)}}var i={from:function(n){return new u(n)}},u=function(){function n(n){this.source=n}var i=n.prototype;return i[t]=function(){return o(this.source)},i.map=function(n){return this.chain((function(r){return function(t){return e(r,(function*(r){t(n(r))}))}}))},i.filter=function(n){return this.chain((function(r){return function(t){return e(r,(function*(r){n(r)&&t(r)}))}}))},i.match=function(n){return this.filter(r(n))},i.chain=function(r){return new n(a(r(this.source)))},i.forEach=function(n){return e(this.source,n)},i.first=function*(){var n=yield o(this.source),r=yield n.next();return r.done?void 0:r.value},n}();function o(n){return n[t]?n[t]():n}var c={next:function(){throw new Error("dummy")}},s=function(){function t(n){this.subscription=n}t.of=function*(r){var e=new t(c);return yield n.resource(e,(function*(){e.subscription=yield o(r),yield}))};var e=t.prototype;return e.filter=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var t=yield r.next();if(t.done)return t;if(n(t.value))return t}}})},e.match=function(n){return this.filter(r(n))},e.map=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var t=yield r.next();return t.done?t:{done:!1,value:n(t.value)}}}})},e.first=function*(){var n=yield this.subscription.next();return n.done?void 0:n.value},e.expect=function*(){var n=yield this.subscription.next();if(n.done)throw new Error("expected subscription to contain a value");return n.value},e.forEach=function*(n){for(;;){var r=yield this.subscription.next();if(r.done)return r.value;yield n(r.value)}},e.next=function(){return this.subscription.next()},t}(),f=function(){var n=this;this.waiters=[],this.signal=function(r){var t=n.waiters.pop();t&&t(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function a(r){return function*(){var t=[],e=new f,i=function(n){t.push({done:!1,value:n}),e.signal()};return yield n.resource(new s({next:function(){try{var n=e.wait();return t.length>0&&e.signal(),Promise.resolve(n.then((function(){return t.shift()})))}catch(n){return Promise.reject(n)}}}),(function*(){try{var n=yield r((function(n){return i(n)}));t.push({done:!0,value:n}),e.signal()}finally{i=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}}exports.ChainableSubscription=s,exports.Subscribable=i,exports.SymbolSubscribable=t,exports.createSubscription=a,exports.forEach=e,exports.subscribe=function*(n){return yield s.of(n)};
"use strict";var n=require("effection"),r=Symbol.for("Symbol.subscription"),e={next:function(){throw new Error("dummy")}},t=function(){function t(n){this.subscription=n}t.of=function*(i){var u=new t(e);return yield n.resource(u,(function*(){u.subscription=yield function(n){return n[r]?n[r]():n}(i),yield}))},t.wrap=function*(r,i){var u=new t(e);return yield n.resource(u,(function*(){var n=yield r;u.subscription=i(n),yield}))};var i=t.prototype;return i.filter=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var e=yield r.next();if(e.done)return e;if(n(e.value))return e}}})},i.match=function(n){return this.filter(function n(r){return function(e){if("object"==typeof e&&"object"==typeof r){var t=e;return Object.entries(r).every((function(r){var e=r[0];return n(r[1])(t[e])}))}return e===r}}(n))},i.map=function(n){var r=this.subscription;return new t({next:function*(){for(;;){var e=yield r.next();return e.done?e:{done:!1,value:n(e.value)}}}})},i.first=function*(){var n=yield this.subscription.next();return n.done?void 0:n.value},i.expect=function*(){var n=yield this.subscription.next();if(n.done)throw new Error("expected subscription to contain a value");return n.value},i.forEach=function*(n){for(;;){var r=yield this.subscription.next();if(r.done)return r.value;yield n(r.value)}},i.next=function(){return this.subscription.next()},t}(),i=function(){var n=this;this.waiters=[],this.signal=function(r){var e=n.waiters.pop();e&&e(r)},this.wait=function(){return new Promise((function(r){return n.waiters.push(r)}))}};function u(n){return function n(e){var i;return Object.assign(e,((i={})[r]=function*(){return yield e},i.filter=function(r){return n((function*(){return yield t.wrap(e,(function(n){return n.filter(r)}))}))},i.match=function(r){return n((function*(){return yield t.wrap(e,(function(n){return n.match(r)}))}))},i.map=function(r){return n((function*(){return yield t.wrap(e,(function(n){return n.map(r)}))}))},i.first=function*(){return yield(yield e).first()},i.expect=function*(){return yield(yield e).expect()},i.forEach=function*(n){return yield(yield e).forEach(n)},i.next=function*(){return yield(yield e).next()},i))}((function*(){return yield t.of(n)}))}var o={from:function(n){return console.warn("`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead"),u(n)}};exports.ChainableSubscription=t,exports.Subscribable=o,exports.SymbolSubscribable=r,exports.createSubscription=function(r){return function*(){var e=[],u=new i,o=function(n){e.push({done:!1,value:n}),u.signal()};return yield n.resource(new t({next:function(){try{var n=u.wait();return e.length>0&&u.signal(),Promise.resolve(n.then((function(){return e.shift()})))}catch(n){return Promise.reject(n)}}}),(function*(){try{var n=yield r((function(n){return o(n)}));e.push({done:!0,value:n}),u.signal()}finally{o=function(n){throw function(n){var r=new Error("tried to publish a value: "+n+" on an already finished subscription");return r.name="TypeError",r}(n)}}}))}},exports.forEach=function*(n,r){return console.warn("`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead"),yield u(n).forEach(r)},exports.subscribe=u;
//# sourceMappingURL=subscription.cjs.production.min.js.map

@@ -41,82 +41,6 @@ import { resource } from 'effection';

function* _forEach(source, visit) {
var subscription = yield subscribe(source);
while (true) {
var result = yield subscription.next();
if (result.done) {
return result.value;
} else {
yield visit(result.value);
}
}
function isSubscribable(value) {
return !!value[SymbolSubscribable];
}
var Subscribable = {
from: function from(source) {
return new Chain(source);
}
};
var Chain = /*#__PURE__*/function () {
function Chain(source) {
this.source = source;
}
var _proto = Chain.prototype;
_proto[SymbolSubscribable] = function () {
return subscribe(this.source);
};
_proto.map = function map(fn) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
publish(fn(item));
});
};
});
};
_proto.filter = function filter(predicate) {
return this.chain(function (source) {
return function (publish) {
return _forEach(source, function* (item) {
if (predicate(item)) {
publish(item);
}
});
};
});
};
_proto.match = function match(reference) {
return this.filter(matcher(reference));
};
_proto.chain = function chain(next) {
return new Chain(createSubscription(next(this.source)));
};
_proto.forEach = function forEach(visit) {
return _forEach(this.source, visit);
};
_proto.first = function* first() {
var subscription = yield subscribe(this.source);
var _yield$subscription$n = yield subscription.next(),
done = _yield$subscription$n.done,
value = _yield$subscription$n.value;
if (done) {
return undefined;
} else {
return value;
}
};
return Chain;
}();
function subscribe(source) {
function rawSubscribe(source) {
if (isSubscribable(source)) {

@@ -127,6 +51,2 @@ return source[SymbolSubscribable]();

}
} // eslint-disable-next-line @typescript-eslint/no-explicit-any
function isSubscribable(value) {
return !!value[SymbolSubscribable];
}

@@ -147,3 +67,3 @@

return yield resource(chain, function* () {
chain.subscription = yield subscribe(source);
chain.subscription = yield rawSubscribe(source);
yield;

@@ -153,2 +73,11 @@ });

ChainableSubscription.wrap = function* wrap(source, fn) {
var chain = new ChainableSubscription(DUMMY);
return yield resource(chain, function* () {
var subscription = yield source;
chain.subscription = fn(subscription);
yield;
});
};
var _proto = ChainableSubscription.prototype;

@@ -313,7 +242,55 @@

function* subscribe$1(source) {
return yield ChainableSubscription.of(source);
function makeChainable(operation) {
var _Object$assign;
return Object.assign(operation, (_Object$assign = {}, _Object$assign[SymbolSubscribable] = function* () {
return yield operation;
}, _Object$assign.filter = function filter(predicate) {
return makeChainable(function* () {
return yield ChainableSubscription.wrap(operation, function (inner) {
return inner.filter(predicate);
});
});
}, _Object$assign.match = function match(reference) {
return makeChainable(function* () {
return yield ChainableSubscription.wrap(operation, function (inner) {
return inner.match(reference);
});
});
}, _Object$assign.map = function map(mapper) {
return makeChainable(function* () {
return yield ChainableSubscription.wrap(operation, function (inner) {
return inner.map(mapper);
});
});
}, _Object$assign.first = function* first() {
return yield (yield operation).first();
}, _Object$assign.expect = function* expect() {
return yield (yield operation).expect();
}, _Object$assign.forEach = function* forEach(visit) {
return yield (yield operation).forEach(visit);
}, _Object$assign.next = function* next() {
return yield (yield operation).next();
}, _Object$assign));
}
export { ChainableSubscription, Subscribable, SymbolSubscribable, createSubscription, _forEach as forEach, subscribe$1 as subscribe };
function subscribe(source) {
return makeChainable(function* () {
return yield ChainableSubscription.of(source);
});
}
var Subscribable = {
from: function from(source) {
console.warn('`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead');
return subscribe(source);
}
};
function* forEach(source, visit) {
console.warn('`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead');
return yield subscribe(source).forEach(visit);
}
export { ChainableSubscription, Subscribable, SymbolSubscribable, createSubscription, forEach, subscribe };
//# sourceMappingURL=subscription.esm.js.map
{
"name": "@effection/subscription",
"version": "0.9.0",
"version": "0.10.0",
"description": "Effection Subscriptions",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -67,9 +67,8 @@ # @effection/subscription

### Subscribable.from(source)
### subscribe(source)
In order to lift functions into the context of a subscription, you can
use `Subscribable.from` which will return an instance of
`Subscribable` that allows you to transform a subscription produced
In order to lift functions into the context of a subscription, you can use
`subscribe` which can be used to transform subscriptions via combinators.
### Subscribable#map(fn)
### map(fn)

@@ -80,6 +79,6 @@ Returns a new subscribable whose items are transformed by `fn`. For

``` javascript
Subscribable.from(websocket).map(message => JSON.parse(message));
subscribe(websocket).map(message => JSON.parse(message));
```
### Subscribable#filter(predicate)
### filter(predicate)

@@ -90,6 +89,6 @@ Return a new `Subscribable` that only produces items from its source

``` javascript
Subscribable.from(websocket).filter(message => message.type === 'command');
subscribe(websocket).filter(message => message.type === 'command');
```
### Subscribable#match(reference)
### match(reference)

@@ -101,6 +100,6 @@ Return a new `Subscribable` that only produces items from its source that match

``` javascript
Subscribable.from(websocket).match({ type: 'command' });
subscribe(websocket).match({ type: 'command' });
```
### Subscribable#first()
### first()

@@ -111,3 +110,23 @@ An operation that produces the first item in a subscription or

``` javascript
let message = yield Subscribable.from(websocket).first();
let message = yield subscribe(websocket).first();
```
### expect()
An operation that produces the first item in a subscription or
throws an error if the subscription has no items.
``` javascript
let message = yield subscribe(websocket).expect();
```
### forEach()
Calls the given operation function with each item in the subscription. Returns
the return value of the subscriopion when done.
``` javascript
let exitCode = yield subscribe(websocket).forEach(function*(message) {
// ...
});
```
import { Operation, resource } from 'effection';
import { Subscription } from './subscription';
import { DeepPartial, matcher } from './match';
import { SubscriptionSource, subscribe } from './subscribable';
import { SubscriptionSource, rawSubscribe } from './subscription-source';

@@ -15,3 +15,3 @@ const DUMMY = { next() { throw new Error('dummy') } };

return yield resource(chain, function*() {
chain.subscription = yield subscribe(source);
chain.subscription = yield rawSubscribe(source);
yield;

@@ -21,2 +21,15 @@ });

static *wrap<T, TReturn, R = T>(
source: Operation<ChainableSubscription<T, TReturn>>,
fn: (inner: ChainableSubscription<T, TReturn>) => ChainableSubscription<R, TReturn>,
): Operation<ChainableSubscription<R, TReturn>> {
let chain = new ChainableSubscription<R, TReturn>(DUMMY);
return yield resource(chain, function*() {
let subscription = yield source;
chain.subscription = fn(subscription);
yield;
});
}
filter(predicate: (value: T) => boolean): ChainableSubscription<T, TReturn> {

@@ -23,0 +36,0 @@ let { subscription } = this;

export { Subscription, createSubscription } from './subscription';
export { SymbolSubscribable } from './symbol-subscribable';
export { SubscriptionSource, forEach, Subscribable } from './subscribable';
export { Subscribable } from './subscribable';
export { SubscriptionSource } from './subscription-source';
export { ChainableSubscription } from './chainable-subscription';
export { ChainableSubscribable } from './chainable-subscribable';
export { subscribe } from './subscribe';
import { Operation } from 'effection';
import { ChainableSubscription } from './chainable-subscription';
import { SubscriptionSource } from './subscribable';
import { SubscriptionSource } from './subscription-source';
import { subscribe } from './subscribe';
export function* subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<ChainableSubscription<T,TReturn>> {
return yield ChainableSubscription.of(source);
export function* forEach<T,TReturn>(source: SubscriptionSource<T,TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn> {
console.warn('`forEach(source, ...)` is deprecated, use `subscribe(source).forEach(...)` instead');
return yield subscribe<T, TReturn>(source).forEach(visit);
}
import { Operation } from 'effection';
import { Subscription, createSubscription, Subscriber } from './subscription';
import { DeepPartial, matcher } from './match';
import { Subscription } from './subscription';
import { SymbolSubscribable } from './symbol-subscribable';
import { subscribe } from './subscribe';
import { SubscriptionSource } from './subscription-source';

@@ -10,75 +11,7 @@ export interface Subscribable<T,TReturn> {

export type SubscriptionSource<T,TReturn> = Subscribable<T,TReturn> | Operation<Subscription<T,TReturn>>;
export function* forEach<T,TReturn>(source: SubscriptionSource<T,TReturn>, visit: (value: T) => Operation<void>): Operation<TReturn> {
let subscription: Subscription<T,TReturn> = yield subscribe(source);
while (true) {
let result: IteratorResult<T,TReturn> = yield subscription.next();
if (result.done) {
return result.value;
} else {
yield visit(result.value);
}
}
}
export const Subscribable = {
from: <T,TReturn>(source: SubscriptionSource<T,TReturn>) => new Chain(source)
}
export class Chain<T, TReturn> implements Subscribable<T,TReturn> {
constructor(private source: SubscriptionSource<T,TReturn>) {}
[SymbolSubscribable](): Operation<Subscription<T,TReturn>> {
return subscribe(this.source)
from<T,TReturn>(source: SubscriptionSource<T,TReturn>) {
console.warn('`Subscribable.from(source)` is deprecated, use `subscribe(source).map(...)` instead');
return subscribe(source)
}
map<X>(fn: (value: T) => X): Chain<X,TReturn> {
return this.chain(source => publish => forEach(source, function*(item) {
publish(fn(item));
}));
}
filter(predicate: (value: T) => boolean): Chain<T,TReturn> {
return this.chain(source => publish => forEach(source, function*(item) {
if (predicate(item)) {
publish(item);
}
}))
}
match(reference: DeepPartial<T>): Chain<T,TReturn> {
return this.filter(matcher(reference));
}
chain<X = T,XReturn = TReturn>(next: (source: SubscriptionSource<T,TReturn>) => Subscriber<X,XReturn>): Chain<X,XReturn> {
return new Chain(createSubscription(next(this.source)));
}
forEach(visit: (value: T) => Operation<void>): Operation<TReturn> {
return forEach(this.source, visit);
}
*first(): Operation<T | undefined> {
let subscription: Subscription<T, TReturn> = yield subscribe(this.source);
let { done, value } = yield subscription.next();
if (done) {
return undefined;
} else {
return value;
}
}
}
export function subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<Subscription<T,TReturn>> {
if (isSubscribable<T,TReturn>(source)) {
return source[SymbolSubscribable]()
} else {
return source;
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function isSubscribable<T,TReturn>(value: any): value is Subscribable<T,TReturn> {
return !!value[SymbolSubscribable];
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc