New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

chifley

Package Overview
Dependencies
Maintainers
0
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

chifley - npm Package Compare versions

Comparing version 0.0.0-next.26 to 0.0.0-next.27

197

dist/chifley.esm.js
// lib/index.ts
var ActiveState = 0;
var ChangingState = 1;
var PendingState = 2;
var EndedState = 3;
var I = (x) => x;

@@ -7,3 +11,2 @@ var EQ = (a, b) => a === b;

var sessions = [];
var skipped = /* @__PURE__ */ new Set();
var isPropagating = false;

@@ -13,3 +16,3 @@ var isLazy = /* @__PURE__ */ new WeakSet();

static SKIP = Symbol("Stream.SKIP");
dependents = [];
immediateDependents = [];
parents = [];

@@ -25,3 +28,3 @@ // streams that want to end when this stream ends

const [value] = args;
this.state = args.length !== 0 && value !== _InternalStream.SKIP ? "active" : "pending";
this.state = args.length !== 0 && value !== _InternalStream.SKIP ? ActiveState : PendingState;
if (value !== _InternalStream.SKIP) {

@@ -33,5 +36,2 @@ this.value = value;

}
// like calling new Stream() but adds the getter setter
// interface mithril-stream had. An instance method
// so subclassing works.
of(...args) {

@@ -43,5 +43,5 @@ let instance = new this.constructor(...args);

if (this.isOpen()) {
this.state = "changing";
this.state = ChangingState;
}
for (let s of this.dependents) {
for (let s of this.immediateDependents) {
s.stream.markAsChanging();

@@ -51,3 +51,3 @@ }

isOpen() {
return this.state === "pending" || this.state === "active" || this.state === "changing";
return this.state == ActiveState || this.state == ChangingState || this.state == PendingState;
}

@@ -68,5 +68,5 @@ get = () => {

}
for (let c of this.dependents.slice().map((x) => x.stream).concat(this.ends.slice())) {
for (let c of this.immediateDependents.slice().map((x) => x.stream)) {
if (c._end) {
c.end.set(true);
c.end._set(true);
} else {

@@ -76,9 +76,12 @@ c.doEnd();

}
this.state = "ended";
this.parents.length = this.dependents.length = 0;
for (let f of this.ends.slice()) {
f();
}
this.state = EndedState;
this.ends.length = this.parents.length = this.immediateDependents.length = 0;
}
get end() {
if (!this._end) {
if (this.state == "ended") {
return this.of(true);
if (this.state == EndedState) {
return this._end = this.of(true);
}

@@ -95,8 +98,12 @@ this._end = this.of();

}
// delete in favour of deregister dependent
_unregisterChild(child) {
let i = this.dependents.findIndex((x) => x.stream === child);
let i = this.immediateDependents.findIndex((x) => x.stream === child);
if (i !== -1) {
this.dependents.splice(i, 1);
this.immediateDependents.splice(i, 1);
}
}
_registerDependent(dependent) {
this.immediateDependents.push(dependent);
}
_map(visitor, ignoreInitial) {

@@ -107,30 +114,8 @@ const args = ignoreInitial ? [] : [visitor(this.value)];

target.parents.push(this);
this.dependents.push({ stream: target, fn: visitor });
this._registerDependent({ stream: target, fn: visitor });
return target;
}
map(visitor) {
return this._map(visitor, this.state !== "active");
return this._map(visitor, this.state !== ActiveState);
}
getAllDependencies(session) {
const toUpdate = [];
const stack = session.stream.dependents.slice().map((x) => ({ ...x, nextValue: session.nextValue }));
while (stack.length) {
let target = stack.shift();
target.stream.state = "changing";
toUpdate.push(target);
stack.push(...target.stream.dependents.map((x) => ({
fn: x.fn,
stream: x.stream,
// todo-james instead of getter just have reference to stream parent
// and grab value lazily
get nextValue() {
if (skipped.has(target.stream)) {
return Stream.SKIP;
}
return target.stream.value;
}
})));
}
return toUpdate;
}
// unlike traditional mithril stream (which has a recursive update)

@@ -153,8 +138,2 @@ // we instead opt for detecting all the static dependencies up front

_set = (newValue) => {
if (newValue == _InternalStream.SKIP) {
if (isPropagating) {
skipped.add(this);
}
return;
}
const lazy = isLazy.has(newValue);

@@ -165,11 +144,11 @@ if (!lazy) {

if (!this.isOpen()) {
if (isPropagating) {
skipped.add(this);
}
return;
}
if (this.state == EndedState) {
return;
}
if (!lazy) {
this.state = "active";
this.state = ActiveState;
} else {
this.state = "changing";
this.state = ChangingState;
}

@@ -187,36 +166,32 @@ sessions.push({

const session = sessions.shift();
const toUpdate = this.getAllDependencies(session);
let stack = [];
if (isLazy.has(session.nextValue)) {
toUpdate.unshift({
nextValue: session.nextValue(),
stack.unshift({
fn: I,
stream: session.stream
stream: session.stream,
stateUpdated: true
});
} else {
stack = session.stream.immediateDependents.slice();
}
for (let i = 0; i < toUpdate.length; i++) {
let { stream: s, fn: f, nextValue } = toUpdate[i];
if (s.parents.some((x) => skipped.has(x))) {
s.state = "active";
skipped.add(s);
continue;
while (stack.length) {
let target = stack.shift();
if (!target.stateUpdated) {
target.stream.state = ChangingState;
}
let { stream: s, fn: f } = target;
let nextValue = target.stream.parents.length ? target.stream.parents.at(-1).value : session.nextValue;
if (!s.isOpen()) {
skipped.add(s);
continue;
}
s.state = "active";
s.state = ActiveState;
if (isLazy.has(nextValue)) {
nextValue = nextValue();
}
if (nextValue === Stream.SKIP) {
s.state = "active";
skipped.add(s);
continue;
}
let newValue2 = f(nextValue);
if (newValue2 === Stream.SKIP) {
skipped.add(s);
continue;
}
s.value = newValue2;
stack.push(...target.stream.immediateDependents);
}

@@ -226,3 +201,2 @@ }

isPropagating = false;
skipped.clear();
}

@@ -243,6 +217,6 @@ };

default(x) {
if (this.state === "pending") {
if (this.state === PendingState) {
const out = this.cloneWithoutDependendents();
out.value = x;
out.state = "active";
out.state = ActiveState;
return out;

@@ -269,4 +243,4 @@ } else {

if (i > n) {
if (out.state !== "ended") {
out.end.set(true);
if (out.state !== EndedState) {
out.doEnd();
}

@@ -305,7 +279,7 @@ return _InternalStream.SKIP;

const out = this.of();
out.state = this.state === "changing" ? "active" : this.state;
out.state = this.state === ChangingState ? ActiveState : this.state;
out.value = this.value;
out._readonly = this._readonly;
out.parents.push(this);
this.dependents.push({ stream: out, fn });
this._registerDependent({ stream: out, fn });
return out;

@@ -316,7 +290,7 @@ }

const out = this.of();
out.state = this.state === "changing" ? "active" : this.state;
out.state = this.state === ChangingState ? ActiveState : this.state;
out.value = this.value;
out._readonly = true;
let last;
const mapper = this.map((x) => {
this.ends.push(() => out.doEnd());
this.map((x) => {
clearTimeout(id);

@@ -333,5 +307,6 @@ id = setTimeout(() => {

const out = this.of();
out.state = this.state === "changing" ? "active" : this.state;
out.state = this.state === ChangingState ? ActiveState : this.state;
out.value = this.value;
out._readonly = true;
this.ends.push(() => out.doEnd());
function process(x) {

@@ -347,3 +322,3 @@ clearTimeout(id);

}
const mapper = this.map(process);
this.map(process);
return out;

@@ -404,4 +379,4 @@ }

}
static scan(fn, seed, source) {
const sink = source.map((n) => {
scan(fn, seed) {
const sink = this.map((n) => {
const out = fn(seed, n);

@@ -423,3 +398,3 @@ if (out !== _InternalStream.SKIP) {

for (let s of streams) {
ready = s.state === "active";
ready = s.state === ActiveState;
if (!ready) {

@@ -438,3 +413,3 @@ break;

for (let ss of streams) {
if (ss.state === "pending") {
if (ss.state === PendingState) {
ready = false;

@@ -447,3 +422,3 @@ break;

let sessionI = sessions.findIndex((x2) => x2.stream === out);
if (sessionI !== -1) {
if (sessionI > -1) {
sessions.splice(sessionI, 1);

@@ -460,4 +435,4 @@ }

}, true);
out.ends.push(mapper);
dependency.ends.push(out);
out.ends.push(() => mapper.doEnd());
dependency.ends.push(() => out.doEnd());
}

@@ -471,2 +446,40 @@ return out;

}
// todo
awaitLatest(visitor) {
let lastEmit = 0;
let abortController = new AbortController();
const out = new _InternalStream();
out._readonly = true;
this.ends.push(() => out.doEnd(), () => abortController.abort());
this.map(async (x) => {
let emit = ++lastEmit;
lastEmit = emit;
abortController.abort();
abortController = new AbortController();
try {
const value = await visitor(x, { signal: abortController.signal });
if (emit === lastEmit) {
out._set({ status: "fulfilled", value });
lastEmit = 0;
}
} catch (e) {
if (emit === lastEmit) {
out._set({ status: "rejected", reason: e });
lastEmit = 0;
}
}
});
return out;
}
awaitEvery(visitor) {
const out = new _InternalStream();
out._readonly = true;
const abortController = new AbortController();
this.ends.push(() => out.doEnd(), () => abortController.abort());
this.map(async (x) => {
const [result] = await Promise.allSettled([visitor(x, { signal: abortController.signal })]);
out._set(result);
});
return out;
}
// sin

@@ -494,3 +507,3 @@ observe(fn) {

off: () => {
mapper.end.set(true);
mapper.end._set(true);
}

@@ -518,3 +531,7 @@ };

export {
ActiveState,
ChangingState,
EndedState,
InternalStream,
PendingState,
Stream,

@@ -521,0 +538,0 @@ StreamStatic

@@ -0,1 +1,10 @@

export declare const ActiveState: 0;
export type ActiveState = typeof ActiveState;
export declare const ChangingState: 1;
export type ChangingState = typeof ChangingState;
export declare const PendingState: 2;
export type PendingState = typeof PendingState;
export declare const EndedState: 3;
export type EndedState = typeof EndedState;
export type StreamState = ActiveState | ChangingState | PendingState | EndedState;
type Dependent = {

@@ -7,3 +16,3 @@ stream: InternalStream;

static SKIP: never;
private dependents;
private immediateDependents;
private parents;

@@ -24,10 +33,5 @@ private ends;

_unregisterChild<U>(child: InternalStream<U>): void;
_registerDependent(dependent: Dependent): void;
_map<U>(visitor: (x: T) => U, ignoreInitial: boolean): InternalStream<U>;
map<U>(visitor: (x: T) => U): InternalStream<U>;
getAllDependencies(session: {
stream: InternalStream;
nextValue: any;
}): (Dependent & {
nextValue: any;
})[];
_set: (newValue: T | (() => T)) => void;

@@ -56,6 +60,12 @@ set: (newValue: T) => void;

static isStream(s: any): s is InternalStream;
static scan<T, U>(fn: (p: U, n: T) => U, seed: U, source: InternalStream<T>): InternalStream<U>;
scan<U>(fn: (p: U, n: T) => U, seed: U): InternalStream<U>;
static merge(streams: InternalStream[]): InternalStream<any[]>;
static combine<U>(fn: (changed: InternalStream[]) => U, streams: InternalStream[]): InternalStream<U>;
removeReadOnly(): this;
awaitLatest<U>(visitor: (x: T, context: {
signal: AbortSignal;
}) => Promise<U>): InternalStream<PromiseSettledResult<U>>;
awaitEvery<U>(visitor: (x: T, context: {
signal: AbortSignal;
}) => Promise<U>): InternalStream<PromiseSettledResult<U>>;
observe(fn: (x: T) => any): () => void;

@@ -102,11 +112,18 @@ ['sin/observe'](fn: (x: T) => any): () => void;

once(): Stream<T>;
scan<U>(fn: (acc: U, value: T) => U, acc: U): Stream<U>;
cloneWithoutDependendents(): Stream<T>;
removeReadOnly(): Stream<T>;
awaitLatest<U>(visitor: (x: T, context: {
signal: AbortSignal;
}) => Promise<U>): Stream<PromiseSettledResult<U>>;
awaitEvery<U>(visitor: (x: T, context: {
signal: AbortSignal;
}) => Promise<U>): Stream<PromiseSettledResult<U>>;
['zed/effect']: () => ZedEffect;
} & SinObservable;
export type ReadableStream<T> = CoreStream<T> & {
export type ReadableStream<T, GetT = T> = CoreStream<T> & {
/** Returns the value of the stream. */
get(): T;
get(): GetT;
};
export type WritableStream<T> = ReadableStream<T> & {
export type WritableStream<T, GetT = T> = ReadableStream<T, GetT> & {
/** Sets the value of the stream. */

@@ -119,3 +136,3 @@ set(value: T): void;

<T>(x: T): WritableStream<T>;
<T>(): WritableStream<T | undefined>;
<T>(): WritableStream<T, T | undefined>;
/** Creates a computed stream that reactively updates if any of its upstreams are updated. */

@@ -127,4 +144,2 @@ merge<S extends readonly any[]>(streams: {

}>;
/** Creates a new streamCoStreamConstructor with the results of calling the function on every incoming stream with and accumulator and the incoming value. */
scan<T, U>(fn: (acc: U, value: T) => U, acc: U, stream: Stream<T>): Stream<U>;
/** A special value that can be returned to stream callbacks to skip execution of downstreams. */

@@ -131,0 +146,0 @@ SKIP: never;

@@ -1,3 +0,12 @@

type StreamState = 'active' | 'changing' | 'pending' | 'ended'
export const ActiveState = 0 as const
export type ActiveState = typeof ActiveState
export const ChangingState = 1 as const
export type ChangingState = typeof ChangingState
export const PendingState = 2 as const
export type PendingState = typeof PendingState
export const EndedState = 3 as const
export type EndedState = typeof EndedState
export type StreamState = ActiveState | ChangingState | PendingState | EndedState
const I = <T>(x: T) => x

@@ -15,3 +24,2 @@ const EQ = <T>(a:T,b:T) => a === b

const sessions : { stream: InternalStream, nextValue: any }[] = []
const skipped = new Set<InternalStream>();
let isPropagating = false;

@@ -24,3 +32,3 @@

private dependents: Dependent[] = []
private immediateDependents: Dependent[] = []
private parents: InternalStream[] = []

@@ -30,3 +38,3 @@

// but don't want to update when this stream updates
private ends: InternalStream[] = []
private ends: (() => void)[] = []

@@ -47,4 +55,4 @@ private state: StreamState

&& value !== InternalStream.SKIP
? 'active'
: 'pending'
? ActiveState
: PendingState

@@ -59,5 +67,2 @@ if ( value !== InternalStream.SKIP ) {

// like calling new Stream() but adds the getter setter
// interface mithril-stream had. An instance method
// so subclassing works.
of<U>(...args: [U] | []): InternalStream<U> {

@@ -70,5 +75,5 @@ let instance = new (this.constructor as any)(...args)

if (this.isOpen()) {
this.state = 'changing'
this.state = ChangingState
}
for (let s of this.dependents) {
for (let s of this.immediateDependents) {
s.stream.markAsChanging()

@@ -80,5 +85,5 @@ }

return (
this.state === 'pending' ||
this.state === 'active' ||
this.state === 'changing'
this.state == ActiveState
|| this.state == ChangingState
|| this.state == PendingState
)

@@ -103,5 +108,5 @@ }

}
for (let c of this.dependents.slice().map( x => x.stream ).concat(this.ends.slice())) {
for (let c of this.immediateDependents.slice().map( x => x.stream )) {
if (c._end) {
c.end.set(true)
c.end._set(true)
} else {

@@ -111,6 +116,9 @@ c.doEnd()

}
this.state = 'ended'
this.parents.length =
this.dependents.length =
0
for (let f of this.ends.slice()) {
f()
}
this.state = EndedState
this.ends.length =
this.parents.length =
this.immediateDependents.length = 0
}

@@ -120,5 +128,5 @@

if (!this._end) {
if (this.state == 'ended') {
if (this.state == EndedState) {
// inert already ended stream
return this.of(true)
return this._end = this.of(true)
}

@@ -138,9 +146,16 @@ this._end = this.of<true>()

// delete in favour of deregister dependent
_unregisterChild<U>(child: InternalStream<U>): void {
let i = this.dependents.findIndex( x => x.stream === child)
let i = this.immediateDependents.findIndex( x => x.stream === child)
if (i !== -1) {
this.dependents.splice(i, 1)
this.immediateDependents.splice(i, 1)
}
}
_registerDependent(dependent: Dependent) {
this.immediateDependents.push(dependent)
}
_map<U>(visitor: (x: T) => U, ignoreInitial: boolean): InternalStream<U> {

@@ -152,3 +167,3 @@ const args : [U] | [] = ignoreInitial ? [] : [visitor(this.value!)]

target.parents.push(this)
this.dependents.push({ stream: target, fn: visitor })
this._registerDependent({ stream: target, fn: visitor })
return target

@@ -158,30 +173,5 @@ }

map<U>(visitor: (x: T) => U): InternalStream<U> {
return this._map(visitor, this.state !== 'active')
return this._map(visitor, this.state !== ActiveState)
}
getAllDependencies(session: { stream: InternalStream, nextValue: any }){
const toUpdate : (Dependent & { nextValue: any })[] = []
const stack = session.stream.dependents.slice().map( x => ({ ...x, nextValue: session.nextValue }));
while( stack.length ) {
let target = stack.shift()!
target.stream.state = 'changing'
toUpdate.push(target)
stack.push(...target.stream.dependents.map( x => ({
fn: x.fn,
stream: x.stream,
// todo-james instead of getter just have reference to stream parent
// and grab value lazily
get nextValue(){
if ( skipped.has(target.stream) ) {
return Stream.SKIP
}
return target.stream.value
}
})))
}
return toUpdate
}
// unlike traditional mithril stream (which has a recursive update)

@@ -204,10 +194,2 @@ // we instead opt for detecting all the static dependencies up front

_set = (newValue: T | (() => T)) => {
if (newValue == InternalStream.SKIP) {
if (isPropagating) {
skipped.add(this)
}
return;
}
const lazy = isLazy.has(newValue)

@@ -223,11 +205,12 @@

if (!this.isOpen() ) {
if( isPropagating ) {
skipped.add(this)
}
return;
}
if (this.state == EndedState) {
return;
}
if ( !lazy ) {
this.state = 'active'
this.state = ActiveState
} else {
this.state = 'changing'
this.state = ChangingState
}

@@ -247,28 +230,43 @@

const session = sessions.shift()!
const toUpdate = this.getAllDependencies(session)
// lazy streams aren't evaluated
// on the call to `_set, so put it at the front
// of the list
// Need to cache getAllDependencies, from 5 ops/sec 0.29 ops/sec
// Iterator: 0.44 ops/sec
// Inlined: 0.48 ops/sec (1 test failing)
// -getNextValue 2.30 ops/sec (1 test failing)
// -skipped set 3.90 ops/sec (1 test failing)
// -copy dependent objects when adding to stack 4.61 ops/sec
// -fix tests, stack if / else 4.7 ops/sec
// -assume SKIP is never the input value 5.4 ops/sec
// just the result of the transform
// Use integer flags for state instead of strings 5.5 ops/sec
let stack : (Dependent & { stateUpdated?: boolean })[] = []
if ( isLazy.has(session.nextValue) ) {
toUpdate.unshift({
nextValue: session.nextValue(),
stack.unshift({
fn: I,
stream: session.stream
stream: session.stream,
stateUpdated: true,
})
} else {
stack = session.stream.immediateDependents.slice()
}
for( let i = 0; i < toUpdate.length; i++) {
let { stream:s, fn: f, nextValue } = toUpdate[i]
if ( s.parents.some( x => skipped.has(x) ) ) {
s.state = 'active'
skipped.add(s)
continue;
while (stack.length) {
let target = stack.shift()!
if ( !target.stateUpdated ){
target.stream.state = ChangingState
}
let { stream:s, fn: f } = target
let nextValue = target.stream.parents.length ? target.stream.parents.at(-1)!.value : session.nextValue
if (!s.isOpen()) {
skipped.add(s)
continue
}
s.state = 'active'
s.state = ActiveState

@@ -279,8 +277,2 @@ if ( isLazy.has(nextValue) ) {

// can happen here because of lazy {
if ( nextValue === Stream.SKIP ) {
s.state = 'active'
skipped.add(s)
continue;
}

@@ -290,6 +282,8 @@ let newValue = f(nextValue)

if ( newValue === Stream.SKIP ) {
skipped.add(s)
continue;
}
s.value = newValue
stack.push(...target.stream.immediateDependents)
}

@@ -299,3 +293,2 @@ }

isPropagating = false
skipped.clear()
}

@@ -323,6 +316,6 @@

default(x: T): InternalStream<T> {
if (this.state === 'pending' ) {
if (this.state === PendingState) {
const out = this.cloneWithoutDependendents()
out.value = x
out.state = 'active'
out.state = ActiveState
return out

@@ -352,4 +345,4 @@ } else {

if ( i > n ) {
if ( out.state !== 'ended' ) {
out.end.set(true)
if (out.state !== EndedState) {
out.doEnd()
}

@@ -394,3 +387,3 @@ return InternalStream.SKIP

const out = this.of<T>()
out.state = this.state === 'changing' ? 'active' : this.state
out.state = this.state === ChangingState ? ActiveState : this.state
out.value = this.value

@@ -400,3 +393,3 @@ out._readonly = this._readonly

out.parents.push(this)
this.dependents.push({ stream: out, fn })
this._registerDependent({ stream: out, fn })

@@ -410,8 +403,8 @@ return out

const out = this.of() as any as InternalStream<T>
out.state = this.state === 'changing' ? 'active' : this.state
out.state = this.state === ChangingState ? ActiveState : this.state
out.value = this.value
out._readonly = true
this.ends.push(() => out.doEnd())
let last;
const mapper = this.map( x => {
this.map( x => {
clearTimeout(id)

@@ -431,5 +424,6 @@ id = setTimeout(() => {

const out = this.of() as any as InternalStream<T>
out.state = this.state === 'changing' ? 'active' : this.state
out.state = this.state === ChangingState ? ActiveState : this.state
out.value = this.value
out._readonly = true
this.ends.push(() => out.doEnd())

@@ -447,3 +441,3 @@ function process(x: T){

const mapper = this.map(process)
this.map(process)

@@ -517,4 +511,4 @@ return out

static scan<T, U>(fn: (p: U, n: T) => U, seed: U, source: InternalStream<T> ): InternalStream<U> {
const sink = source.map( n => {
scan<U>(fn: (p: U, n: T) => U, seed: U ): InternalStream<U> {
const sink = this.map( n => {
const out = fn(seed, n)

@@ -528,2 +522,3 @@ if ( out !== InternalStream.SKIP ) {

}

@@ -543,3 +538,3 @@ static merge(streams: InternalStream[]): InternalStream<any[]> {

for (let s of streams) {
ready = s.state === 'active'
ready = s.state === ActiveState
if (!ready) {

@@ -568,3 +563,3 @@ break

for (let ss of streams) {
if (ss.state === 'pending') {
if (ss.state === PendingState) {
ready = false

@@ -576,4 +571,5 @@ break

if (ready) {
// if another dependency scheduled a session we can exit early
let sessionI = sessions.findIndex( x => x.stream === out )
if (sessionI !== -1) {
if ( sessionI > -1 ) {
// unschedule it, so it can be scheduled by the next dep

@@ -595,4 +591,4 @@ sessions.splice(sessionI, 1)

out.ends.push(mapper)
dependency.ends.push(out)
out.ends.push(() => mapper.doEnd())
dependency.ends.push(() => out.doEnd())
}

@@ -609,2 +605,53 @@

// todo
awaitLatest<U>(visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): InternalStream<PromiseSettledResult<U>> {
let lastEmit = 0
let abortController = new AbortController()
const out = new InternalStream<PromiseSettledResult<U>>;
out._readonly = true
this.ends.push(() => out.doEnd(), () => abortController.abort())
this.map( async x => {
let emit = ++lastEmit
lastEmit = emit
abortController.abort()
abortController = new AbortController()
try {
const value = await visitor(x, { signal: abortController.signal })
if (emit === lastEmit) {
out._set({ status: 'fulfilled', value })
lastEmit = 0
}
} catch (e) {
if (emit === lastEmit) {
out._set({ status: 'rejected', reason: e })
lastEmit = 0
}
}
})
return out
}
awaitEvery<U>(visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): InternalStream<PromiseSettledResult<U>> {
const out = new InternalStream<PromiseSettledResult<U>>;
out._readonly = true
const abortController = new AbortController()
this.ends.push(() => out.doEnd(), () => abortController.abort())
this.map( async x => {
const [result] = await Promise.allSettled([visitor(x, { signal: abortController.signal })])
out._set(result)
})
return out
}
// sin

@@ -642,3 +689,3 @@ observe( fn: (x:T) => any ){

off: () => {
mapper.end.set(true)
mapper.end._set(true)
}

@@ -695,14 +742,19 @@ }

scan<U>(fn: (acc: U, value: T) => U, acc: U): Stream<U>
cloneWithoutDependendents(): Stream<T>
removeReadOnly(): Stream<T>;
awaitLatest<U>( visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): Stream<PromiseSettledResult<U>>
awaitEvery<U>( visitor: (x: T, context: { signal: AbortSignal }) => Promise<U>): Stream<PromiseSettledResult<U>>
['zed/effect']: () => ZedEffect
} & SinObservable
export type ReadableStream<T> = CoreStream<T> & {
export type ReadableStream<T, GetT=T> = CoreStream<T> & {
/** Returns the value of the stream. */
get(): T
get(): GetT
}
export type WritableStream<T> = ReadableStream<T> & {
export type WritableStream<T, GetT=T> = ReadableStream<T, GetT> & {
/** Sets the value of the stream. */

@@ -717,3 +769,3 @@ set(value: T): void

<T>(x: T): WritableStream<T>
<T>(): WritableStream<T | undefined>
<T>(): WritableStream<T, T | undefined>
/** Creates a computed stream that reactively updates if any of its upstreams are updated. */

@@ -724,4 +776,2 @@

}): Stream<{ [I in keyof S]: S[I] }>
/** Creates a new streamCoStreamConstructor with the results of calling the function on every incoming stream with and accumulator and the incoming value. */
scan<T, U>(fn: (acc: U, value: T) => U, acc: U, stream: Stream<T>): Stream<U>

@@ -746,3 +796,4 @@ /** A special value that can be returned to stream callbacks to skip execution of downstreams. */

// the following is just hacks to make the Stream both a function
// and a class.
// and a class. I've benchmarked the difference and it is so minor (0.1 op/s)
// that we should keep it so we don't have to say `new Stream()` all the time
function StreamStatic<T>(...args: [T] | []){

@@ -749,0 +800,0 @@ return new InternalStream(...args)

{
"name": "chifley",
"version": "0.0.0-next.26",
"version": "0.0.0-next.27",
"description": "",

@@ -12,2 +12,3 @@ "type": "module",

"test": "node --import tsx --test test/*.ts",
"bench": "node --import tsx benchmarks/index.ts",
"dev": "node --watch --import tsx --test test/*.ts",

@@ -31,12 +32,17 @@ "build:bundle": "esbuild lib/index.ts --bundle --format=esm --sourcemap --allow-overwrite --outfile=./dist/chifley.esm.js",

"devDependencies": {
"@types/benchmark": "^2.1.5",
"@types/mithril": "^2.2.7",
"@types/node": "^20.11.17",
"@typescript-eslint/eslint-plugin": "^6.21.0",
"@typescript-eslint/parser": "^6.21.0",
"benchmark": "^2.1.4",
"esbuild": "^0.20.0",
"eslint": "^8.56.0",
"flyd": "^0.2.8",
"mithril": "^2.2.2",
"most": "^1.9.0",
"prettier": "3.2.5",
"tsx": "^4.7.0",
"typescript": "^5.0.4",
"@types/node": "^20.11.17"
},
"dependencies": {}
"typescript": "^5.0.4"
}
}

@@ -1,7 +0,565 @@

# chifley
# 🚂 chifley
> ⚠️ Warning, this library is very new, and while it is does have a lot of tests, we plan to iterate a bit on the internals near term so do not use this in production yet.
> If you still do decide to use it, make sure you pin to a specific version.
```
🚨🚨 🚨🚨
🚨 Warning, this library is very new and should be considered pre-release. Do not use in production. 🚨
🚨🚨 🚨🚨
```
A small, speedy reactive programming library with a focused API.
A small, speedy reactive programming library with an API designed to be _app ready_.
- 👶 Simple reactivity model
- 🤖 Easy to debug
- 📉 Low memory use
- 🏃‍♂️‍➡️ Fast
- 🤏 Tiny but with just enough features ⭐
## Quick Start
```typescript
import { Stream as c } from 'chifley'
const text = c('wow')
// perfectly valid email validation 🫠
const isEmail = text.map( text => text.includes('@') )
isEmail.get()
//=> true
try {
isEmail.set(true)
} catch (e) {
// Cannot write to a readonly stream!
console.error(e)
}
text.set('batman@example.com')
isEmail.get()
//=> true
```
## Why Chifley?
### 👶 Simple reactivity model
`chifley` is heavily inspired by stream libraries like [flyd]() and [mithril-stream](). All `chifley` streams are immediately live. They do not need to be subscribed to activate. All downstream subscriptions are themselves active streams that share the same cached parent/input state.
When you end a parent stream all dependent streams will also end. When a dependent stream ends it does not impact the parent's end state.
If you are coming from libraries like [Rx.js]() this may feel a little foreign but for UI programming specifically, this model is far more appropriate.
### 🤖 Easy to debug
Most stream libraries have a complex recursive propagation algorithm. If you are trying to debug your stream graph, traditional debuggers aren't very helpful as you will find yourself stepping through generic internal functions recursively.
So, instead, we find ourselves using custom tools like Rx marbles (and more commonly, just logs 🫠).
`chifley` was deliberately designed to be sustainable at scale with large complex UI stream graphs. When you update a stream, `chifley` traverses all the possible dependencies in a single while loop.
Additionally, any writes that occur during a propagation are deferred until the immediate impact of the previous write is complete.
This doesn't prevent infinite loops but it makes it very clear when you are entering and exiting a propagation caused by a single write. It also helps identify how many dependent transactions are spawned by a single write.
### 📉 Low memory use
`chifley` streams are instances of a simple class. Each instance has many useful methods available but thanks to prototypical inheritance, the memory for those methods are all shared. Each stream has a few instance specific properties but everything else is shared and stored once.
### 🏃‍♂️‍➡️ Fast
`chifley` isn't the fastest stream library, but it is faster than many of them and strikes a healthy balance between memory and CPU usage.
- Roughly 10% slower than `most`
- Roughly 45% faster than `flyd`
- Roughly 430% faster than `mithril-stream`
```
filter -> map -> reduce 1000000 integers
-------------------------------------------------------
most.js 8.62 op/s ± 8.24% (45 samples)
chifley 7.83 op/s ± 1.00% (42 samples)
flyd 5.39 op/s ± 2.45% (31 samples)
mithril-stream 1.79 op/s ± 0.96% (13 samples)
-------------------------------------------------------
```
- [From Github Actions](https://github.com/JAForbes/chifley/actions/runs/10732783258/job/29765146646)
- [Benchmarks Source](./benchmarks/index.ts)
> 🤓 Note these bench marks are very early and incomplete. Contributions / extensions would be very welcome.
### 🤏 Tiny but with just enough features ⭐
`chifley` originally started as a fork of [mithril-stream](), which in turn was a rewrite of [flyd](). The code has evolved greatly over many years, and this library is a formalization of a lot of things we have learnt over nearly a decade of working with these libraries.
We observed what methods and capabilities weer actually used, and we have removed everything we didn't. We even removed things we really liked in principle.
The truth is, when your stream library uses the [flyd]() model, you don't need as many operators or abstractions as you do in say [Rx.js]() because we aren't trying to prevent side effects by predefining operators for all possible scenarios.
So here's a short list of things we have included in this tiny library:
- 💪 14 commonly use stream operations: (`map`, `filter`, `reject`, `merge`, `scan`, `dropRepeats`, `throttle`, `default`, `skip`, `take`, `once`, `afterSilence`, `awaitLatest` and `awaitEvery`)
- 🕵️ Automatic dependency tracking (like S.js / Signal libraries)
- 🔒 Read-only streams (both at runtime and via Typescript)
- 💾 `toJSON` serialization
- 🔥 `Sin.js` observable protocol
- 🕰️ Predictable atomic clock update (_similar_ to S.js)
## API
### Creating a stream
You can initialize a stream with an initial value or without. If you opt of an initial value the type of the stream will be `T | undefined` otherwise it will be `T`.
```typescript
import { Stream as c } from 'chifley'
const name = c('Ringo')
const songs = c<{ track: string, duration: number }>()
```
### Reading stream values
The last emitted value of a stream is always cached and can be retrieved via `.get()`
```typescript
const name = c('Ringo')
c.get()
//=> 'Ringo'
```
If the stream had no initial value, the type will be `T | undefined`:
```typescript
const name = c<string>();
c.get().toUpperCase()
// 🚫 Not OK: Typescript Error
c.get()?.toUpperCase()
// 🟢 OK
```
### Writing stream values
You can change a stream value via `.set`
```typescript
name.set('George')
```
This will trigger dependent streams to also update.
If you need to access the previous value as part of the update you can use `.update`
```typescript
name.update( prev => prev == 'John' ? 'Paul' : 'George' )
```
### Subscribing to changes
You can subscribe to changes via `.map`. It will only emit when it has received a value. If you don't initialize a stream, don't worry, it won't emit with `undefined`.
```typescript
const name = c<string>();
name.map( x => console.log('The name changed: ', x))
// At this point nothing has logged.
name.set('Paul')
// Logs: 'The name changed: Paul'
```
### Ending a stream
You can end a stream and all its dependent streams by calling `.end.set(true)`. You can also check if a stream has ended via `.end.get()`
```typescript
name.end.set(true)
```
You may have noticed from this signature that `.end` is itself a stream. This is most useful if you'd like to be notified when a particular stream has been ended.
We recursively create the `.end` stream as you access it. Technically you can go `name.end.end.end.end.end` (but don't do that).
### Opting out of an update
Within a visitor function you can return `c.SKIP` which will be detected and skip propagation of that stream and its dependencies. The existing value of the stream will be retained and no dependencies will be updated.
```typescript
const year = c(2000)
const olympicYear = age.map( x => x % 4 === 0 ? x : c.SKIP )
// immediately logs: 'olympics! 2000'
olympicYear.map( x => {
console.log('olympics!', x)
})
year(1999)
// no log occurs
year(2024)
// logs: 'olympics! 2024'
```
This allows `.map` (and most other operators) to act like `.filter` or `.reject` which can be incredibly powerful.
> 🤓 Note unlike `mithril-stream` `SKIP` only works as a return value within a stream transform. You can write `SKIP` to a writable stream but it will be treated just like any other value. This removes two logical branches in the update path and speeds things up enough to warrant the sacrifice.
## Static methods and properties
### `merge`
If you have two or more streams and you would like to combine them in some way then `merge` is the way to go.
You provide a list of streams and it will provide a stream of a list of values.
```typescript
const organization_id = c<string>()
const project_id = c<string>()
const ids = c.merge([organization_id, project_id] as const)
const path =
ids.map(([organization_id, project_id]) => `/org/${organizaton_id}/project/${project_id}`)
path.map(
path => console.log('URL path changed: ', path)
)
```
`merge` will wait for all streams to have at least 1 value/emit before it emits its first update.
```typescript
// So far nothing has logged
organization_id.set('1')
// still nothing has logged
project_id.set('2')
// now this logs:
// 'URL path changed: /org/1/project/2
```
- `isStream`
Not very exciting but if you want to know if some arbitrary object is an instance from this library use this:
```typescript
isStream(null)
//=> false
isStream( c('hello') )
//=> true
```
## Instance operators
### `map`
When you have one input stream and you want to transform it and produce a new output stream use `.map`
```typescript
const name = c('George')
const isCoolBeatle = name.map( name => name === 'Ringo' )
```
If you just want to subscribe to a stream and not produce a new stream, just use `.map`, it is okay, it is no more or less expensive and the FP wizard cabal will let you off, just this once.
### `dropRepeats` / `dropRepeatsWith`
Do you have a stream that emits all the time, like say when a user does stuff? Do you want to only be notified when we have a new value? Then use `dropRepeats`
```typescript
const food = c<string>()
const foodOrder = food.dropRepeats()
foodOrder.map( x => console.log('Hurry up! I want my ', x + '!'))
food.set('pretzel') // Hurry up! I want my pretzel!
food.set('pretzel')
food.set('pretzel')
food.set('pretzel')
food.set('doughnut') // Hurry up! I want my doughnut!
```
`dropRepeats` using reference equality, if you'd like to change the definition of equality, use `dropRepeatsWith( ... )` with a custom equality function e.g. for deep equality you could use Ramda's `R.equal`
```typescript
const stateChanges = state.dropRepeatsWith(R.equal)
```
### `filter` / `reject`
For opting in and out of changes we can use `c.SKIP` but sometimes defining control flow with simple predicates is a lot clearer.
For these cases`filter` and `reject` can help clean up things a bit.
E.g. only eit a terminal if the line feed exactly equals 'exit':
```typescript
const linefeed = c<string>()
const exit = linefeed.filter( x => x == 'exit' )
exit.map( () => killTerminalSubProcess() )
```
Or only execute statements that don't start with a comment:
```typescript
const queryInput = c<string>()
const notAComment = linefeed.reject( x => x.startsWith('--') )
const completedStatement = notAComment.filter( x => x.includes(';') )
completedStatement.map( query => {
execSqlQuery(query)
})
```
### `default`
If you want to guarantee a stream has a value you can use `default`. Note this isn't used for coalescing undefined or null values, this is for handling streams that may not have emitted a value yet.
```typescript
const projects = c<Project[]>()
{
// 🚫 Type error and will result in an `undefined` `.length `value
const length = projects.map( x => x.length )
}
{
// 🟢 OK
const length = projects.default([]).map( x => x.length )
}
```
### `skip`
Sometimes we have a stream with an initial value but we don't want to be notified until we receive a new value. This is when we use `skip`:
```typescript
const form = c<InitialState>(init())
const changed = form.skip(1)
changed.map( x => console.log('Form changed', x))
```
### `take` / `once`
If we want to only get the first emit we can use `.once()`.
```typescript
const firstClick = click.once()
```
If you want the first `n` emits you can use `.take(n)`
```typescript
const firstThreeClicks = click.take(3)
```
In both cases the stream will end once it reaches the maximum number of events.
### `scan`
`scan` makes it convenient to produce a new value based on the combination of the latest and previous emit. `scan` is often used as a lightweight alternative to redux or for building reactive state machines.
Here is a simple counter example:
```typescript
type Action = 'inc' | 'dec'
const action = c<Action>()
const count = action.scan(0, (prev, action) => {
if (action === 'inc' ) {
return prev + 1
} else if (action == 'dec') {
return prev - 1
}
return prev
})
```
### `awaitLatest`
Unlike other stream/obserable libraries `chifley` doesn't coerce promises, or arrays (or anything else) into stream events. A `Promise` is as much a value as a `number` of a `string`.
But we do provide some basic operators for handling the two common cases of coercing promises into stream events.
`awaitLatest` allows you to preference the promise resolution of the latest emit even if a prior promise resolves first. This is most useful for network requests:
```typescript
const results =
searchInput
.awaitLatest((search) = fetch(`${endpoint}?q=${search}`).then( x => x.json()))
search('hello')
results.map( x => {
console.log(...x.status === 'fulfilled' ? ['🟢', x.value] : ['🚫', x.reason])
})
```
In the above example, the user may constantly be updating the value of `searchInput` but we are guaranteed to only get the network response corresponding to the latest searchInput.
Because `Promise`'s can reject we coerce the result to the `allSettled` API
```typescript
type Settled =
| { status: 'fulfilled', value: T }
| { status: 'rejected', reason: unknown }
```
Note this will help us with our control flow but it won't actually cancel the network request, we'll simply ignore the response of older requests.
For convenience though, `awaitLatest` creates an `AbortController` and passes the signal in as a second argument so you can optionally abort requests (or anything else that adheres to the `AbortController` API) when a new request is starting.
```typescript
const results =
searchInput
.awaitLatest((search, { signal }) = fetch(`${endpoint}?q=${search}`, { signal }).then( x => x.json()))
```
### `awaitEvery`
Sometimes though we want a corresponding emit for every single promise resolution
```typescript
const results =
searchInput
.awaitEvery((search, { signal }) = fetch(`${endpoint}?q=${search}`, { signal }).then( x => x.json()))
```
With `awaitEvery` for every input we will get a corresponding emit (providing the promise settles). We still provide the ability to cancel on abort signal but this will only fire if the stream is ended before a request can resolve.
Like `awaitLatest`, the resulting emit is of the `Settled<T>` type.
### `throttle`
Usually we don't want to fire a network request for every single corresponding user keystroke.
One strategy is to `throttle` the input, sending at most 1 request every `n` milliseconds.
```typescript
const searchInput = c('')
searchInput
.throttle(100)
.awaitLatest((search, { signal }) = fetch(`${endpoint}?q=${search}`, { signal }))
```
### `afterSilence`
Other times we only want to fire off a request when the user stops typing for a bit.
```typescript
const searchInput = c('')
searchInput
.afterSilence(100)
.awaitLatest((search, { signal }) = fetch(`${endpoint}?q=${search}`, { signal }))
```
## Advanced
### Error / Promise Rejection propagation
This library deliberately has no opinions on synchronous error handling. If you throw an error within a stream update it will simply bubble up the stack uninterrupted. However, for `awaitLatest` and `awaitEvery` we return the same type as the standard `Promise.allSettled` API.
### Stream End Behaviour
When you end a stream the last emitted value will still be cached so `.get()` will continue to work forever. But any upstream changes will not propagate to the ended stream, the value will stay the same, no operators will fire.
All children streams will also be ended (recursively).
If you end a writable stream and change the value via `.set` or `.update` no exception will be thrown and the value will change. If you subscribe to an ended stream via an operator no exceptions will be thrown but as the upstream dependency will never again emit, neither will the new dependent stream.
When using `.merge`, if any dependency ends the merged stream is ended. If another (unended) dependency emits, the merged stream won't emit.
### Tracking references and creations
This feature is designed to be used primarily by framework authors and not directly by `chifley` users in normal application code. It will seem a little verbose at first but it is designed to give the framework author precise control over how tracking contexts cascade.
To track stream references use the static `trackReferenced` function:
```typescript
const returnValue = c.trackReferenced(() => {
return someFunctionThatMightReferenceStreams()
}, new Set<c>)
```
Any synchronous calls to `.get()` while that function runs will result in that stream being added to the provided set `Set<c>`. You can then inspect the set after executing the function and use it to inject dependencies for some framework process, e.g. automatic rendering when a stream emits.
To track stream creation, you can do the same thing via `trackCreated`:
```typescript
const returnValue = c.trackCreated(() => {
return someFunctionThatMightCreateStreams()
}, new Set<c>)
```
This is useful if you'd like to automatically destroy streams created within a particular scope, such as during a render, within a component initialization etc.
To track both creations and references use `c.track`
```typescript
const returnValue = c.track(() => {
return someFunctionThatMightCreateStreams()
}, new Set<c>)
```
You can opt out of a tracking context using the `sample` function. There are `sampleCreated` and `sampleReferenced` variants (which do what you would expect).
```typescript
const returnValue = c.track(() => {
someFunctionThatMightCreateStreams()
return sample(() => {
return weWillNotTrackThisFunctionCall()
})
}, new Set<c>)
```
You can also `untrack` a stream that has already been tracked, this just removes it from whatever tracking context is currently active.
```typescript
const returnValue = c.track(() => {
someFunctionThatMightCreateStreams()
// even if `someFunctionThatMightCreateStreams` referenced
// `someStream` it won't appear in the set.
untrack(someStream);
}, new Set<c>)
```
From this feature set you can build S.js like computations, and any kind of conceivable reactive framework feature you'd like to build. If you'd like tracking contexts to cascade, just reuse the same set for multiple contexts. If you'd like each tracking context to have sole responsibility then use separate sets. If you'd like to use generator functions then you can start and end a tracking set for each invocation of `iterator.next(...)` - the sky is the limit.
## Prior Art
This library has a lot of influences and is greatly shaped by years of working with FP and FRP libraries in large complex applications.
Many thanks first of all to [Simon Friis Vindum](), this library is greatly influenced by his library [flyd]().
Much appreciaton to [Adam Haille]() for creating [S.js]() and to [Ryan Carniato]() for popularising it through his work with [Solid.js]. This library has been inspired by [S.js]() in its atomic update algorithm and with the automatic dependency tracking that we also see in `S.computation`.
This library originally started as a port of [mithril-stream]() originally a rewrite of [flyd]() by [Leo Horie]() and later rewritten by [Rasmus Porsager](). Thanks again goes to Rasmus for the sin observable protocol which this library has adopted.
Despite dropping support for `fantasyland` and many other functional programming operators such as `lift`, I wouldn't be as involved with FRP as I am if it weren't for the `fantasyland`, `ramda` and `sanctuary` communities.

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc