
Security News
Socket Security Analysis Is Now One Click Away on npm
npm now links to Socket's security analysis on every package page. Here's what you'll find when you click through.
@ggoodman/channels
Advanced tools
An implementation of a `Channel` primitive that can be used to decouple producers and consumers in concurrent code.
An implementation of a Channel primitive that can be used to decouple producers and consumers in concurrent code.
Features:
npm install --save @ggoodman/channels
or
yarn add @ggoodman/channels
import { channel, Channel } from './';
const timeout = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const createBall = () => ({ hits: 0, status: '' });
type Ball = ReturnType<typeof createBall>;
const wiff = channel<Ball>();
const waff = channel<Ball>();
const createBat = async (inbound: Channel<Ball>, outbound: Channel<Ball>) => {
for await (const ball of inbound) {
ball.hits++;
ball.status = ball.status === 'wiff!' ? 'waff!' : 'wiff!';
console.log(`🎾 Ball hit ${ball.hits} time(s), ${ball.status}`);
await outbound.put(ball); // smash the ball back
await timeout(500); // assume it's going to take a bit to hit the ball
}
console.log(`🛑 OK, fun's over`);
};
process.on('SIGINT', () => {
// Important to close BOTH sides because if the timing is unlucky, you read from
// the open side and attempt to write to the closed side.
wiff.close();
waff.close();
});
createBat(waff, wiff); // create a bat that will wiff waffs
createBat(wiff, waff); // create a bat that will waff wiffs
waff.put(createBall());
Writing to a channel is a blocking operation. That is, it returns a Promise<boolean>. Code that is writing to a channel should typically block on the write using something like:
await channel.put(message);
The returned Promise will only resolve at the earlier of:
false.true.However, sometimes you may want to be able to write a certain number of messages to a Channel before any readers start consuming messages. In a single-threaded environment, like JavaScript, this could easily lead to a deadlocked program; execution will never proceed to the logic consuming from the channel since it is blocked on writing to it.
To get around this, a Channel may be created with a buffer size. As many messages as the buffer size can be written to the Channel before calls to put will block on a read.
A Channel can be closed, signalling that further calls to put should be treaded as exceptions. When a channel is closed, all outstanding Promises for pending writes will resolve with the value false.
Note: This means that buffered messages that had been accepted but not consumed will be lost.
All outstanding, and new iterations consuming the channel will also immediately complete. This also means that iterations racing multiple channels will also immediately stop iteration.
Blocking reads from a channel (such as those waiting for a value to be written to the channel) will resolve to the special closed signal.
Channel interface/**
* A CSP-style channel.
*
* Represents an optionally-buffered unidirectional channel for typed messages.
*/
export interface Channel<T> {
/**
* Indication of whether this channel is closed.
*/
readonly closed: boolean;
/**
* The number of queued messages in the channel.
*/
readonly size: number;
/**
* Closes the channel.
*
* This immediately causes all pending `Promise`s to resolve
* with the `closed` symbol. It will immediately cause all `AsyncIterator`s to
* return, skipping further iteration.
*
* Attempting to call `put` on a closed channel will throw a {@link ChannelClosedError}.
* Subsequent calls to {@link Channel.take | take} or {@link select} that include this channel will
* immediately resolve / complete iteration.
*/
close(): void;
/**
* Write a message to the channel.
*
* If the channel is closed, a {@link ChannelClosedError} exception will be thrown. If
* the channel has a fixed buffer size and this message would cause that buffer to
* be exceeded, the message will be discarded and this function will return `false`.
*
* @param msg The message to write to the channel
* @returns A promise for whether the message was accepted or not. This will always be true for
* unbuffered channels.
*/
put(msg: T): Promise<boolean>;
/**
* Waits for a message (or `AsyncIterable` therefor).
*
* This is the method that a consumer will be calling to obtain a value (or stream
* thereof) from the {@link Channel}.
*
* When the channel is _closed_ and this method's return value is used as a `Promise`,
* the returned promise will resolve to the {@link closed} symbol. When the channel
* is _closed_ and this method's return value is used as an `AsyncIterable`, the
* iterable will not produce any values and will immediately 'return'.
*
* @returns An {@link IterablePromise}, so the return value can either be used as a
* `Promise` or as an `AsyncIterable`.
*/
take(): IterablePromise<TakeValue<T>, T>;
}
IterablePromise type/**
* A `Promise` that is also an `AsyncIterable`.
*
* This is a utility type to represent objects that can be used both as a `Promise`
* or as an `AsyncIterable`.
*
* @template T The message type that will resolve when used as a `Promise`.
* @template U The message type that will be produced when used as an `AsyncIterable`.
*/
export type IterablePromise<T, U = T> = Promise<T> & AsyncIterable<U>;
import { channel, closed } from '@ggoodman/channels';
const ch = channel();
const msg = await ch.take(); // Use the value as a promise
if (msg === closed) {
// Ignore this?
}
// Do something with msg
import { channel } from '@ggoodman/channels';
const ch = channel();
for await (const msg of ch.take()) {
// Do something with msg
}
channel(bufferSize?: number): ChannelCreate a new Channel with an optional buffer size.
closed: unique symbolA Symbol to which calls to Channel.read() might resolve if the target Channel is closed.
Important: When using blocking reads, it is important to test resolved values against
closed.
select({ [key: string]: Channel }): IterablePromise<{ key: string, value: unknown }>Race multiple channels against each other. This api can be used both as a Promise or an AsyncIterable. In either case, the resolved value will be an object with the shape { key, value }. The key is the key in the object passed to select to identify from which channel the value was produced.
ChannelClosedError constructorThe constructor function for the ChannelClosedError errors that will be thrown when attempting to write to a closed channel.
select to be fully typed.FAQs
An implementation of a closable, tailable `Channel` primitive in idiomatic JavaScript that can be used to decouple producers and consumers in concurrent code.
We found that @ggoodman/channels demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
npm now links to Socket's security analysis on every package page. Here's what you'll find when you click through.

Security News
A compromised npm publish token was used to push a malicious postinstall script in cline@2.3.0, affecting the popular AI coding agent CLI with 90k weekly downloads.

Product
Socket is now scanning AI agent skills across multiple languages and ecosystems, detecting malicious behavior before developers install, starting with skills.sh's 60,000+ skills.