Socket
Socket
Sign inDemoInstall

@effection/subscription

Package Overview
Dependencies
Maintainers
1
Versions
151
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effection/subscription - npm Package Compare versions

Comparing version 2.0.0-beta.16 to 2.0.0-beta.17

14

CHANGELOG.md
# @effection/subscription
## \[2.0.0-beta.17]
- The `buffer` method on `Stream` returns a resource and can receive an optional limit using a ring buffer for efficient bounded caching
- [4e9cb50](https://github.com/thefrontside/effection/commit/4e9cb5006249a208c62f7de30c1fd16a713725c7) Improve process streams on 2021-09-22
- - [0248d79](https://github.com/thefrontside/effection/commit/0248d79a33dcfc4200b0832aba975c9cad08981e) Add package readmes on 2021-09-28
- Remove operation resolutions entirely, use Future instead
- [5f67d61](https://github.com/thefrontside/effection/commit/5f67d610324af158eba67be5600d413fc1f2ace1) Add changeset on 2021-09-29
- Remove the `stringBuffer` method on `Stream`
- [4e9cb50](https://github.com/thefrontside/effection/commit/4e9cb5006249a208c62f7de30c1fd16a713725c7) Improve process streams on 2021-09-22
- Add `grep` method to streams to scan items for given substring or regexp
- [2b3de48](https://github.com/thefrontside/effection/commit/2b3de4822321ba7d0c464a1088aeb0fd8356fd1f) Add grep method to Stream on 2021-09-22
- Split off `Stream` from subscription package into its own `@effection/stream` package
- [248de1d](https://github.com/thefrontside/effection/commit/248de1dd31d172762d9601a2b5acd983dce61ab0) Split `Stream` into its own package on 2021-09-27
## \[2.0.0-beta.16]

@@ -4,0 +18,0 @@

4

dist-cjs/index.d.ts

@@ -1,8 +0,4 @@

export { SymbolOperationIterable } from './symbol-operation-iterable';
export { OperationIterable } from './operation-iterable';
export { OperationIterator } from './operation-iterator';
export { Subscription } from './subscription';
export { createStream, Stream, StringBufferStream } from './stream';
export { Writable, WritableStream } from './writable-stream';
export { Queue, createQueue } from './queue';
//# sourceMappingURL=index.d.ts.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.createQueue = exports.createStream = exports.SymbolOperationIterable = void 0;
var symbol_operation_iterable_1 = require("./symbol-operation-iterable");
Object.defineProperty(exports, "SymbolOperationIterable", { enumerable: true, get: function () { return symbol_operation_iterable_1.SymbolOperationIterable; } });
var stream_1 = require("./stream");
Object.defineProperty(exports, "createStream", { enumerable: true, get: function () { return stream_1.createStream; } });
exports.createQueue = void 0;
var queue_1 = require("./queue");
Object.defineProperty(exports, "createQueue", { enumerable: true, get: function () { return queue_1.createQueue; } });
//# sourceMappingURL=index.js.map

@@ -34,19 +34,18 @@ "use strict";

let next = () => {
return {
name: `${name}.next()`,
perform(resolve) {
if (values.length) {
resolve(values.shift());
}
else {
waiters.push(resolve);
return () => {
let index = waiters.indexOf(resolve);
if (index > -1) {
waiters.splice(index, 1);
}
};
}
return core_1.withLabels((task) => {
let { future, resolve } = core_1.createFuture();
if (values.length) {
resolve(values.shift());
}
};
else {
waiters.push(resolve);
task.consume(() => {
let index = waiters.indexOf(resolve);
if (index > -1) {
waiters.splice(index, 1);
}
});
}
return future;
}, { name: `${name}.next()` });
};

@@ -53,0 +52,0 @@ function withName(operationName, operation) {

@@ -1,8 +0,4 @@

export { SymbolOperationIterable } from './symbol-operation-iterable';
export { OperationIterable } from './operation-iterable';
export { OperationIterator } from './operation-iterator';
export { Subscription } from './subscription';
export { createStream, Stream, StringBufferStream } from './stream';
export { Writable, WritableStream } from './writable-stream';
export { Queue, createQueue } from './queue';
//# sourceMappingURL=index.d.ts.map

@@ -1,4 +0,2 @@

export { SymbolOperationIterable } from './symbol-operation-iterable';
export { createStream } from './stream';
export { createQueue } from './queue';
//# sourceMappingURL=index.js.map

@@ -1,2 +0,2 @@

import { withLabels } from '@effection/core';
import { withLabels, createFuture } from '@effection/core';
export function createQueue(name = 'queue') {

@@ -31,19 +31,18 @@ let waiters = [];

let next = () => {
return {
name: `${name}.next()`,
perform(resolve) {
if (values.length) {
resolve(values.shift());
}
else {
waiters.push(resolve);
return () => {
let index = waiters.indexOf(resolve);
if (index > -1) {
waiters.splice(index, 1);
}
};
}
return withLabels((task) => {
let { future, resolve } = createFuture();
if (values.length) {
resolve(values.shift());
}
};
else {
waiters.push(resolve);
task.consume(() => {
let index = waiters.indexOf(resolve);
if (index > -1) {
waiters.splice(index, 1);
}
});
}
return future;
}, { name: `${name}.next()` });
};

@@ -50,0 +49,0 @@ function withName(operationName, operation) {

{
"name": "@effection/subscription",
"version": "2.0.0-beta.16",
"version": "2.0.0-beta.17",
"description": "Effection Subscriptions",

@@ -31,6 +31,5 @@ "main": "dist-cjs/index.js",

"dependencies": {
"@effection/core": "2.0.0-beta.14"
"@effection/core": "2.0.0-beta.15"
},
"devDependencies": {
"@effection/mocha": "2.0.0-beta.9",
"@frontside/tsconfig": "^1.2.0",

@@ -37,0 +36,0 @@ "@types/mocha": "^8.0.3",

# @effection/subscription
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Created by Frontside](https://img.shields.io/badge/created%20by-frontside-26abe8.svg)](https://frontside.com)
[![Chat on Discord](https://img.shields.io/discord/700803887132704931?Label=Discord)](https://discord.gg/Ug5nWH8)
APIs for producing, consuming and transforming streams of data within
effection operations.
[Effection][] is the structured concurrency toolkit for JavaScript. You can find
detailed information about channels, streams and subscriptions at [https://frontside.com/effection/docs/guides/collections#stream](https://frontside.com/effection/docs/guides/collections#stream)
### createSubscription(publisher)
At it's lowest level, the subscription API does not actually require
any helpers to implement, only that the subscription object itself
conform to a certain API, and that the caller respect . However, to
manually implement this API every time would be unreasonably
cumbersome. This is where `createSubscription` comes in. It returns an
operation that produces a `Subscription` from a publisher. Where
`publisher` is a fuction that takes a `publish` function and returns
an Operation that produces the return value of the subscription.
``` typescript
type Publisher<T> = (publish: (value: T) => void) => Operation<T>;
createSubscription<T, TReturn>(publisher: Publisher<T,TReturn>): Operation<Subscription<T,TReturn>>
```
the publish function is called to "push" a value out to the
subscription so that it will be returned by a subsequent call to the
`next()` operation of the subscription. Publish can be called many
times in between subsequent calls to `next` and still not lose a
value.
For example, to implement the `on` subscription for event emitters:
``` javascript
export function on(emitter, eventName) {
return createSubscription(function* (publish) {
let listener = (...args) => publish(args);
try {
emitter.on(eventName, listener);
yield;
} finally {
emitter.off(eventName, listener);
}
});
}
```
Now, any event can be consumed as a subscription:
``` javascript
let subscription = yield on(socket, 'message');
while (true) {
let { value: [message] } = subscription.next();
yield handleMessage(message);
}
```
One of the greatest advantages of using `createSubscription` is that
the `Subscription` produced is an effection resource, and so will
automatically be shut down when no longer needed. That way, there is
no need to call the `unsubscribe()` method ever.
### SymbolSubscribable
In order to facilitate interoperation of subscription producers and
consumers, any object can implement the `[SymbolSubscribable]()`
method in order to be turned into a subscription. This follows the
pattern of `Symbol.iterator`, and `Symbol.observable`. Any object that
implements this method can be consumed as a subscription.
### subscribe(source)
In order to lift functions into the context of a subscription, you can use
`subscribe` which can be used to transform subscriptions via combinators.
### map(fn)
Returns a new subscribable whose items are transformed by `fn`. For
example:
``` javascript
subscribe(websocket).map(message => JSON.parse(message));
```
### filter(predicate)
Return a new `Subscribable` that only produces items from its source
that match `predicate`.
``` javascript
subscribe(websocket).filter(message => message.type === 'command');
```
### match(reference)
Return a new `Subscribable` that only produces items from its source that match
`reference` in the sense that the produced items have the same properties and
values as `reference`.
``` javascript
subscribe(websocket).match({ type: 'command' });
```
### first()
An operation that produces the first item in a subscription or
undefined if the subscription has no items.
``` javascript
let message = yield subscribe(websocket).first();
```
### expect()
An operation that produces the first item in a subscription or
throws an error if the subscription has no items.
``` javascript
let message = yield subscribe(websocket).expect();
```
### forEach()
Calls the given operation function with each item in the subscription. Returns
the return value of the subscriopion when done.
``` javascript
let exitCode = yield subscribe(websocket).forEach(function*(message) {
// ...
});
```
[Effection]: https://frontside.com/effection

@@ -1,7 +0,3 @@

export { SymbolOperationIterable } from './symbol-operation-iterable';
export { OperationIterable } from './operation-iterable';
export { OperationIterator } from './operation-iterator';
export { Subscription } from './subscription';
export { createStream, Stream, StringBufferStream } from './stream';
export { Writable, WritableStream } from './writable-stream';
export { Queue, createQueue } from './queue';

@@ -1,2 +0,2 @@

import { Operation, withLabels } from '@effection/core';
import { Operation, withLabels, createFuture } from '@effection/core';
import { Subscription } from './index';

@@ -47,18 +47,17 @@

let next = (): Operation<IteratorResult<T, TReturn>> => {
return {
name: `${name}.next()`,
perform(resolve) {
if(values.length) {
resolve(values.shift() as IteratorResult<T, TReturn>);
} else {
waiters.push(resolve);
return () => {
let index = waiters.indexOf(resolve);
if(index > -1) {
waiters.splice(index, 1);
}
};
}
return withLabels((task) => {
let { future, resolve } = createFuture<IteratorResult<T, TReturn>>();
if(values.length) {
resolve(values.shift() as IteratorResult<T, TReturn>);
} else {
waiters.push(resolve);
task.consume(() => {
let index = waiters.indexOf(resolve);
if(index > -1) {
waiters.splice(index, 1);
}
});
}
};
return future;
}, { name: `${name}.next()` });
};

@@ -65,0 +64,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc