
Product
Socket Now Protects the Chrome Extension Ecosystem
Socket is launching experimental protection for Chrome extensions, scanning for malware and risky permissions to prevent silent supply chain attacks.
@gftdcojp/ksqldb-orm
Advanced tools
ksqldb-orm - Server-Side TypeScript ORM for ksqlDB with enterprise security extensions
Enterprise-Grade TypeScript ksqlDB ORM - Server-Side Only
This package provides high-quality ksqlDB ORM core features and works in conjunction with other packages in the @gftdcojp ecosystem.
@gftdcojp/ksqldb-orm is the most mature package in the ecosystem:
This package provides the core ksqlDB ORM functionality and integrates with other packages in the @gftdcojp ecosystem:
@gftdcojp/ksqldb-orm
- Core ksqlDB ORM features (including enterprise security)@gftdcojp/cli
- Command-line interface tool (separate package)@gftdcojp/ksqldb-orm-confluent
- Confluent Platform integration (future implementation)packages/
βββ @gftdcojp:ksqldb-orm/ # Core ORM features (security)
βββ @gftdcojp:cli/ # CLI tool for type generation (separate repository)
βββ confluent/ # Confluent Platform integration (planned)
# Future installation method (Q1 2025)
pnpm add @gftdcojp/ksqldb-orm
pnpm install
pnpm build
# Install separately
pnpm add @gftdcojp/cli
This package provides a built-in resilience mechanism to solve a common production issue: "Cannot determine which host contains the required partitions to serve the pull query"
import { initializeResilientKsqlDbClient, executePullQueryResilient } from '@gftdcojp/ksqldb-orm';
// Drop-in replacement with automatic resilience
await initializeResilientKsqlDbClient({
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
}
});
// Same API, with automatic handling of partition rebalancing!
const data = await executePullQueryResilient('SELECT * FROM USERS_TABLE;');
console.log('Query successful:', data);
await initializeResilientKsqlDbClient({
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
},
resilience: {
// Retry settings
retries: {
maxRetries: 5, // Maximum number of retries
baseDelay: 1500, // Base delay between retries (ms)
backoffStrategy: 'exponential', // 'exponential' | 'linear' | 'fixed'
jitter: 0.1 // Adds randomness to prevent thundering herd
},
// Circuit breaker to prevent cascading failures
circuitBreaker: {
enabled: true, // Enable circuit breaker
failureThreshold: 5, // Open circuit after 5 failures
openTimeout: 30000, // Wait 30 seconds before retrying
successThreshold: 2 // Close circuit after 2 successes
},
// Partition-aware error detection
partitionAwareness: {
enabled: true, // Detect partition rebalancing errors
rebalanceTimeout: 10000 // Timeout for rebalancing operations
},
// HTTP/WebSocket fallback strategy
fallback: {
fallbackToHttp: true, // Fallback to HTTP on WebSocket failure
alternativeEndpoints: [ // Alternative endpoints to try
'https://backup-cluster.amazonaws.com:8088'
],
fallbackTimeout: 5000 // Timeout for fallback attempts
},
// Metrics collection for observability
metrics: {
enabled: true, // Enable metrics collection
interval: 60000, // Collection interval (ms)
collector: (metrics) => { // Custom metrics handler
console.log('Resilience Metrics:', {
totalRequests: metrics.totalRequests,
failedRequests: metrics.failedRequests,
retriedRequests: metrics.retriedRequests,
averageResponseTime: metrics.averageResponseTime,
partitionRebalanceEvents: metrics.partitionRebalanceEvents
});
}
}
}
});
import { ResilientKsqlDbClient } from '@gftdcojp/ksqldb-orm';
const client = new ResilientKsqlDbClient({
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
}
});
await client.initialize();
// Execute a query and get detailed resilience information
const result = await client.executePullQuery('SELECT * FROM USERS_TABLE;');
console.log('Query Data:', result.data);
console.log('Resilience Info:', {
retryCount: result.resilience.retryCount,
fallbackUsed: result.resilience.fallbackUsed,
executionTime: result.resilience.executionTime,
circuitBreakerState: result.resilience.circuitBreakerState
});
import { getKsqlDbHealth, getKsqlDbMetrics } from '@gftdcojp/ksqldb-orm';
// Check system health
const health = await getKsqlDbHealth();
console.log('Health Status:', health.status); // 'healthy' | 'degraded' | 'unhealthy'
console.log('Circuit Breaker State:', health.circuitBreaker.state);
// Get comprehensive metrics
const metrics = await getKsqlDbMetrics();
console.log('Performance Metrics:', {
uptime: metrics.totalRequests,
errorRate: (metrics.failedRequests / metrics.totalRequests) * 100,
averageLatency: metrics.averageResponseTime,
partitionEvents: metrics.partitionRebalanceEvents
});
Resilience features are also available in the high-level database client:
import { createResilientDatabaseClient } from '@gftdcojp/ksqldb-orm';
const db = createResilientDatabaseClient({
ksql: {
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
},
resilience: {
retries: { maxRetries: 5 },
circuitBreaker: { enabled: true }
}
}
});
// Enhanced query builder with resilience
const users = await db.from('users')
.select('id, name, email')
.eq('status', 'active')
.executeEnhanced({ // Enhanced execution with resilience metadata
retries: 3,
fallbackToHttp: true,
timeout: 10000
});
console.log('Users:', users.data);
console.log('Query Resilience:', users.resilience);
The resilience system automatically detects and handles various error scenarios:
"Cannot determine which host contains the required partitions"
For more details, see docs/resilience.md.
This package implements enterprise-grade security features:
anon
, authenticated
, and service_role
# JWT Authentication (required in production)
GFTD_JWT_SECRET=your-cryptographically-secure-64-char-secret-key
# ksqlDB Connection (required)
GFTD_KSQLDB_URL=https://your-ksqldb-cluster.amazonaws.com:8088
GFTD_KSQLDB_API_KEY=your-confluent-api-key
GFTD_KSQLDB_API_SECRET=your-confluent-api-secret
# Refresh Token Storage (required in production)
GFTD_REDIS_URL=redis://localhost:6379
# or
GFTD_POSTGRES_URL=postgresql://user:pass@localhost:5432/dbname
# CORS Settings
GFTD_CORS_ORIGINS=https://app.example.com,https://admin.example.com
# JWT Settings
GFTD_JWT_EXPIRES_IN=15m
GFTD_JWT_REFRESH_EXPIRES_IN=7d
# Logging
GFTD_LOG_LEVEL=info
import { initializeSecurity } from '@gftdcojp/ksqldb-orm/security';
// Initialize security settings on application startup
await initializeSecurity();
import { createCorsMiddleware } from '@gftdcojp/ksqldb-orm/utils/cors';
const app = express();
app.use(createCorsMiddleware());
import { JwtAuthManager } from '@gftdcojp/ksqldb-orm/jwt-auth';
const authManager = JwtAuthManager.getInstance();
// Authenticate user and get tokens
const authResult = await authManager.authenticate(userPayload);
// Verify access token
const user = authManager.verifyAccessToken(accessToken);
// Refresh tokens
const newTokens = await authManager.refresh(refreshToken, currentUser);
Run security validation to check your configuration:
import { displaySecurityStatus, securityHealthCheck } from '@gftdcojp/ksqldb-orm/security';
// Display security status
displaySecurityStatus();
// Programmatic health check
const health = await securityHealthCheck();
console.log(health.status); // 'healthy' | 'warning' | 'critical'
Create the table for PostgreSQL refresh token storage:
-- Copy the SQL from the REFRESH_TOKEN_TABLE_SQL export
-- or use the helper function:
import { REFRESH_TOKEN_TABLE_SQL } from '@gftdcojp/ksqldb-orm/security';
// Execute the SQL manually
console.log(REFRESH_TOKEN_TABLE_SQL);
This package achieves a security score of 90/100 with the following protections:
For more details, see SECURITY-FIX-REPORT.md.
This package is server-side (Node.js) only.
// All features, including file operations and CLI tools
import { KsqlDbClient, TypeGenerator, AuditLogManager } from '@gftdcojp/ksqldb-orm';
The package includes comprehensive tests for the server environment:
# Run all tests
pnpm test
# Run with coverage
pnpm run test:coverage
# Run in watch mode
pnpm run test:watch
# Run integration tests
pnpm run test:integration
The current test suite achieves 44.47% code coverage.
import { createDatabaseClient } from '@gftdcojp/ksqldb-orm';
// Create a database client
const dbClient = createDatabaseClient({
ksql: {
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
}
}
});
await dbClient.initialize();
// Supabase-like query (simple and intuitive)
const { data, error } = await dbClient
.from('users')
.eq('status', 'active')
.limit(10)
.execute();
if (error) {
console.error('Query failed:', error);
} else {
console.log('Users:', data);
}
// Get a single record
const { data: user } = await dbClient
.from('users')
.eq('id', 1)
.single();
// Insert data
const { data: newUser } = await dbClient
.from('users')
.insert({
name: 'John Doe',
email: 'john@example.com',
status: 'active'
});
import {
initializeResilientKsqlDbClient,
executePullQueryResilient,
ResilientKsqlDbClient,
createResilientDatabaseClient,
getKsqlDbHealth,
getKsqlDbMetrics
} from '@gftdcojp/ksqldb-orm';
// Global resilience client functions
await initializeResilientKsqlDbClient(config);
const data = await executePullQueryResilient('SELECT * FROM USERS;');
// Class-based resilience client
const client = new ResilientKsqlDbClient(config);
await client.initialize();
const result = await client.executePullQuery('SELECT * FROM USERS;');
// Resilient database client
const db = createResilientDatabaseClient(config);
const users = await db.from('users').executeEnhanced();
// Health and metrics
const health = await getKsqlDbHealth();
const metrics = await getKsqlDbMetrics();
import { createDatabaseClient, DatabaseClient } from '@gftdcojp/ksqldb-orm';
// Create client
const dbClient = createDatabaseClient({
ksql: {
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
}
}
});
// Get data
const { data } = await dbClient.from('users').execute();
// Conditional search (all operators)
const { data } = await dbClient
.from('users')
.eq('status', 'active') // Equal
.neq('type', 'test') // Not equal
.gt('age', 18) // Greater than
.between('score', 80, 100) // Range
.like('name', '%john%') // Pattern match
.in('department', ['eng', 'dev']) // Multiple values
.isNotNull('email') // Not null
.order('created_at', false)
.limit(25)
.execute();
// Data manipulation
// Single insert
await dbClient.from('users').insert({
name: 'John',
email: 'john@example.com'
});
// Batch insert
await dbClient.from('users').insert([
{ name: 'Alice', email: 'alice@example.com' },
{ name: 'Bob', email: 'bob@example.com' },
{ name: 'Charlie', email: 'charlie@example.com' }
]);
// Update and delete with complex conditions
await dbClient.from('users').eq('id', 1).update({ name: 'Jane' });
await dbClient.from('users').lt('last_login', '2024-01-01').delete();
import {
generateTypesForTables,
listAllTables,
getTableSchema
} from '@gftdcojp/ksqldb-orm/type-generator';
import {
initializeSchemaRegistryClient,
registerSchema,
getLatestSchema
} from '@gftdcojp/ksqldb-orm/schema-registry';
// Planned API (not yet implemented)
import {
RLSManager,
PolicyType
} from '@gftdcojp/ksqldb-orm/row-level-security';
// π§ Feature in progress
// - Advanced RLS policies for data access control
// - Design complete, implementation planned for Q1 2025
π Full Documentation - Detailed guides and learning paths π High-Level Query Builder - Full API reference
// Get all data
const { data } = await dbClient.from('users').execute();
// Search with various conditions
const { data } = await dbClient
.from('users')
.eq('status', 'active') // Equal
.neq('type', 'test') // Not equal
.gt('age', 18) // Greater than
.between('score', 80, 100) // Range
.like('name', '%john%') // Pattern match
.in('department', ['eng', 'dev']) // Multiple values
.isNotNull('email') // Not null
.order('created_at', false)
.limit(10)
.execute();
// Get a single record
const { data: user } = await dbClient
.from('users')
.eq('id', 123)
.single();
// Search for null values
const { data: usersWithoutEmail } = await dbClient
.from('users')
.isNull('email')
.execute();
// NOT IN condition
const { data: nonTestUsers } = await dbClient
.from('users')
.notIn('status', ['test', 'deleted'])
.execute();
// Single data insert
const { data } = await dbClient
.from('users')
.insert({
name: 'John Doe',
email: 'john@example.com',
status: 'active'
});
// Batch data insert (multiple records)
const { data } = await dbClient
.from('users')
.insert([
{ name: 'Alice', email: 'alice@example.com', status: 'active' },
{ name: 'Bob', email: 'bob@example.com', status: 'pending' },
{ name: 'Charlie', email: 'charlie@example.com', status: 'active' }
]);
// Update data with complex conditions
const { data } = await dbClient
.from('users')
.between('created_at', '2024-01-01', '2024-01-31')
.eq('status', 'pending')
.update({
status: 'verified',
updated_at: new Date().toISOString()
});
// Delete data with conditions
const { data } = await dbClient
.from('users')
.lt('last_login', '2023-01-01')
.eq('status', 'inactive')
.delete();
// Planned API (not yet implemented)
import { rls } from '@gftdcojp/ksqldb-orm/row-level-security';
// Create security policy (in progress)
rls.createPolicy({
tableName: 'users_table',
condition: 'user_id = auth.user_id()',
roles: ['authenticated']
});
import { generateTypesForTables } from '@gftdcojp/ksqldb-orm/type-generator';
// Automatically generate type definitions for all tables
const typeDefinitions = await generateTypesForTables();
import { registerSchema } from '@gftdcojp/ksqldb-orm/schema-registry';
// Register an Avro schema
await registerSchema('users-value', userSchema, 'AVRO');
Note: The CLI tool is provided in a separate package,
@gftdcojp/cli
.
# Install the CLI tool separately
pnpm add @gftdcojp/cli
# Generate types for all tables
npx @gftdcojp/cli generate-all --output ./types
# Generate types for a specific table
npx @gftdcojp/cli generate-types --table users_table --output ./types
# List all tables and streams
npx @gftdcojp/cli list
# For more details on CLI commands, see the @gftdcojp/cli package documentation
# ksqlDB Connection Settings
GFTD_KSQLDB_URL=https://your-cluster.aws.confluent.cloud:443
GFTD_KSQLDB_API_KEY=your-api-key
GFTD_KSQLDB_API_SECRET=your-api-secret
# Schema Registry Connection Settings (optional)
CONFLUENT_SCHEMA_REGISTRY_URL=https://your-schema-registry.aws.confluent.cloud
# Logging Settings (optional)
GFTD_LOG_LEVEL=info # Log level (debug, info, warn, error)
GFTD_LOG_DIR=/absolute/path/to/logs # Custom log directory (absolute path)
LOG_LEVEL=info # Alternative log level setting
LOG_DIR=/path/to/logs # Alternative log directory setting
import { KsqlDbConfig } from '@gftdcojp/ksqldb-orm';
const config: KsqlDbConfig = {
url: process.env.GFTD_KSQLDB_URL,
auth: {
key: process.env.GFTD_KSQLDB_API_KEY,
secret: process.env.GFTD_KSQLDB_API_SECRET
},
headers: {
'Custom-Header': 'value'
}
};
Feature Category | Completion | Status | Description |
---|---|---|---|
ksqlDB Client | 95% | β High-Quality | Fully-featured REST API client |
Resilience Features | 90% | β Production-Grade | Partition rebalancing support |
Security | 90% | β Enterprise-Grade | 90/100 score achieved |
Type Generation | 85% | β Practical | Automatic TypeScript type generation |
Schema Registry | 85% | β Practical | Confluent integration |
Q1 2025 Goals:
Feature | Implementation | Quality Score | Production Use |
---|---|---|---|
ksqlDB Client | 95% | A+ | β Recommended |
Resilience & Error Handling | 90% | A+ | β Recommended |
Security Features | 90% | A+ | β Recommended |
Type Generation | 85% | A | β Usable |
Database Client | 85% | A | β Usable |
Feature | Design | Implementation | Target |
---|---|---|---|
Row-Level Security | β Complete | π§ In Progress | Q1 2025 |
Performance Optimization | β Complete | π Planned | Q1 2025 |
MIT License - see the LICENSE file for details.
Please read the contributing guidelines before submitting a pull request.
import { defineSchema, createStreamFromSchema } from '@gftdcojp/ksqldb-orm';
import { string, int, timestamp } from '@gftdcojp/ksqldb-orm/field-types';
// Define a schema - the topic will automatically be set to infinite retention
const userEventSchema = defineSchema('UserEvent', {
userId: int().notNull(),
eventType: string().notNull(),
data: string(),
timestamp: timestamp().notNull()
});
// Create a stream with automatic infinite retention
await createStreamFromSchema('UserEvent', 'STREAM');
// Or use direct DDL (retention.ms=-1 will be added automatically)
await executeDDL(`
CREATE STREAM user_events (
user_id INT,
event_type STRING,
data STRING,
timestamp STRING
) WITH (
kafka_topic='user_events',
value_format='JSON'
);
`);
Provides an easy-to-use database interface with a comprehensive and intuitive query builder.
β Available Now (Production-Ready):
π§ Coming Soon (Q1 2025):
π Recommendation: A high-quality package that is ready for enterprise production use today!
@gftdcojp/ksqldb-orm - The most mature ksqlDB integration solution β¨
FAQs
ksqldb-orm - Server-Side TypeScript ORM for ksqlDB with enterprise security extensions
The npm package @gftdcojp/ksqldb-orm receives a total of 296 weekly downloads. As such, @gftdcojp/ksqldb-orm popularity was classified as not popular.
We found that @gftdcojp/ksqldb-orm demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago.Β It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket is launching experimental protection for Chrome extensions, scanning for malware and risky permissions to prevent silent supply chain attacks.
Product
Add secure dependency scanning to Claude Desktop with Socket MCP, a one-click extension that keeps your coding conversations safe from malicious packages.
Product
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.