🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Sign inDemoInstall
Socket

corsa

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

corsa

Async iteration channels in JavaScript.

1.0.2
latest
Source
npm
Version published
Weekly downloads
8
-46.67%
Maintainers
1
Weekly downloads
 
Created
Source

Corsa

Async iteration channels in JavaScript.

NPM package Build Status

$ npm install corsa --save
import { channel } from 'corsa'

const { readable, writable } = channel()
writable.write(3)
writable.write(2)
writable.write(1)
writable.end()

for await (const value of readable) {
  console.log(value)
}
console.log('done')
output:
> 3
> 2
> 1
> done

Overview

Corsa is a library for creating buffered readable / writable channels in JavaScript. This library was specifically written to help solve backpressure issues that can occur when dealing with high frequency messaging using traditional event listeners in JavaScript.

Corsa approaches this problem by making the channels sender await and suspend at buffer capacity. This helps to ensure the senders send rate is locked to the throughput allowed by a receiver.

Requires async/await and AsyncIteration support. Tested natively on Node v10.

channel<T>

A channel is a uni-directional pipe for which data can flow. The following code creates an unbounded channel which allows for near infinite buffering of messages between writable and readable. The call to channel returns a channel object, which we destructure into the readable and writable pairs.

const { readable, writable } = channel()

The following creates a bounded channel which allows for sending 5 values before suspending (see bounded vs unbounded)

const { readable, writable }  = channel(5)

Writer<T>

The following code creates an unbounded channel and sends the values 1, 2, 3 followed by a call to end() to signal EOF to a receiver.

const { readable, writable } = channel<number>()

writable.write(1)
writable.write(2)
writable.write(3)
writable.end()

Reader<T>

The Reader<T> is the receiving side of a channel and supports for-await-of for general iteration.

const { readable, writable } = channel()
writable.write(1)
writable.write(2)
writable.write(3)
writable.end()

for await (const value of readable) {
  console.log(value)
}

bounded vs unbounded

By default all channels are unbounded but it is possible to set a fixed buffering size when creating a channel(). When setting a channel size, this will cause a writable to pause at await when sending values. The await at the writable will only occur once the channels buffer has filled with values. The writable will remained suspended until such time a receiver starts pulling values from the channel.

The following code demostrates this behavior with channel bound to a buffer of 5.

const { readable, writable } = channel(5)
await writable.write(1) 
await writable.write(2) 
await writable.write(3) 
await writable.write(4) 
await writable.write(5) // - at capacity, the readable will need to read something.

await writable.write(6) // suspend  <-----+
                        //                | - readable.read() dequeues one element from the
...                     //                |   stream which will cause the writable to resume.
                        //                |  
await readable.read()   // resume   ------+

select

This library provides a simple channel select function similar to multi channel select found in the Go programming language. It allows multiple Reader<T> types to be combined into a singular stream.

import { channel, select } from 'corsa'

function strings() {
  const { readable, writable } = channel<string>()
  setInterval(() => writable.write('hello world'), 100)
  return readable
}

function numbers() {
  const { readable, writable } = channel<number>()
  setInterval(() => writable.write(Math.random()), 200)
  return readable
}

async function start() {
  const readable = select(strings(), numbers())
  for await (const value of readable) {
    console.log(value)
  }
}

Keywords

async-iterators

FAQs

Package last updated on 19 Feb 2019

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts