Core Async JS ·

CoreAsyncJS is a JS library that implements the core aspects of the CSP (Communicating Sequential Processes) patterns. CoreAsyncJS uses Channels to synchronise concurrent processes. It is inspired from the Clojure core.async library and designed to be used with the npm package 'co' (shipped with this package). It also exposes composable APIs called transducers to help filter, map and, more generally speaking, reduce streams of data.
const { co, Channel } = require('core-async')
const inChan = new Channel()
const outChan = new Channel()
co(function *(){
while (true) {
yield inChan.put(Math.random())
const msg = outChan.stake()
if (msg)
console.log(msg)
}
})
co(function *(){
while (true) {
const val = yield inChan.take()
if (val >= 0.250 && val <= 0.252)
yield outChan.put(`We've found a winner: ${val}`)
}
})
console.log('Program has started.')
For more advanced examples, please refer to the Examples sections.
Table of Contents
Install
npm i core-async
APIs
Channel
The Channel object supports three different signatures:
new Channel(<Int> buffer, <Object> options)
new Channel(<Int> buffer, <Func> transducer, <Object> options)
new Channel(<Func> transducer, <Object> options)
Buffered vs Unbuffered Channels
const { co, Channel } = require('core-async')
const bufferedChan = new Channel(2)
const unbufferedChan = new Channel()
co(function *(){
yield bufferedChan.put(1)
console.log('1 has been added on the unbuffered channel.')
yield bufferedChan.put(2)
console.log('2 has been added on the unbuffered channel.')
yield bufferedChan.put(3)
console.log('3 has been added on the unbuffered channel.')
})
co(function *(){
yield unbufferedChan.put(1)
console.log('1 has been added on the buffered channel.')
})
Dropping and Sliding Channels
const { Channel } = require('core-async')
const droppingChan = new Channel(2, { mode: 'dropping' })
const slidingChan = new Channel(2, { mode: 'sliding' })
More detailed doc coming soon...
put - take - sput - stake
Example:
const co = require('co')
const { Channel } = require('core-async')
const chan = new Channel()
co(function *() {
yield chan.put(1)
console.log('PUT 1 DONE')
const sputSucceeded = chan.sput(2)
console.log(`SPUT 2 SUCCESSED? ${sputSucceeded}`)
})
co(function *() {
const val = yield chan.take()
console.log(`TAKE VALUE: ${val}`)
const stakeSucceeded = chan.stake()
console.log(`STAKE SUCCESSED? ${stakeSucceeded}`)
})
Channel.put( brick): <Promise>
Takes any object and yield a promise that returns a boolean. That boolean indicates whether or not the brick was successfully taken by another process. The most common case where this api yields false occurs when the channel uses a transducer that filtered the brick out. Example:
const chan = new Channel(filter(x < 10))
co(function *(){
const status = yield chan.put(1)
if (!status)
console.log('1 does not match the criteria to be added in this channel.')
else
console.log('1 has been taken from this channel.')
})
Channel.take(): <Promise>
Doc coming soon...
Channel.sput( brick): Boolean
Doc coming soon...
Channel.stake(): Object
Doc coming soon...
Composable transducers
A Transducer is a fancy name for a function that can transform data. Why not call this a transformer then? Well, this is indeed a transformer, but it has an extra key property thanks to the way it is implemented: Composition. You can compose multiple transducers together to make a new one, which really helps with code reusability and better encapsulation and separation of concerns. Under the hood, this is achieved by leveraging reducers, hence the name transducer (transform, reduce, tadaaa). The following two examples should help make those concepts clearer:
Creating a channel that only accepts numbers strictly greater than one:
const { co, Channel, transducer: { filter } } = require('./src')
const chan = new Channel(filter(x => x > 1))
co(function *() {
let status = yield chan.put(1)
if (!status) console.log('1 was not put on the channel.')
status = yield chan.put(2)
if (!status) console.log('2 was not put on the channel.')
status = yield chan.put(3)
if (!status) console.log('3 was not put on the channel.')
})
co(function *(){
while(true) {
const val = yield chan.take()
console.log(`${val} was taken from the channel.`)
}
})
Creating a channel that:
- Only accepts numbers between this interval: [0.25000, 0.25002]
- Multiply the filtered numbers by 100,000.
- Accumulate the numbers in an object.
const { co, Channel, transducer: { compose, filter, map, reduce } } = require('./src')
const doSomething = compose(
filter(x => x >= 0.25000 && x <= 0.25002),
map(x => x*100000),
reduce((acc,x,idx) => ({ total:acc.total+x, idx }), { total:0 })
)
const chan = new Channel(doSomething)
co(function *() {
let i = 0
while(true) {
const v = Math.random()
const status = yield chan.put(v)
if (status)
i = 0
else {
i++
process.stdout.clearLine()
process.stdout.cursorTo(0)
process.stdout.write(`Attempt ${i} invalid: ${v}`)
}
}
})
co(function *(){
while(true) {
const { total, idx } = yield chan.take()
console.log(`\n${idx}: ${total}`)
}
})
filter(<Func<Object,Int,Boolean>> predicate): <Func<Object,Object>> output
predicate
is a high-order binary operators function that returns a boolean. The first operator is the input object. The second operator is the object index in the stream. This output
function returns one the following:
- If the predicate returns true, the
output
function returns the original input object. - If the predicate returns false, the
output
function returns a specific NOMATCHKEY string(1) signaling that the input should not be added to the channel.
(1) NOMATCHKEY: no_match_7WmYhpJF33VG3X2dEqCQSwauKRb4zrPIRCh19zDF
map(<Func<Object,Int,Object>> transform): <Func<Object,Object>>
Doc coming soon...
reduce(<Func<Object,Object,Int,Object>> reduceFn): <Func<Object,Object>>
Doc coming soon...
compose(<Func<Object,Object>> transducer1 [,<Func<Object,Object>> transducer2, ...]): <Func<Object,Object>>
Doc coming soon...
Utils
alts
const co = require('co')
const { Channel, fn: { alts } } = require('core-async')
const chan1 = new Channel()
const chan2 = new Channel()
co(function *() {
const [v,chan] = yield alts([chan1, chan2])
if (chan == chan1)
console.log(`CHAN 1 WON WITH VALUE: ${v}`)
else
console.log(`CHAN 2 WON WITH VALUE: ${v}`)
})
chan1.put('hello')
chan2.put('world')
More detailed doc coming soon...
merge
const co = require('co')
const { Channel, fn: { merge } } = require('core-async')
const chan1 = new Channel()
const chan2 = new Channel()
co(function *() {
const mergedChan = yield merge([chan1, chan2])
while(true) {
const v = yield mergedChan.take()
console.log(v)
}
})
chan1.put(1)
chan1.put(2)
chan1.put(3)
chan2.put('Hello')
chan2.put('world!')
chan2.put('This rocks!')
More detailed doc coming soon...
timeout
timeout
returns an empty buffer channel that puts a brick onto it after a predetermined amount of milliseconds. This designs to deal with timeouts in a very idiomatic way as demontrated in the Dealing With Timeout section.
const co = require('co')
const { utils: { timeout } } = require('core-async')
co(function *() {
const t = timeout(5000)
console.log('Start waiting for 5 seconds...')
yield t.take()
console.log('Done waiting!')
})
subscribe
const co = require('co')
const { Channel, utils: { subscribe } } = require('core-async')
const source = new Channel()
const numberSusbcriber = new Channel()
const wordSusbcriber = new Channel()
subscribe(source,[{
chan: numberSusbcriber,
rule: data => typeof(data) == 'number'
}, {
chan: wordSusbcriber,
rule: data => typeof(data) == 'string'
}])
co(function *(){
while(true) {
const data = yield numberSusbcriber.take()
console.log(`NUMBER RECEIVED: ${data}`)
}
})
co(function *(){
while(true) {
const data = yield wordSusbcriber.take()
console.log(`WORD RECEIVED: ${data}`)
}
})
const a = [1,'one',2,'two',3,'three']
a.map(data => source.put(data))
More detailed doc coming soon...
throttle
const co = require('co')
const { utils: { throttle } } = require('core-async')
const delay = t => new Promise(resolve => setTimeout(resolve, t))
const seed = (size=0) => Array.apply(null, Array(size))
const lotsOfConcurrentTasks = seed(1000).map((_,i) => (() => delay(Math.round(Math.random()*10000)).then(() => `TASK ${i} DONE`)))
co(function *(){
const results = yield throttle(lotsOfConcurrentTasks, 20)
console.log(results)
})
More detailed doc coming soon...
Common Patterns & Idiomatic Style
Dealing With Timeout
With channels, the combination of the alts
and timeout
functions makes dealing with timeouts straightforward:
const co = require('co')
const { Channel, alts, timeout } = require('core-async')
const numberChan = new Channel()
co(function *(){
let counter = 0
while(true)
yield numberChan.put(++counter)
})
co(function *() {
const t = timeout(3000)
let carryOn = true
while(carryOn) {
const [v,chan] = yield alts([numberChan,t])
carryOn = chan != t
if (carryOn) console.log(`Number: ${v}`)
}
console.log(`We're done here.`)
})
Why you should always close your channel when you're done with it
In NodeJS, channels are just a nice pattern to syncronize the event loop so you can write code that leverages complex concurency models. When a channel is created, streaming bricks to it or requesting bricks from it result in adding new tasks on the event loop. There are scenarios where it is desirable that the event loop flushes those tasks, and that's why the close()
api exists. One such example is properly ending the execution of an AWS Lambda. In theory, an AWS Lambda stops its execution when its callback function is called. However, that's not exactly true. If there are still pending tasks in its event loop, the AWS Lambda will stay idle, potentially consuming bilaable resources for doing nothing.
Examples
Chat between two agents
The following demoes 2 lightweight threads thanks to the co library. The communication between those 2 threads is managed by the core-async Channel called chatBetween_t1_t2
. The 2 lightweight threads can be seen as 2 users chatting with each other.
const co = require('co')
const { Channel } = require('core-async')
const chatBetween_t1_t2 = new Channel()
co(function *() {
console.log('STARING LIGHTWEIGHT THREAD T1')
yield chatBetween_t1_t2.put('Hello')
const msg1 = yield chatBetween_t1_t2.take()
console.log(` T2 says: ${msg1}`)
yield chatBetween_t1_t2.put(`Going to the beach in
an hour. Want to come?`)
const msg2 = yield chatBetween_t1_t2.take()
console.log(` T2 says: ${msg2}`)
yield chatBetween_t1_t2.put(`No worries mate! Bring
some frothies. See you
there!`)
})
co(function *() {
console.log('STARING LIGHTWEIGHT THREAD T2')
const msg1 = yield chatBetween_t1_t2.take()
console.log(`T1 says: ${msg1}`)
yield chatBetween_t1_t2.put(`Hi T1. What's up?`)
const msg2 = yield chatBetween_t1_t2.take()
console.log(`T1 says: ${msg2}`)
yield chatBetween_t1_t2.put(`Sounds great. I'll meet
you there. Thanks for the invite.`)
const msg3 = yield chatBetween_t1_t2.take()
console.log(`T1 says: ${msg3}`)
chatBetween_t1_t2.close()
})
Monitoring Stock Prices
The following snippet monitors the FAANG and check which one moves by more than 5% (up or down) over a certain period of time. If it does,
an alert is sent.
const co = require('co')
const { Channel } = require('core-async')
const STOCKS = ['FB','AAPL', 'AMZN', 'NFLX', 'GOOGL']
const CHECK_INTERVAL = 100
const SLIDING_WINDOW = 60
const PRICE_CHANGE_THRESHOLD = 0.05
const delay = t => new Promise(resolve => setTimeout(resolve, t))
const getRandomNumber = ({ start, end }) => {
const endDoesNotExist = end === undefined
if (start == undefined && endDoesNotExist)
return Math.random()
const _start = start >= 0 ? Math.round(start) : 0
const _end = end >= 0 ? Math.round(end) : 0
const size = endDoesNotExist ? _start : (_end - _start)
const offset = endDoesNotExist ? 0 : _start
return offset + Math.floor(Math.random() * size)
}
const getStockPrice = ticker => Promise.resolve({ ticker, price:getRandomNumber({ start:100, end: 110 }) })
const getSignificantPriceChange = (priceHistory, percThreshold) => {
const snapshotT0 = priceHistory[0]
const snapshotT1 = priceHistory.slice(-1)[0]
const percChange = (snapshotT1.price-snapshotT0.price)/snapshotT0.price
if (Math.abs(percChange) >= percThreshold)
return { percChange: (percChange*100).toFixed(1), t0: snapshotT0, t1: snapshotT1 }
else
return null
}
const sendPriceAlert = ({ ticker, priceChange }) => Promise.resolve({ ticker, priceChange }).then(() => {
console.log(`Price of ${ticker} ${priceChange.percChange < 0 ? 'dropped' : 'increased' } by ${priceChange.percChange}% in the last hour.`)
})
const STOCK_DATA = STOCKS.map(ticker => ({ ticker, chan: new Channel() }))
const main = () =>
STOCK_DATA.forEach(({ ticker, chan }) =>
co(function *() {
while (true) {
const { price } = yield getStockPrice(ticker)
chan.put({ price, date: Date.now() })
yield delay(CHECK_INTERVAL)
}
})
co(function *() {
let priceHist = []
while (true) {
const priceSnapshot = yield chan.take()
priceHist.push(priceSnapshot)
if (priceHist.length == SLIDING_WINDOW) {
const priceChange = getSignificantPriceChange(priceHist, PRICE_CHANGE_THRESHOLD)
if (priceChange) {
priceHist = []
sendPriceAlert({ ticker, priceChange })
} else
priceHist.splice(0,1)
}
yield delay(CHECK_INTERVAL)
}
})
)
main()
This Is What We re Up To
We are Neap, an Australian Technology consultancy powering the startup ecosystem in Sydney. We simply love building Tech and also meeting new people, so don't hesitate to connect with us at https://neap.co.
Our other open-sourced projects:
GraphQL
- graphql-serverless: GraphQL (incl. a GraphiQL interface) middleware for webfunc.
- schemaglue: Naturally breaks down your monolithic graphql schema into bits and pieces and then glue them back together.
- graphql-s2s: Add GraphQL Schema support for type inheritance, generic typing, metadata decoration. Transpile the enriched GraphQL string schema into the standard string schema understood by graphql.js and the Apollo server client.
- graphql-authorize: Authorization middleware for graphql-serverless. Add inline authorization straight into your GraphQl schema to restrict access to certain fields based on your user's rights.
React & React Native
Tools
License
Copyright (c) 2017-2019, Neap Pty Ltd.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
- Neither the name of Neap Pty Ltd nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL NEAP PTY LTD BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
