New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

unbounded-async-channel

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

unbounded-async-channel

A simple unbounded async channel implementation for Node.js and the browser.

  • 0.1.5
  • latest
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
1
decreased by-50%
Maintainers
1
Weekly downloads
 
Created
Source

Coverage Status npm NPM

Documentation

The unbounded-async-channel library provides a simple unbounded channel that allows for concurrent reading and writing of values.

Writing to the channel is synchronous and non-blocking, while reading from the channel blocks until a value is written or the channel is closed.

If there are no readers when a value is written, the value is buffered and available for future readers.

Installation

npm i unbounded-async-channel

Basic Usage

import { createUnboundedAsyncChannel } from "unbounded-async-channel";

// create an UnboundedAsyncChannel with the provided type
const channel = createUnboundedAsyncChannel<number>();

// writing to channel is a non blocking operation
channel.write(1);

// read() blocks until a value is available or the channel is closed. read() doesn't throw any error
const res = await channel.read();

if (res.closed) {
  if (res.error) {
    console.log("Channel closed with error", res.error);
  } else {
    console.log("Channel closed without any error");
  }
} else {
  console.log(res.value);
}

Reading values using for await loop

UnboundedAsyncChannel can be iterated using for await loop.

import { createUnboundedAsyncChannel } from "unbounded-async-channel";

const channel = createUnboundedAsyncChannel<number>();

for (let i = 0; i < 10; i++) {
  channel.write(i);
}

channel.close();

try {
  for await (const value of channel) {
    console.log(value);
  }
  console.log("channel closed without error");
} catch (err) {
  console.log("channel closed with error");
}

Closing a channel with error

If a channel is closed with an error and there are no readers, a future reader can receive that error after consuming all the buffered values in the channel.

A for await loop will also first consume all buffered values before throwing an error. If the channel was closed without any error, a for await loop will cleanly end.

Clearing a channel after closing

A closed channel may contain buffered values. To clear all buffered values consume all the messages

channel.close();
// after closing the channel, this will not block
for await (const value of channel) {
}

Example: Concurrent processing of tasks

import { createUnboundedAsyncChannel } from "unbounded-async-channel";

// channel to write tasks to
const tasks = createUnboundedAsyncChannel<string>();

// channel to write task results to
const taskResults = createUnboundedAsyncChannel<string>();

// queue all the tasks to be processed later
for (let i = 0; i < 1000; i++) {
  tasks.write(`some_url_${i}`);
}

// close the channel without passing an error
tasks.close();

// array to store worker promises
const workerPromises = [];

// create 10 workers to process the tasks
for (let i = 0; i < 10; i++) {
  workerPromises.push(
    new Promise<void>(async (resolve) => {
      try {
        /**
         * - Error is thrown only if the channel is closed with error parameter.
         * - Also error is not thrown immediately after closing the channel but
         *   instead after all buffered values are consumed
         */
        for await (const url of tasks) {
          // process the task
          const result = `processed ${url}`;
          taskResults.write(result);
        }
      } catch (e) {
        console.log("channel closed with error", e);
      }

      resolve();
    }),
  );
}

// wait for all workers to finish and then close the taskResults channel
Promise.all(workerPromises).then(() => {
  taskResults.close();
});

// consume the task results
for await (const result of taskResults) {
  console.log(result);
}

Keywords

FAQs

Package last updated on 11 Sep 2023

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc