ArchStream

WIP
Pipeline stream for software architecture of Domain-Driven Design and Data-Oriented Design.
Install
$ npm i arch-stream
Example
abstract class CommandQuery {
request: {};
response: {};
}
class Command extends CommandQuery {}
class Query extends CommandQuery {}
export class GetQuery extends Query {}
export class PostCommand extends Command {}
export default class Entity {
constructor(public method: GetQuery|PostCommand) {
}
}
import A from 'arch-stream';
import Entity from './entity';
export default () =>
A<Entity>()
.proxy(A.Proxy.Responsibility());
import Stream from './stream';
import Entity, {GetQuery} from './entity';
export default Stream()
.rule(GetQuery, entity => entity.method)
.pipe(entity => ...)
.pipe(entity => ...)
.pipe(entity => ...)
.export();
import Stream from './stream';
import Entity, {PostCommand} from './entity';
export default Stream()
.rule(PostCommand, entity => entity.method)
.pipe(entity => ...)
.pipe(entity => ...)
.pipe(entity => ...)
.export();
import Stream from './stream';
import Entity, {GetQuery, PostCommand} from './entity';
import getStream from './get';
import postStream from './post';
export default Stream()
.import(entity => entity.method)
.import(getStream, GetQuery)
.import(postStream, PostCommand)
.export();
import fetchStream from './fetch';
import Entity, {GetQuery, PostCommand} from './entity';
const fetch = fetchStream.read().write;
export function get() {
fetch(new Entity(new GetQuery()), e => console.log(e));
}
export function post() {
fetch(new Entity(new PostCommand()), e => console.log(e));
}
API
arch-stream.d.ts
- ArchStream
- Message
- Proxy
- Supervisor
- Observable
- Map/Set
- MultiMap/MultiSet
- Tick
- Timer
- FINGERPRINT
- uuid
- sqid
- Maybe, Just, Nothing
- Either, Left, Right
Modular stream
import ArchStream from 'arch-stream';
const add1 =
ArchStream<{val: number}>()
.pipe(e => ++e.val)
.export(Infinity);
const add2 =
ArchStream<{val: number}>()
.import(add1)
.import(add1)
.export(Infinity);
const add3 =
ArchStream<{val: number}>()
.import(add1)
.import(add2)
.export();
ArchStream<{val: number}>()
.import(add1)
.import(add2)
.import(add3)
.export()
.read(e => console.log(e.val))
.write({val: 0})
.write({val: 9});
ArchStream<{val: number}>()
.import(add1)
.import(add2)
.import(add3)
.export()
.read()
.write({val: 0}, e => console.log(e.val))
.write({val: 9}, e => console.log(e.val));
Thenable
import A from 'arch-stream';
A<{val: number}>()
.pipe(e => new Promise(resolve => resolve(++e.val)))
.pipe(e => A.Msg().send(++e.val, true))
.pipe(e => A.Msg().send(++e.val))
.export()
.read(n => console.log(n))
.write({val: 1})
.write({val: 2});
Proxy
Case
Interpret the context of the stream.
import A from 'arch-stream';
const enum State {
pending,
resolved,
rejected
}
interface Entity {
state: State;
}
A<Entity>()
.proxy(A.Proxy.Case<Entity, State>([State.pending, State.resolved, State.rejected], entity => entity.state))
.pipe(e => e.state = State.resolved, e => e.state = State.resolved, e => e.state = State.rejected)
.pipe()
.pipe(e => e.state = 10, e => e.state = 20, e => e.state = 30)
.pipe(e => e.state = -10, e => e.state = -20, e => e.state = -30)
.export()
.read(e => console.log(e.val))
.write({state: State.pending});
Hook
Add preprocess and postprocess hook points.
Responsibility
Freeze the ruled object inside an entity.
import A from 'arch-stream';
A<{val: number}>()
.proxy(A.Proxy.Responsibility())
.rule(e => e)
.pipe(e => ++e.val)
.export()
.read(e => console.log(e.val))
.write({val: 0});
Call order constraints (Type constraint)
Possible
- pipe > pipe
- pipe > export
- pipe > proxy
- proxy > pipe
- proxy > proxy
- import > import
- import > export
- export > flow
- export > read
- export > write
- read > write
- flow > read
Impossible
- pipe > import
- pipe > read
- proxy -> import
- import > pipe
- import > read
- export > pipe
- export > import
- read > read
- write > read
- read > flow
Browser
Test
$ npm i && gulp install
$ gulp test
$ gulp bench