@agentick/kernel
Low-level execution primitives for Agentick. Provides procedures, async context management, schema validation, logging, telemetry, and event streaming.
Note: Most applications should use @agentick/core instead. This package is the foundation that core builds upon.
Installation
pnpm add @agentick/kernel
Core Primitives
Procedures
Procedures wrap any async function, generator, or async iterable with middleware, execution tracking, and streaming:
import { createProcedure } from "@agentick/kernel";
import { z } from "zod";
const fetchUser = createProcedure(
{
name: "fetchUser",
schema: z.object({ id: z.string() }),
},
async ({ id }) => {
return await db.users.find(id);
},
);
const user = await fetchUser({ id: "123" });
const withLogging = fetchUser.use(async (args, ctx, next) => {
console.log("Fetching user:", args.id);
const result = await next(args);
console.log("Found:", result);
return result;
});
Async Generators
Procedures wrapping generators get automatic context preservation, stream:chunk events, abort handling, and iterator cleanup:
const tokenStream = createProcedure(
{ name: "tokens", handleFactory: false },
async function* (prompt: string) {
const response = await fetchSSE(prompt);
for await (const chunk of response) {
yield chunk.text;
}
},
);
const iter = await tokenStream("tell me a joke");
for await (const token of iter) {
process.stdout.write(token);
}
Any function returning an AsyncIterable gets the same treatment — it doesn't have to be async function*.
ExecutionHandle
Procedures are callable directly. You can also use .exec():
const handle = await fetchUser({ id: "123" });
const handle = await fetchUser.exec({ id: "123" });
console.log(handle.status);
handle.abort();
const user = await handle.result;
Composition
import { pipe, compose } from "@agentick/kernel";
const pipeline = pipe(validate, transform, save);
const composed = compose(save, transform, validate);
Context (AsyncLocalStorage)
Request-scoped state that flows through async operations automatically:
import { Context } from "@agentick/kernel";
Context.run({ user: { id: "123" }, metadata: { traceId: "abc" } }, async () => {
const ctx = Context.get();
console.log(ctx.user?.id);
await someAsyncOperation();
});
await Context.fork({ metadata: { branch: "A" } }, async () => {
});
Global Event Subscribers
const unsubscribe = Context.subscribeGlobal((event) => {
console.log(event.type, event.payload);
});
Context.emit("custom:event", { data: "value" });
Schema Validation
Unified handling for Zod 3, Zod 4, and Standard Schema:
import { detectSchemaType, toJSONSchema, validateSchema, parseSchema } from "@agentick/kernel";
const type = detectSchemaType(schema);
const jsonSchema = toJSONSchema(myZodSchema);
const result = validateSchema(schema, data);
if (result.success) {
console.log(result.data);
} else {
console.log(result.issues);
}
const data = parseSchema(schema, input);
Logging
Structured logging with automatic context injection:
import { Logger } from "@agentick/kernel";
Logger.configure({
level: "info",
transport: "pretty",
});
const log = Logger.get();
log.info("Processing request", { userId: "123" });
const dbLog = Logger.for("database");
dbLog.debug("Query executed", { query, duration });
Telemetry
Spans and metrics for observability:
import { Telemetry } from "@agentick/kernel";
const trace = Telemetry.startTrace("handle-request");
const span = Telemetry.startSpan("fetch-user");
span.setAttribute("userId", "123");
try {
} catch (error) {
span.recordError(error);
} finally {
span.end();
}
const requestCounter = Telemetry.getCounter("requests_total");
requestCounter.add(1, { method: "POST" });
const latencyHistogram = Telemetry.getHistogram("request_duration_ms");
latencyHistogram.record(42, { endpoint: "/api/users" });
Channels
Bidirectional communication for real-time updates:
import { Channel } from "@agentick/kernel";
const channel = new Channel();
channel.on("message", (payload) => {
console.log("Received:", payload);
});
channel.emit("message", { text: "Hello" });
const response = await channel.request("getUser", { id: "123" }, 5000);
channel.broadcast("notification", { message: "System update" });
EventBuffer
Type-safe event streaming with replay:
import { EventBuffer } from "@agentick/kernel";
type MyEvent =
| { type: "start"; id: string }
| { type: "progress"; percent: number }
| { type: "complete"; result: unknown };
const buffer = new EventBuffer<MyEvent>();
buffer.on("progress", (event) => {
console.log(`${event.percent}% complete`);
});
buffer.onReplay("start", (event) => {
console.log("Started:", event.id);
});
for await (const event of buffer) {
console.log(event.type);
}
buffer.push({ type: "start", id: "123" });
buffer.push({ type: "progress", percent: 50 });
buffer.push({ type: "complete", result: { success: true } });
buffer.close();
Guards
Gate procedure execution with access control checks:
import { createGuard, GuardError } from "@agentick/kernel";
const adminOnly = createGuard(
(envelope) => envelope.context.user?.roles?.includes("admin") ?? false,
);
const roleGuard = createGuard(
{
name: "role-guard",
guardType: "role",
reason: (envelope) => `User ${envelope.context.user?.id} lacks required role`,
},
(envelope) => envelope.context.user?.roles?.includes("admin") ?? false,
);
const customGuard = createGuard({ name: "acl-guard" }, (envelope) => {
if (!hasPermission(envelope.context.user)) {
throw GuardError.role(["admin", "moderator"]);
}
return true;
});
const secured = fetchUser.use(adminOnly);
Guards are middleware — they compose with .use() like any other middleware but are purpose-built for allow/deny decisions.
GuardError
import { GuardError, isGuardError } from "@agentick/kernel";
GuardError.role(["admin"]);
GuardError.denied("Custom reason");
if (isGuardError(error)) {
error.code;
error.guardType;
error.details;
}
Key Patterns
Middleware Pipelines
import { createPipeline } from "@agentick/kernel";
const authPipeline = createPipeline([
async (args, ctx, next) => {
if (!ctx.user) throw new Error("Unauthorized");
return next(args);
},
async (args, ctx, next) => {
ctx.metadata.startTime = Date.now();
return next(args);
},
]);
const securedFetch = fetchUser.use(authPipeline);
Immutable Composition
All procedure methods return new instances:
const base = createProcedure({ name: "base" }, handler);
const withTimeout = base.withTimeout(5000);
const withContext = withTimeout.withContext({ tenant: "acme" });
console.log(base === withTimeout);
API Reference
Procedures
createProcedure(options, handler) | Create a procedure (function or generator) |
generatorProcedure(options, fn) | Create a generator procedure with this |
createHook(options, handler) | Create a hook procedure |
pipe(...procedures) | Chain left-to-right |
compose(...procedures) | Chain right-to-left |
createPipeline(middleware) | Bundle middleware |
Guards
createGuard(fn) | Create guard from predicate |
createGuard(config, fn) | Create guard with config |
GuardError | Access denied error class |
GuardError.role(roles) | Role-based denial factory |
GuardError.denied(reason) | Custom denial factory |
isGuardError(error) | Type guard for GuardError |
Context
Context.create(overrides?) | Create root context |
Context.run(context, fn) | Run within context |
Context.fork(overrides, fn) | Fork for parallel execution |
Context.get() / Context.tryGet() | Access current context |
Context.emit(type, payload) | Emit event |
Context.subscribeGlobal(handler) | Subscribe to all events |
Schema
detectSchemaType(schema) | Identify schema type |
toJSONSchema(schema) | Convert to JSON Schema |
validateSchema(schema, value) | Validate with result object |
parseSchema(schema, value) | Parse or throw |
Logging & Telemetry
Logger.configure(options) | Configure logging |
Logger.get() | Get context-aware logger |
Logger.for(name) | Get scoped logger |
Telemetry.startSpan(name) | Create span |
Telemetry.getCounter(name) | Create counter metric |
Telemetry.getHistogram(name) | Create histogram metric |
Streaming
Channel | Pub/sub with request/response |
EventBuffer<T> | Type-safe event streaming with replay |
mapStream(s,fn) | Transform items in an async stream |
tapStream(s,fn) | Side effects without modifying stream |
mergeStreams(s) | Merge multiple streams (race) |
isAsyncIterable | Type guard for async iterables |
License
MIT