Comparing version 0.0.0-next.27 to 0.0.0-next.28
// lib/index.ts | ||
var ActiveState = 0; | ||
var ChangingState = 1; | ||
var PendingState = 2; | ||
var EndedState = 3; | ||
var I = (x) => x; | ||
var PendingState = 0; | ||
var ActiveState = 1; | ||
var EndedState = 2; | ||
var State = { | ||
Ended: EndedState, | ||
Active: ActiveState, | ||
Pending: PendingState | ||
}; | ||
var EQ = (a, b) => a === b; | ||
@@ -12,5 +15,418 @@ var createdStack = []; | ||
var isPropagating = false; | ||
var isLazy = /* @__PURE__ */ new WeakSet(); | ||
var SKIP = Symbol("Stream.SKIP"); | ||
var stack = []; | ||
var FilterOp = class { | ||
parent; | ||
stream; | ||
predicate; | ||
constructor(predicate, parent) { | ||
this.parent = parent; | ||
this.predicate = predicate; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
target.op = this; | ||
parent.immediateDependents.push(this); | ||
this.stream = target; | ||
if (parent.state !== ActiveState) { | ||
this.next(); | ||
} | ||
} | ||
next() { | ||
const pv = this.parent.value; | ||
let res = this.predicate(pv); | ||
if (res !== SKIP && res) { | ||
this.stream.value = pv; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
return false; | ||
} | ||
}; | ||
var MapOp = class { | ||
parent; | ||
stream; | ||
visitor; | ||
constructor(visitor, parent) { | ||
this.visitor = visitor; | ||
this.parent = parent; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
target.op = this; | ||
parent.immediateDependents.push(this); | ||
this.stream = target; | ||
if (parent.state === ActiveState) { | ||
this.next(); | ||
} | ||
} | ||
next() { | ||
let res = this.visitor(this.parent.value); | ||
if (res !== SKIP) { | ||
this.stream.value = res; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
return false; | ||
} | ||
}; | ||
var ScanOp = class { | ||
parent; | ||
stream; | ||
visitor; | ||
acc; | ||
constructor(visitor, acc, parent) { | ||
this.visitor = visitor; | ||
this.acc = acc; | ||
this.parent = parent; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
parent.immediateDependents.push(this); | ||
this.stream = target; | ||
if (this.parent.state !== ActiveState) { | ||
this.next(); | ||
} | ||
} | ||
next() { | ||
let res = this.visitor(this.acc, this.parent.value); | ||
if (res !== SKIP) { | ||
this.acc = res; | ||
this.stream.value = res; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
return false; | ||
} | ||
}; | ||
var MergeOp = class { | ||
parent; | ||
stream; | ||
inputs; | ||
constructor(inputs) { | ||
let parent = new InternalStream(); | ||
this.inputs = inputs; | ||
let readyMask = 0; | ||
inputs.forEach((x, i) => { | ||
if (x.state !== PendingState) { | ||
readyMask |= 1 << i; | ||
} | ||
}); | ||
parent.state = readyMask === (1 << inputs.length) - 1 ? ActiveState : PendingState; | ||
parent.op = this; | ||
parent._readonly = true; | ||
this.parent = parent; | ||
this.stream = parent; | ||
for (let pi = 0; pi < inputs.length; pi++) { | ||
let p = inputs[pi]; | ||
let listener = p.map((_) => { | ||
if (parent.state === ActiveState) { | ||
if (isPropagating) { | ||
let i = sessions.indexOf(this.stream); | ||
if (i !== -1) { | ||
sessions.splice(i, 1); | ||
} | ||
sessions.push(this.stream); | ||
} else { | ||
parent._set(null); | ||
} | ||
return null; | ||
} else { | ||
readyMask |= 1 << pi; | ||
parent.state = readyMask === (1 << inputs.length) - 1 ? ActiveState : PendingState; | ||
if (parent.state !== ActiveState) { | ||
return null; | ||
} else { | ||
sessions.push(this.stream); | ||
return null; | ||
} | ||
} | ||
}); | ||
listener.ends.push(() => { | ||
this.stream.end(); | ||
}); | ||
parent.ends.push(() => { | ||
listener.end(); | ||
}); | ||
} | ||
} | ||
next() { | ||
this.parent.value = this.inputs.map((x) => x.value); | ||
this.parent.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var DefaultOp = class { | ||
parent; | ||
stream; | ||
defaultValue; | ||
constructor(defaultValue, parent) { | ||
this.parent = parent; | ||
this.defaultValue = defaultValue; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
parent.immediateDependents.push(this); | ||
target.value = defaultValue; | ||
this.stream = target; | ||
} | ||
next() { | ||
this.stream.value = this.parent.value; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var SkipOp = class { | ||
parent; | ||
stream; | ||
skipN; | ||
i = 0; | ||
constructor(skipN, parent) { | ||
this.skipN = skipN; | ||
this.parent = parent; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
parent.immediateDependents.push(this); | ||
this.stream = target; | ||
if (this.parent.state === ActiveState) { | ||
this.next(); | ||
} | ||
} | ||
next() { | ||
if (this.i++ >= this.skipN) { | ||
this.stream.value = this.parent.value; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
return false; | ||
} | ||
}; | ||
var TakeOp = class { | ||
parent; | ||
stream; | ||
i = 0; | ||
n; | ||
constructor(n, parent) { | ||
this.n = n; | ||
this.parent = parent; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
parent.immediateDependents.push(this); | ||
this.stream = target; | ||
if (this.parent.state === ActiveState) { | ||
this.next(); | ||
} | ||
} | ||
next() { | ||
if (this.i++ <= this.n) { | ||
this.stream.value = this.parent.value; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} else { | ||
this.stream.end(); | ||
return false; | ||
} | ||
} | ||
}; | ||
var DropRepeatsWithOp = class { | ||
parent; | ||
stream; | ||
equality; | ||
constructor(equality, parent) { | ||
this.parent = parent; | ||
this.equality = equality; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
parent.immediateDependents.push(this); | ||
this.stream = target; | ||
if (this.parent.state === ActiveState) { | ||
this.next(); | ||
} | ||
} | ||
next() { | ||
if (this.equality(this.stream.value, this.parent.value)) { | ||
return false; | ||
} | ||
this.stream.value = this.parent.value; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var AfterSilenceOp = class { | ||
parent; | ||
stream; | ||
ms; | ||
constructor(ms, parent) { | ||
this.ms = ms; | ||
this.parent = parent; | ||
let id; | ||
const target = new InternalStream(); | ||
target.state = this.parent.state; | ||
target.value = this.parent.value; | ||
target.op = this; | ||
target._readonly = true; | ||
this.stream = target; | ||
let visit = (x) => { | ||
clearTimeout(id); | ||
id = setTimeout(() => { | ||
this.stream._set(x); | ||
}, ms); | ||
}; | ||
let listener = this.parent.map(visit); | ||
target.ends.push(() => listener.end()); | ||
} | ||
next() { | ||
this.stream.value = this.parent.value; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var ThrottleOp = class { | ||
parent; | ||
stream; | ||
ms; | ||
last; | ||
id; | ||
constructor(ms, parent) { | ||
this.parent = parent; | ||
this.ms = ms; | ||
const target = new InternalStream(); | ||
target.state = parent.state; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
target.value = parent.value; | ||
this.stream = target; | ||
this.last = Date.now(); | ||
const process = (x) => { | ||
clearTimeout(this.id); | ||
let dt = Date.now() - this.last; | ||
if (dt >= this.ms) { | ||
this.last = Date.now(); | ||
this.stream._set(x); | ||
} else { | ||
this.id = setTimeout(() => process(x), Math.max(0, this.ms - dt), x); | ||
} | ||
}; | ||
let listener = parent.map((x) => process(x)); | ||
listener.ends.push(() => { | ||
target.end(); | ||
}); | ||
target.ends.push(() => { | ||
listener.end(); | ||
}); | ||
} | ||
next() { | ||
this.stream.value = this.parent.value; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var AwaitLatestOp = class { | ||
parent; | ||
stream; | ||
visitor; | ||
constructor(visitor, parent) { | ||
this.visitor = visitor; | ||
this.parent = parent; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
this.stream = target; | ||
let lastEmit = 0; | ||
let abortController = new AbortController(); | ||
parent.ends.push(() => target.end(), () => abortController.abort()); | ||
let listener = parent.map(async (x) => { | ||
let emit = ++lastEmit; | ||
lastEmit = emit; | ||
abortController.abort(); | ||
abortController = new AbortController(); | ||
try { | ||
const value = await visitor(x, { signal: abortController.signal }); | ||
if (emit === lastEmit) { | ||
target.nextValue; | ||
target.nextValue = { status: "fulfilled", value }; | ||
target._set(target.nextValue); | ||
lastEmit = 0; | ||
} | ||
} catch (e) { | ||
if (emit === lastEmit) { | ||
target._set({ status: "rejected", reason: e }); | ||
lastEmit = 0; | ||
} | ||
} | ||
}); | ||
target.ends.push(() => { | ||
listener.end(); | ||
}); | ||
} | ||
next() { | ||
this.stream.value = this.stream.nextValue; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var AwaitEveryOp = class { | ||
parent; | ||
stream; | ||
visitor; | ||
constructor(visitor, parent) { | ||
this.parent = parent; | ||
this.visitor = visitor; | ||
const target = new InternalStream(); | ||
target.state = PendingState; | ||
target.op = this; | ||
target._readonly = true; | ||
target.parents.push(parent); | ||
this.stream = target; | ||
const abortController = new AbortController(); | ||
parent.ends.push(() => target.end(), () => abortController.abort()); | ||
let listener = parent.map(async (x) => { | ||
const [result] = await Promise.allSettled([visitor(x, { signal: abortController.signal })]); | ||
target.nextValue = result; | ||
target._set(result); | ||
}); | ||
target.ends.push(() => { | ||
abortController.abort(); | ||
listener.end(); | ||
}); | ||
} | ||
next() { | ||
this.stream.value = this.stream.nextValue; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var SourceOp = class { | ||
parent; | ||
stream; | ||
constructor(parent) { | ||
this.parent = parent; | ||
this.stream = parent; | ||
} | ||
next() { | ||
this.stream.value = this.stream.nextValue; | ||
this.stream.state = ActiveState; | ||
return true; | ||
} | ||
}; | ||
var InternalStream = class _InternalStream { | ||
static SKIP = Symbol("Stream.SKIP"); | ||
static SKIP = SKIP; | ||
immediateDependents = []; | ||
@@ -23,2 +439,6 @@ parents = []; | ||
value; | ||
nextValue; | ||
stateUpdated = false; | ||
op; | ||
type = "source"; | ||
_end; | ||
@@ -28,8 +448,9 @@ _readonly = false; | ||
const [value] = args; | ||
this.state = args.length !== 0 && value !== _InternalStream.SKIP ? ActiveState : PendingState; | ||
if (value !== _InternalStream.SKIP) { | ||
this.state = args.length !== 0 && value !== SKIP ? ActiveState : PendingState; | ||
this.op = new SourceOp(this); | ||
if (value !== SKIP) { | ||
this.value = value; | ||
} | ||
let stack = createdStack.at(-1); | ||
stack?.add(this); | ||
let stack2 = createdStack.at(-1); | ||
stack2?.add(this); | ||
} | ||
@@ -40,55 +461,27 @@ of(...args) { | ||
} | ||
markAsChanging() { | ||
if (this.isOpen()) { | ||
this.state = ChangingState; | ||
} | ||
for (let s of this.immediateDependents) { | ||
s.stream.markAsChanging(); | ||
} | ||
} | ||
isOpen() { | ||
return this.state == ActiveState || this.state == ChangingState || this.state == PendingState; | ||
} | ||
get = () => { | ||
let stack = referencedStack.at(-1); | ||
stack?.add(this); | ||
let stack2 = referencedStack.at(-1); | ||
stack2?.add(this); | ||
return this.value; | ||
}; | ||
readonly() { | ||
const cloned = this.cloneWithoutDependendents(); | ||
cloned._readonly = true; | ||
return cloned; | ||
} | ||
doEnd() { | ||
end() { | ||
if (this.state === EndedState) { | ||
return; | ||
} | ||
for (let p of this.parents.slice()) { | ||
p._unregisterChild(this); | ||
} | ||
for (let c of this.immediateDependents.slice().map((x) => x.stream)) { | ||
if (c._end) { | ||
c.end._set(true); | ||
} else { | ||
c.doEnd(); | ||
for (let d of this.immediateDependents.slice()) { | ||
if (!d.stream) { | ||
continue; | ||
} | ||
let c = d.stream; | ||
c.end(); | ||
} | ||
this.state = EndedState; | ||
for (let f of this.ends.slice()) { | ||
f(); | ||
} | ||
this.state = EndedState; | ||
this.ends.length = this.parents.length = this.immediateDependents.length = 0; | ||
} | ||
get end() { | ||
if (!this._end) { | ||
if (this.state == EndedState) { | ||
return this._end = this.of(true); | ||
} | ||
this._end = this.of(); | ||
this._end.map((x) => { | ||
if (x === true) { | ||
this.doEnd(); | ||
} | ||
return x; | ||
}); | ||
} | ||
return this._end; | ||
} | ||
// delete in favour of deregister dependent | ||
@@ -101,94 +494,71 @@ _unregisterChild(child) { | ||
} | ||
_registerDependent(dependent) { | ||
this.immediateDependents.push(dependent); | ||
} | ||
_map(visitor, ignoreInitial) { | ||
const args = ignoreInitial ? [] : [visitor(this.value)]; | ||
const target = this.of(...args); | ||
target._readonly = true; | ||
target.parents.push(this); | ||
this._registerDependent({ stream: target, fn: visitor }); | ||
return target; | ||
} | ||
map(visitor) { | ||
return this._map(visitor, this.state !== ActiveState); | ||
const out = new MapOp(visitor, this); | ||
return out.stream; | ||
} | ||
// unlike traditional mithril stream (which has a recursive update) | ||
// we instead opt for detecting all the static dependencies up front | ||
// and updating them serially. If any writes happen while | ||
// we are updating, we start a new update session. We repeat this | ||
// until (hopefully) no more writes have happened during propagation. | ||
// If they do though, that's an infinite loop which is supported as it | ||
// was in mithril-stream | ||
// Need to cache getAllDependencies, from 5 ops/sec 0.29 ops/sec | ||
// Iterator: 0.44 ops/sec | ||
// Inlined: 0.48 ops/sec (1 test failing) | ||
// -getNextValue 2.30 ops/sec (1 test failing) | ||
// -skipped set 3.90 ops/sec (1 test failing) | ||
// -copy dependent objects when adding to stack 4.61 ops/sec | ||
// -fix tests, stack if / else 4.7 ops/sec | ||
// -assume SKIP is never the input value 5.4 ops/sec | ||
// just the result of the transform | ||
// Use integer flags for state instead of strings 5.5 ops/sec | ||
// Use a do while so first run doesn't require session creation 6.4 ops/sec | ||
// | ||
// we do this to make propagation more predictable and easier to follow | ||
// in a debugger. You can clearly see when we're enterning a new propagation | ||
// and which stream triggered it. This can help track down the root | ||
// cause of a complex loop. | ||
// (Experiment) Re-use previous evaluated dependents ~9 ops/sec | ||
// | ||
// The downside is we have to manually track things you'd normally get | ||
// for free in a recursive update. E.g. we need to record which streams | ||
// were skipped so their dependencies (which are already recorded) are also skipped | ||
// Re-wrote to use Op classes after researching xstream 21 ops/sec (tests failing) | ||
// | ||
_set = (newValue) => { | ||
const lazy = isLazy.has(newValue); | ||
if (!lazy) { | ||
this.value = newValue; | ||
} | ||
if (!this.isOpen()) { | ||
return; | ||
} | ||
if (this.state == EndedState) { | ||
return; | ||
} | ||
if (!lazy) { | ||
this.state = ActiveState; | ||
} else { | ||
this.state = ChangingState; | ||
} | ||
sessions.push({ | ||
stream: this, | ||
nextValue: newValue | ||
}); | ||
let target = this.op; | ||
this.nextValue = newValue; | ||
if (isPropagating) { | ||
this.nextValue = newValue; | ||
sessions.push(this); | ||
return; | ||
} | ||
try { | ||
isPropagating = true; | ||
while (sessions.length) { | ||
const session = sessions.shift(); | ||
let stack = []; | ||
if (isLazy.has(session.nextValue)) { | ||
stack.unshift({ | ||
fn: I, | ||
stream: session.stream, | ||
stateUpdated: true | ||
}); | ||
} else { | ||
stack = session.stream.immediateDependents.slice(); | ||
isPropagating = true; | ||
do { | ||
while (true) { | ||
let recurse = target.stream.state !== EndedState && target.next(); | ||
if (!recurse) { | ||
if (stack.length === 0) { | ||
break; | ||
} else { | ||
target = stack.shift(); | ||
break; | ||
} | ||
} | ||
while (stack.length) { | ||
let target = stack.shift(); | ||
if (!target.stateUpdated) { | ||
target.stream.state = ChangingState; | ||
} | ||
let { stream: s, fn: f } = target; | ||
let nextValue = target.stream.parents.length ? target.stream.parents.at(-1).value : session.nextValue; | ||
if (!s.isOpen()) { | ||
let ds = target.stream?.immediateDependents; | ||
let L = ds.length; | ||
if (stack.length === 0) { | ||
if (L === 1) { | ||
target = ds[0]; | ||
continue; | ||
} else if (L === 0) { | ||
break; | ||
} | ||
s.state = ActiveState; | ||
if (isLazy.has(nextValue)) { | ||
nextValue = nextValue(); | ||
} | ||
let newValue2 = f(nextValue); | ||
if (newValue2 === Stream.SKIP) { | ||
continue; | ||
} | ||
s.value = newValue2; | ||
stack.push(...target.stream.immediateDependents); | ||
} | ||
let xs = ds.slice(); | ||
for (let i = 0; i < xs.length; i++) { | ||
let d = xs[i]; | ||
stack.push(d); | ||
} | ||
if (stack.length === 0) { | ||
break; | ||
} else { | ||
target = stack.shift(); | ||
} | ||
} | ||
} finally { | ||
isPropagating = false; | ||
} | ||
if (sessions.length) { | ||
let session = sessions.shift(); | ||
target = session.op; | ||
} else { | ||
isPropagating = false; | ||
return; | ||
} | ||
} while (true); | ||
}; | ||
@@ -208,36 +578,15 @@ set = (newValue) => { | ||
default(x) { | ||
if (this.state === PendingState) { | ||
const out = this.cloneWithoutDependendents(); | ||
out.value = x; | ||
out.state = ActiveState; | ||
return out; | ||
} else { | ||
if (this.state !== PendingState) { | ||
return this; | ||
} | ||
const op = new DefaultOp(x, this); | ||
return op.stream; | ||
} | ||
skip(n) { | ||
let i = 0; | ||
let out = this.map((x) => { | ||
if (i >= n) { | ||
return x; | ||
} else { | ||
i++; | ||
return _InternalStream.SKIP; | ||
} | ||
}); | ||
return out; | ||
const op = new SkipOp(n, this); | ||
return op.stream; | ||
} | ||
take(n) { | ||
let i = 0; | ||
let out = this.map((x) => { | ||
if (i > n) { | ||
if (out.state !== EndedState) { | ||
out.doEnd(); | ||
} | ||
return _InternalStream.SKIP; | ||
} | ||
i++; | ||
return x; | ||
}); | ||
return out; | ||
const op = new TakeOp(n, this); | ||
return op.stream; | ||
} | ||
@@ -251,68 +600,20 @@ once() { | ||
dropRepeatsWith(equality) { | ||
let neverEmitted = Symbol("neverEmitted"); | ||
let prev = neverEmitted; | ||
const out = this.map( | ||
(x) => { | ||
try { | ||
if (prev !== neverEmitted && equality(prev, x)) { | ||
return _InternalStream.SKIP; | ||
} | ||
return x; | ||
} finally { | ||
prev = x; | ||
} | ||
} | ||
); | ||
return out; | ||
const op = new DropRepeatsWithOp(equality, this); | ||
return op.stream; | ||
} | ||
cloneWithoutDependendents(fn = I) { | ||
const out = this.of(); | ||
out.state = this.state === ChangingState ? ActiveState : this.state; | ||
out.value = this.value; | ||
out._readonly = this._readonly; | ||
out.parents.push(this); | ||
this._registerDependent({ stream: out, fn }); | ||
return out; | ||
} | ||
afterSilence(ms) { | ||
let id; | ||
const out = this.of(); | ||
out.state = this.state === ChangingState ? ActiveState : this.state; | ||
out.value = this.value; | ||
out._readonly = true; | ||
this.ends.push(() => out.doEnd()); | ||
this.map((x) => { | ||
clearTimeout(id); | ||
id = setTimeout(() => { | ||
out._set(x); | ||
}, ms); | ||
}); | ||
return out; | ||
const op = new AfterSilenceOp(ms, this); | ||
return op.stream; | ||
} | ||
throttle(ms) { | ||
let id; | ||
let last = Date.now(); | ||
const out = this.of(); | ||
out.state = this.state === ChangingState ? ActiveState : this.state; | ||
out.value = this.value; | ||
out._readonly = true; | ||
this.ends.push(() => out.doEnd()); | ||
function process(x) { | ||
clearTimeout(id); | ||
let dt = Date.now() - last; | ||
if (dt >= ms) { | ||
last = Date.now(); | ||
out._set(x); | ||
} else { | ||
id = setTimeout(process, Math.max(0, ms - dt), x); | ||
} | ||
} | ||
this.map(process); | ||
return out; | ||
const op = new ThrottleOp(ms, this); | ||
return op.stream; | ||
} | ||
filter(predicate) { | ||
return this.map((x) => predicate(x) ? x : _InternalStream.SKIP); | ||
let op = new FilterOp(predicate, this); | ||
return op.stream; | ||
} | ||
reject(predicate) { | ||
return this.map((x) => !predicate(x) ? x : _InternalStream.SKIP); | ||
let op = new FilterOp((x) => !predicate(x), this); | ||
return op.stream; | ||
} | ||
@@ -359,4 +660,4 @@ // tracking | ||
static untrack(stream) { | ||
for (let stack of referencedStack) { | ||
stack.delete(stream); | ||
for (let stack2 of referencedStack) { | ||
stack2.delete(stream); | ||
} | ||
@@ -368,100 +669,16 @@ } | ||
scan(fn, seed) { | ||
const sink = this.map((n) => { | ||
const out = fn(seed, n); | ||
if (out !== _InternalStream.SKIP) { | ||
seed = out; | ||
} | ||
return out; | ||
}); | ||
return sink; | ||
const op = new ScanOp(fn, seed, this); | ||
return op.stream; | ||
} | ||
static merge(streams) { | ||
return _InternalStream.combine(() => { | ||
return streams.map((s) => s.get()); | ||
}, streams); | ||
const op = new MergeOp(streams); | ||
return op.stream; | ||
} | ||
static combine(fn, streams) { | ||
let ready = true; | ||
for (let s of streams) { | ||
ready = s.state === ActiveState; | ||
if (!ready) { | ||
break; | ||
} | ||
} | ||
const out = ready ? streams[0].of(fn(streams)) : streams[0].of(); | ||
out._readonly = true; | ||
let changed = /* @__PURE__ */ new Set(); | ||
for (let dependency of streams) { | ||
const mapper = dependency._map((x) => { | ||
changed.add(dependency); | ||
if (!ready) { | ||
ready = true; | ||
for (let ss of streams) { | ||
if (ss.state === PendingState) { | ||
ready = false; | ||
break; | ||
} | ||
} | ||
} | ||
if (ready) { | ||
let sessionI = sessions.findIndex((x2) => x2.stream === out); | ||
if (sessionI > -1) { | ||
sessions.splice(sessionI, 1); | ||
} | ||
let value = () => { | ||
let ret = fn([...changed]); | ||
changed.clear(); | ||
return ret; | ||
}; | ||
isLazy.add(value); | ||
out._set(value); | ||
} | ||
}, true); | ||
out.ends.push(() => mapper.doEnd()); | ||
dependency.ends.push(() => out.doEnd()); | ||
} | ||
return out; | ||
} | ||
// experimental | ||
removeReadOnly() { | ||
this._readonly = false; | ||
return this; | ||
} | ||
// todo | ||
awaitLatest(visitor) { | ||
let lastEmit = 0; | ||
let abortController = new AbortController(); | ||
const out = new _InternalStream(); | ||
out._readonly = true; | ||
this.ends.push(() => out.doEnd(), () => abortController.abort()); | ||
this.map(async (x) => { | ||
let emit = ++lastEmit; | ||
lastEmit = emit; | ||
abortController.abort(); | ||
abortController = new AbortController(); | ||
try { | ||
const value = await visitor(x, { signal: abortController.signal }); | ||
if (emit === lastEmit) { | ||
out._set({ status: "fulfilled", value }); | ||
lastEmit = 0; | ||
} | ||
} catch (e) { | ||
if (emit === lastEmit) { | ||
out._set({ status: "rejected", reason: e }); | ||
lastEmit = 0; | ||
} | ||
} | ||
}); | ||
return out; | ||
const op = new AwaitLatestOp(visitor, this); | ||
return op.stream; | ||
} | ||
awaitEvery(visitor) { | ||
const out = new _InternalStream(); | ||
out._readonly = true; | ||
const abortController = new AbortController(); | ||
this.ends.push(() => out.doEnd(), () => abortController.abort()); | ||
this.map(async (x) => { | ||
const [result] = await Promise.allSettled([visitor(x, { signal: abortController.signal })]); | ||
out._set(result); | ||
}); | ||
return out; | ||
const op = new AwaitEveryOp(visitor, this); | ||
return op.stream; | ||
} | ||
@@ -472,3 +689,3 @@ // sin | ||
return () => { | ||
copy.doEnd(); | ||
copy.end(); | ||
}; | ||
@@ -480,3 +697,3 @@ } | ||
return () => { | ||
copy.doEnd(); | ||
copy.end(); | ||
}; | ||
@@ -492,3 +709,3 @@ } | ||
off: () => { | ||
mapper.end._set(true); | ||
mapper.end(); | ||
} | ||
@@ -516,10 +733,8 @@ }; | ||
export { | ||
ActiveState, | ||
ChangingState, | ||
EndedState, | ||
InternalStream, | ||
PendingState, | ||
State, | ||
Stream, | ||
StreamStatic | ||
StreamStatic, | ||
Stream as XStream | ||
}; | ||
//# sourceMappingURL=chifley.esm.js.map |
@@ -1,34 +0,36 @@ | ||
export declare const ActiveState: 0; | ||
export type ActiveState = typeof ActiveState; | ||
export declare const ChangingState: 1; | ||
export type ChangingState = typeof ChangingState; | ||
export declare const PendingState: 2; | ||
export type PendingState = typeof PendingState; | ||
export declare const EndedState: 3; | ||
export type EndedState = typeof EndedState; | ||
export type StreamState = ActiveState | ChangingState | PendingState | EndedState; | ||
type Dependent = { | ||
stream: InternalStream; | ||
fn: (x: any) => any; | ||
declare const PendingState: 0; | ||
type PendingState = typeof PendingState; | ||
declare const ActiveState: 1; | ||
type ActiveState = typeof ActiveState; | ||
declare const EndedState: 2; | ||
type EndedState = typeof EndedState; | ||
export declare const State: { | ||
Ended: 2; | ||
Active: 1; | ||
Pending: 0; | ||
}; | ||
export type StreamState = ActiveState | PendingState | EndedState; | ||
interface Op<T, U> { | ||
parent: InternalStream<T>; | ||
stream: InternalStream<U>; | ||
next(): boolean; | ||
} | ||
declare class InternalStream<T = any> { | ||
static SKIP: never; | ||
private immediateDependents; | ||
private parents; | ||
private ends; | ||
private state; | ||
private value; | ||
immediateDependents: Op<T, any>[]; | ||
parents: InternalStream[]; | ||
ends: (() => void)[]; | ||
state: StreamState; | ||
value: T | undefined; | ||
nextValue: T | undefined; | ||
stateUpdated: boolean; | ||
op: Op<any, T>; | ||
type: string; | ||
private _end; | ||
private _readonly; | ||
_readonly: boolean; | ||
constructor(...args: [T] | []); | ||
of<U>(...args: [U] | []): InternalStream<U>; | ||
private markAsChanging; | ||
private isOpen; | ||
get: () => T | undefined; | ||
readonly(): InternalStream<T>; | ||
private doEnd; | ||
get end(): InternalStream<true>; | ||
end(): void; | ||
_unregisterChild<U>(child: InternalStream<U>): void; | ||
_registerDependent(dependent: Dependent): void; | ||
_map<U>(visitor: (x: T) => U, ignoreInitial: boolean): InternalStream<U>; | ||
map<U>(visitor: (x: T) => U): InternalStream<U>; | ||
@@ -45,3 +47,2 @@ _set: (newValue: T | (() => T)) => void; | ||
dropRepeatsWith(equality: (a: T, b: T) => boolean): InternalStream<T>; | ||
cloneWithoutDependendents(fn?: (x: T) => T): InternalStream<T>; | ||
afterSilence(ms: number): InternalStream<T>; | ||
@@ -61,4 +62,2 @@ throttle(ms: number): InternalStream<T>; | ||
static merge(streams: InternalStream[]): InternalStream<any[]>; | ||
static combine<U>(fn: (changed: InternalStream[]) => U, streams: InternalStream[]): InternalStream<U>; | ||
removeReadOnly(): this; | ||
awaitLatest<U>(visitor: (x: T, context: { | ||
@@ -90,13 +89,10 @@ signal: AbortSignal; | ||
export type CoreStream<T> = { | ||
/** Creates a dependent stream whose value is set to the result of the callback function. */ | ||
state: StreamState; | ||
map<U>(f: (current: T) => U): Stream<U>; | ||
/** This method is functionally identical to stream. It exists to conform to Fantasy Land's Applicative specification. */ | ||
of(val: T): Stream<T>; | ||
/** A co-dependent stream that unregisters dependent streams when set to true. */ | ||
end: WritableStream<boolean>; | ||
/** When a stream is passed as the argument to JSON.stringify(), the value of the stream is serialized. */ | ||
toJSON(): string; | ||
/** Returns the value of the stream. */ | ||
valueOf(): T; | ||
get: ReadableStream<T>; | ||
end(): void; | ||
ends: (() => void)[]; | ||
filter(predicate: (value: T) => boolean): Stream<T>; | ||
@@ -114,3 +110,2 @@ reject(predicate: (value: T) => boolean): Stream<T>; | ||
cloneWithoutDependendents(): Stream<T>; | ||
removeReadOnly(): Stream<T>; | ||
awaitLatest<U>(visitor: (x: T, context: { | ||
@@ -125,9 +120,6 @@ signal: AbortSignal; | ||
export type ReadableStream<T, GetT = T> = CoreStream<T> & { | ||
/** Returns the value of the stream. */ | ||
get(): GetT; | ||
}; | ||
export type WritableStream<T, GetT = T> = ReadableStream<T, GetT> & { | ||
/** Sets the value of the stream. */ | ||
set(value: T): void; | ||
/** Updates the value of the stream with access to prior value. */ | ||
update(visitor: (x: T) => T): void; | ||
@@ -155,5 +147,6 @@ }; | ||
} | ||
export type Stream<T = any> = WritableStream<T> | ReadableStream<T>; | ||
export type Stream<T = any, GetT = any> = WritableStream<T, GetT> | ReadableStream<T>; | ||
declare function StreamStatic<T>(...args: [T] | []): InternalStream<T>; | ||
export { InternalStream, StreamStatic }; | ||
export declare const Stream: StreamStatic; | ||
export { Stream as XStream }; |
1084
lib/index.ts
@@ -1,13 +0,16 @@ | ||
export const ActiveState = 0 as const | ||
export type ActiveState = typeof ActiveState | ||
export const ChangingState = 1 as const | ||
export type ChangingState = typeof ChangingState | ||
export const PendingState = 2 as const | ||
export type PendingState = typeof PendingState | ||
export const EndedState = 3 as const | ||
export type EndedState = typeof EndedState | ||
const PendingState = 0 as const | ||
type PendingState = typeof PendingState | ||
const ActiveState = 1 as const | ||
type ActiveState = typeof ActiveState | ||
const EndedState = 2 as const | ||
type EndedState = typeof EndedState | ||
export type StreamState = ActiveState | ChangingState | PendingState | EndedState | ||
export const State = { | ||
Ended: EndedState, | ||
Active: ActiveState, | ||
Pending: PendingState, | ||
} | ||
const I = <T>(x: T) => x | ||
export type StreamState = ActiveState | PendingState | EndedState | ||
const EQ = <T>(a:T,b:T) => a === b | ||
@@ -18,3 +21,2 @@ | ||
type Dependent = { stream: InternalStream, fn: (x: any) => any } | ||
@@ -24,24 +26,541 @@ // if a write happens while propagating | ||
// before starting the next one | ||
const sessions : { stream: InternalStream, nextValue: any }[] = [] | ||
const sessions: InternalStream[] = [] | ||
let isPropagating = false; | ||
const isLazy = new WeakSet<any>() | ||
const SKIP = Symbol('Stream.SKIP') as never | ||
let stack : Op<any,any>[] = [] | ||
interface Op<T, U> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<U> | ||
next(): boolean | ||
} | ||
class FilterOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
predicate: (x:T) => boolean | ||
constructor( predicate: (x:T) => boolean, parent: InternalStream<T> ) { | ||
this.parent = parent | ||
this.predicate = predicate | ||
const target = new InternalStream<T>() | ||
target.state = PendingState | ||
target._readonly = true | ||
target.parents.push(parent) | ||
target.op = this | ||
parent.immediateDependents.push(this) | ||
this.stream = target; | ||
if (parent.state !== ActiveState) { | ||
this.next() | ||
} | ||
} | ||
next(){ | ||
const pv = this.parent.value! | ||
let res = this.predicate(pv) | ||
if ( res !== SKIP && res ) { | ||
this.stream.value = pv | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
return false | ||
} | ||
} | ||
class MapOp<T, U> implements Op<T,U> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<U> | ||
visitor: (x:T) => U | ||
constructor(visitor: (x:T) => U, parent: InternalStream<T>){ | ||
this.visitor = visitor | ||
this.parent = parent | ||
const target = new InternalStream<U>() | ||
target.state = PendingState | ||
target._readonly = true | ||
target.parents.push(parent) | ||
target.op = this | ||
parent.immediateDependents.push(this) | ||
this.stream = target; | ||
if (parent.state === ActiveState) { | ||
this.next() | ||
} | ||
} | ||
next(){ | ||
let res = this.visitor(this.parent.value!) | ||
if ( res !== SKIP ) { | ||
this.stream.value = res | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
return false | ||
} | ||
} | ||
class ScanOp<T,U> implements Op<T,U> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<U> | ||
visitor: (p: U, n: T) => U | ||
acc: U | ||
constructor(visitor: (p: U, n: T) => U, acc: U, parent: InternalStream<T>){ | ||
this.visitor = visitor | ||
this.acc = acc | ||
this.parent = parent | ||
const target = new InternalStream() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
parent.immediateDependents.push(this) | ||
this.stream = target | ||
if (this.parent.state !== ActiveState) { | ||
this.next() | ||
} | ||
} | ||
next(){ | ||
let res = this.visitor(this.acc, this.parent.value!) | ||
if ( res !== SKIP ) { | ||
this.acc = res | ||
this.stream.value = res | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
return false | ||
} | ||
} | ||
class MergeOp<T> implements Op<T, T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
inputs: InternalStream<any>[] | ||
constructor(inputs: InternalStream<any>[]) { | ||
let parent = new InternalStream() | ||
this.inputs = inputs | ||
let readyMask = 0 | ||
inputs.forEach( (x, i) => { | ||
if (x.state !== PendingState) { | ||
readyMask |= 1 << i | ||
} | ||
}) | ||
parent.state = readyMask === (1 << inputs.length) - 1 ? ActiveState : PendingState | ||
parent.op = this | ||
parent._readonly = true | ||
this.parent = parent | ||
this.stream = parent | ||
for(let pi = 0; pi < inputs.length; pi++) { | ||
let p = inputs[pi] | ||
let listener = p.map((_) => { | ||
if (parent.state === ActiveState) { | ||
// if a dependency already emitted and scheduled this | ||
// to run, remove that schedule | ||
// so we can reschedule to the end of the propagation | ||
if (isPropagating) { | ||
let i = sessions.indexOf(this.stream) | ||
if (i !== -1) { | ||
sessions.splice(i, 1) | ||
} | ||
sessions.push(this.stream) | ||
} else { | ||
parent._set(null as any) | ||
} | ||
return null; | ||
} else { | ||
// if we weren't ready yet, then we can just schedule | ||
// if we're now ready | ||
readyMask |= 1 << pi | ||
parent.state = readyMask === (1 << inputs.length) - 1 ? ActiveState : PendingState | ||
if (parent.state !== ActiveState) { | ||
return null; | ||
} else { | ||
sessions.push(this.stream) | ||
return null | ||
} | ||
} | ||
}) | ||
listener.ends.push(() => { | ||
this.stream.end() | ||
}) | ||
parent.ends.push(() => { | ||
listener.end() | ||
}) | ||
} | ||
} | ||
next(){ | ||
this.parent.value = this.inputs.map( x => x.value ) as T | ||
this.parent.state = ActiveState | ||
return true | ||
} | ||
} | ||
class DefaultOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
defaultValue: T | ||
constructor(defaultValue: T, parent: InternalStream<T>) { | ||
this.parent = parent | ||
this.defaultValue = defaultValue | ||
const target = new InternalStream() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
parent.immediateDependents.push(this) | ||
target.value = defaultValue | ||
this.stream = target | ||
} | ||
next(){ | ||
this.stream.value = this.parent.value | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
} | ||
class SkipOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
skipN: number | ||
i: number = 0; | ||
constructor(skipN: number, parent: InternalStream<T>){ | ||
this.skipN = skipN | ||
this.parent = parent | ||
const target = new InternalStream() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
parent.immediateDependents.push(this) | ||
this.stream = target | ||
if (this.parent.state === ActiveState) { | ||
this.next() | ||
} | ||
} | ||
next(){ | ||
if ( this.i++ >= this.skipN ) { | ||
this.stream.value = this.parent.value | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
return false | ||
} | ||
} | ||
class TakeOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
i: number = 0 | ||
n: number | ||
constructor(n: number, parent: InternalStream<T>){ | ||
this.n = n | ||
this.parent = parent | ||
const target = new InternalStream() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
parent.immediateDependents.push(this) | ||
this.stream = target | ||
if (this.parent.state === ActiveState) { | ||
this.next() | ||
} | ||
} | ||
next(){ | ||
if ( this.i++ <= this.n ) { | ||
this.stream.value = this.parent.value | ||
this.stream.state = ActiveState | ||
return true | ||
} else { | ||
this.stream.end() | ||
return false | ||
} | ||
} | ||
} | ||
class DropRepeatsWithOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
equality: (a: T, b: T) => boolean | ||
constructor(equality: (a: T, b: T) => boolean, parent: InternalStream<T>) { | ||
this.parent = parent | ||
this.equality = equality | ||
const target = new InternalStream<T>() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
parent.immediateDependents.push(this) | ||
this.stream = target | ||
if (this.parent.state === ActiveState) { | ||
this.next() | ||
} | ||
} | ||
next(){ | ||
if ( this.equality(this.stream.value!, this.parent.value!) ) { | ||
return false; | ||
} | ||
this.stream.value = this.parent.value | ||
this.stream.state = ActiveState | ||
return true; | ||
} | ||
} | ||
class AfterSilenceOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
ms: number | ||
constructor(ms: number, parent:InternalStream<T>){ | ||
this.ms = ms | ||
this.parent = parent | ||
let id: ReturnType<typeof setTimeout>; | ||
const target = new InternalStream<T>() | ||
target.state = this.parent.state | ||
target.value = this.parent.value | ||
target.op = this | ||
target._readonly = true | ||
this.stream = target | ||
let visit = (x: T) => { | ||
clearTimeout(id) | ||
id = setTimeout(() => { | ||
this.stream._set(x) | ||
}, ms) | ||
} | ||
let listener = this.parent.map(visit) | ||
target.ends.push(() => listener.end()) | ||
} | ||
next() { | ||
this.stream.value = this.parent.value | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
} | ||
class ThrottleOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
ms: number | ||
last: number | ||
id!: number | ||
constructor(ms: number, parent: InternalStream<T>){ | ||
this.parent = parent | ||
this.ms = ms | ||
const target = new InternalStream<T>() | ||
target.state = parent.state | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
target.value = parent.value | ||
this.stream = target | ||
this.last = Date.now() | ||
const process = (x: T) => { | ||
clearTimeout(this.id) | ||
let dt = Date.now() - this.last | ||
if (dt >= this.ms) { | ||
this.last = Date.now() | ||
this.stream._set(x) | ||
} else { | ||
this.id = setTimeout(() => process(x), Math.max(0, this.ms - dt), x) | ||
} | ||
} | ||
let listener = parent.map((x) => process(x)) | ||
listener.ends.push(() => { | ||
target.end() | ||
}) | ||
target.ends.push(() => { | ||
listener.end() | ||
}) | ||
} | ||
next(){ | ||
this.stream.value = this.parent.value | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
} | ||
class AwaitLatestOp<T, U> implements Op<T,PromiseSettledResult<U>> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<PromiseSettledResult<U>> | ||
visitor: (x: T, context: { signal: AbortSignal }) => Promise<U> | ||
constructor(visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>, parent: InternalStream<T>){ | ||
this.visitor = visitor | ||
this.parent = parent | ||
const target = new InternalStream<PromiseSettledResult<U>>() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
this.stream = target | ||
let lastEmit = 0 | ||
let abortController = new AbortController() | ||
parent.ends.push(() => target.end(), () => abortController.abort()) | ||
let listener = parent.map( async x => { | ||
let emit = ++lastEmit | ||
lastEmit = emit | ||
abortController.abort() | ||
abortController = new AbortController() | ||
try { | ||
const value = await visitor(x, { signal: abortController.signal }) | ||
if (emit === lastEmit) { | ||
target.nextValue | ||
target.nextValue = { status: 'fulfilled', value } | ||
target._set(target.nextValue) | ||
lastEmit = 0 | ||
} | ||
} catch (e) { | ||
if (emit === lastEmit) { | ||
target._set({ status: 'rejected', reason: e }) | ||
lastEmit = 0 | ||
} | ||
} | ||
}) | ||
target.ends.push(() => { | ||
listener.end() | ||
}) | ||
} | ||
next(){ | ||
this.stream.value = this.stream.nextValue | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
} | ||
class AwaitEveryOp<T, U> implements Op<T,PromiseSettledResult<U>> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<PromiseSettledResult<U>> | ||
visitor: (x: T, context: { signal: AbortSignal }) => Promise<U> | ||
constructor( | ||
visitor: (x: T, context: { signal: AbortSignal }) => Promise<U> | ||
, parent: InternalStream<T> | ||
){ | ||
this.parent = parent | ||
this.visitor = visitor | ||
const target = new InternalStream<PromiseSettledResult<U>>() | ||
target.state = PendingState | ||
target.op = this | ||
target._readonly = true | ||
target.parents.push(parent) | ||
this.stream = target | ||
const abortController = new AbortController() | ||
parent.ends.push(() => target.end(), () => abortController.abort()) | ||
let listener = | ||
parent.map( async x => { | ||
const [result] = await Promise.allSettled([visitor(x, { signal: abortController.signal })]) | ||
target.nextValue = result | ||
target._set(result) | ||
}) | ||
target.ends.push(() => { | ||
abortController.abort() | ||
listener.end() | ||
}) | ||
} | ||
next(){ | ||
this.stream.value = this.stream.nextValue | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
} | ||
class SourceOp<T> implements Op<T,T> { | ||
parent: InternalStream<T> | ||
stream: InternalStream<T> | ||
constructor(parent: InternalStream<T>) { | ||
this.parent = parent | ||
this.stream = parent | ||
} | ||
next(){ | ||
this.stream.value = this.stream.nextValue | ||
this.stream.state = ActiveState | ||
return true | ||
} | ||
} | ||
class InternalStream<T=any> { | ||
static SKIP = Symbol('Stream.SKIP') as never | ||
static SKIP = SKIP | ||
immediateDependents: Op<T, any>[] = [] | ||
parents: InternalStream[] = [] | ||
private immediateDependents: Dependent[] = [] | ||
private parents: InternalStream[] = [] | ||
// streams that want to end when this stream ends | ||
// but don't want to update when this stream updates | ||
private ends: (() => void)[] = [] | ||
ends: (() => void)[] = [] | ||
private state: StreamState | ||
state: StreamState | ||
private value: T | undefined | ||
value: T | undefined | ||
nextValue: T | undefined | ||
stateUpdated: boolean = false | ||
op: Op<any, T> | ||
type: string = 'source' | ||
private _end!: InternalStream<true> | ||
private _readonly: boolean = false; | ||
_readonly: boolean = false; | ||
@@ -53,7 +572,9 @@ constructor(...args: [T] | []) { | ||
args.length !== 0 | ||
&& value !== InternalStream.SKIP | ||
&& value !== SKIP | ||
? ActiveState | ||
: PendingState | ||
if ( value !== InternalStream.SKIP ) { | ||
this.op = new SourceOp(this) | ||
if ( value !== SKIP ) { | ||
this.value = value | ||
@@ -71,19 +592,2 @@ } | ||
private markAsChanging(): void { | ||
if (this.isOpen()) { | ||
this.state = ChangingState | ||
} | ||
for (let s of this.immediateDependents) { | ||
s.stream.markAsChanging() | ||
} | ||
} | ||
private isOpen() { | ||
return ( | ||
this.state == ActiveState | ||
|| this.state == ChangingState | ||
|| this.state == PendingState | ||
) | ||
} | ||
get = () => { | ||
@@ -95,23 +599,22 @@ let stack = referencedStack.at(-1) | ||
readonly(): InternalStream<T> { | ||
const cloned = this.cloneWithoutDependendents() | ||
cloned._readonly = true | ||
return cloned | ||
} | ||
private doEnd(){ | ||
end(){ | ||
if (this.state === EndedState) { | ||
return; | ||
} | ||
for (let p of this.parents.slice()) { | ||
p._unregisterChild(this) | ||
} | ||
for (let c of this.immediateDependents.slice().map( x => x.stream )) { | ||
if (c._end) { | ||
c.end._set(true) | ||
} else { | ||
c.doEnd() | ||
for (let d of this.immediateDependents.slice()) { | ||
if (!d.stream) { | ||
continue; | ||
} | ||
let c = d.stream | ||
c.end() | ||
} | ||
this.state = EndedState | ||
for (let f of this.ends.slice()) { | ||
f() | ||
} | ||
this.state = EndedState | ||
this.ends.length = | ||
@@ -122,21 +625,2 @@ this.parents.length = | ||
get end(): InternalStream<true> { | ||
if (!this._end) { | ||
if (this.state == EndedState) { | ||
// inert already ended stream | ||
return this._end = this.of(true) | ||
} | ||
this._end = this.of<true>() | ||
this._end.map((x) => { | ||
if (x === true) { | ||
this.doEnd() | ||
} | ||
return x | ||
}) | ||
} | ||
return this._end | ||
} | ||
// delete in favour of deregister dependent | ||
@@ -152,133 +636,83 @@ _unregisterChild<U>(child: InternalStream<U>): void { | ||
_registerDependent(dependent: Dependent) { | ||
this.immediateDependents.push(dependent) | ||
} | ||
_map<U>(visitor: (x: T) => U, ignoreInitial: boolean): InternalStream<U> { | ||
const args : [U] | [] = ignoreInitial ? [] : [visitor(this.value!)] | ||
const target = this.of(...args) | ||
target._readonly = true | ||
target.parents.push(this) | ||
this._registerDependent({ stream: target, fn: visitor }) | ||
return target | ||
} | ||
map<U>(visitor: (x: T) => U): InternalStream<U> { | ||
return this._map(visitor, this.state !== ActiveState) | ||
const out = new MapOp(visitor, this) | ||
return out.stream | ||
} | ||
// unlike traditional mithril stream (which has a recursive update) | ||
// we instead opt for detecting all the static dependencies up front | ||
// and updating them serially. If any writes happen while | ||
// we are updating, we start a new update session. We repeat this | ||
// until (hopefully) no more writes have happened during propagation. | ||
// If they do though, that's an infinite loop which is supported as it | ||
// was in mithril-stream | ||
// Need to cache getAllDependencies, from 5 ops/sec 0.29 ops/sec | ||
// Iterator: 0.44 ops/sec | ||
// Inlined: 0.48 ops/sec (1 test failing) | ||
// -getNextValue 2.30 ops/sec (1 test failing) | ||
// -skipped set 3.90 ops/sec (1 test failing) | ||
// -copy dependent objects when adding to stack 4.61 ops/sec | ||
// -fix tests, stack if / else 4.7 ops/sec | ||
// -assume SKIP is never the input value 5.4 ops/sec | ||
// just the result of the transform | ||
// Use integer flags for state instead of strings 5.5 ops/sec | ||
// Use a do while so first run doesn't require session creation 6.4 ops/sec | ||
// | ||
// we do this to make propagation more predictable and easier to follow | ||
// in a debugger. You can clearly see when we're enterning a new propagation | ||
// and which stream triggered it. This can help track down the root | ||
// cause of a complex loop. | ||
// (Experiment) Re-use previous evaluated dependents ~9 ops/sec | ||
// | ||
// The downside is we have to manually track things you'd normally get | ||
// for free in a recursive update. E.g. we need to record which streams | ||
// were skipped so their dependencies (which are already recorded) are also skipped | ||
// Re-wrote to use Op classes after researching xstream 21 ops/sec (tests failing) | ||
// | ||
_set = (newValue: T | (() => T)) => { | ||
const lazy = isLazy.has(newValue) | ||
// because gating infinite loops often | ||
// checks the current value, but deps update | ||
// later | ||
if ( !lazy ) { | ||
this.value = newValue as T | ||
} | ||
if (!this.isOpen() ) { | ||
return; | ||
} | ||
if (this.state == EndedState) { | ||
return; | ||
} | ||
let target = this.op | ||
this.nextValue = newValue as any | ||
if ( !lazy ) { | ||
this.state = ActiveState | ||
} else { | ||
this.state = ChangingState | ||
} | ||
sessions.push({ | ||
stream: this, nextValue: newValue | ||
}) | ||
if (isPropagating) { | ||
this.nextValue = newValue as any | ||
sessions.push(this) | ||
return; | ||
} | ||
try { | ||
isPropagating = true | ||
while (sessions.length) { | ||
const session = sessions.shift()! | ||
// lazy streams aren't evaluated | ||
// on the call to `_set, so put it at the front | ||
// of the list | ||
// Need to cache getAllDependencies, from 5 ops/sec 0.29 ops/sec | ||
// Iterator: 0.44 ops/sec | ||
// Inlined: 0.48 ops/sec (1 test failing) | ||
// -getNextValue 2.30 ops/sec (1 test failing) | ||
// -skipped set 3.90 ops/sec (1 test failing) | ||
// -copy dependent objects when adding to stack 4.61 ops/sec | ||
// -fix tests, stack if / else 4.7 ops/sec | ||
// -assume SKIP is never the input value 5.4 ops/sec | ||
// just the result of the transform | ||
// Use integer flags for state instead of strings 5.5 ops/sec | ||
let stack : (Dependent & { stateUpdated?: boolean })[] = [] | ||
isPropagating = true | ||
if ( isLazy.has(session.nextValue) ) { | ||
stack.unshift({ | ||
fn: I, | ||
stream: session.stream, | ||
stateUpdated: true, | ||
}) | ||
} else { | ||
stack = session.stream.immediateDependents.slice() | ||
} | ||
do { | ||
while (true) { | ||
let recurse = | ||
target.stream.state !== EndedState | ||
&& target.next() | ||
while (stack.length) { | ||
let target = stack.shift()! | ||
if ( !target.stateUpdated ){ | ||
target.stream.state = ChangingState | ||
if (!recurse) { | ||
if (stack.length === 0) { | ||
break; | ||
} else { | ||
target = stack.shift()! | ||
break; | ||
} | ||
let { stream:s, fn: f } = target | ||
let nextValue = target.stream.parents.length ? target.stream.parents.at(-1)!.value : session.nextValue | ||
if (!s.isOpen()) { | ||
continue | ||
} | ||
s.state = ActiveState | ||
} | ||
let ds = target.stream?.immediateDependents! | ||
let L = ds.length | ||
if ( isLazy.has(nextValue) ) { | ||
nextValue = nextValue() | ||
} | ||
let newValue = f(nextValue) | ||
// also possible! | ||
if ( newValue === Stream.SKIP ) { | ||
if ( stack.length === 0 ) { | ||
if (L === 1) { | ||
target = ds[0] | ||
continue; | ||
} else if (L === 0) { | ||
break | ||
} | ||
s.value = newValue | ||
stack.push(...target.stream.immediateDependents) | ||
} | ||
let xs = ds.slice() | ||
for( let i = 0; i < xs.length; i++ ) { | ||
let d = xs[i] | ||
stack.push(d) | ||
} | ||
if ( stack.length === 0) { | ||
break | ||
} else { | ||
target = stack.shift()! | ||
} | ||
} | ||
} finally { | ||
isPropagating = false | ||
} | ||
if (sessions.length) { | ||
let session = sessions.shift()! | ||
target = session.op | ||
} else { | ||
isPropagating = false | ||
return; | ||
} | ||
} while (true) | ||
} | ||
@@ -304,40 +738,17 @@ | ||
default(x: T): InternalStream<T> { | ||
if (this.state === PendingState) { | ||
const out = this.cloneWithoutDependendents() | ||
out.value = x | ||
out.state = ActiveState | ||
return out | ||
} else { | ||
if (this.state !== PendingState) { | ||
return this | ||
} | ||
const op = new DefaultOp(x, this) | ||
return op.stream | ||
} | ||
skip(n: number): InternalStream<T> { | ||
let i = 0; | ||
let out = this.map( x => { | ||
if ( i >= n ) { | ||
return x | ||
} else { | ||
i++ | ||
return InternalStream.SKIP | ||
} | ||
}) | ||
return out | ||
const op = new SkipOp(n, this) | ||
return op.stream | ||
} | ||
take(n: number): InternalStream<T> { | ||
let i = 0; | ||
let out = this.map( x => { | ||
if ( i > n ) { | ||
if (out.state !== EndedState) { | ||
out.doEnd() | ||
} | ||
return InternalStream.SKIP | ||
} | ||
i++ | ||
return x | ||
}) | ||
return out | ||
const op = new TakeOp(n, this) | ||
return op.stream | ||
} | ||
@@ -354,83 +765,24 @@ | ||
dropRepeatsWith(equality: (a: T, b: T) => boolean): InternalStream<T> { | ||
let neverEmitted = Symbol('neverEmitted') | ||
let prev : T = neverEmitted as any; | ||
const out = this.map( | ||
x => { | ||
try { | ||
if ( prev !== neverEmitted && equality(prev!, x) ) { | ||
return InternalStream.SKIP | ||
} | ||
return x | ||
} finally { | ||
prev = x | ||
} | ||
} | ||
) | ||
return out | ||
const op = new DropRepeatsWithOp(equality, this) | ||
return op.stream | ||
} | ||
cloneWithoutDependendents(fn: (x:T) => T=I): InternalStream<T> { | ||
const out = this.of<T>() | ||
out.state = this.state === ChangingState ? ActiveState : this.state | ||
out.value = this.value | ||
out._readonly = this._readonly | ||
out.parents.push(this) | ||
this._registerDependent({ stream: out, fn }) | ||
return out | ||
} | ||
afterSilence(ms: number): InternalStream<T> { | ||
let id: ReturnType<typeof setTimeout>; | ||
const out = this.of() as any as InternalStream<T> | ||
out.state = this.state === ChangingState ? ActiveState : this.state | ||
out.value = this.value | ||
out._readonly = true | ||
this.ends.push(() => out.doEnd()) | ||
this.map( x => { | ||
clearTimeout(id) | ||
id = setTimeout(() => { | ||
out._set(x) | ||
}, ms) | ||
}) | ||
return out | ||
const op = new AfterSilenceOp(ms, this) | ||
return op.stream | ||
} | ||
throttle(ms: number): InternalStream<T> { | ||
let id: ReturnType<typeof setTimeout> | ||
let last = Date.now() | ||
const out = this.of() as any as InternalStream<T> | ||
out.state = this.state === ChangingState ? ActiveState : this.state | ||
out.value = this.value | ||
out._readonly = true | ||
this.ends.push(() => out.doEnd()) | ||
function process(x: T){ | ||
clearTimeout(id) | ||
let dt = Date.now() - last | ||
if (dt >= ms) { | ||
last = Date.now() | ||
out._set(x) | ||
} else { | ||
id = setTimeout(process, Math.max(0, ms - dt), x) | ||
} | ||
} | ||
this.map(process) | ||
return out | ||
const op = new ThrottleOp(ms, this) | ||
return op.stream | ||
} | ||
filter(predicate: (x: T) => boolean): InternalStream<T> { | ||
return this.map( x => predicate(x) ? x : InternalStream.SKIP ) | ||
let op = new FilterOp(predicate, this) | ||
return op.stream | ||
} | ||
reject(predicate: (x: T) => boolean): InternalStream<T> { | ||
return this.map( x => !predicate(x) ? x : InternalStream.SKIP ) | ||
let op = new FilterOp( x => !predicate(x), this) | ||
return op.stream | ||
} | ||
@@ -495,10 +847,4 @@ | ||
scan<U>(fn: (p: U, n: T) => U, seed: U ): InternalStream<U> { | ||
const sink = this.map( n => { | ||
const out = fn(seed, n) | ||
if ( out !== InternalStream.SKIP ) { | ||
seed = out | ||
} | ||
return out | ||
}) | ||
return sink | ||
const op = new ScanOp(fn, seed, this) | ||
return op.stream | ||
} | ||
@@ -508,126 +854,14 @@ | ||
static merge(streams: InternalStream[]): InternalStream<any[]> { | ||
return InternalStream.combine(() => { | ||
return streams.map( s => s.get()) | ||
}, streams) | ||
const op = new MergeOp<any[]>(streams) | ||
return op.stream | ||
} | ||
static combine<U>( | ||
// type with overloads externally | ||
fn: (changed: InternalStream[]) => U, | ||
streams: InternalStream[], | ||
): InternalStream<U> { | ||
let ready = true | ||
for (let s of streams) { | ||
ready = s.state === ActiveState | ||
if (!ready) { | ||
break | ||
} | ||
} | ||
const out = ready | ||
? streams[0].of<U>(fn(streams)) | ||
: streams[0].of<U>() | ||
out._readonly = true | ||
let changed = new Set<InternalStream>() | ||
for (let dependency of streams) { | ||
// when dependency emits, we add it to the changed array | ||
// and schedule an emit | ||
const mapper = dependency._map( x => { | ||
changed.add(dependency) | ||
if (!ready) { | ||
ready = true | ||
for (let ss of streams) { | ||
if (ss.state === PendingState) { | ||
ready = false | ||
break | ||
} | ||
} | ||
} | ||
if (ready) { | ||
// if another dependency scheduled a session we can exit early | ||
let sessionI = sessions.findIndex( x => x.stream === out ) | ||
if ( sessionI > -1 ) { | ||
// unschedule it, so it can be scheduled by the next dep | ||
sessions.splice(sessionI, 1) | ||
} | ||
// don't call fn() now, only want to call once per "update" | ||
let value = () => { | ||
let ret = fn([...changed]) | ||
changed.clear() | ||
return ret; | ||
} | ||
isLazy.add(value) | ||
out._set(value) | ||
} | ||
}, true) | ||
out.ends.push(() => mapper.doEnd()) | ||
dependency.ends.push(() => out.doEnd()) | ||
} | ||
return out | ||
} | ||
// experimental | ||
removeReadOnly(){ | ||
this._readonly = false | ||
return this | ||
} | ||
// todo | ||
awaitLatest<U>(visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): InternalStream<PromiseSettledResult<U>> { | ||
let lastEmit = 0 | ||
let abortController = new AbortController() | ||
const out = new InternalStream<PromiseSettledResult<U>>; | ||
out._readonly = true | ||
this.ends.push(() => out.doEnd(), () => abortController.abort()) | ||
this.map( async x => { | ||
let emit = ++lastEmit | ||
lastEmit = emit | ||
abortController.abort() | ||
abortController = new AbortController() | ||
try { | ||
const value = await visitor(x, { signal: abortController.signal }) | ||
if (emit === lastEmit) { | ||
out._set({ status: 'fulfilled', value }) | ||
lastEmit = 0 | ||
} | ||
} catch (e) { | ||
if (emit === lastEmit) { | ||
out._set({ status: 'rejected', reason: e }) | ||
lastEmit = 0 | ||
} | ||
} | ||
}) | ||
return out | ||
const op = new AwaitLatestOp(visitor, this) | ||
return op.stream | ||
} | ||
awaitEvery<U>(visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): InternalStream<PromiseSettledResult<U>> { | ||
const out = new InternalStream<PromiseSettledResult<U>>; | ||
out._readonly = true | ||
const abortController = new AbortController() | ||
this.ends.push(() => out.doEnd(), () => abortController.abort()) | ||
this.map( async x => { | ||
const [result] = await Promise.allSettled([visitor(x, { signal: abortController.signal })]) | ||
out._set(result) | ||
}) | ||
return out | ||
const op = new AwaitEveryOp(visitor, this) | ||
return op.stream | ||
} | ||
@@ -642,3 +876,3 @@ | ||
// them access to the raw stream, no point creating it | ||
copy.doEnd() | ||
copy.end() | ||
} | ||
@@ -654,3 +888,3 @@ } | ||
// them access to the raw stream, no point creating it | ||
copy.doEnd() | ||
copy.end() | ||
} | ||
@@ -669,3 +903,3 @@ } | ||
off: () => { | ||
mapper.end._set(true) | ||
mapper.end() | ||
} | ||
@@ -679,4 +913,2 @@ } | ||
type ZedEffect = { | ||
@@ -695,11 +927,10 @@ on: (rerun: (value: any) => void) => { | ||
export type CoreStream<T> = { | ||
/** Creates a dependent stream whose value is set to the result of the callback function. */ | ||
state: StreamState | ||
map<U>(f: (current: T) => U): Stream<U> | ||
/** This method is functionally identical to stream. It exists to conform to Fantasy Land's Applicative specification. */ | ||
of(val: T): Stream<T> | ||
/** A co-dependent stream that unregisters dependent streams when set to true. */ | ||
end: WritableStream<boolean> | ||
/** When a stream is passed as the argument to JSON.stringify(), the value of the stream is serialized. */ | ||
toJSON(): string | ||
/** Returns the value of the stream. */ | ||
valueOf(): T | ||
@@ -709,2 +940,5 @@ | ||
end(): void | ||
ends: (() => void)[] | ||
filter(predicate: (value: T) => boolean): Stream<T> | ||
@@ -728,3 +962,2 @@ reject(predicate: (value: T) => boolean): Stream<T> | ||
cloneWithoutDependendents(): Stream<T> | ||
removeReadOnly(): Stream<T>; | ||
@@ -738,12 +971,9 @@ awaitLatest<U>( visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): Stream<PromiseSettledResult<U>> | ||
export type ReadableStream<T, GetT=T> = CoreStream<T> & { | ||
/** Returns the value of the stream. */ | ||
get(): GetT | ||
} | ||
export type WritableStream<T, GetT=T> = ReadableStream<T, GetT> & { | ||
/** Sets the value of the stream. */ | ||
export type WritableStream<T, GetT=T> = ReadableStream<T, GetT> & { | ||
set(value: T): void | ||
/** Updates the value of the stream with access to prior value. */ | ||
update(visitor: (x: T) => T): void | ||
/** Gets the current value of the stream. */ | ||
} | ||
@@ -775,3 +1005,3 @@ | ||
export type Stream<T=any> = WritableStream<T> | ReadableStream<T> | ||
export type Stream<T=any, GetT=any> = WritableStream<T, GetT> | ReadableStream<T> | ||
@@ -803,1 +1033,3 @@ // the following is just hacks to make the Stream both a function | ||
export { Stream as XStream } |
{ | ||
"name": "chifley", | ||
"version": "0.0.0-next.27", | ||
"version": "0.0.0-next.28", | ||
"description": "", | ||
@@ -44,4 +44,6 @@ "type": "module", | ||
"tsx": "^4.7.0", | ||
"typescript": "^5.0.4" | ||
"typescript": "^5.0.4", | ||
"xstream": "^11.14.0", | ||
"zx": "^8.1.5" | ||
} | ||
} |
220
README.md
# π chifley | ||
``` | ||
π¨π¨ π¨π¨ | ||
π¨ Warning, this library is very new and should be considered pre-release. Do not use in production. π¨ | ||
π¨π¨ π¨π¨ | ||
π¨π¨ π¨π¨ | ||
π¨ Warning, this library is very new and should be considered pre-release. π¨ | ||
π¨ Do not use in production. π¨ | ||
π¨π¨ π¨π¨ | ||
``` | ||
A small, speedy reactive programming library with an API designed to be _app ready_. | ||
A small, speedy reactive programming library with a simple API designed to be _app ready_. | ||
@@ -15,3 +16,3 @@ - πΆ Simple reactivity model | ||
- πββοΈββ‘οΈ Fast | ||
- π€ Tiny but with just enough features β | ||
- π€ Tiny (8kb) but with just enough features β | ||
@@ -29,3 +30,3 @@ ## Quick Start | ||
isEmail.get() | ||
//=> true | ||
//=> false | ||
@@ -47,67 +48,6 @@ try { | ||
### πΆ Simple reactivity model | ||
What makes Chifley different from other stream libraries? Chifley is focused on one goal: the perfect stream libraries for building UIs. In order to meet that goal, Chifley also has to be fast, easy to use (and debug!) and it has to be small. | ||
`chifley` is heavily inspired by stream libraries like [flyd]() and [mithril-stream](). All `chifley` streams are immediately live. They do not need to be subscribed to activate. All downstream subscriptions are themselves active streams that share the same cached parent/input state. | ||
You'll find Chifley familiar if you've used libraries like [flyd] or [mithril-stream]. If you've never used a stream library before then Chifley is a great place to start, it is designed to be easy to learn. If you've coming from libraries like Rx.js you need to unlearn what you have learned as Chifley has a very different philosophy and semantics. | ||
When you end a parent stream all dependent streams will also end. When a dependent stream ends it does not impact the parent's end state. | ||
If you are coming from libraries like [Rx.js]() this may feel a little foreign but for UI programming specifically, this model is far more appropriate. | ||
### π€ Easy to debug | ||
Most stream libraries have a complex recursive propagation algorithm. If you are trying to debug your stream graph, traditional debuggers aren't very helpful as you will find yourself stepping through generic internal functions recursively. | ||
So, instead, we find ourselves using custom tools like Rx marbles (and more commonly, just logs π« ). | ||
`chifley` was deliberately designed to be sustainable at scale with large complex UI stream graphs. When you update a stream, `chifley` traverses all the possible dependencies in a single while loop. | ||
Additionally, any writes that occur during a propagation are deferred until the immediate impact of the previous write is complete. | ||
This doesn't prevent infinite loops but it makes it very clear when you are entering and exiting a propagation caused by a single write. It also helps identify how many dependent transactions are spawned by a single write. | ||
### π Low memory use | ||
`chifley` streams are instances of a simple class. Each instance has many useful methods available but thanks to prototypical inheritance, the memory for those methods are all shared. Each stream has a few instance specific properties but everything else is shared and stored once. | ||
### πββοΈββ‘οΈ Fast | ||
`chifley` isn't the fastest stream library, but it is faster than many of them and strikes a healthy balance between memory and CPU usage. | ||
- Roughly 10% slower than `most` | ||
- Roughly 45% faster than `flyd` | ||
- Roughly 430% faster than `mithril-stream` | ||
``` | ||
filter -> map -> reduce 1000000 integers | ||
------------------------------------------------------- | ||
most.js 8.62 op/s Β± 8.24% (45 samples) | ||
chifley 7.83 op/s Β± 1.00% (42 samples) | ||
flyd 5.39 op/s Β± 2.45% (31 samples) | ||
mithril-stream 1.79 op/s Β± 0.96% (13 samples) | ||
------------------------------------------------------- | ||
``` | ||
- [From Github Actions](https://github.com/JAForbes/chifley/actions/runs/10732783258/job/29765146646) | ||
- [Benchmarks Source](./benchmarks/index.ts) | ||
> π€ Note these bench marks are very early and incomplete. Contributions / extensions would be very welcome. | ||
### π€ Tiny but with just enough features β | ||
`chifley` originally started as a fork of [mithril-stream](), which in turn was a rewrite of [flyd](). The code has evolved greatly over many years, and this library is a formalization of a lot of things we have learnt over nearly a decade of working with these libraries. | ||
We observed what methods and capabilities weer actually used, and we have removed everything we didn't. We even removed things we really liked in principle. | ||
The truth is, when your stream library uses the [flyd]() model, you don't need as many operators or abstractions as you do in say [Rx.js]() because we aren't trying to prevent side effects by predefining operators for all possible scenarios. | ||
So here's a short list of things we have included in this tiny library: | ||
- πͺ 14 commonly use stream operations: (`map`, `filter`, `reject`, `merge`, `scan`, `dropRepeats`, `throttle`, `default`, `skip`, `take`, `once`, `afterSilence`, `awaitLatest` and `awaitEvery`) | ||
- π΅οΈ Automatic dependency tracking (like S.js / Signal libraries) | ||
- π Read-only streams (both at runtime and via Typescript) | ||
- πΎ `toJSON` serialization | ||
- π₯ `Sin.js` observable protocol | ||
- π°οΈ Predictable atomic clock update (_similar_ to S.js) | ||
## API | ||
@@ -184,12 +124,8 @@ | ||
You can end a stream and all its dependent streams by calling `.end.set(true)`. You can also check if a stream has ended via `.end.get()` | ||
You can end a stream and all its dependent streams by calling `.end()`. You can also check if a stream has ended via `.end.state === State.Ended` | ||
```typescript | ||
name.end.set(true) | ||
name.end() | ||
``` | ||
You may have noticed from this signature that `.end` is itself a stream. This is most useful if you'd like to be notified when a particular stream has been ended. | ||
We recursively create the `.end` stream as you access it. Technically you can go `name.end.end.end.end.end` (but don't do that). | ||
### Opting out of an update | ||
@@ -407,3 +343,3 @@ | ||
Unlike other stream/obserable libraries `chifley` doesn't coerce promises, or arrays (or anything else) into stream events. A `Promise` is as much a value as a `number` of a `string`. | ||
Unlike other stream/obserable libraries Chifley doesn't coerce promises, or arrays (or anything else) into stream events. A `Promise` is as much a value as a `number` of a `string`. | ||
@@ -504,3 +440,3 @@ But we do provide some basic operators for handling the two common cases of coercing promises into stream events. | ||
This feature is designed to be used primarily by framework authors and not directly by `chifley` users in normal application code. It will seem a little verbose at first but it is designed to give the framework author precise control over how tracking contexts cascade. | ||
This feature is designed to be used primarily by framework authors and not directly by Chifley users in normal application code. It will seem a little verbose at first but it is designed to give the framework author precise control over how tracking contexts cascade. | ||
@@ -561,12 +497,132 @@ To track stream references use the static `trackReferenced` function: | ||
## Prior Art | ||
## Comparisons | ||
### πΆ Simple reactivity model | ||
Chifley is heavily inspired by stream libraries like [flyd] and [mithril-stream]. All Chifley streams are immediately active. What does "active" mean? It means there is no extra `subscribe()` call you have to do to have your stream transforms start evaluating. If you put a `console.log(...)` in a call to `filter` or `map` or any other operator, you'll see they are immediately invoked. | ||
Additionally, every stream caches the last emitted value, and you can always access it via `.get()`. Source streams can be written to via `.set` and `.update` while dependent streams are readonly and will prevent you from writing to them both at runtime and via Typescript. | ||
When you end a parent stream all dependent streams will also end. | ||
If you are coming from libraries like [Rx.js] this may feel a little foreign as you don't typically manually end a source stream you simply unsubscribe from a sink stream and the source is automatically ended when there's no more subscribers. But for UI programming specifically explicitly ending source streams is both more efficient and easier to reason about. | ||
Unlike traditional FRP libraries you are encouraged to write to streams (even while changes are already propagating). You are expected to guard against potential infinite loops yourself, but Chifley is designed to make it easy to diagnose issues in your stream graph by using a transaction / atomic clock model: each new write is deferred until the immediate effects of the current write are complete. | ||
### π€ Easy to debug | ||
Most stream libraries have a complex recursive propagation algorithm. If you are trying to debug your stream graph, traditional debuggers aren't very helpful as you will find yourself stepping through generic internal functions recursively. | ||
So, instead, we find ourselves using custom tools like Rx marbles (and more commonly, just logs π« ). | ||
Chifley was deliberately designed to be sustainable at scale with large complex UI stream graphs. When you update a stream, Chifley traverses all the possible dependencies in a single while loop. | ||
Additionally, any writes that occur during a propagation are deferred until the immediate impact of the previous write is complete. | ||
This doesn't prevent infinite loops but it makes it very clear when you are entering and exiting a propagation caused by a single write. It also helps identify how many dependent transactions are spawned by a single write. | ||
### π Low memory use | ||
Chifley streams are instances of a simple class. Each instance has many useful methods available but thanks to prototypical inheritance, the memory for those methods are all shared. Each stream has a few instance specific properties but everything else is shared and stored once. | ||
### πββοΈββ‘οΈ Fast | ||
Chifley isn't the fastest stream library available but it is competitive and strikes a healthy balance between memory / CPU usage and features. | ||
- [From Github Actions](https://github.com/JAForbes/chifley/actions/runs/10732783258/job/29765146646) | ||
- [Benchmarks Source](./benchmarks/index.ts) | ||
> π€ Note these bench marks are very early and incomplete. Contributions / extensions would be very welcome. | ||
### π€ Tiny but with just enough features β | ||
Chifley originally started as a fork of [mithril-stream], which in turn was a rewrite of [flyd]. The code has evolved greatly over many years, and this library is a formalization of a lot of things we have learnt over nearly a decade of working with these libraries. | ||
We observed what methods and capabilities weer actually used, and we have removed everything we didn't. We even removed things we really liked in principle. | ||
The truth is, when your stream library uses the [flyd] model, you don't need as many operators or abstractions as you do in say [Rx.js] because we aren't trying to prevent side effects by predefining operators for all possible scenarios. | ||
So here's a short list of things we have included in this tiny library: | ||
- πͺ 14 commonly use stream operations: (`map`, `filter`, `reject`, `merge`, `scan`, `dropRepeats`, `throttle`, `default`, `skip`, `take`, `once`, `afterSilence`, `awaitLatest` and `awaitEvery`) | ||
- π΅οΈ Automatic dependency tracking (like S.js / Signal libraries) | ||
- π Read-only streams (both at runtime and via Typescript) | ||
- πΎ `toJSON` serialization | ||
- π₯ `Sin.js` observable protocol | ||
- π°οΈ Predictable atomic clock update (_similar_ to S.js) | ||
### Traditional FRP vs Reactive Store | ||
There are many stream libraries in JS, with varying philosophies, feature sets and performance profiles. | ||
Chifley is lazer focused on UI state management, nothing more. It is designed to act as a simple reactive store with a few core transforms/combinators. You are encouraged to write to a chifley stream the same way you write to `setState` in React, or a signal in Solid.js. | ||
But in traditional FRP this is considered problematic. You will often read advice like: | ||
> If you find yourself writing to a Subject you are probably doing things *the wrong way* | ||
And in some contexts this is true, you really can create a mess very quickly if you don't take the time to learn how to use these tools in the way they were designed to be used. | ||
But part of the benefit of Chifley is you don't need to explicitly think about hot vs cold, MemoryStreams, share/publish etc. It's effectively a state store with a few tricks up its sleeve. | ||
### Apples to Apples comparisons are complicated... | ||
Often with traditional FRP libraries you can really tweak performance if you know what you are doing but its also easy to get awful performance if you are not very careful. It is quite simple to find yourself with duplicated effects (e.g. network requests) because multiple components are referencing the same observable and each subscription duplicates all the transforms. | ||
This will never happen with libraries where all streams are "hot" (xstream, flyd, mithril-stream) but can be avoided with specific operators in "cold" by default libraries (Rx.js, most.js). | ||
[xstream] is a very interesting case, it has amazing performance and while being Rx inspired it makes some important design decisions to side step alot of the confusion. [xstream] is a great choice if you are writing mostly acyclic stream graphs and you are happy to avoid writing to subject/source streams. It is still very much in the spirit of what most would consider "real FRP". In my benchmarks (which are very tailored towards UI workloads) xstream was frequently the fastest stream library that I tested against. [xstream] is roughy 2x faster than Chifley. | ||
[flyd] and [mithril-stream] both could be considered direct ancestors of Chifley. The programming model is less focused on linear dataflow and raw performance and instead is designed to assist with UI programming. Chifley is roughly 4x faster than [flyd], and 18x faster than [mithril-stream]. | ||
Chifley approaches (and sometimes exceeds) the performance of [most.js] but only in the specific context of how we benchmark (multiple subscribers, no fromArray, no special caching operators). In a different context [most.js] is often the leader in performance, especially in server workloads where there is 1 subscriber per request. For those specific use cases I would recommend to look at [most.js] | ||
### Benchmarking considerations | ||
In our benchmarks we completely avoid `fromArray` and any kind of operator like `share()`, `remember()` or `publish()`. We avoid `fromArray` because this is easy to optimize with specialized code paths but its not a good representation of performance when processing asynchronous events. We avoid operators like `share()`, `remember()`, `publish()` (etc) because having to opt in to this specialized code path requires a level of expertise that isn't representative of most users of a stream library. We want to know how Chifley compares to the sort of code you would actually see in the wild, equally Chifley benchmarks are not allowed to use any special operator to improve performance, if we're filtering, mapping and reducing, then that's all the operators we can use in the benchmark. | ||
Additionally, its rare in a UI for a stream to have only a single subscriber, so we will often subscribe to the same pipeline n times to see how performance degrades (this heavily impacts benchmarking scores for cold observable libraries). | ||
``` | ||
filter -> map -> reduce 1000000 integers (10 consumers) | ||
------------------------------------------------------- | ||
xstream 34.83 op/s Β± 1.89% (82 samples) | ||
chifley 21.05 op/s Β± 7.83% (53 samples) | ||
flyd 4.70 op/s Β± 1.26% (27 samples) | ||
mithril-stream 1.76 op/s Β± 3.62% (13 samples) | ||
most.js 8.46 op/s Β± 8.69% (44 samples) | ||
------------------------------------------------------- | ||
``` | ||
[From Github Actions](https://github.com/JAForbes/chifley/actions/runs/10770097243/job/29862748651) | ||
## Prior Art and Thanks | ||
This library has a lot of influences and is greatly shaped by years of working with FP and FRP libraries in large complex applications. | ||
Many thanks first of all to [Simon Friis Vindum](), this library is greatly influenced by his library [flyd](). | ||
Many thanks first of all to [Simon Friis Vindum], this library is greatly influenced by his library [flyd]. | ||
Much appreciaton to [Adam Haille]() for creating [S.js]() and to [Ryan Carniato]() for popularising it through his work with [Solid.js]. This library has been inspired by [S.js]() in its atomic update algorithm and with the automatic dependency tracking that we also see in `S.computation`. | ||
Much appreciaton to [Adam Haile] for creating [S.js] and to [Ryan Carniato] for popularising it through his work with [Solid.js]. This library has been inspired by [S.js] in its atomic update algorithm and with the automatic dependency tracking that we also see in `S.computation`. | ||
This library originally started as a port of [mithril-stream]() originally a rewrite of [flyd]() by [Leo Horie]() and later rewritten by [Rasmus Porsager](). Thanks again goes to Rasmus for the sin observable protocol which this library has adopted. | ||
After benchmarking, we greatly revised the internals to mimic some techniques xstream uses to improve performance. A lot of credit goes to [AndrΓ© Staltz](https://staltz.com/), [Tyler Steinberger](https://github.com/tylors) and all the other contributors to [xstream] for making it so fast. | ||
Despite dropping support for `fantasyland` and many other functional programming operators such as `lift`, I wouldn't be as involved with FRP as I am if it weren't for the `fantasyland`, `ramda` and `sanctuary` communities. | ||
This library originally started as a port of [mithril-stream], and [mithril-stream] was originally a rewrite of [flyd]. [mithril-stream] was first implemented by [Leo Horie] (Original author of Mithril.js) and later rewritten by [Rasmus Porsager]. Thanks again goes to Rasmus for the [Sin.js] observable protocol which this library has adopted. | ||
Finally, special thanks to [Barney Carroll] for constantly offering helpful feedback and reading early drafts of anything and everything I ever work on. | ||
[flyd]: https://github.com/paldepind/flyd | ||
[Rx.js]: https://rxjs.dev/ | ||
[S.js]: https://github.com/adamhaile/s | ||
[mithril-stream]: https://mithril.js.org/stream.html | ||
[Solid.js]: https://www.solidjs.com/ | ||
[Sin.js]: https://sinjs.com | ||
[xstream]: https://github.com/staltz/xstream | ||
[Simon Friis Vindum]: https://github.com/paldepind | ||
[Adam Haile]: https://github.com/adamhaile | ||
[Rasmus Porsager]: https://github.com/porsager | ||
[Ryan Carniato]: https://x.com/ryancarniato | ||
[Barney Carroll]: https://github.com/barneycarroll | ||
[Leo Horie]: https://github.com/lhorie |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
111791
1691
621
16
1