@multiplayer-app/ai-agent-node
A Node.js library for building AI agent backends with support for multi-provider AI models, real-time communication, distributed processing, and chat management.
Features
- Multi-Provider AI Support: OpenAI, Anthropic, Google, and OpenRouter
- Streaming Chat Processing: Real-time message streaming with tool calling support
- Distributed Architecture: Kafka and Redis integration for scalable agent processing
- Real-time Communication: Socket.io integration for live updates
- Agent Process Management: Lifecycle management with event-driven architecture
- Artifact Storage: S3 integration for storing generated artifacts
- Context Management: Intelligent context limiting and attachment handling
- Built-in Tools: Form value proposal
Installation
npm install @multiplayer-app/ai-agent-node ai
Prerequisites
Infrastructure Setup
The library requires several infrastructure services to function properly. These services can be started using Docker Compose or configured separately.
Starting Services
Use the startServices() function to initialize all required services:
import { startServices } from '@multiplayer-app/ai-agent-node';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
await startServices(chatRepository, messageRepository);
This function initializes:
- Kafka Service: Connects to Kafka brokers and subscribes to topics for chat title generation and background processing
- Agent Store: Initializes the in-memory agent process store for managing active agent conversations
- Model Store: Fetches and caches available AI models from configured providers
- S3 Bucket: Ensures the configured S3 bucket exists for artifact storage
Stopping Services
Use the stopServices() function to gracefully shut down connections:
import { stopServices } from '@multiplayer-app/ai-agent-node';
await stopServices();
This disconnects:
- Redis connections (including pub/sub clients)
- Kafka consumer and producer connections
Infrastructure Services
Kafka
Purpose: Kafka is used for asynchronous, distributed processing of agent tasks:
- Chat Title Generation: Generates chat titles asynchronously after the first message to avoid blocking the main request
- Background Processing: Handles long-running agent tasks that don't need immediate response
- Scalability: Allows multiple service instances to process tasks in parallel
Topics:
chat-title-generation (default): Processes chat title generation requests
background-chat-processing (default): Handles background agent processing tasks
Redis
Purpose: Redis provides:
- Pub/Sub for Socket.IO: Enables real-time message broadcasting across multiple service instances
- Agent State Management: Stores temporary agent process state and event listeners
- Caching: Can be used for caching model information and other frequently accessed data
Connection: Redis is used for both direct key-value operations and pub/sub channels for Socket.IO adapter.
Docker Compose Setup
The project includes a docker-compose.yml file that sets up all required services:
docker-compose up -d
docker-compose down
This starts:
- MongoDB (port 27017): Database for chat and message storage
- Redis (port 6379): Pub/sub and caching
- Zookeeper (port 2181): Required for Kafka
- Kafka (port 9092): Message broker
- MinIO (ports 9000, 9001): S3-compatible object storage for artifacts
Environment Variables
Configure the library using the following environment variables:
Redis Configuration
REDIS_URL=redis://localhost:6379
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_DATABASE=0
AI Provider API Keys
At least one provider API key is required:
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_GENERATIVE_AI_API_KEY=...
OPENROUTER_API_KEY=sk-or-...
AI Configuration
MAX_CONTEXT_MESSAGES=10
DEFAULT_MODEL=openai/gpt-4o
S3 Configuration
S3_ATTACHMENTS_BUCKET=ai-agent-attachments
Kafka Configuration
KAFKA_GROUP_ID=ai-agent-node
KAFKA_CHAT_TITLE_GENERATION_TOPIC=chat-title-generation
KAFKA_BACKGROUND_CHAT_PROCESSING_TOPIC=background-chat-processing
File Processing (Optional)
ENABLE_TEXT_EXTRACTION=true
MAX_EXTRACTED_TEXT_SIZE=51200
Usage Examples
Basic Chat Processing
The ChatProcessor class handles all chat operations. The most common use case is creating a streaming message:
import { ChatProcessor } from '@multiplayer-app/ai-agent-node';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
import { ArtifactStore } from '@multiplayer-app/ai-agent-node';
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
const artifactStore = new ArtifactStore();
const chatProcessor = new ChatProcessor(
chatRepository,
messageRepository,
artifactStore
);
const stream = await chatProcessor.createMessageStream({
content: 'Hello, how can you help me?',
contextKey: 'support',
userId: 'user-123'
});
stream.on('data', (chunk) => {
const data = chunk.toString();
});
Stream Chunk Format
The stream emits Server-Sent Events (SSE) with the following chunk types:
{
type: 'chat',
chatId: 'chat-123',
chat: { }
}
{
type: 'message',
message: { }
}
{
type: 'error',
error: 'Error message'
}
[DONE]
Listing and Getting Chats
const chats = await chatProcessor.listChats({
contextKey: 'support',
userId: 'user-123',
limit: 20,
sortField: 'updatedAt',
sortOrder: SortOrder.Desc
});
const chat = await chatProcessor.getChat('chat-id');
await chatProcessor.deleteChat('chat-id');
Agent Setup
Agents are configured using a JSON configuration file that defines agents, their tools, and which contexts they're available in.
Config JSON Format
{
"models": ["gpt-4o", "claude-3-5-sonnet", "openai/gpt-4"],
"agents": [
{
"name": "support-agent",
"description": "Customer support agent",
"systemPrompt": "You are a helpful customer support agent...",
"defaultModel": "gpt-4o",
"temperature": 0.7,
"maxTokens": 2000,
"contextKeys": ["support", "general"],
"tools": [
{
"type": "api-tool",
"data": {
"title": "Get Order Status",
"description": "Retrieves the status of a customer order",
"method": "GET",
"url": "https://api.example.com/orders/{orderId}",
"headersToPass": ["Authorization"],
"needsApproval": false
}
},
{
"type": "web-search",
"data": {
"title": "web-search"
}
}
]
}
]
}
Loading Config
import { ConfigStore } from '@multiplayer-app/ai-agent-node';
import fs from 'fs';
const configStore = ConfigStore.getInstance();
const rawConfig = JSON.parse(fs.readFileSync('agent-config.json', 'utf-8'));
configStore.loadConfig(rawConfig);
API Tool Configuration
API tools allow agents to make HTTP requests to external services:
{
"type": "api-tool",
"data": {
"title": "Tool Name",
"description": "What this tool does",
"method": "GET|POST|PUT|DELETE|PATCH",
"url": "https://api.example.com/endpoint",
"headersToPass": ["Authorization", "X-Custom-Header"],
"body": {
"type": "object",
"properties": {
"param1": { "type": "string" },
"param2": { "type": "number" }
},
"required": ["param1"]
},
"queryParams": {
"type": "object",
"properties": {
"filter": { "type": "string" }
}
},
"needsApproval": true
}
}
Key Fields:
needsApproval: If true, the tool execution requires user approval before running
headersToPass: List of request headers to forward from the original request
body/queryParams: JSON Schema definitions for request parameters (converted to Zod schemas)
Web Search Tool
Web search tools enable agents to search the internet:
{
"type": "web-search",
"data": {
"title": "web-search"
}
}
In-Code Agent Support
You can also add agents programmatically:
import { ConfigStore } from '@multiplayer-app/ai-agent-node';
import { AgentToolType } from '@multiplayer-app/ai-agent-types';
const configStore = ConfigStore.getInstance();
configStore.addAgent(['support', 'sales'], {
name: 'custom-agent',
description: 'Custom agent added in code',
systemPrompt: 'You are a helpful assistant...',
defaultModel: 'gpt-4o',
temperature: 0.7,
tools: [
{
type: AgentToolType.WEB_SEARCH,
data: { title: 'web-search' }
}
]
});
configStore.addToolToAllAgents({
type: AgentToolType.LOCAL_FUNCTION,
data: {
title: 'custom-tool',
needsApproval: false,
}
});
Tool Approval Flow
When a tool has needsApproval: true, the agent will request approval before execution:
- Agent generates a tool call with
requiresConfirmation: true and an approvalId
- Chat status changes to
AgentStatus.WaitingForUserAction
- User can approve or deny the tool call
- If approved, the tool executes; if denied, execution is skipped
const stream = await chatProcessor.createMessageStream({
chatId: 'chat-id',
messageId: 'message-id',
approvalId: 'approval-id',
approved: true,
userResponse: 'User approved'
});
const chat = await chatProcessor.getChat('chat-id');
await chatProcessor.streamMessage(chat, {
chatId: 'chat-id',
messageId: 'message-id',
approvalId: 'approval-id',
approved: true,
userResponse: 'User approved'
});
User Attachments
Messages can include attachments for context:
import { AgentAttachmentType } from '@multiplayer-app/ai-agent-types';
const stream = await chatProcessor.createMessageStream({
content: 'Analyze this document',
contextKey: 'support',
attachments: [
{
id: 'att-1',
type: AgentAttachmentType.File,
name: 'document.pdf',
url: 'https://s3.../document.pdf',
mimeType: 'application/pdf',
size: 1024000
},
{
id: 'att-2',
type: AgentAttachmentType.Context,
name: 'Page Context',
metadata: {
kind: 'webSnippet',
source: { url: 'https://example.com' },
selectedText: 'Selected text from page'
}
}
]
});
Database Access
The library uses repository interfaces from @multiplayer-app/ai-agent-db for database operations. You can use any implementation that conforms to these interfaces.
MongoDB Example
import mongo from '@multiplayer-app/ai-agent-mongo';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
await mongo.connect();
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
const chatProcessor = new ChatProcessor(
chatRepository,
messageRepository,
artifactStore
);
Generic Repositories
The library works with any repository implementation that matches the interfaces:
import type {
AgentChatRepository,
AgentMessageRepository
} from '@multiplayer-app/ai-agent-db';
class CustomChatRepository implements AgentChatRepository {
}
class CustomMessageRepository implements AgentMessageRepository {
}
const chatProcessor = new ChatProcessor(
new CustomChatRepository(),
new CustomMessageRepository(),
artifactStore
);
Repository Methods
AgentChatRepository:
create(data): Create a new chat
findById(id): Find chat by ID
update(id, data): Update chat
delete(id): Delete chat
findWithMessages(filter, options): Find chats with aggregated messages
AgentMessageRepository:
create(data): Create a new message
findById(id): Find message by ID
findByChatId(chatId): Find all messages for a chat
update(id, data): Update message
delete(id): Delete message
Advanced Usage
Agent Process Management
Monitor and control agent processes:
import { agentStore, AgentProcessEventType } from '@multiplayer-app/ai-agent-node';
agentStore.addListener('chat-id', (event) => {
switch (event.type) {
case AgentProcessEventType.Update:
console.log('Agent updated:', event.data);
break;
case AgentProcessEventType.Finished:
console.log('Agent finished:', event.data);
break;
case AgentProcessEventType.Error:
console.error('Agent error:', event.data);
break;
}
});
agentStore.stopAgentProcess('chat-id');
Built-in Tools
The library includes built-in tools:
import {
createProposeFormValuesTool,
createGenerateChartTool
} from '@multiplayer-app/ai-agent-node';
configStore.addToolToAllAgents(createProposeFormValuesTool());
configStore.addToolToAllAgents(createGenerateChartTool());
Context Limiting
The library automatically limits context to prevent token limit issues:
import { helpers } from '@multiplayer-app/ai-agent-node';
const limitedMessages = helpers.ContextLimiter.limitContext(messages, {
maxMessages: 10,
keepFirstUserMessage: true,
keepSystemMessages: true
});
Artifact Management
Store and retrieve artifacts generated by agents:
const artifacts = chatProcessor.listArtifacts('chat-id');
import { ArtifactStore } from '@multiplayer-app/ai-agent-node';
const artifactStore = new ArtifactStore();
const artifacts = artifactStore.listArtifacts('chat-id');