Security News
Bun 1.2 Released with 90% Node.js Compatibility and Built-in S3 Object Support
Bun 1.2 enhances its JavaScript runtime with 90% Node.js compatibility, built-in S3 and Postgres support, HTML Imports, and faster, cloud-first performance.
@fine-js/channels
Advanced tools
Library for asynchronous in-process communication
based on Clojure's
excellent core.async
.
Read through introductory blog post
or watch Strange Loop presentation
for more details and inspiration.
It's queues, essentially, except there are Promise
s on top because JavaScript.
npm install @fine-js/channels
Library's source code is plain JavaScript with no runtime dependencies that works in Node.js and modern browsers.
After installing a dependency, require
the module as usual:
const {chan} = require('@fine-js/channels')
console.dir(chan())
A build for browsers is available as a single-file download
and via UNPKG (docs).
Here is an example of embedding version 0.0.2
in particular:
<script src="https://unpkg.com/@fine-js/channels@0.0.2/browser.js"
integrity="sha384-aLUgfMcOf6P0qxZ4k0e084VdlxfruOqU0zXhYBSZS28Y07u7Zoo1NBbYnNpNynck"
crossorigin="anonymous"></script>
<script>
const {chan} = window.finejs.channels
console.dir(chan())
</script>
Alternatively, you can build your own version with Browserify
or any other bundler, as well as serve require.resolve('@fine-js/channels/browser.js')
from your server directly.
TODO…
Channel is a plain object with two operations:
put(ch, val)
puts a value onto the channeltake(ch)
takes a value from the channelPort is a description of either operation:
ch
is a port representing taking a value from channel ch
[ch, val]
is a port representing putting val
onto the channel ch
Each channel can be backed by a buffer of given capacity.
Parking is creating and returning a promise that will possibly never resolve, or resolve at indefinite time in the future. This is in contrast to immediately which is used in this document to roughly mean guaranteed resolution in the near future (often, after a couple of microtask ticks).
chan()
creates a channelput()
puts a value onto the channeltake()
takes a value from the channelclose()
closes a channelalt()
runs at most one of the given operations returning result of handleralts()
runs at most one of the given portspoll()
takes a value from channel, when immediately possibleoffer()
puts a value onto channel, when immediately possibletimeout()
creates a self-closing channelunbuffered()
for directly connecting writers to readersbuffer()
up to given capacitydropping()
ignores writes over capacitysliding()
writes over capacity remove oldest written valuech[Symbol.asyncIterator]
makes channel iterable with for await…of
chan()
⬏Creates a channel with an optional buffer:
chan()
unbuffered channelchan(capacity)
blocking buffer of given capacitychan(buffer)
passed bufferchan()
chan(10)
chan(sliding(1))
put()
⬏Puts a value onto the channel. null
or undefined
are not allowed.
Will park if no buffer space is available.
Resolves to true
unless channel is already closed.
put(ch, 42)
ch.put(42)
take()
⬏Takes a value from the channel. Will park if nothing is available.
Resolves to a value taken or null
if channel is already closed.
take(ch)
ch.take()
close()
⬏Closes given channel. The channel will no longer accept any puts (they will be ignored).
Data in the channel remains available for taking, until exhausted,
after which takes will return null
.
If there are any pending takes, they will be dispatched with null
.
Any parked puts will remain so until a taker releases them.
Closing a closed channel is a no-op. Returns null
.
close(ch)
ch.close()
timeout()
⬏Returns a channel that will close after given number of millisecods.
timeout(100)
alt()
⬏Executes one of several channel operations via alts()
and passess its result to a handler.
Accepts a list of operations follwed by corresponding handler and options to pass to alts()
, where:
(val, channel)
Each channel may only appear once.
Resolves to handler's result if it's a function, handler itself otherwise.
alt([
a,
(val) => console.log('read %o from a', val),
[b, c],
(val, ch) => console.log('read %o from %o', val, ch),
[[ch, 42]],
'wrote 42 to ch',
])
alt([
[[worker1, task], [worker2, task], [worker3, task]],
`task ${task.id} queued`,
], {default: () => `try again later in ${waittime()} seconds`})
alt([
results,
(val) => ({status: 200, text: val}),
timeout(150),
{status: 504},
], {priority: true})
alts()
⬏Completes at most one of several ports.
Unless the priority
option is true
, if more than one port is ready,
a non-deterministic choice will be made. If no operation is
ready and a default
value is supplied, [default-val, alts.default]
will
be returned. Otherwise alts
will park until the first operation to
become ready completes.
Resolves to [val port]
of the completed operation,
where val
is the value taken for takes,
and a boolean (true
unless already closed, as per ch.put
) for puts.
alts([ch1, ch2, ch3])
alts([
[worker1, task],
[worker2, task],
[worker3, task],
], {default: 'try again later'})
alts([
results,
timeout(150),
], {priority: true})
poll()
⬏Takes a value from the channel, when immediately possible.
Resolves a value taken if successful, null
otherwise.
poll(ch)
offer()
⬏Puts a value into the channel, when immediately possible.
Resolves to true
if offer succeeds.
offer(ch, 42)
unbuffered()
⬏Giving this buffer to a channel will make it act as a synchronisation point between readers and writers: puts are parked until the value is taken by consumer.
This is a buffer channel will get when you call chan()
or chan(0)
.
buffer()
⬏Creates a blocking buffer:
buffer()
with capacity 1buffer(capacity)
with given capacity.When full, puts will be parked.
ch(16)
ch(buffer(16))
dropping()
⬏Creates a dropping non-blocking buffer:
dropping()
with capacity 1dropping(capacity)
with given capacityWhen full, puts will complete puts, but values discarded.
ch(dropping(3000))
sliding()
⬏Creates a sliding non-blocking buffer:
sliding()
with capacity 1sliding(capacity)
with given capacityWhen full, puts will complete puts, thier values buffered, but oldest elements in buffer will be dropped.
ch(sliding(1))
ch[Symbol.asyncIterator]()
⬏Returns Async Iterator over values being put onto the channel. Not meant to be used directly, instead:
// Simply consuming everything until the channel closes.
for await (const message of ch)
console.log(message)
// Note that the loop's body will NOT see every single message being put onto the channel,
// since, effectively, this calls `take()` on each iteration and somewhat equivalent to:
while (!closed(ch))
console.log(await ch.take())
// Reading until we see a sentinel value
for await (const byte of ch)
if (byte === 0)
break;
Read more over at MDN:
FAQs
Bits of Clojure's `core.async` ported to JS
We found that @fine-js/channels demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Bun 1.2 enhances its JavaScript runtime with 90% Node.js compatibility, built-in S3 and Postgres support, HTML Imports, and faster, cloud-first performance.
Security News
Biden's executive order pushes for AI-driven cybersecurity, software supply chain transparency, and stronger protections for federal and open source systems.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.