
Product
Introducing Repository Access Permissions and Custom Roles
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.
@mastra/pg
Advanced tools
Affected versions:
Postgres provider for Mastra - includes both vector and db storage capabilities
PostgreSQL implementation for Mastra, providing both vector similarity search (using pgvector) and general storage capabilities with connection pooling and transaction support.
npm install @mastra/pg
PgVector supports multiple connection methods:
1. Connection String (Recommended)
import { PgVector } from '@mastra/pg';
const vectorStore = new PgVector({
connectionString: 'postgresql://user:pass@localhost:5432/db',
});
2. Host/Port/Database Configuration
const vectorStore = new PgVector({
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password',
});
Note: PgVector also supports advanced configurations like Google Cloud SQL Connector via
pg.ClientConfig.
const vectorStore = new PgVector({
connectionString: 'postgresql://user:pass@localhost:5432/db',
schemaName: 'custom_schema', // Use custom schema (default: public)
max: 30, // Max pool connections (default: 20)
idleTimeoutMillis: 60000, // Idle timeout (default: 30000)
pgPoolOptions: {
// Additional pg pool options
connectionTimeoutMillis: 5000,
allowExitOnIdle: true,
},
});
// Create a new table with vector support
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'cosine',
// Optional: Configure index type and parameters
indexConfig: {
type: 'hnsw', // 'ivfflat' (default), 'hnsw', or 'flat'
hnsw: {
m: 16, // Number of connections per layer (default: 8)
efConstruction: 64 // Size of dynamic list (default: 32)
}
}
});
// Add vectors
const ids = await vectorStore.upsert({
indexName: 'my_vectors',
vectors: [[0.1, 0.2, ...], [0.3, 0.4, ...]],
metadata: [{ text: 'doc1' }, { text: 'doc2' }],
});
// Query vectors
const results = await vectorStore.query({
indexName: 'my_vectors',
queryVector: [0.1, 0.2, ...],
topK: 10, // topK
filter: { text: 'doc1' }, // filter
includeVector: false, // includeVector
minScore: 0.5, // minScore
});
// Clean up
await vectorStore.disconnect();
import { PostgresStore } from '@mastra/pg';
const store = new PostgresStore({
host: 'localhost',
port: 5432,
database: 'mastra',
user: 'postgres',
password: 'postgres',
});
// Create a thread
await store.saveThread({
thread: {
id: 'thread-123',
resourceId: 'resource-456',
title: 'My Thread',
metadata: { key: 'value' },
createdAt: new Date(),
},
});
// Add messages to thread
await store.saveMessages({
messages: [
{
id: 'msg-789',
threadId: 'thread-123',
role: 'user',
content: { content: 'Hello' },
resourceId: 'resource-456',
createdAt: new Date(),
},
],
});
// Query threads and messages
const savedThread = await store.getThreadById({ threadId: 'thread-123' });
const messages = await store.listMessages({ threadId: 'thread-123' });
Both PgVector and PostgresStore support multiple connection methods:
Connection String
{
connectionString: 'postgresql://user:pass@localhost:5432/db';
}
Host/Port/Database
{
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password'
}
Advanced: Also supports
pg.ClientConfigfor use cases like Google Cloud SQL Connector with IAM authentication.
schemaName: Custom PostgreSQL schema (default: public)ssl: Enable SSL or provide custom SSL options (true | false | ConnectionOptions)max: Maximum pool connections (default: 20)idleTimeoutMillis: Idle connection timeout (default: 30000)pgPoolOptions: Additional pg pool options (PgVector only)The following filter operators are supported for metadata queries:
$eq, $ne, $gt, $gte, $lt, $lte$and, $or$in, $nin$regex, $likeExample filter:
{
$and: [{ age: { $gt: 25 } }, { tags: { $in: ['tag1', 'tag2'] } }];
}
pgvector supports three index types, each with different performance characteristics:
IVFFlat groups vectors into clusters for efficient searching:
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'cosine',
indexConfig: {
type: 'ivfflat',
ivf: {
lists: 1000, // Number of clusters (default: auto-calculated as sqrt(rows) * 2)
},
},
});
HNSW builds a graph structure for extremely fast searches:
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'dotproduct', // Recommended for normalized embeddings (OpenAI, etc.)
indexConfig: {
type: 'hnsw',
hnsw: {
m: 16, // Connections per layer (default: 8, range: 2-100)
efConstruction: 64, // Dynamic list size (default: 32, range: 4-1000)
},
},
});
Tuning HNSW:
m: Better accuracy, more memory (16-32 for high accuracy)efConstruction: Better index quality, slower builds (64-200 for quality)Uses sequential scan for 100% accuracy:
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'cosine',
indexConfig: {
type: 'flat',
},
});
Choose the appropriate metric for your embeddings:
cosine (default): Angular similarity, good for text embeddingseuclidean: L2 distance, for unnormalized embeddingsdotproduct: Dot product, optimal for normalized embeddings (OpenAI, Cohere)The system automatically detects configuration changes and only rebuilds indexes when necessary, preventing the performance issues from unnecessary recreations.
Important behaviors:
indexConfig is provided, existing indexes are preserved as-isindexConfig is provided, indexes are only rebuilt if the configuration differscreateIndex({indexName, dimension, metric?, indexConfig?, buildIndex?}): Create a new table with vector supportbuildIndex({indexName, metric?, indexConfig?}): Build or rebuild vector indexupsert({indexName, vectors, metadata?, ids?}): Add or update vectorsquery({indexName, queryVector, topK?, filter?, includeVector?, minScore?}): Search for similar vectorsupdateVector({ indexName, id?, filter?, update }): Update a single vector by ID or metadata filterdeleteVector({ indexName, id }): Delete a single vector by IDdeleteVectors({ indexName, ids?, filter? }): Delete multiple vectors by IDs or metadata filterlistIndexes(): List all vector-enabled tablesdescribeIndex(indexName): Get table statistics and index configurationdeleteIndex(indexName): Delete a tabletruncateIndex(indexName): Remove all data from a tabledisconnect(): Close all database connectionssaveThread({ thread }): Create or update a threadgetThreadById({ threadId }): Get a thread by IDupdateThread({ id, title, metadata }): Update thread title and/or metadatadeleteThread({ threadId }): Delete a thread and its messageslistThreadsByResourceId({ resourceId, offset, limit, orderBy? }): List paginated threads for a resourcesaveMessages({ messages }): Save multiple messages in a transactionlistMessages({ threadId, resourceId?, perPage?, page?, orderBy?, filter? }): Get messages for a thread with paginationlistMessagesById({ messageIds }): Get specific messages by their IDsupdateMessages({ messages }): Update existing messagesdeleteMessages(messageIds): Delete specific messagesgetResourceById({ resourceId }): Get a resource by IDsaveResource({ resource }): Create or save a resourceupdateResource({ resourceId, workingMemory }): Update resource working memorypersistWorkflowSnapshot({ workflowName, runId, snapshot }): Save workflow stateloadWorkflowSnapshot({ workflowName, runId }): Load workflow statelistWorkflowRuns({ workflowName, pagination }): List workflow runs with paginationgetWorkflowRunById({ workflowName, runId }): Get a specific workflow runupdateWorkflowState({ workflowName, runId, state }): Update workflow stateupdateWorkflowResults({ workflowName, runId, results }): Update workflow resultscreateSpan(span): Create a single AI spanbatchCreateSpans({ records }): Create multiple AI spansupdateSpan({ traceId, spanId, updates }): Update an AI spanbatchUpdateSpans({ updates }): Update multiple AI spansgetTrace(traceId): Get an trace by IDgetTracesPaginated({ ...filters, pagination }): Get paginated traces with filteringbatchDeleteTraces({ traceIds }): Delete multiple tracesgetScoreById({ id }): Get a score by IDsaveScore(score): Save an evaluation scorelistScoresByScorerId({ scorerId, pagination }): List scores by scorer with paginationlistScoresByRunId({ runId, pagination }): List scores by run with paginationlistScoresByEntityId({ entityId, entityType, pagination }): List scores by entity with paginationlistScoresBySpan({ traceId, spanId, pagination }): List scores by span with paginationThe PostgreSQL store provides comprehensive index management capabilities to optimize query performance.
PostgreSQL storage automatically creates composite indexes during initialization for common query patterns:
mastra_threads_resourceid_createdat_idx: (resourceId, createdAt DESC)mastra_messages_thread_id_createdat_idx: (thread_id, createdAt DESC)mastra_traces_name_starttime_idx: (name, startTime DESC)mastra_evals_agent_name_created_at_idx: (agent_name, created_at DESC)These indexes significantly improve performance for filtered queries with sorting.
Create additional indexes to optimize specific query patterns:
// Basic index for common queries
await store.createIndex({
name: 'idx_threads_resource',
table: 'mastra_threads',
columns: ['resourceId'],
});
// Composite index with sort order for filtering + sorting
await store.createIndex({
name: 'idx_messages_composite',
table: 'mastra_messages',
columns: ['thread_id', 'createdAt DESC'],
});
// GIN index for JSONB columns (fast JSON queries)
await store.createIndex({
name: 'idx_traces_attributes',
table: 'mastra_traces',
columns: ['attributes'],
method: 'gin',
});
For more advanced use cases, you can also use:
unique: true for unique constraintswhere: 'condition' for partial indexesmethod: 'brin' for time-series datastorage: { fillfactor: 90 } for update-heavy tablesconcurrent: true for non-blocking creation (default)// List all indexes
const allIndexes = await store.listIndexes();
// List indexes for specific table
const threadIndexes = await store.listIndexes('mastra_threads');
// Get detailed statistics for an index
const stats = await store.describeIndex('idx_threads_resource');
console.log(stats);
// {
// name: 'idx_threads_resource',
// table: 'mastra_threads',
// columns: ['resourceId', 'createdAt'],
// unique: false,
// size: '128 KB',
// definition: 'CREATE INDEX idx_threads_resource...',
// method: 'btree',
// scans: 1542, // Number of index scans
// tuples_read: 45230, // Tuples read via index
// tuples_fetched: 12050 // Tuples fetched via index
// }
// Drop an index
await store.dropIndex('idx_threads_status');
| Index Type | Best For | Storage | Speed |
|---|---|---|---|
| btree (default) | Range queries, sorting, general purpose | Moderate | Fast |
| hash | Equality comparisons only | Small | Very fast for = |
| gin | JSONB, arrays, full-text search | Large | Fast for contains |
| gist | Geometric data, full-text search | Moderate | Fast for nearest-neighbor |
| spgist | Non-balanced data, text patterns | Small | Fast for specific patterns |
| brin | Large tables with natural ordering | Very small | Fast for ranges |
name (required): Index nametable (required): Table namecolumns (required): Array of column names (can include DESC/ASC)unique: Create unique index (default: false)concurrent: Non-blocking index creation (default: true)where: Partial index conditionmethod: Index type ('btree' | 'hash' | 'gin' | 'gist' | 'spgist' | 'brin')opclass: Operator class for GIN/GIST indexesstorage: Storage parameters (e.g., { fillfactor: 90 })tablespace: Tablespace name for index placement// Check index usage statistics
const stats = await store.describeIndex('idx_threads_resource');
// Identify unused indexes
if (stats.scans === 0) {
console.log(`Index ${stats.name} is unused - consider removing`);
await store.dropIndex(stats.name);
}
// Monitor index efficiency
const efficiency = stats.tuples_fetched / stats.tuples_read;
if (efficiency < 0.5) {
console.log(`Index ${stats.name} has low efficiency: ${efficiency}`);
}
FAQs
Postgres provider for Mastra - includes both vector and db storage capabilities
The npm package @mastra/pg receives a total of 318,926 weekly downloads. As such, @mastra/pg popularity was classified as popular.
We found that @mastra/pg demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 6 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.

Product
Socket MCP now lets AI assistants review org alerts, investigate threats using the Socket threat feed, and inspect package files in addition to dependency scoring.

Product
Socket Firewall blocks malicious VS Code and Open VSX extensions before install, protecting developers from compromised editor marketplaces.