
Product
Socket for Jira Is Now Available
Socket for Jira lets teams turn alerts into Jira tickets with manual creation, automated ticketing rules, and two-way sync.
@oh-my-pi/pi-agent-core
Advanced tools
General-purpose agent with transport abstraction, state management, and attachment support
Stateful agent with tool execution and event streaming. Built on @oh-my-pi/pi-ai.
npm install @oh-my-pi/pi-agent
import { Agent } from "@oh-my-pi/pi-agent";
import { getModel } from "@oh-my-pi/pi-ai";
const agent = new Agent({
initialState: {
systemPrompt: "You are a helpful assistant.",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
},
});
agent.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
// Stream just the new text chunk
process.stdout.write(event.assistantMessageEvent.delta);
}
});
await agent.prompt("Hello!");
The agent works with AgentMessage, a flexible type that can include:
user, assistant, toolResult)LLMs only understand user, assistant, and toolResult. The convertToLlm function bridges this gap by filtering and transforming messages before each LLM call.
AgentMessage[] → transformContext() → AgentMessage[] → convertToLlm() → Message[] → LLM
(optional) (required)
The agent emits events for UI updates. Understanding the event sequence helps build responsive interfaces.
When you call prompt("Hello"):
prompt("Hello")
├─ agent_start
├─ turn_start
├─ message_start { message: userMessage } // Your prompt
├─ message_end { message: userMessage }
├─ message_start { message: assistantMessage } // LLM starts responding
├─ message_update { message: partial... } // Streaming chunks
├─ message_update { message: partial... }
├─ message_end { message: assistantMessage } // Complete response
├─ turn_end { message, toolResults: [] }
└─ agent_end { messages: [...] }
If the assistant calls tools, the loop continues:
prompt("Read config.json")
├─ agent_start
├─ turn_start
├─ message_start/end { userMessage }
├─ message_start { assistantMessage with toolCall }
├─ message_update...
├─ message_end { assistantMessage }
├─ tool_execution_start { toolCallId, toolName, args }
├─ tool_execution_update { partialResult } // If tool streams
├─ tool_execution_end { toolCallId, result }
├─ message_start/end { toolResultMessage }
├─ turn_end { message, toolResults: [toolResult] }
│
├─ turn_start // Next turn
├─ message_start { assistantMessage } // LLM responds to tool result
├─ message_update...
├─ message_end
├─ turn_end
└─ agent_end
continue() resumes from existing context without adding a new message. Use it for retries after errors.
// After an error, retry from current state
await agent.continue();
The last message in context must be user or toolResult (not assistant).
| Event | Description |
|---|---|
agent_start | Agent begins processing |
agent_end | Agent completes with all new messages |
turn_start | New turn begins (one LLM call + tool executions) |
turn_end | Turn completes with assistant message and tool results |
message_start | Any message begins (user, assistant, toolResult) |
message_update | Assistant only. Includes assistantMessageEvent with delta |
message_end | Message completes |
tool_execution_start | Tool begins |
tool_execution_update | Tool streams progress |
tool_execution_end | Tool completes |
const agent = new Agent({
// Initial state
initialState: {
systemPrompt: string,
model: Model,
thinkingLevel: "off" | "minimal" | "low" | "medium" | "high" | "xhigh",
tools: AgentTool<any>[],
messages: AgentMessage[],
},
// Convert AgentMessage[] to LLM Message[] (required for custom message types)
convertToLlm: (messages) => messages.filter(...),
// Transform context before convertToLlm (for pruning, compaction)
transformContext: async (messages, signal) => pruneOldMessages(messages),
// How to handle queued messages: "one-at-a-time" (default) or "all"
queueMode: "one-at-a-time",
// Custom stream function (for proxy backends)
streamFn: streamProxy,
// Dynamic API key resolution (for expiring OAuth tokens)
getApiKey: async (provider) => refreshToken(),
// Tool execution context (late-bound UI/session access)
getToolContext: () => ({ /* app-defined */ }),
});
interface AgentState {
systemPrompt: string;
model: Model;
thinkingLevel: ThinkingLevel;
tools: AgentTool<any>[];
messages: AgentMessage[];
isStreaming: boolean;
streamMessage: AgentMessage | null; // Current partial during streaming
pendingToolCalls: Set<string>;
error?: string;
}
Access via agent.state. During streaming, streamMessage contains the partial assistant message.
// Text prompt
await agent.prompt("Hello");
// With images
await agent.prompt("What's in this image?", [{ type: "image", data: base64Data, mimeType: "image/jpeg" }]);
// AgentMessage directly
await agent.prompt({ role: "user", content: "Hello", timestamp: Date.now() });
// Continue from current context (last message must be user or toolResult)
await agent.continue();
agent.setSystemPrompt("New prompt");
agent.setModel(getModel("openai", "gpt-4o"));
agent.setThinkingLevel("medium");
agent.setTools([myTool]);
agent.replaceMessages(newMessages);
agent.appendMessage(message);
agent.clearMessages();
agent.reset(); // Clear everything
agent.abort(); // Cancel current operation
await agent.waitForIdle(); // Wait for completion
const unsubscribe = agent.subscribe((event) => {
console.log(event.type);
});
unsubscribe();
Queue messages to inject during tool execution (steering) or after the agent would otherwise stop (follow-up):
agent.setSteeringMode("one-at-a-time");
agent.setInterruptMode("immediate");
// While agent is running tools
agent.steer({
role: "user",
content: "Stop! Do this instead.",
timestamp: Date.now(),
});
// Queue a follow-up to run after the current turn completes
agent.followUp({
role: "user",
content: "After that, summarize the changes.",
timestamp: Date.now(),
});
Steering messages are checked after each tool call by default. Set interruptMode to "wait" to defer
steering until the current turn completes.
Extend AgentMessage via declaration merging:
declare module "@oh-my-pi/pi-agent" {
interface CustomAgentMessages {
notification: { role: "notification"; text: string; timestamp: number };
}
}
// Now valid
const msg: AgentMessage = { role: "notification", text: "Info", timestamp: Date.now() };
Handle custom types in convertToLlm:
const agent = new Agent({
convertToLlm: (messages) =>
messages.flatMap((m) => {
if (m.role === "notification") return []; // Filter out
return [m];
}),
});
Define tools using AgentTool:
import { Type } from "@sinclair/typebox";
const readFileTool: AgentTool = {
name: "read_file",
label: "Read File", // For UI display
description: "Read a file's contents",
parameters: Type.Object({
path: Type.String({ description: "File path" }),
}),
execute: async (toolCallId, params, signal, onUpdate, context) => {
const content = await fs.readFile(params.path, "utf-8");
// Optional: stream progress
onUpdate?.({ content: [{ type: "text", text: "Reading..." }], details: {} });
return {
content: [{ type: "text", text: content }],
details: { path: params.path, size: content.length },
};
},
};
agent.setTools([readFileTool]);
Throw an error when a tool fails. Do not return error messages as content.
execute: async (toolCallId, params, signal, onUpdate) => {
if (!fs.existsSync(params.path)) {
throw new Error(`File not found: ${params.path}`);
}
// Return content only on success
return { content: [{ type: "text", text: "..." }] };
};
Thrown errors are caught by the agent and reported to the LLM as tool errors with isError: true.
For browser apps that proxy through a backend:
import { Agent, streamProxy } from "@oh-my-pi/pi-agent";
const agent = new Agent({
streamFn: (model, context, options) =>
streamProxy(model, context, {
...options,
authToken: "...",
proxyUrl: "https://your-server.com",
}),
});
For direct control without the Agent class:
import { agentLoop, agentLoopContinue } from "@oh-my-pi/pi-agent";
const context: AgentContext = {
systemPrompt: "You are helpful.",
messages: [],
tools: [],
};
const config: AgentLoopConfig = {
model: getModel("openai", "gpt-4o"),
convertToLlm: (msgs) => msgs.filter((m) => ["user", "assistant", "toolResult"].includes(m.role)),
};
const userMessage = { role: "user", content: "Hello", timestamp: Date.now() };
for await (const event of agentLoop([userMessage], context, config)) {
console.log(event.type);
}
// Continue from existing context
for await (const event of agentLoopContinue(context, config)) {
console.log(event.type);
}
MIT
FAQs
General-purpose agent with transport abstraction, state management, and attachment support
We found that @oh-my-pi/pi-agent-core demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket for Jira lets teams turn alerts into Jira tickets with manual creation, automated ticketing rules, and two-way sync.

Company News
Socket won two 2026 Reppy Awards from RepVue, ranking in the top 5% of all sales orgs. AE Alexandra Lister shares what it's like to grow a sales career here.

Security News
NIST will stop enriching most CVEs under a new risk-based model, narrowing the NVD's scope as vulnerability submissions continue to surge.