New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kysely-replication

Package Overview
Dependencies
Maintainers
0
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kysely-replication - npm Package Compare versions

Comparing version 0.0.2 to 0.1.0

2

dist/strategy/random.d.ts

@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js';

#private;
constructor(options: RandomReplicaStrategyOptions);
constructor(options?: RandomReplicaStrategyOptions);
next(replicaCount: number): Promise<number>;

@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined;

@@ -5,3 +5,3 @@ // src/strategy/random.ts

constructor(options) {
this.#options = options;
this.#options = { ...options };
}

@@ -12,3 +12,3 @@ async next(replicaCount) {

get onTransaction() {
return this.#options.onTransaction;
return this.#options?.onTransaction;
}

@@ -15,0 +15,0 @@ };

@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js';

#private;
constructor(options: RoundRobinReplicaStrategyOptions);
constructor(options?: RoundRobinReplicaStrategyOptions);
next(replicaCount: number): Promise<number>;

@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined;

@@ -13,3 +13,3 @@ // src/strategy/round-robin.ts

get onTransaction() {
return this.#options.onTransaction;
return this.#options?.onTransaction;
}

@@ -16,0 +16,0 @@ };

{
"name": "kysely-replication",
"version": "0.0.2",
"description": "",
"version": "0.1.0",
"description": "Replication-aware Kysely query execution",
"repository": {
"type": "git",
"url": "https://github.com/kysely-org/kysely-replication.git"
},
"type": "module",

@@ -6,0 +10,0 @@ "main": "./dist/index.cjs",

# kysely-replication
Production systems often use multiple database replicas to distribute read load
and increase availability. `kysely-replication` is a set of ideas and utilities
for building a replication-aware Kysely instance.
## Installation
```sh
npm install kysely kysely-replication
```
## Usage
This example demonstrates how to create a Kysely instance that uses a primary dialect
for writes and 2 replica dialects for reads. The `RoundRobinReplicaStrategy`
is used to distribute read queries evenly between the replicas. Alternatively,
the `RandomReplicaStrategy` can be imported from `kysely-replication/strategy/random`
and used to randomly select a replica for each query.
The strategy is (optionally) configured to throw an error if a transaction is started
on a replica connection - this can occur when using `db.connection()` and the first
query is a read query. The error is thrown to prevent the transaction from being
started on a replica connection, which could lead to inconsistencies in the database
state.
```ts

@@ -32,3 +56,47 @@ import { Kysely, PostgresDialect } from 'kysely'

await db.destroy()
await db.destroy() // destroys all sub-dialects
```
## Executes on Primary
### DDL Queries
DDL (Data Definition Language) queries execute on the primary dialect, as they
change the structure of the database.
Basically, anything on `db.schema` is included.
### Raw Queries
We don't currently parse raw queries to determine if they are read or write queries.
As a measure of caution, all raw queries execute on the primary dialect.
So anything that starts with `sql` template tag, as main query, or as subquery
in a `with` clause.
### Write Queries
Write queries execute on the primary dialect, as they change the state of the database.
Basically, `db.insertInto`, `db.updateTable`, `db.deleteFrom`, `db.mergeInto`, `db.replaceInto`
as the main query or as a subquery in a `with` clause.
### Transactions
Transactions more often than not contain write queries. They also more often than
not obtain locks on the database to ensure consistency. For these reasons, transactions
execute on the primary dialect.
As mentioned above, the `onTransaction` option in replica strategies can be used
to throw an error if a transaction is started on a replica connection - a rare
case that can occur when using `db.connection()` and the first query is a read
query.
## Executes on Replicas
### Read Queries
Read queries execute on replica dialects, as you'd expect.
Basically, `db.selectFrom` and `db.selectNoFrom` queries that do not contain write
queries in a `with` clause.

@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js';

#private;
constructor(options: RandomReplicaStrategyOptions);
constructor(options?: RandomReplicaStrategyOptions);
next(replicaCount: number): Promise<number>;

@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined;

@@ -5,3 +5,3 @@ // src/strategy/random.ts

constructor(options) {
this.#options = options;
this.#options = { ...options };
}

@@ -12,3 +12,3 @@ async next(replicaCount) {

get onTransaction() {
return this.#options.onTransaction;
return this.#options?.onTransaction;
}

@@ -15,0 +15,0 @@ };

@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js';

#private;
constructor(options: RoundRobinReplicaStrategyOptions);
constructor(options?: RoundRobinReplicaStrategyOptions);
next(replicaCount: number): Promise<number>;

@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined;

@@ -13,3 +13,3 @@ // src/strategy/round-robin.ts

get onTransaction() {
return this.#options.onTransaction;
return this.#options?.onTransaction;
}

@@ -16,0 +16,0 @@ };

@@ -14,4 +14,5 @@ import {

} from 'kysely'
import { afterEach, beforeAll, describe, expect, it } from 'vitest'
import { afterEach, beforeAll, describe, expect, it, vi } from 'vitest'
import { KyselyReplicationDialect } from '../src/index.js'
import { RandomReplicaStrategy } from '../src/strategy/random.js'
import { RoundRobinReplicaStrategy } from '../src/strategy/round-robin.js'

