Channels.js
Library for asynchronous in-process communication
based on Clojure's
excellent core.async
.
Read through introductory blog post
or watch Strange Loop presentation
for more details and inspiration.
It's queues, essentially, except there are Promise
s on top because JavaScript.
npm install @fine-js/channels
Supported Platforms
Library's source code is plain JavaScript with no runtime dependencies that works in Node.js
and modern browsers.
Node.js
After installing a dependency, require
the module as usual:
const {chan} = require('@fine-js/channels')
console.dir(chan())
Browsers
A build for browsers is available as a single-file download
and via UNPKG (docs).
Here is an example of embedding version 0.0.2
in particular:
<script src="https://unpkg.com/@fine-js/channels@0.0.2/browser.js"
integrity="sha384-aLUgfMcOf6P0qxZ4k0e084VdlxfruOqU0zXhYBSZS28Y07u7Zoo1NBbYnNpNynck"
crossorigin="anonymous"></script>
<script>
const {chan} = window.finejs.channels
console.dir(chan())
</script>
Alternatively, you can build your own version with Browserify
or any other bundler, as well as serve require.resolve('@fine-js/channels/browser.js')
from your server directly.
Overview
TODO…
Terminology
Channel is a plain object with two operations:
put(ch, val)
puts a value onto the channeltake(ch)
takes a value from the channel
Port is a description of either operation:
ch
is a port representing taking a value from channel ch
[ch, val]
is a port representing putting val
onto the channel ch
Each channel can be backed by a buffer of given capacity.
Parking is creating and returning a promise that will possibly never resolve,
or resolve at indefinite time in the future. This is in contrast
to immediately which is used in this document to roughly mean guaranteed resolution
in the near future (often, after a couple of microtask ticks).
API
Basics
chan()
creates a channelput()
puts a value onto the channeltake()
takes a value from the channelclose()
closes a channel
Core
alt()
runs at most one of the given operations returning result of handleralts()
runs at most one of the given portspoll()
takes a value from channel, when immediately possibleoffer()
puts a value onto channel, when immediately possibletimeout()
creates a self-closing channel
Buffers
chan()
⬏
Creates a channel with an optional buffer:
chan()
unbuffered channelchan(capacity)
blocking buffer of given capacitychan(buffer)
passed buffer
chan()
chan(10)
chan(sliding(1))
put()
⬏
Puts a value onto the channel. null
or undefined
are not allowed.
Will park if no buffer space is available.
Resolves to true
unless channel is already closed.
put(ch, 42)
ch.put(42)
take()
⬏
Takes a value from the channel. Will park if nothing is available.
Resolves to a value taken or null
if channel is already closed.
take(ch)
ch.take()
close()
⬏
Closes given channel. The channel will no longer accept any puts (they will be ignored).
Data in the channel remains available for taking, until exhausted,
after which takes will return null
.
If there are any pending takes, they will be dispatched with null
.
Any parked puts will remain so until a taker releases them.
Closing a closed channel is a no-op. Returns null
.
close(ch)
ch.close()
timeout()
⬏
Returns a channel that will close after given number of millisecods.
timeout(100)
alt()
⬏
Executes one of several channel operations via alts()
and passess its result to a handler.
Accepts a list of operations follwed by corresponding handler and options to pass to alts()
, where:
- each operation is either
- single channel to take from
- array of ports
- each handler is either
- a function accepting
(val, channel)
- anything else
Each channel may only appear once.
Resolves to handler's result if it's a function, handler itself otherwise.
alt([
a,
(val) => console.log('read %o from a', val),
[b, c],
(val, ch) => console.log('read %o from %o', val, ch),
[[ch, 42]],
'wrote 42 to ch',
])
alt([
[[worker1, task], [worker2, task], [worker3, task]],
`task ${task.id} queued`,
], {default: () => `try again later in ${waittime()} seconds`})
alt([
results,
(val) => ({status: 200, text: val}),
timeout(150),
{status: 504},
], {priority: true})
alts()
⬏
Completes at most one of several ports.
Unless the priority
option is true
, if more than one port is ready,
a non-deterministic choice will be made. If no operation is
ready and a default
value is supplied, [default-val, alts.default]
will
be returned. Otherwise alts
will park until the first operation to
become ready completes.
Resolves to [val port]
of the completed operation,
where val
is the value taken for takes,
and a boolean (true
unless already closed, as per ch.put
) for puts.
alts([ch1, ch2, ch3])
alts([
[worker1, task],
[worker2, task],
[worker3, task],
], {default: 'try again later'})
alts([
results,
timeout(150),
], {priority: true})
poll()
⬏
Takes a value from the channel, when immediately possible.
Resolves a value taken if successful, null
otherwise.
poll(ch)
offer()
⬏
Puts a value into the channel, when immediately possible.
Resolves to true
if offer succeeds.
offer(ch, 42)
unbuffered()
⬏
Giving this buffer to a channel will make it act as a synchronisation point
between readers and writers: puts are parked until the value is taken by consumer.
This is a buffer channel will get when you call chan()
or chan(0)
.
buffer()
⬏
Creates a blocking buffer:
buffer()
with capacity 1buffer(capacity)
with given capacity.
When full, puts will be parked.
ch(16)
ch(buffer(16))
dropping()
⬏
Creates a dropping non-blocking buffer:
dropping()
with capacity 1dropping(capacity)
with given capacity
When full, puts will complete puts, but values discarded.
ch(dropping(3000))
sliding()
⬏
Creates a sliding non-blocking buffer:
sliding()
with capacity 1sliding(capacity)
with given capacity
When full, puts will complete puts, thier values buffered, but oldest
elements in buffer will be dropped.
ch(sliding(1))
ch[Symbol.asyncIterator]()
⬏
Returns Async Iterator over values being put onto the channel.
Not meant to be used directly, instead:
for await (const message of ch)
console.log(message)
while (!closed(ch))
console.log(await ch.take())
for await (const byte of ch)
if (byte === 0)
break;
Read more over at MDN: