Communicating sequential flows
It is runner, which create asynchronous flow with the javascript es6 generators, native functions and promises.
Docs
Concept
The concept of the library is very close to co, but unlike co and others similar tools, scf can be considered as greedy - it executes each entity, that can be executed.
For example, the function returns another function:
new Csf(() => () => {
return 'ok'
})
.then(console.log)
We got "ok"
, because nested function has executed too.
The same structure with co returns a function, instead of final result:
co(() => () => {
return 'ok'
})
.then(console.log)
Otherwise, each passed to the runner function will be invoked with the parent context.
Example:
function getEnv() {
return this.env;
}
new Csf(function () {
this.env = 'node'
return () => () => getEnv;
})
.then(console.log)
Payload
When each entity is executed, the question arises how to predict the final result.
And to understand this, there is such a thing as a payload.
Each the returned value will be considered as payload and will be returned as final result if it is not of one the next types:
- Promises
- Functions (also async functions)
- Generators
- Generator functions
const Csf = require('scf')
Csf.call(function *() {
const a = yield Promise.resolve(4);
const b = yield 8;
const c = yield function() {
return 15;
}
const d = yield function *() {
yield 16;
}
const e = yield () => () => 23;
const f = (function *() {
yield 42
})();
return Math.sum(a, b, c, d, e, f)
})
.then(console.log)
Context inheritance
Any native function inherit parent context. This allows you to implicitly transfer data between different flow depth levels.
Even if middling function is a arrow-function, but it returns classic function, context will be proxied to that classic function.
Csf.apply(() => () => {
return function() {
return typeof this;
}
}, {})
.then(console.log)
Flow control helpers
The csf library provides flow control API (so called fx). You can get it from the path csf/fx
.
const { each } = require('csf/fx')
There are some of them:
spawn
Run sub-flow with the context inheritancefork
Spawn non-blocking sub-flowpayload
Wrap function to return it as payloadcancel
Cancel async operationreduce
Async reducemap
Async mapeach
Async each
etc. Read them all in the API reference.
Compatability
All you need is a node grater than 6.4.0 or regenerator (also babel tranform-regenerator) transpiler for older versions of node and compatibility with old browsers.
Special behavior of generators
Any returned es6 generator (or a generator result) will be executed until it returns final value.
import Csf from 'csf'
const deferredCalc = function* (a, b) {
yield new Promise(resolve => setTimeout(resolve, 3330))
yield a + b;
}
new Csf(function* () {
const a = yield 11;
const b = yield 22;
return deferredCalc(a, b);
})
.then(console.log)
Keep in the mind, and there is an important part of the behavior - only last generator result will be returned.
And, if the generator contains no return operator, the value returned by last operator yield will be considered as the final result.
new Sequence(function *() {
yield 1;
yield 2;
yield 3;
})
.then(console.log)
Executable values and payload
Functions, promises, and generators are considered to be executable values.
But all other types of values (as numbers, strings, booleans, objects, symbols, null and undefined) will be considered as, so-called, payload and will be returned to the parent function (or next generator step) as the result.
Creating standalone sequences
Csf allows you to wrap code with csf runner, thus obtaining a standalone asynchronous function.
import Csf from 'csf'
export default Csf.wrap(function* (url) {
const serverResponse = yield fetch(url);
return serverResponse.json();
});
Which will allow you to execute sequences without any additional access to the csf API. Also, such functions can take arguments.
import fetchUrl from './sequences/fetchUrl'
fetchUrl('/entity/33')
.then(console.log)
Or use from an another sequence.
import Csf from 'csf'
import fetchUrl from './sequences/fetchUrl'
Csf.call(function* () {
const data = yield fetchUrl('/entity/33')
console.log(data);
});
Context in the ditails
Every sub-flow can access context of the initial flow. This means that you can manage the state or use API of your application from the child flow.
Even if your nested flow includes arrow-functions (which by nature can not access dynamic context), you can access context by returning classic functions.
Here is example how to get redux store state from nested arrow-function.
import { createStore } from 'redux'
import { apply } from 'csf'
function getState() {
return this.getState();
}
const store = createStore(state => state, {
sequence: 'X'
});
apply(
store,
function* () {
return () => {
return () => {
return getState;
}
}
}
)
.then(console.log)
Channels
Foundation stone of CSP pattern is a channels. Channel can accumulate and radiate values. csf channel is not an object, but function, which returns promise with next value each time it called.
If channel is empty, then returned promise will be unresolved, until next value will be pushed to channel.
Sequence channels are not a in-flow API and can be used separately.
const {
createChannel
} = require("../lib");
const messages = createChannel();
messages.push("Hello");
messages.push("Channel");
messages().then(console.log);
messages().then(console.log);
Here is few examples of practicle usage of the channels.
Imagine situation when you should make request to the server each time user click on document. And you can not make more than 6 requests at a time.
Here is approximate solution on pure javascript:
const requestsQueue = [];
let activeRequestsCount = 0;
function sendRequest(event) {
if (activeRequests >= 6) {
requestsQueue.push(event);
} else {
activeRequestsCount++;
fetch('api/click')
.then(() => {
activeRequestsCount--;
if (requestsQueue.length) {
requestsQueue
.slice(0, 6 - activeRequestsCount)
.map(sendRequest)
}
})
}
}
document.addEventListener('click', sendRequest);
I'm not sure this example real works, but it shows how cumbersome it looks
And here how it can be solved with channels:
import Csf, { createChannel } from 'csf'
new Csf(function *() {
const clicks = createChannel();
const requests = createChannel();
document.addEventListener('click', clicks.push)
while (yield clicks) {
while (requests.count() >= 6) {
yield requests;
}
requests.push(fetch('api/click'))
}
})
Cancellation
Most of fork processes in csf can be cancelled. To cancel any parallel process you should use special method Csf.cancel
.
const Csf = require('csf')
new Csf(function *() {
const a = [1, 2, 3, 4, 5]
const parallelFlow = yield Csf.fx.fork(Csf.fx.each(a, function *(val) {
yield Csf.fx.pause(50);
console.log(`Val: ${val}`)
}));
yield Csf.fx.pause(120)
Csf.cancel(parallelFlow);
})
In this example we canceled process, created by async operator each
.
Author
Vladimir Kalmykov vladimirmorulus@gmail.com
License
MIT