@@ -27,142 +28,206 @@

describe('kysely-replication: round-robin', () => {
let db: Kysely<Database>
const executions: string[] = []
const randomSpy = vi.spyOn(global.Math, 'random')
const warnSpy = vi.spyOn(global.console, 'warn')
beforeAll(() => {
db = new Kysely({
dialect: new KyselyReplicationDialect({
primaryDialect: getDummyDialect('primary', executions),
replicaDialects: new Array(3)
.fill(null)
.map((_, i) =>
getDummyDialect(`replica-${i}`, executions),
) as unknown as readonly [Dialect, ...Dialect[]],
replicaStrategy: new RoundRobinReplicaStrategy({}),
}),
describe.each(
(['error', 'warn'] as const).flatMap((onTransaction) => [
{
onTransaction,
replicaAssertion: (executions: string[]) =>
expect(executions).toEqual([
'replica-0',
'replica-1',
'replica-2',
'replica-0',
'replica-1',
]),
Strategy: RoundRobinReplicaStrategy,
},
{
onTransaction,
replicaAssertion: (executions: string[]) =>
expect(randomSpy).toHaveBeenCalledTimes(executions.length),
Strategy: RandomReplicaStrategy,
},
]),
)(
'kysely-replication: $Strategy.name (onTransaction: $onTransaction)',
({ onTransaction, replicaAssertion, Strategy }) => {
let db: Kysely<Database>
const executions: string[] = []
let primaryDialect: Dialect
let replicaDialects: readonly [Dialect, ...Dialect[]]
beforeAll(() => {
primaryDialect = getDummyDialect('primary', executions)
replicaDialects = new Array(3)
.fill(null)
.map((_, i) =>
getDummyDialect(`replica-${i}`, executions),
) as unknown as readonly [Dialect, ...Dialect[]]
db = new Kysely({
dialect: new KyselyReplicationDialect({
primaryDialect,
replicaDialects,
replicaStrategy: new Strategy({ onTransaction }),
}),
})
})
})
afterEach(() => {
executions.length = 0 // clear executions
})
afterEach(() => {
executions.length = 0 // clear executions
})
it('should use primary dialect for DDL queries', async () => {
const queries = {
alterTable: db.schema
.alterTable('users')
.addColumn('nickname', 'varchar'),
createIndex: db.schema
.createIndex('users_index')
.on('users')
.column('email'),
createSchema: db.schema.createSchema('moshe'),
createTable: db.schema
.createTable('cakes')
.addColumn('id', 'serial', (cb) => cb.primaryKey()),
createType: db.schema
.createType('cake_type')
.asEnum(['chocolate', 'vanilla']),
createView: db.schema
.createView('users_view')
.as(db.selectFrom('users').selectAll()),
dropIndex: db.schema.dropIndex('users_index'),
dropSchema: db.schema.dropSchema('moshe'),
dropTable: db.schema.dropTable('cakes'),
dropType: db.schema.dropType('cake_type'),
dropView: db.schema.dropView('users_view'),
} satisfies {
[K in keyof Omit<SchemaModule, `with${string}`>]: {
execute(): Promise<unknown>
it('should use primary dialect for DDL queries', async () => {
const queries = {
alterTable: db.schema
.alterTable('users')
.addColumn('nickname', 'varchar'),
createIndex: db.schema
.createIndex('users_index')
.on('users')
.column('email'),
createSchema: db.schema.createSchema('moshe'),
createTable: db.schema
.createTable('cakes')
.addColumn('id', 'serial', (cb) => cb.primaryKey()),
createType: db.schema
.createType('cake_type')
.asEnum(['chocolate', 'vanilla']),
createView: db.schema
.createView('users_view')
.as(db.selectFrom('users').selectAll()),
dropIndex: db.schema.dropIndex('users_index'),
dropSchema: db.schema.dropSchema('moshe'),
dropTable: db.schema.dropTable('cakes'),
dropType: db.schema.dropType('cake_type'),
dropView: db.schema.dropView('users_view'),
} satisfies {
[K in keyof Omit<SchemaModule, `with${string}`>]: {
execute(): Promise<unknown>
}
}
}
await Promise.all(Object.values(queries).map((query) => query.execute()))
await Promise.all(Object.values(queries).map((query) => query.execute()))
expect(executions).toEqual(Object.values(queries).map(() => 'primary'))
})
expect(executions).toEqual(Object.values(queries).map(() => 'primary'))
})
it('should use primary dialect for raw queries', async () => {
await sql`select 1`.execute(db)
it('should use primary dialect for raw queries', async () => {
await sql`select 1`.execute(db)
expect(executions).toEqual(['primary'])
})
expect(executions).toEqual(['primary'])
})
it('should use primary dialect for DML queries that mutate data', async () => {
const queries = {
deleteFrom: db.deleteFrom('users').where('id', '=', 1),
insertInto: db
.insertInto('users')
.values({ email: 'info@example.com', is_verified: false }),
mergeInto: db
.mergeInto('users as u1')
.using('users as u2', 'u1.id', 'u2.id')
.whenMatched()
.thenDoNothing(),
replaceInto: db
.replaceInto('users')
.values({ email: 'info@example.com', is_verified: false }),
updateTable: db
.updateTable('users')
.set('is_verified', true)
.where('id', '=', 1),
with: db
.with('insert', (qb) =>
qb
.insertInto('users')
.values({ email: 'info@example.com', is_verified: false })
.returning('id'),
)
.selectFrom('users')
.innerJoin('insert', 'insert.id', 'users.id')
.selectAll(),
} satisfies {
[K in
| keyof Omit<
QueryCreator<Database>,
`select${string}` | `with${string}`
>
| 'with']: {
execute(): Promise<unknown>
it('should use primary dialect for DML queries that mutate data', async () => {
const queries = {
deleteFrom: db.deleteFrom('users').where('id', '=', 1),
insertInto: db
.insertInto('users')
.values({ email: 'info@example.com', is_verified: false }),
mergeInto: db
.mergeInto('users as u1')
.using('users as u2', 'u1.id', 'u2.id')
.whenMatched()
.thenDoNothing(),
replaceInto: db
.replaceInto('users')
.values({ email: 'info@example.com', is_verified: false }),
updateTable: db
.updateTable('users')
.set('is_verified', true)
.where('id', '=', 1),
with: db
.with('insert', (qb) =>
qb
.insertInto('users')
.values({ email: 'info@example.com', is_verified: false })
.returning('id'),
)
.selectFrom('users')
.innerJoin('insert', 'insert.id', 'users.id')
.selectAll(),
} satisfies {
[K in
| keyof Omit<
QueryCreator<Database>,
`select${string}` | `with${string}`
>
| 'with']: {
execute(): Promise<unknown>
}
}
}
await Promise.all(Object.values(queries).map((query) => query.execute()))
await Promise.all(Object.values(queries).map((query) => query.execute()))
expect(executions).toEqual(Object.values(queries).map(() => 'primary'))
})
expect(executions).toEqual(Object.values(queries).map(() => 'primary'))
})
it('should use replica dialects for DML queries that do not mutate data', async () => {
const queries = {
selectFrom: db.selectFrom('users').selectAll(),
selectNoFrom: db
.selectNoFrom((eb) => eb.selectFrom('users').selectAll().as('u'))
.selectAll(),
with: db
.with('u1', (qb) => qb.selectFrom('users').selectAll())
.selectFrom('u1')
.selectAll(),
} satisfies {
[K in keyof Pick<
QueryCreator<Database>,
'selectFrom' | 'selectNoFrom' | 'with'
>]: { execute(): Promise<unknown> }
}
it('should use primary dialect for transactions', async () => {
await db.transaction().execute(async (trx) => {
await trx.selectFrom('users').selectAll().execute()
})
await Promise.all([
Object.values(queries).map((query) => query.execute()),
db.selectFrom('users').selectAll().where('id', '=', 1).execute(),
db.selectFrom('users').selectAll().where('id', '=', 2).execute(),
])
expect(executions).toEqual(['primary'])
})
expect(executions).toEqual([
'replica-0',
'replica-1',
'replica-2',
'replica-0',
'replica-1',
])
})
})
it('should use replica dialects for DML queries that do not mutate data', async () => {
const queries = {
selectFrom: db.selectFrom('users').selectAll(),
selectNoFrom: db
.selectNoFrom((eb) => eb.selectFrom('users').selectAll().as('u'))
.selectAll(),
with: db
.with('u1', (qb) => qb.selectFrom('users').selectAll())
.selectFrom('u1')
.selectAll(),
} satisfies {
[K in keyof Pick<
QueryCreator<Database>,
'selectFrom' | 'selectNoFrom' | 'with'
>]: { execute(): Promise<unknown> }
}
// with extra queries to test round-robin
await Promise.all([
Object.values(queries).map((query) => query.execute()),
db.selectFrom('users').selectAll().execute(),
db.selectFrom('users').selectAll().execute(),
])
replicaAssertion(executions)
})
const message =
'KyselyReplication: transaction started with replica connection!'
if (onTransaction === 'error') {
it('should throw an error when a transaction is started with a replica connection', async () => {
await expect(
db.connection().execute(async (con) => {
await con.selectFrom('users').selectAll().execute()
await con.transaction().execute(async (trx) => {
await trx.selectFrom('users').selectAll().execute()
})
}),
).rejects.toThrow(message)
})
} else {
it('should warn when a transaction is started with a replica connection', async () => {
await db.connection().execute(async (con) => {
await con.selectFrom('users').selectAll().execute()
await con.transaction().execute(async (trx) => {
await trx.selectFrom('users').selectAll().execute()
})
})
expect(warnSpy).toHaveBeenCalledTimes(1)
expect(warnSpy).toHaveBeenCalledWith(message)
})
}
},
)
function getDummyDialect(name: string, executions: string[]): Dialect {

@@ -169,0 +234,0 @@ return {

@@ -6,2 +6,3 @@ import { defineConfig } from 'vitest/config'

allowOnly: false,
clearMocks: true,
globalSetup: ['./vitest.setup.ts'],

@@ -8,0 +9,0 @@ typecheck: {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc