@kysera/dal
Functional Data Access Layer for Kysera - Query functions with automatic plugin support.
Overview
@kysera/dal provides a functional approach to database access as an alternative to traditional repository patterns. Write query functions that are composable, type-safe, and easy to test.
The DAL works through @kysera/executor to provide automatic plugin support. When you pass a KyseraExecutor (instead of a raw Kysely instance) to your query functions, all plugins (soft-delete, RLS, audit, etc.) are automatically applied while maintaining a clean functional API.
Features
- Query Functions - Pure functions instead of repository methods
- Type Inference - Return types automatically inferred from queries
- Context Passing - Explicit database context (no dependency injection)
- Plugin Support - Automatic plugin interception via
@kysera/executor
- Transaction Support - First-class transactions with automatic plugin propagation
- Composition Utilities - Combine queries using
compose, chain, parallel, etc.
- Zero Dependencies - Only
@kysera/executor dependency (peers on Kysely)
- Fully Typed - Complete TypeScript support with strict mode
Installation
npm install @kysera/dal @kysera/executor kysely
npm install @kysera/soft-delete @kysera/rls @kysera/audit
pnpm add @kysera/dal @kysera/executor kysely
yarn add @kysera/dal @kysera/executor kysely
bun add @kysera/dal @kysera/executor kysely
Quick Start
import { Kysely } from 'kysely'
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { createQuery, withTransaction } from '@kysera/dal'
interface Database {
users: {
id: number
email: string
name: string
deleted_at: Date | null
}
}
const db = new Kysely<Database>({
})
const executor = await createExecutor(db, [softDeletePlugin()])
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').select(['id', 'email', 'name']).where('id', '=', id).executeTakeFirst()
)
const createUser = createQuery((ctx, data: { email: string; name: string }) =>
ctx.db.insertInto('users').values(data).returningAll().executeTakeFirstOrThrow()
)
const user = await getUserById(executor, 1)
const result = await withTransaction(executor, async ctx => {
const newUser = await createUser(ctx, {
email: 'test@example.com',
name: 'Test User'
})
return newUser
})
Core Concepts
Query Functions
Query functions are the building blocks of the Functional DAL. They accept a database context and arguments, returning a Promise.
import { createQuery } from '@kysera/dal'
const findUserByEmail = createQuery((ctx, email: string) =>
ctx.db.selectFrom('users').selectAll().where('email', '=', email).executeTakeFirst()
)
const insertPost = createQuery((ctx, data: { title: string; body: string; user_id: number }) =>
ctx.db.insertInto('posts').values(data).returningAll().executeTakeFirstOrThrow()
)
const updateUserName = createQuery((ctx, id: number, name: string) =>
ctx.db.updateTable('users').set({ name }).where('id', '=', id).returningAll().executeTakeFirst()
)
const deletePost = createQuery((ctx, id: number) =>
ctx.db.deleteFrom('posts').where('id', '=', id).executeTakeFirst()
)
Plugin Integration
The DAL works through @kysera/executor for plugin support. Instead of implementing its own plugin system, the DAL leverages the executor's plugin interception mechanism. When you pass a KyseraExecutor (instead of a raw Kysely instance) to your query functions, all plugins are automatically applied at the query builder level.
Basic Plugin Integration
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { createQuery } from '@kysera/dal'
const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({
schema: {
users: { tenantIdColumn: 'tenant_id' },
posts: { tenantIdColumn: 'tenant_id' }
},
getCurrentTenantId: () => currentTenantId
})
])
const getUsers = createQuery(ctx => ctx.db.selectFrom('users').selectAll().execute())
const users = await getUsers(executor)
How Plugin Propagation Works
- Query Creation: When you pass a
KyseraExecutor to a query function, the context preserves the executor with all its plugins
- Transaction Wrapping:
withTransaction() automatically wraps transaction instances with the same plugins as the parent executor
- Automatic Interception: All query builders (
selectFrom, insertInto, etc.) are intercepted by plugins before execution
- Type Safety: Full TypeScript support - the database schema type is preserved through all transformations
Multiple Plugins
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { auditPlugin } from '@kysera/audit'
const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({
}),
auditPlugin({
})
])
const getUsers = createQuery(ctx => ctx.db.selectFrom('users').selectAll().execute())
const users = await getUsers(executor)
Transactions
Execute multiple queries atomically within a transaction. Plugins automatically propagate to the transaction context.
import { withTransaction, createTransactionalQuery } from '@kysera/dal'
const result = await withTransaction(executor, async ctx => {
const user = await createUser(ctx, userData)
const profile = await createProfile(ctx, { userId: user.id, ...profileData })
return { user, profile }
})
const transferFunds = createTransactionalQuery(
async (ctx, fromId: number, toId: number, amount: number) => {
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '-', amount) }))
.where('id', '=', fromId)
.execute()
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '+', amount) }))
.where('id', '=', toId)
.execute()
return { success: true }
}
)
await withTransaction(executor, ctx => transferFunds(ctx, 1, 2, 100))
await transferFunds(executor, 1, 2, 100)
Composition
compose
Compose two query functions sequentially, passing the result of the first to the second:
import { createQuery, compose } from '@kysera/dal'
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getPostsByUserId = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('posts').selectAll().where('user_id', '=', userId).execute()
)
const getUserWithPosts = compose(getUserById, async (ctx, user) => ({
...user,
posts: await getPostsByUserId(ctx, user.id)
}))
const result = await getUserWithPosts(executor, 1)
chain
Chain multiple transformations on a query result:
import { createQuery, chain } from '@kysera/dal'
const getUser = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getUserComplete = chain(
getUser,
async (ctx, user) => ({ ...user, posts: await getPosts(ctx, user.id) }),
async (ctx, data) => ({ ...data, followers: await getFollowers(ctx, data.id) }),
async (ctx, data) => ({ ...data, stats: await getStats(ctx, data.id) })
)
const fullUser = await getUserComplete(executor, 1)
parallel
Execute multiple queries concurrently and combine their results:
import { createQuery, parallel } from '@kysera/dal'
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
)
const getUserStats = createQuery((ctx, id: number) =>
ctx.db.selectFrom('user_stats').selectAll().where('user_id', '=', id).executeTakeFirst()
)
const getNotifications = createQuery((ctx, id: number) =>
ctx.db.selectFrom('notifications').selectAll().where('user_id', '=', id).execute()
)
const getDashboardData = parallel({
user: getUserById,
stats: getUserStats,
notifications: getNotifications
})
const dashboard = await getDashboardData(executor, userId)
conditional
Execute a query conditionally based on runtime logic:
import { createQuery, conditional } from '@kysera/dal'
const getPremiumFeatures = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('premium_features').selectAll().where('user_id', '=', userId).execute()
)
const getFeatures = conditional(
(ctx, userId: number, isPremium: boolean) => isPremium,
getPremiumFeatures,
[]
)
const features = await getFeatures(executor, userId, true)
const emptyFeatures = await getFeatures(executor, userId, false)
mapResult
Transform array results with a mapper function:
import { createQuery, mapResult } from '@kysera/dal'
const getAllUsers = createQuery(ctx => ctx.db.selectFrom('users').selectAll().execute())
const getUserNames = mapResult(getAllUsers, user => user.name)
const names = await getUserNames(executor)
API Reference
Query Creation
createQuery<DB, TArgs, TResult>(queryFn)
Create a typed query function.
Parameters:
queryFn: (ctx: DbContext<DB>, ...args: TArgs) => Promise<TResult> - Query implementation
Returns: QueryFunction<DB, TArgs, TResult>
Example:
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').select(['id', 'email', 'name']).where('id', '=', id).executeTakeFirst()
)
const user = await getUserById(executor, 1)
await withTransaction(executor, async ctx => {
const user = await getUserById(ctx, 1)
return user
})
createTransactionalQuery<DB, TArgs, TResult>(queryFn)
Create a query function that requires a transaction context.
Parameters:
queryFn: (ctx: DbContext<DB>, ...args: TArgs) => Promise<TResult> - Query implementation
Returns: QueryFunction<DB, TArgs, TResult>
Throws: Error if called outside a transaction
Example:
const transferFunds = createTransactionalQuery(
async (ctx, fromId: number, toId: number, amount: number) => {
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '-', amount) }))
.where('id', '=', fromId)
.execute()
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '+', amount) }))
.where('id', '=', toId)
.execute()
return { success: true }
}
)
await withTransaction(executor, ctx => transferFunds(ctx, 1, 2, 100))
await transferFunds(executor, 1, 2, 100)
Context Management
createContext<DB>(db)
Create a database context from any database instance.
Parameters:
db: Kysely<DB> | Transaction<DB> | KyseraExecutor<DB> | KyseraTransaction<DB> - Database instance
Returns: DbContext<DB>
Important: To enable plugin support in the DAL, pass a KyseraExecutor (not a raw Kysely instance) to createContext. The executor wraps the database with plugin interception.
Example:
import { createContext } from '@kysera/dal'
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
const executor = await createExecutor(db, [softDeletePlugin()])
const ctx = createContext(executor)
const user = await findUserById(ctx, 1)
Without plugins (raw Kysely):
const ctx = createContext(db)
const user = await findUserById(ctx, 1)
withTransaction<DB, T>(db, fn, options?)
Execute a function within a transaction.
Parameters:
db: Kysely<DB> | KyseraExecutor<DB> - Database instance
fn: (ctx: DbContext<DB>) => Promise<T> - Function to execute
options?: TransactionOptions - Transaction options (optional)
Returns: Promise<T>
Example:
const result = await withTransaction(executor, async ctx => {
const user = await createUser(ctx, userData)
const profile = await createProfile(ctx, { userId: user.id, ...profileData })
return { user, profile }
})
const result = await withTransaction(executor, async ctx => {
const users = await getUsers(ctx)
return users
})
withContext<DB, T>(db, fn)
Execute a function with a database context (no transaction).
Parameters:
db: Kysely<DB> | KyseraExecutor<DB> - Database instance
fn: (ctx: DbContext<DB>) => Promise<T> - Function to execute
Returns: Promise<T>
Example:
const users = await withContext(executor, async ctx => {
return getAllUsers(ctx)
})
isInTransaction<DB>(ctx)
Check if context is within a transaction.
Parameters:
ctx: DbContext<DB> - Database context
Returns: boolean
Example:
const myQuery = createQuery((ctx, id: number) => {
if (isInTransaction(ctx)) {
console.log('Running inside transaction')
}
return ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
})
Composition
compose<DB, TArgs, TFirst, TResult>(first, second)
Compose two query functions sequentially.
Parameters:
first: QueryFunction<DB, TArgs, TFirst> - First query
second: (ctx: DbContext<DB>, result: TFirst) => Promise<TResult> - Second query
Returns: QueryFunction<DB, TArgs, TResult>
chain<DB, TArgs, T1, T2, ...>(query, ...transforms)
Chain multiple transformations on a query result.
Parameters:
query: QueryFunction<DB, TArgs, T1> - Initial query
...transforms: Array<(ctx: DbContext<DB>, result) => Promise<result>> - Transform functions
Returns: QueryFunction<DB, TArgs, TN> (where N is the last transform result type)
Overloads: Supports 1-3 transform functions with full type inference
parallel<DB, TArgs, T>(queries)
Execute multiple queries in parallel.
Parameters:
queries: Record<string, QueryFunction<DB, TArgs, unknown>> - Object of query functions
Returns: QueryFunction<DB, TArgs, ParallelResult<T>>
conditional<DB, TArgs, TResult, TFallback>(condition, query, fallback?)
Execute a query conditionally.
Parameters:
condition: (ctx: DbContext<DB>, ...args: TArgs) => boolean | Promise<boolean> - Condition function
query: QueryFunction<DB, TArgs, TResult> - Query to execute if true
fallback?: TFallback - Value to return if false
Returns: QueryFunction<DB, TArgs, TResult | TFallback>
mapResult<DB, TArgs, TItem, TResult>(query, mapper)
Map over array results.
Parameters:
query: QueryFunction<DB, TArgs, TItem[]> - Query returning array
mapper: (item: TItem, index: number) => TResult - Mapper function
Returns: QueryFunction<DB, TArgs, TResult[]>
TypeScript Types
Re-exported Executor Types
The following types are re-exported from @kysera/executor for convenience:
import type {
ExecutorConfig,
KyseraExecutorMarker,
PluginValidationDetails
} from '@kysera/dal'
ExecutorConfig - Configuration options for executor creation
KyseraExecutorMarker - Type marker interface for KyseraExecutor
PluginValidationDetails - Validation error details from plugin system
See @kysera/executor documentation for detailed type information.
DbContext<DB>
Database context interface.
interface DbContext<DB = Record<string, unknown>> {
readonly db: Kysely<DB> | Transaction<DB> | KyseraExecutor<DB> | KyseraTransaction<DB>
readonly isTransaction: boolean
}
QueryFunction<DB, TArgs, TResult>
Query function signature.
type QueryFunction<DB, TArgs extends readonly unknown[], TResult> = (
ctxOrDb: DbContext<DB> | Kysely<DB> | KyseraExecutor<DB>,
...args: TArgs
) => Promise<TResult>
TransactionOptions
Transaction execution options.
interface TransactionOptions {
isolationLevel?: 'read uncommitted' | 'read committed' | 'repeatable read' | 'serializable'
}
Note: Isolation level configuration is dialect-specific and should typically be set at the connection pool level.
Type Inference Utilities
type InferResult<T> = T extends QueryFunction<any, any, infer R> ? R : never
type InferArgs<T> = T extends QueryFunction<any, infer A, any> ? A : never
type InferDB<T> = T extends QueryFunction<infer DB, any, any> ? DB : never
ParallelResult<T>
Result type for parallel query execution.
type ParallelResult<T extends Record<string, QueryFunction<any, any, any>>> = {
[K in keyof T]: T[K] extends QueryFunction<any, any, infer R> ? R : never
}
Transaction Features
Savepoint Management
The DAL provides savepoint support for nested transaction rollback points:
import { withTransaction, withSavepoint } from '@kysera/dal'
await withTransaction(executor, async ctx => {
const user = await createUser(ctx, { email: 'test@example.com', name: 'Test' })
try {
await withSavepoint(ctx, 'before_post', async spCtx => {
const post = await createPost(spCtx, { userId: user.id, title: 'Test Post' })
await someRiskyOperation(spCtx, post.id)
})
} catch (error) {
console.log('Post creation failed, but user was still created')
}
return user
})
Savepoint Validation:
Savepoint names must be positive integers (1, 2, 3, etc.) for PostgreSQL compatibility:
await withSavepoint(ctx, '1', async spCtx => { })
await withSavepoint(ctx, '2', async spCtx => { })
await withSavepoint(ctx, '999', async spCtx => { })
await withSavepoint(ctx, 'my-savepoint', async spCtx => { })
await withSavepoint(ctx, '0', async spCtx => { })
await withSavepoint(ctx, '-1', async spCtx => { })
Rollback Error Handling
When a transaction or savepoint is rolled back due to an error, the DAL logs the rollback operation but preserves the original error:
import { withTransaction } from '@kysera/dal'
try {
await withTransaction(executor, async ctx => {
await createUser(ctx, { email: 'test@example.com', name: 'Test' })
throw new Error('Something went wrong')
})
} catch (error) {
console.error(error.message)
}
Internal logging:
When a rollback occurs, the DAL logs it using console.error:
Transaction/savepoint rolled back due to error: [error message]
This helps with debugging while ensuring the original error is always re-thrown to the caller.
Plugin Propagation in Transactions
All plugins registered on the executor automatically propagate through transactions:
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { withTransaction } from '@kysera/dal'
const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({ schema: rlsSchema, getCurrentTenantId: () => tenantId })
])
await withTransaction(executor, async ctx => {
const users = await ctx.db.selectFrom('users').selectAll().execute()
const newUser = await ctx.db
.insertInto('users')
.values({ name: 'Alice', tenant_id: tenantId })
.returningAll()
.executeTakeFirst()
return newUser
})
See the Executor documentation for more details on how plugins work in transactions.
Examples
Complete Example with Plugins
import { Kysely, PostgresDialect } from 'kysely'
import { Pool } from 'pg'
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { createQuery, withTransaction, parallel } from '@kysera/dal'
interface Database {
users: {
id: number
email: string
name: string
tenant_id: number
deleted_at: Date | null
}
posts: {
id: number
user_id: number
title: string
body: string
tenant_id: number
deleted_at: Date | null
}
}
const db = new Kysely<Database>({
dialect: new PostgresDialect({
pool: new Pool({ connectionString: process.env.DATABASE_URL })
})
})
const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({
schema: {
users: { tenantIdColumn: 'tenant_id' },
posts: { tenantIdColumn: 'tenant_id' }
},
getCurrentTenantId: () => currentTenantId
})
])
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
)
const getPostsByUserId = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('posts').selectAll().where('user_id', '=', userId).execute()
)
const createUser = createQuery((ctx, data: { email: string; name: string; tenant_id: number }) =>
ctx.db.insertInto('users').values(data).returningAll().executeTakeFirstOrThrow()
)
const createPost = createQuery(
(ctx, data: { user_id: number; title: string; body: string; tenant_id: number }) =>
ctx.db.insertInto('posts').values(data).returningAll().executeTakeFirstOrThrow()
)
const user = await getUserById(executor, 1)
const result = await withTransaction(executor, async ctx => {
const newUser = await createUser(ctx, {
email: 'test@example.com',
name: 'Test User',
tenant_id: currentTenantId
})
const newPost = await createPost(ctx, {
user_id: newUser.id,
title: 'First Post',
body: 'Hello World',
tenant_id: currentTenantId
})
return { user: newUser, post: newPost }
})
const getUserData = parallel({
user: getUserById,
posts: getPostsByUserId
})
const userData = await getUserData(executor, userId)
User Management Service
import { createQuery, withTransaction, parallel } from '@kysera/dal'
const createUser = createQuery((ctx, data: { email: string; name: string }) =>
ctx.db.insertInto('users').values(data).returningAll().executeTakeFirstOrThrow()
)
const createUserProfile = createQuery((ctx, data: { user_id: number; bio: string }) =>
ctx.db.insertInto('profiles').values(data).returningAll().executeTakeFirstOrThrow()
)
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
)
const getProfileByUserId = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('profiles').selectAll().where('user_id', '=', userId).executeTakeFirst()
)
async function registerUser(executor: KyseraExecutor<Database>, data: RegisterData) {
return withTransaction(executor, async ctx => {
const user = await createUser(ctx, {
email: data.email,
name: data.name
})
const profile = await createUserProfile(ctx, {
user_id: user.id,
bio: data.bio
})
return { user, profile }
})
}
const getUserData = parallel({
user: getUserById,
profile: getProfileByUserId
})
const userData = await getUserData(executor, userId)
Blog Post with Author
import { createQuery, compose } from '@kysera/dal'
const getPostById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('posts').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getPostWithAuthor = compose(getPostById, async (ctx, post) => ({
...post,
author: await getUserById(ctx, post.user_id)
}))
const post = await getPostWithAuthor(executor, postId)
Requirements
- Node.js: >=20.0.0
- Bun: >=1.0.0
- Kysely: >=0.28.8 (peer dependency)
- @kysera/executor: >=0.7.0 (dependency)
License
MIT
Links