
Product
Introducing Repository Access Permissions and Custom Roles
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.
@kuindji/reactive
Advanced tools
A JavaScript/TypeScript utility library for building reactive applications with events, actions, stores, and React hooks
A JavaScript/TypeScript utility library for building reactive applications with events, actions, stores, and React hooks.
npm install @kuindji/reactive
# or
yarn add @kuindji/reactive
# or
bun add @kuindji/reactive
Event emitter with three distinct modes:
import { createEvent } from "@kuindji/reactive";
// Create a typed event
// When creating event, provide a listener signature generic to make full event api typed.
const userLoginEvent = createEvent<(userId: string, timestamp: Date) => void>();
// Add listeners
userLoginEvent.addListener((userId, timestamp) => {
console.log(`User ${userId} logged in at ${timestamp}`);
});
// Trigger the event
userLoginEvent.trigger("user123", new Date());
// all settings are optional
const event = createEvent({
async: boolean, // Call listeners asynchronously; default false
limit: number, // Event can be triggered 10 times; default 0 (unlimited)
autoTrigger: boolean, // Auto-trigger new listeners with last args; default false
maxListeners: number, // Maximum number of listeners; default: 1000
// default: undefined
filter: (args: TriggerArgs[], listener: ListenerOptions): boolean => {
// Custom filter logic
// args: arguments passed to trigger()
// listener: an object with listener options and handler itself
return true;
},
});
// all settings are optional
event.addListener(handler, {
limit: number, // Call this listener 5 times; default 0 (unlimited)
first: boolean, // Add to beginning of listener list; default false
alwaysFirst: boolean, // Always call before other listeners; default false
alwaysLast: boolean, // Always call after other listeners; default false
start: number, // Start calling after 3rd trigger; default 0
context: object, // Listener context (this); default undefined
tags: string[], // Listener tags for filtering; default undefined
async: booleantrue, // Call this listener asynchronously; default false
extraData: object, // Custom data will be passed to filter()
});
Collector allows you to gather data from listeners.
type ApplicationData = {
user: {
username?: string;
role?: string;
loggedIn: boolean;
};
notifications: {
type: string;
message: string;
}[];
};
const event = createEvent<() => Partial<ApplicationData>>();
event.addListener(() => {
return {
user: {
username: "john",
role: "admin",
loggedIn: true,
},
};
});
event.addListener(() => {
return {
notifications: [
{
type: "chat",
message: "You've got a new message!",
},
],
};
});
const applicationData = event.merge();
Data flows through listeners in a pipeline, each transforming the data
const event = createEvent((value: number) => number);
event.addListener(value => value + value);
event.addListener(value => value * value);
const value = event.pipe(1); // value = 4
addListener(listener, options?) - Add event listener
on(), listen(), subscribe()removeListener(listener, context?, tag?) - Remove specific listener
un(), off(), remove(), unsubscribe()updateListenerOptions(listener, context?, nextOptions?) - Update a registered listener's soft options (limit, start, async, tags, extraData, alwaysFirst/alwaysLast) in place, preserving its called/count counters. Matches the listener by listener + context. Returns true if a listener was found. context is an identity field and is not updated here (resubscribe to change it); first is insertion-time only and ignored. Lowering limit to at/below the current called removes the listener immediately.hasListener(listener?, context?, tag?) - Check if listener exists
has()removeAllListeners(tag?) - Remove all listeners (optionally by tag)trigger(...args) - Trigger the event
emit(), dispatch()setOptions(options) - Update event options in place. Accepts any EventOptions field (async, limit, autoTrigger, filter, filterContext, maxListeners). Does not reset the internal triggered count.first(...args) - Get first listener's resultlast(...args) - Get last listener's resultall(...args) - Get all listener resultsmerge(...args) - Merge all results (for arrays/objects)concat(...args) - Concatenate all resultsfirstNonEmpty(...args) - Get first non-empty resultuntilTrue(...args) - Stop when listener returns trueuntilFalse(...args) - Stop when listener returns falsepipe(...args) - Pipe data through listenersresolveFirst(...args) - Async version of first()resolveLast(...args) - Async version of last()resolveAll(...args) - Async version of all()resolveMerge(...args) - Async version of merge()resolveConcat(...args) - Async version of concat()resolveFirstNonEmpty(...args) - Async version of firstNonEmpty()resolvePipe(...args) - Async version of pipe()promise(options?: ListenerOptions) - Get a promise that resolves on next triggersuspend(withQueue?: boolean) - Suspend event triggering; When withQueue=true, all trigger calls will be queued and replayed after resume()resume() - Resume event triggeringreset() - Reset event statewithTags(tags: string[], callback: () => CallbackResponse) => CallbackResponse - Execute callback with specific tagsEventBus provides centralized event management for applications. It allows you to define multiple named events and manage them together with features like event source integration, proxying, and interception.
import { createEventBus } from "@kuindji/reactive";
// Define event signatures
type AppEvents = {
userLogin: (userId: string) => void;
userLogout: (userId: string) => void;
dataUpdate: (data: any) => void;
};
// Create event bus
const eventBus = createEventBus<AppEvents>();
// Add listeners
eventBus.on("userLogin", (userId) => {
console.log(`User ${userId} logged in`);
});
// Trigger events
eventBus.trigger("userLogin", "user123");
Relay allows you to forward events from one EventBus to another EventBus.
import { createEventBus, ProxyType } from "@kuindji/reactive";
// Create event buses
const mainBus = createEventBus<{
userLogin: (userId: string) => void;
userLogout: (userId: string) => void;
dataUpdate: (data: any) => void;
}>();
const externalBus = createEventBus<{
login: (userId: string) => void;
logout: (userId: string) => void;
update: (data: any) => void;
}>();
// Relay events from external bus to main bus
mainBus.relay({
eventSource: externalBus,
remoteEventName: "login",
localEventName: "userLogin",
});
mainBus.relay({
eventSource: externalBus,
remoteEventName: "logout",
localEventName: "userLogout",
});
mainBus.relay({
eventSource: externalBus,
remoteEventName: "update",
localEventName: "dataUpdate",
});
// Listen to events on main bus
mainBus.on("userLogin", (userId) => {
console.log(`User ${userId} logged in via relay`);
});
// Trigger on external bus - will be relayed to main bus
externalBus.trigger("login", "user123");
You can use prefixes to organize relayed events:
// Relay all events with a prefix
mainBus.relay({
eventSource: externalBus,
remoteEventName: "*", // Relay all events
localEventNamePrefix: "external-",
});
// Now external events will be available as:
// external-login, external-logout, external-update
mainBus.on("external-login", (userId) => {
console.log(`External login: ${userId}`);
});
You can control how relayed events handle return values:
// Relay with pipe proxy type - data flows through listeners
mainBus.relay({
eventSource: externalBus,
remoteEventName: "processData",
localEventName: "transformData",
proxyType: ProxyType.PIPE,
});
// now when you call remote "processData" event
// it will passed through mainBus's "transformData" pipeline and returned to externalBus.
const transformedData = externalBus.first("processData", { some: data });
// Relay with merge proxy type - merge results from all listeners
mainBus.relay({
eventSource: externalBus,
remoteEventName: "collectData",
localEventName: "aggregateData",
proxyType: ProxyType.MERGE,
});
// Relay with async resolve proxy type
mainBus.relay({
eventSource: externalBus,
remoteEventName: "asyncOperation",
localEventName: "handleAsync",
proxyType: ProxyType.RESOLVE_ALL,
});
Stop relaying events:
// Stop relaying specific event
mainBus.unrelay({
eventSource: externalBus,
remoteEventName: "login",
localEventName: "userLogin",
});
// Stop relaying all events
mainBus.unrelay({
eventSource: externalBus,
remoteEventName: "*",
localEventNamePrefix: "external-",
});
Event sources allow you to integrate with external event systems that follow the EventSource interface. This is useful for WebSocket connections, Node.js EventEmitter, or custom event systems.
import { createEventBus } from "@kuindji/reactive";
import { EventEmitter } from "events";
// Create a Node.js EventEmitter as an event source
const nodeEmitter = new EventEmitter();
// Define the event source interface
const eventSource = {
name: "node-emitter",
on: (name: string, fn: (...args: any[]) => void) => {
nodeEmitter.on(name, fn);
},
un: (name: string, fn: (...args: any[]) => void) => {
nodeEmitter.off(name, fn);
},
accepts: (name: string) => true, // Accept all events
proxyType: ProxyType.TRIGGER,
};
// Create event bus
const eventBus = createEventBus<{
userAction: (action: string, userId: string) => void;
systemEvent: (event: string, data: any) => void;
}>();
// Add event source
eventBus.addEventSource(eventSource);
// Listen to events
eventBus.on("userAction", (action, userId) => {
console.log(`User ${userId} performed action: ${action}`);
});
// Emit on the external source - will be relayed to event bus
nodeEmitter.emit("userAction", "login", "user123");
// Create WebSocket event source
const createWebSocketEventSource = (ws: WebSocket) => ({
name: "websocket",
on: (name: string, fn: (...args: any[]) => void) => {
ws.addEventListener("message", (event) => {
const data = JSON.parse(event.data);
if (data.type === name) {
fn(data.payload);
}
});
},
un: (name: string, fn: (...args: any[]) => void) => {
ws.removeEventListener("message", fn);
},
accepts: (name: string) => true,
proxyType: ProxyType.TRIGGER,
});
// Usage
const ws = new WebSocket("ws://localhost:8080");
const wsEventSource = createWebSocketEventSource(ws);
const eventBus = createEventBus<{
chatMessage: (message: string, userId: string) => void;
userJoined: (userId: string) => void;
userLeft: (userId: string) => void;
}>();
eventBus.addEventSource(wsEventSource);
// Listen to WebSocket events
eventBus.on("chatMessage", (message, userId) => {
console.log(`${userId}: ${message}`);
});
// Create custom event source with filtering
const createCustomEventSource = () => {
const listeners = new Map<string, Set<(...args: any[]) => void>>();
return {
name: "custom-source",
on: (name: string, fn: (...args: any[]) => void) => {
if (!listeners.has(name)) {
listeners.set(name, new Set());
}
listeners.get(name)!.add(fn);
},
un: (name: string, fn: (...args: any[]) => void) => {
listeners.get(name)?.delete(fn);
},
accepts: (name: string) => name.startsWith("app-"), // Only accept app-* events
proxyType: ProxyType.TRIGGER,
// Custom method to trigger events
trigger: (name: string, ...args: any[]) => {
listeners.get(name)?.forEach(fn => fn(...args));
},
};
};
const customSource = createCustomEventSource();
const eventBus = createEventBus<{
appStart: () => void;
appStop: () => void;
}>();
eventBus.addEventSource(customSource);
// Listen to custom events
eventBus.on("appStart", () => {
console.log("Application started");
});
// Trigger on custom source
customSource.trigger("appStart");
addListener(name, handler, options?) - Add listener to specific event
on(), listen(), subscribe()once(name, handler, options?) - Add one-time listenerremoveListener(name, handler, context?, tag?) - Remove listener
un(), off(), remove(), unsubscribe()updateListenerOptions(name, handler, context?, nextOptions?) - Update a registered listener's soft options in place (see Event's updateListenerOptions). Returns false if the event does not exist.trigger(name, ...args) - Trigger specific event
emit(), dispatch()get(name) - Get event instance by nameadd(name, options?) - Add new event to bussetOptions(options?) - Update bus options. Present per-event entries in eventOptions are applied to already-created events via event.setOptions, and future events use the latest stored options. A removed event-name entry leaves the existing event unchanged.first(name, ...args) - Get first listener resultlast(name, ...args) - Get last listener resultall(name, ...args) - Get all listener resultsmerge(name, ...args) - Merge all resultsconcat(name, ...args) - Concatenate all resultsfirstNonEmpty(name, ...args) - Get first non-empty resultuntilTrue(name, ...args) - Stop when listener returns trueuntilFalse(name, ...args) - Stop when listener returns falsepipe(name, ...args) - Pipe data through listenersresolveFirst(name, ...args) - Async version of first()resolveLast(name, ...args) - Async version of last()resolveAll(name, ...args) - Async version of all()resolveMerge(name, ...args) - Async version of merge()resolveConcat(name, ...args) - Async version of concat()resolveFirstNonEmpty(name, ...args) - Async version of firstNonEmpty()resolvePipe(name, ...args) - Async version of pipe()intercept(fn) - Intercept all event triggersstopIntercepting() - Stop interceptionrelay(options) - Relay events from external sourcesunrelay(options) - Stop relaying eventsaddEventSource(source) - Add external event sourceremoveEventSource(source) - Remove event sourcesuspendAll(withQueue?) - Suspend all eventsresumeAll() - Resume all eventsreset() - Reset all eventswithTags(tags, callback) - Execute callback with specific tagsActions are async operations with built-in error handling and response tracking. They provide a structured way to handle async operations and their results.
import { createAction } from "@kuindji/reactive";
// Define an async action
const fetchUserAction = createAction(async (userId: string) => {
const response = await fetch(`/api/users/${userId}`);
if (!response.ok) {
throw new Error("User not found");
}
return response.json();
});
// Add listeners for success/error
fetchUserAction.addListener(({ response, error, args }) => {
if (error) {
console.error("Action failed:", error);
}
else {
console.log("User data:", response);
}
});
// Invoke the action
const result = await fetchUserAction.invoke("user123");
invoke(...args) - Execute the actionsetAction(fn) - Replace the action function in place. Subsequent invoke() calls use the new function; all response, before-action and error listeners are preserved (they live in separate events). The replacement must keep a compatible signature.addListener(handler, options?) - Add response listener
on(), listen(), subscribe()removeListener(handler, context?, tag?) - Remove listener
un(), off(), remove(), unsubscribe()updateListenerOptions(handler, context?, nextOptions?) - Update a response listener's soft options in place (see Event's updateListenerOptions)removeAllListeners(tag?) - Remove all listenersaddErrorListener(handler, context?) - Add error listenerremoveErrorListener(handler, context?) - Remove error listenerremoveAllErrorListeners(tag?) - Remove all error listenerspromise(options?) - Get promise for next invocationbeforeActionPromise(options?) - Get promise for the next before-action callerrorPromise(options?) - Get promise for the next action erroraddBeforeActionListener(handler, options?) - Add listener that runs before invocationremoveBeforeActionListener(handler, context?, tag?) - Remove before-action listenerremoveAllBeforeActionListeners(tag?) - Remove all before-action listenersActionMap provides a way to organize multiple actions with centralized error handling. It's useful for managing related actions in a structured way.
import { createActionMap } from "@kuindji/reactive";
// Define actions
const actions = {
fetchUser: async (userId: string) => {
const response = await fetch(`/api/users/${userId}`);
return response.json();
},
updateUser: async (userId: string, data: any) => {
const response = await fetch(`/api/users/${userId}`, {
method: "PUT",
body: JSON.stringify(data),
});
return response.json();
},
deleteUser: async (userId: string) => {
await fetch(`/api/users/${userId}`, { method: "DELETE" });
},
};
// type ErrorResponse = {
// args: TriggerArgs[],
// error: Error,
// name?: ActionName,
// type?: "action"
// }
// Create action map with error handling
const actionMap = createActionMap(actions, (errorResponse: ErrorResponse) => {
console.error("Action failed:", errorResponse);
});
// Use actions
const user = await actionMap.fetchUser.invoke("user123");
await actionMap.updateUser.invoke("user123", { name: "John" });
The ActionMap returns an object where each key corresponds to an action with the same API as individual actions created with createAction().
ActionBus provides centralized action management similar to EventBus but for actions. It allows you to dynamically add and manage actions with built-in error handling.
import { createActionBus } from "@kuindji/reactive";
type Actions = {
fetchUser: (userId: string) => Promise<UserData>;
updateUser: (userId: string, data: UserData) => Promise<UserData>;
};
// Create action bus
const actionBus = createActionBus<Actions>();
// Add actions
actionBus.add("fetchUser", async (userId) => {
const response = await fetch(`/api/users/${userId}`);
return response.json();
});
actionBus.add("updateUser", async (userId, data) => {
const response = await fetch(`/api/users/${userId}`, {
method: "PUT",
body: JSON.stringify(data),
});
return response.json();
});
// Add listeners
actionBus.on("fetchUser", ({ response, error }) => {
if (error) {
console.error("Failed to fetch user:", error);
}
else {
console.log("User fetched:", response);
}
});
// Invoke actions
const user = await actionBus.invoke("fetchUser", "user123");
add(name, action) - Add action to bus (no-op if it already exists)replace(name, action) - Replace an existing action's function in place via setAction (preserving its listeners and the bus error-forwarding listener); adds it if the name is newremoveAction(name) - Remove an action from the bus (named removeAction because remove is an alias for removeListener). Afterwards invoke/on/un for that name throw Action <name> not found.has(name) - Check if action existsget(name) - Get action by nameinvoke(name, ...args) - Invoke action by nameaddListener(name, handler, options?) - Add listener to action
on(), listen(), subscribe()once(name, handler, options?) - Add one-time listenerremoveListener(name, handler, context?, tag?) - Remove listener
un(), off(), remove(), unsubscribe()updateListenerOptions(name, handler, context?, nextOptions?) - Update a response listener's soft options in place (see Event's updateListenerOptions)addErrorListener(handler) - Add global error listenerremoveErrorListener(handler) - Remove global error listenerhasErrorListeners() - Check if error listeners existStore provides reactive state management with change tracking, validation, and event-driven updates. It's designed for managing application state with full TypeScript support.
import { createStore } from "@kuindji/reactive";
// Define store schema
type UserStore = {
id: string;
name: string;
email: string;
isLoggedIn: boolean;
};
// Create store with initial data
const userStore = createStore<UserStore>({
id: "",
name: "",
email: "",
isLoggedIn: false,
});
// Listen to changes
userStore.onChange("name", (newName, oldName) => {
console.log(`Name changed from ${oldName} to ${newName}`);
});
// Update store
userStore.set("name", "John Doe");
userStore.set("isLoggedIn", true);
// Get values
const name = userStore.get("name");
const userData = userStore.get([ "name", "email" ]); // { name: string, email: string }
set(key, value) - Set single propertyset(data) - Set multiple propertiesasyncSet(key, value) - Async set single propertyasyncSet(data) - Async set multiple propertiesget(key) - Get single propertyget(keys) - Get multiple propertiesisEmpty() - Check if store is emptygetData() - Get all store datareset() - Clear store dataonChange(key, handler, options?) - Listen to property changesremoveOnChange(key, handler, context?, tag?) - Remove a change listenerupdateOnChangeOptions(key, handler, context?, nextOptions?) - Update a change listener's soft options in place (see Event's updateListenerOptions)pipe(key, handler) - Add data transformation pipelinecontrol(event, handler) - Control store eventsbeforeChange - Fired before property changes (can prevent change)change - Fired after properties changereset - Fired when store is reseterror - Fired when errors occurbatch(fn) - Batch multiple changesThe library provides comprehensive React hooks for integrating reactive functionality into React components with automatic cleanup and error handling.
import { useListenToEvent } from "@kuindji/reactive/react";
const event = createEvent(() => void);
function FirstComponent() {
// Use in component
const handleClick = () => {
event.trigger();
};
return (
<div>
<button onClick={handleClick}>Trigger event</button>
</div>
);
}
function AnotherComponent() {
const handler = useCallback(
() => {
console.log("something happened in first component")
},
[]
);
useListenToEvent(event, handler);
}
Create and use event
const event = useEvent(
listenerOptions?: ListenerOptions,
listener?: Function,
errorListener?: (errorResponse) => void
);
Listen to event
useListenToEvent(
event: Event,
listener?: Function,
errorListener?: (errorResponse) => void
)
Create and use event bus
type EventBus = {
eventName: Function
}
type EventBusOptions = {
eventName: EventOptions
}
const eventBus = useEventBus<EventBus>(
options?: EventBusOptions,
allEventsListener?: Function,
errorListener?: (errorResponse) => void
);
Listen to bus events
useListenToEventBus(
eventBus: EventBus,
eventName: string,
listener: Function,
listenerOptions?: ListenerOptions,
errorListener?: (errorResponse) => void
)
Create and use action
const action = useAction(
action: Function,
listener?: Function,
errorListener?: (errorResponse) => void
);
Listen to action events
useListenToAction(
action: Action,
listener?: Function,
errorListener?: (errorResponse) => void
)
Create and use action map
const actionMap = useActionMap(
actions: {
actionName: Function
},
errorListener?: (errorResponse) => void
)
Create and use action bus
type ActionsMap = {
actionName: FunctionSignature
}
const actionBus = useActionBus<ActionsMap>(
initialActions: Partial<ActionsMap>,
errorListener?: (errorResponse) => void
)
Create and use data store
type PropTypes = {
propName: number
}
const store = useStore<PropTypes>(
initialData: Partial<PropTypes>,
config: {
onChange: {
[K in keyof PropTypes]: (
value: PropTypes[K],
prevValue: PropTypes[K] | undefined
) => void
};
pipes: {
[K in keyof PropTypes]: (
value: PropTypes[K]
) => PropTypes[K]
};
control: {
beforeChange: (name, value) => boolean;
change: (names) => void;
error: (errorResponse) => void;
reset: () => void;
}
}
);
Use store value as state
const [ value: TypeOfValue, setValue: (value: TypeOfValue) => void ] = useStoreState(
store: Store,
key: KeyInStore
)
Listen to value changes
useListenToStoreChanges(
store: Store,
key: KeyInStore,
listener: (value: TypeOfValue, previousValue?: TypeOfValue) => void
listenerOptions?: ListenerOptions
)
Hook inputs are reconciled on every render using semantic comparison, so you
can safely pass inline objects (e.g. { tags: [tag] }) without object identity
forcing a resubscribe:
useListenToEvent/useListenToEventBus/useListenToActionBus/useListenToStoreChanges) are compared field by field (tags is an order-insensitive set). A semantically equal object is a no-op. A changed soft option updates the live listener in place, preserving its called/count counters. Changing context (an identity field) resubscribes using the old context.useEvent event options and useEventBus options are reconciled via setOptions instead of being ignored (and useEventBus no longer throws when options change).useAction/useActionBus/useActionMap action functions are replaced in place via setAction (compared by reference), preserving all listeners; useActionBus also adds/removes actions as its map changes. The useActionMap key set is fixed by its type contract — a runtime key-set change throws.useStore config (onChange/pipes/control) is reconciled by category + key (functions compared by reference); only handlers added by the hook are touched. initialData is seed-only — it initializes the store once and later changes are intentionally ignored (live data is owned by set/useStoreState).ErrorBoundary provides catch-all error listener for actions and events. Without ErrorBoundary (or with empty "listener") and without error listener passed directly to them they will re-throw errors from listeners.
import { ErrorBoundary } from "@kuindji/reactive/react";
function App() {
return (
<ErrorBoundary
listener={(errorResponse) => {
console.error("Reactive error:", errorResponse);
// Send to error reporting service
}}>
<UserComponent />
</ErrorBoundary>
);
}
children - React children to renderlistener - Error listener function (optional)ISC License
Contributions are welcome! Please feel free to submit a Pull Request.
If you encounter any issues or have questions, please open an issue on GitHub.
FAQs
A JavaScript/TypeScript utility library for building reactive applications with events, actions, stores, and React hooks
We found that @kuindji/reactive demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.

Product
Socket MCP now lets AI assistants review org alerts, investigate threats using the Socket threat feed, and inspect package files in addition to dependency scoring.

Product
Socket Firewall blocks malicious VS Code and Open VSX extensions before install, protecting developers from compromised editor marketplaces.