@effection/subscription
Advanced tools
Comparing version 2.0.0-beta.16 to 2.0.0-beta.17
# @effection/subscription | ||
## \[2.0.0-beta.17] | ||
- The `buffer` method on `Stream` returns a resource and can receive an optional limit using a ring buffer for efficient bounded caching | ||
- [4e9cb50](https://github.com/thefrontside/effection/commit/4e9cb5006249a208c62f7de30c1fd16a713725c7) Improve process streams on 2021-09-22 | ||
- - [0248d79](https://github.com/thefrontside/effection/commit/0248d79a33dcfc4200b0832aba975c9cad08981e) Add package readmes on 2021-09-28 | ||
- Remove operation resolutions entirely, use Future instead | ||
- [5f67d61](https://github.com/thefrontside/effection/commit/5f67d610324af158eba67be5600d413fc1f2ace1) Add changeset on 2021-09-29 | ||
- Remove the `stringBuffer` method on `Stream` | ||
- [4e9cb50](https://github.com/thefrontside/effection/commit/4e9cb5006249a208c62f7de30c1fd16a713725c7) Improve process streams on 2021-09-22 | ||
- Add `grep` method to streams to scan items for given substring or regexp | ||
- [2b3de48](https://github.com/thefrontside/effection/commit/2b3de4822321ba7d0c464a1088aeb0fd8356fd1f) Add grep method to Stream on 2021-09-22 | ||
- Split off `Stream` from subscription package into its own `@effection/stream` package | ||
- [248de1d](https://github.com/thefrontside/effection/commit/248de1dd31d172762d9601a2b5acd983dce61ab0) Split `Stream` into its own package on 2021-09-27 | ||
## \[2.0.0-beta.16] | ||
@@ -4,0 +18,0 @@ |
@@ -1,8 +0,4 @@ | ||
export { SymbolOperationIterable } from './symbol-operation-iterable'; | ||
export { OperationIterable } from './operation-iterable'; | ||
export { OperationIterator } from './operation-iterator'; | ||
export { Subscription } from './subscription'; | ||
export { createStream, Stream, StringBufferStream } from './stream'; | ||
export { Writable, WritableStream } from './writable-stream'; | ||
export { Queue, createQueue } from './queue'; | ||
//# sourceMappingURL=index.d.ts.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.createQueue = exports.createStream = exports.SymbolOperationIterable = void 0; | ||
var symbol_operation_iterable_1 = require("./symbol-operation-iterable"); | ||
Object.defineProperty(exports, "SymbolOperationIterable", { enumerable: true, get: function () { return symbol_operation_iterable_1.SymbolOperationIterable; } }); | ||
var stream_1 = require("./stream"); | ||
Object.defineProperty(exports, "createStream", { enumerable: true, get: function () { return stream_1.createStream; } }); | ||
exports.createQueue = void 0; | ||
var queue_1 = require("./queue"); | ||
Object.defineProperty(exports, "createQueue", { enumerable: true, get: function () { return queue_1.createQueue; } }); | ||
//# sourceMappingURL=index.js.map |
@@ -34,19 +34,18 @@ "use strict"; | ||
let next = () => { | ||
return { | ||
name: `${name}.next()`, | ||
perform(resolve) { | ||
if (values.length) { | ||
resolve(values.shift()); | ||
} | ||
else { | ||
waiters.push(resolve); | ||
return () => { | ||
let index = waiters.indexOf(resolve); | ||
if (index > -1) { | ||
waiters.splice(index, 1); | ||
} | ||
}; | ||
} | ||
return core_1.withLabels((task) => { | ||
let { future, resolve } = core_1.createFuture(); | ||
if (values.length) { | ||
resolve(values.shift()); | ||
} | ||
}; | ||
else { | ||
waiters.push(resolve); | ||
task.consume(() => { | ||
let index = waiters.indexOf(resolve); | ||
if (index > -1) { | ||
waiters.splice(index, 1); | ||
} | ||
}); | ||
} | ||
return future; | ||
}, { name: `${name}.next()` }); | ||
}; | ||
@@ -53,0 +52,0 @@ function withName(operationName, operation) { |
@@ -1,8 +0,4 @@ | ||
export { SymbolOperationIterable } from './symbol-operation-iterable'; | ||
export { OperationIterable } from './operation-iterable'; | ||
export { OperationIterator } from './operation-iterator'; | ||
export { Subscription } from './subscription'; | ||
export { createStream, Stream, StringBufferStream } from './stream'; | ||
export { Writable, WritableStream } from './writable-stream'; | ||
export { Queue, createQueue } from './queue'; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,4 +0,2 @@ | ||
export { SymbolOperationIterable } from './symbol-operation-iterable'; | ||
export { createStream } from './stream'; | ||
export { createQueue } from './queue'; | ||
//# sourceMappingURL=index.js.map |
@@ -1,2 +0,2 @@ | ||
import { withLabels } from '@effection/core'; | ||
import { withLabels, createFuture } from '@effection/core'; | ||
export function createQueue(name = 'queue') { | ||
@@ -31,19 +31,18 @@ let waiters = []; | ||
let next = () => { | ||
return { | ||
name: `${name}.next()`, | ||
perform(resolve) { | ||
if (values.length) { | ||
resolve(values.shift()); | ||
} | ||
else { | ||
waiters.push(resolve); | ||
return () => { | ||
let index = waiters.indexOf(resolve); | ||
if (index > -1) { | ||
waiters.splice(index, 1); | ||
} | ||
}; | ||
} | ||
return withLabels((task) => { | ||
let { future, resolve } = createFuture(); | ||
if (values.length) { | ||
resolve(values.shift()); | ||
} | ||
}; | ||
else { | ||
waiters.push(resolve); | ||
task.consume(() => { | ||
let index = waiters.indexOf(resolve); | ||
if (index > -1) { | ||
waiters.splice(index, 1); | ||
} | ||
}); | ||
} | ||
return future; | ||
}, { name: `${name}.next()` }); | ||
}; | ||
@@ -50,0 +49,0 @@ function withName(operationName, operation) { |
{ | ||
"name": "@effection/subscription", | ||
"version": "2.0.0-beta.16", | ||
"version": "2.0.0-beta.17", | ||
"description": "Effection Subscriptions", | ||
@@ -31,6 +31,5 @@ "main": "dist-cjs/index.js", | ||
"dependencies": { | ||
"@effection/core": "2.0.0-beta.14" | ||
"@effection/core": "2.0.0-beta.15" | ||
}, | ||
"devDependencies": { | ||
"@effection/mocha": "2.0.0-beta.9", | ||
"@frontside/tsconfig": "^1.2.0", | ||
@@ -37,0 +36,0 @@ "@types/mocha": "^8.0.3", |
130
README.md
# @effection/subscription | ||
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) | ||
[![Created by Frontside](https://img.shields.io/badge/created%20by-frontside-26abe8.svg)](https://frontside.com) | ||
[![Chat on Discord](https://img.shields.io/discord/700803887132704931?Label=Discord)](https://discord.gg/Ug5nWH8) | ||
APIs for producing, consuming and transforming streams of data within | ||
effection operations. | ||
[Effection][] is the structured concurrency toolkit for JavaScript. You can find | ||
detailed information about channels, streams and subscriptions at [https://frontside.com/effection/docs/guides/collections#stream](https://frontside.com/effection/docs/guides/collections#stream) | ||
### createSubscription(publisher) | ||
At it's lowest level, the subscription API does not actually require | ||
any helpers to implement, only that the subscription object itself | ||
conform to a certain API, and that the caller respect . However, to | ||
manually implement this API every time would be unreasonably | ||
cumbersome. This is where `createSubscription` comes in. It returns an | ||
operation that produces a `Subscription` from a publisher. Where | ||
`publisher` is a fuction that takes a `publish` function and returns | ||
an Operation that produces the return value of the subscription. | ||
``` typescript | ||
type Publisher<T> = (publish: (value: T) => void) => Operation<T>; | ||
createSubscription<T, TReturn>(publisher: Publisher<T,TReturn>): Operation<Subscription<T,TReturn>> | ||
``` | ||
the publish function is called to "push" a value out to the | ||
subscription so that it will be returned by a subsequent call to the | ||
`next()` operation of the subscription. Publish can be called many | ||
times in between subsequent calls to `next` and still not lose a | ||
value. | ||
For example, to implement the `on` subscription for event emitters: | ||
``` javascript | ||
export function on(emitter, eventName) { | ||
return createSubscription(function* (publish) { | ||
let listener = (...args) => publish(args); | ||
try { | ||
emitter.on(eventName, listener); | ||
yield; | ||
} finally { | ||
emitter.off(eventName, listener); | ||
} | ||
}); | ||
} | ||
``` | ||
Now, any event can be consumed as a subscription: | ||
``` javascript | ||
let subscription = yield on(socket, 'message'); | ||
while (true) { | ||
let { value: [message] } = subscription.next(); | ||
yield handleMessage(message); | ||
} | ||
``` | ||
One of the greatest advantages of using `createSubscription` is that | ||
the `Subscription` produced is an effection resource, and so will | ||
automatically be shut down when no longer needed. That way, there is | ||
no need to call the `unsubscribe()` method ever. | ||
### SymbolSubscribable | ||
In order to facilitate interoperation of subscription producers and | ||
consumers, any object can implement the `[SymbolSubscribable]()` | ||
method in order to be turned into a subscription. This follows the | ||
pattern of `Symbol.iterator`, and `Symbol.observable`. Any object that | ||
implements this method can be consumed as a subscription. | ||
### subscribe(source) | ||
In order to lift functions into the context of a subscription, you can use | ||
`subscribe` which can be used to transform subscriptions via combinators. | ||
### map(fn) | ||
Returns a new subscribable whose items are transformed by `fn`. For | ||
example: | ||
``` javascript | ||
subscribe(websocket).map(message => JSON.parse(message)); | ||
``` | ||
### filter(predicate) | ||
Return a new `Subscribable` that only produces items from its source | ||
that match `predicate`. | ||
``` javascript | ||
subscribe(websocket).filter(message => message.type === 'command'); | ||
``` | ||
### match(reference) | ||
Return a new `Subscribable` that only produces items from its source that match | ||
`reference` in the sense that the produced items have the same properties and | ||
values as `reference`. | ||
``` javascript | ||
subscribe(websocket).match({ type: 'command' }); | ||
``` | ||
### first() | ||
An operation that produces the first item in a subscription or | ||
undefined if the subscription has no items. | ||
``` javascript | ||
let message = yield subscribe(websocket).first(); | ||
``` | ||
### expect() | ||
An operation that produces the first item in a subscription or | ||
throws an error if the subscription has no items. | ||
``` javascript | ||
let message = yield subscribe(websocket).expect(); | ||
``` | ||
### forEach() | ||
Calls the given operation function with each item in the subscription. Returns | ||
the return value of the subscriopion when done. | ||
``` javascript | ||
let exitCode = yield subscribe(websocket).forEach(function*(message) { | ||
// ... | ||
}); | ||
``` | ||
[Effection]: https://frontside.com/effection |
@@ -1,7 +0,3 @@ | ||
export { SymbolOperationIterable } from './symbol-operation-iterable'; | ||
export { OperationIterable } from './operation-iterable'; | ||
export { OperationIterator } from './operation-iterator'; | ||
export { Subscription } from './subscription'; | ||
export { createStream, Stream, StringBufferStream } from './stream'; | ||
export { Writable, WritableStream } from './writable-stream'; | ||
export { Queue, createQueue } from './queue'; |
@@ -1,2 +0,2 @@ | ||
import { Operation, withLabels } from '@effection/core'; | ||
import { Operation, withLabels, createFuture } from '@effection/core'; | ||
import { Subscription } from './index'; | ||
@@ -47,18 +47,17 @@ | ||
let next = (): Operation<IteratorResult<T, TReturn>> => { | ||
return { | ||
name: `${name}.next()`, | ||
perform(resolve) { | ||
if(values.length) { | ||
resolve(values.shift() as IteratorResult<T, TReturn>); | ||
} else { | ||
waiters.push(resolve); | ||
return () => { | ||
let index = waiters.indexOf(resolve); | ||
if(index > -1) { | ||
waiters.splice(index, 1); | ||
} | ||
}; | ||
} | ||
return withLabels((task) => { | ||
let { future, resolve } = createFuture<IteratorResult<T, TReturn>>(); | ||
if(values.length) { | ||
resolve(values.shift() as IteratorResult<T, TReturn>); | ||
} else { | ||
waiters.push(resolve); | ||
task.consume(() => { | ||
let index = waiters.indexOf(resolve); | ||
if(index > -1) { | ||
waiters.splice(index, 1); | ||
} | ||
}); | ||
} | ||
}; | ||
return future; | ||
}, { name: `${name}.next()` }); | ||
}; | ||
@@ -65,0 +64,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
6
40739
39
441
10
+ Added@effection/core@2.0.0-beta.15(transitive)
- Removed@effection/core@2.0.0-beta.14(transitive)