kysely-replication
Advanced tools
Comparing version 0.0.2 to 0.1.0
@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js'; | ||
#private; | ||
constructor(options: RandomReplicaStrategyOptions); | ||
constructor(options?: RandomReplicaStrategyOptions); | ||
next(replicaCount: number): Promise<number>; | ||
@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined; |
@@ -5,3 +5,3 @@ // src/strategy/random.ts | ||
constructor(options) { | ||
this.#options = options; | ||
this.#options = { ...options }; | ||
} | ||
@@ -12,3 +12,3 @@ async next(replicaCount) { | ||
get onTransaction() { | ||
return this.#options.onTransaction; | ||
return this.#options?.onTransaction; | ||
} | ||
@@ -15,0 +15,0 @@ }; |
@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js'; | ||
#private; | ||
constructor(options: RoundRobinReplicaStrategyOptions); | ||
constructor(options?: RoundRobinReplicaStrategyOptions); | ||
next(replicaCount: number): Promise<number>; | ||
@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined; |
@@ -13,3 +13,3 @@ // src/strategy/round-robin.ts | ||
get onTransaction() { | ||
return this.#options.onTransaction; | ||
return this.#options?.onTransaction; | ||
} | ||
@@ -16,0 +16,0 @@ }; |
{ | ||
"name": "kysely-replication", | ||
"version": "0.0.2", | ||
"description": "", | ||
"version": "0.1.0", | ||
"description": "Replication-aware Kysely query execution", | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/kysely-org/kysely-replication.git" | ||
}, | ||
"type": "module", | ||
@@ -6,0 +10,0 @@ "main": "./dist/index.cjs", |
# kysely-replication | ||
Production systems often use multiple database replicas to distribute read load | ||
and increase availability. `kysely-replication` is a set of ideas and utilities | ||
for building a replication-aware Kysely instance. | ||
## Installation | ||
```sh | ||
npm install kysely kysely-replication | ||
``` | ||
## Usage | ||
This example demonstrates how to create a Kysely instance that uses a primary dialect | ||
for writes and 2 replica dialects for reads. The `RoundRobinReplicaStrategy` | ||
is used to distribute read queries evenly between the replicas. Alternatively, | ||
the `RandomReplicaStrategy` can be imported from `kysely-replication/strategy/random` | ||
and used to randomly select a replica for each query. | ||
The strategy is (optionally) configured to throw an error if a transaction is started | ||
on a replica connection - this can occur when using `db.connection()` and the first | ||
query is a read query. The error is thrown to prevent the transaction from being | ||
started on a replica connection, which could lead to inconsistencies in the database | ||
state. | ||
```ts | ||
@@ -32,3 +56,47 @@ import { Kysely, PostgresDialect } from 'kysely' | ||
await db.destroy() | ||
await db.destroy() // destroys all sub-dialects | ||
``` | ||
## Executes on Primary | ||
### DDL Queries | ||
DDL (Data Definition Language) queries execute on the primary dialect, as they | ||
change the structure of the database. | ||
Basically, anything on `db.schema` is included. | ||
### Raw Queries | ||
We don't currently parse raw queries to determine if they are read or write queries. | ||
As a measure of caution, all raw queries execute on the primary dialect. | ||
So anything that starts with `sql` template tag, as main query, or as subquery | ||
in a `with` clause. | ||
### Write Queries | ||
Write queries execute on the primary dialect, as they change the state of the database. | ||
Basically, `db.insertInto`, `db.updateTable`, `db.deleteFrom`, `db.mergeInto`, `db.replaceInto` | ||
as the main query or as a subquery in a `with` clause. | ||
### Transactions | ||
Transactions more often than not contain write queries. They also more often than | ||
not obtain locks on the database to ensure consistency. For these reasons, transactions | ||
execute on the primary dialect. | ||
As mentioned above, the `onTransaction` option in replica strategies can be used | ||
to throw an error if a transaction is started on a replica connection - a rare | ||
case that can occur when using `db.connection()` and the first query is a read | ||
query. | ||
## Executes on Replicas | ||
### Read Queries | ||
Read queries execute on replica dialects, as you'd expect. | ||
Basically, `db.selectFrom` and `db.selectNoFrom` queries that do not contain write | ||
queries in a `with` clause. |
@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js'; | ||
#private; | ||
constructor(options: RandomReplicaStrategyOptions); | ||
constructor(options?: RandomReplicaStrategyOptions); | ||
next(replicaCount: number): Promise<number>; | ||
@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined; |
@@ -5,3 +5,3 @@ // src/strategy/random.ts | ||
constructor(options) { | ||
this.#options = options; | ||
this.#options = { ...options }; | ||
} | ||
@@ -12,3 +12,3 @@ async next(replicaCount) { | ||
get onTransaction() { | ||
return this.#options.onTransaction; | ||
return this.#options?.onTransaction; | ||
} | ||
@@ -15,0 +15,0 @@ }; |
@@ -9,3 +9,3 @@ import { R as ReplicaStrategy } from '../config-5S6ngVvi.js'; | ||
#private; | ||
constructor(options: RoundRobinReplicaStrategyOptions); | ||
constructor(options?: RoundRobinReplicaStrategyOptions); | ||
next(replicaCount: number): Promise<number>; | ||
@@ -12,0 +12,0 @@ get onTransaction(): 'error' | 'warn' | 'allow' | undefined; |
@@ -13,3 +13,3 @@ // src/strategy/round-robin.ts | ||
get onTransaction() { | ||
return this.#options.onTransaction; | ||
return this.#options?.onTransaction; | ||
} | ||
@@ -16,0 +16,0 @@ }; |
@@ -14,4 +14,5 @@ import { | ||
} from 'kysely' | ||
import { afterEach, beforeAll, describe, expect, it } from 'vitest' | ||
import { afterEach, beforeAll, describe, expect, it, vi } from 'vitest' | ||
import { KyselyReplicationDialect } from '../src/index.js' | ||
import { RandomReplicaStrategy } from '../src/strategy/random.js' | ||
import { RoundRobinReplicaStrategy } from '../src/strategy/round-robin.js' | ||
@@ -27,142 +28,206 @@ | ||
describe('kysely-replication: round-robin', () => { | ||
let db: Kysely<Database> | ||
const executions: string[] = [] | ||
const randomSpy = vi.spyOn(global.Math, 'random') | ||
const warnSpy = vi.spyOn(global.console, 'warn') | ||
beforeAll(() => { | ||
db = new Kysely({ | ||
dialect: new KyselyReplicationDialect({ | ||
primaryDialect: getDummyDialect('primary', executions), | ||
replicaDialects: new Array(3) | ||
.fill(null) | ||
.map((_, i) => | ||
getDummyDialect(`replica-${i}`, executions), | ||
) as unknown as readonly [Dialect, ...Dialect[]], | ||
replicaStrategy: new RoundRobinReplicaStrategy({}), | ||
}), | ||
describe.each( | ||
(['error', 'warn'] as const).flatMap((onTransaction) => [ | ||
{ | ||
onTransaction, | ||
replicaAssertion: (executions: string[]) => | ||
expect(executions).toEqual([ | ||
'replica-0', | ||
'replica-1', | ||
'replica-2', | ||
'replica-0', | ||
'replica-1', | ||
]), | ||
Strategy: RoundRobinReplicaStrategy, | ||
}, | ||
{ | ||
onTransaction, | ||
replicaAssertion: (executions: string[]) => | ||
expect(randomSpy).toHaveBeenCalledTimes(executions.length), | ||
Strategy: RandomReplicaStrategy, | ||
}, | ||
]), | ||
)( | ||
'kysely-replication: $Strategy.name (onTransaction: $onTransaction)', | ||
({ onTransaction, replicaAssertion, Strategy }) => { | ||
let db: Kysely<Database> | ||
const executions: string[] = [] | ||
let primaryDialect: Dialect | ||
let replicaDialects: readonly [Dialect, ...Dialect[]] | ||
beforeAll(() => { | ||
primaryDialect = getDummyDialect('primary', executions) | ||
replicaDialects = new Array(3) | ||
.fill(null) | ||
.map((_, i) => | ||
getDummyDialect(`replica-${i}`, executions), | ||
) as unknown as readonly [Dialect, ...Dialect[]] | ||
db = new Kysely({ | ||
dialect: new KyselyReplicationDialect({ | ||
primaryDialect, | ||
replicaDialects, | ||
replicaStrategy: new Strategy({ onTransaction }), | ||
}), | ||
}) | ||
}) | ||
}) | ||
afterEach(() => { | ||
executions.length = 0 // clear executions | ||
}) | ||
afterEach(() => { | ||
executions.length = 0 // clear executions | ||
}) | ||
it('should use primary dialect for DDL queries', async () => { | ||
const queries = { | ||
alterTable: db.schema | ||
.alterTable('users') | ||
.addColumn('nickname', 'varchar'), | ||
createIndex: db.schema | ||
.createIndex('users_index') | ||
.on('users') | ||
.column('email'), | ||
createSchema: db.schema.createSchema('moshe'), | ||
createTable: db.schema | ||
.createTable('cakes') | ||
.addColumn('id', 'serial', (cb) => cb.primaryKey()), | ||
createType: db.schema | ||
.createType('cake_type') | ||
.asEnum(['chocolate', 'vanilla']), | ||
createView: db.schema | ||
.createView('users_view') | ||
.as(db.selectFrom('users').selectAll()), | ||
dropIndex: db.schema.dropIndex('users_index'), | ||
dropSchema: db.schema.dropSchema('moshe'), | ||
dropTable: db.schema.dropTable('cakes'), | ||
dropType: db.schema.dropType('cake_type'), | ||
dropView: db.schema.dropView('users_view'), | ||
} satisfies { | ||
[K in keyof Omit<SchemaModule, `with${string}`>]: { | ||
execute(): Promise<unknown> | ||
it('should use primary dialect for DDL queries', async () => { | ||
const queries = { | ||
alterTable: db.schema | ||
.alterTable('users') | ||
.addColumn('nickname', 'varchar'), | ||
createIndex: db.schema | ||
.createIndex('users_index') | ||
.on('users') | ||
.column('email'), | ||
createSchema: db.schema.createSchema('moshe'), | ||
createTable: db.schema | ||
.createTable('cakes') | ||
.addColumn('id', 'serial', (cb) => cb.primaryKey()), | ||
createType: db.schema | ||
.createType('cake_type') | ||
.asEnum(['chocolate', 'vanilla']), | ||
createView: db.schema | ||
.createView('users_view') | ||
.as(db.selectFrom('users').selectAll()), | ||
dropIndex: db.schema.dropIndex('users_index'), | ||
dropSchema: db.schema.dropSchema('moshe'), | ||
dropTable: db.schema.dropTable('cakes'), | ||
dropType: db.schema.dropType('cake_type'), | ||
dropView: db.schema.dropView('users_view'), | ||
} satisfies { | ||
[K in keyof Omit<SchemaModule, `with${string}`>]: { | ||
execute(): Promise<unknown> | ||
} | ||
} | ||
} | ||
await Promise.all(Object.values(queries).map((query) => query.execute())) | ||
await Promise.all(Object.values(queries).map((query) => query.execute())) | ||
expect(executions).toEqual(Object.values(queries).map(() => 'primary')) | ||
}) | ||
expect(executions).toEqual(Object.values(queries).map(() => 'primary')) | ||
}) | ||
it('should use primary dialect for raw queries', async () => { | ||
await sql`select 1`.execute(db) | ||
it('should use primary dialect for raw queries', async () => { | ||
await sql`select 1`.execute(db) | ||
expect(executions).toEqual(['primary']) | ||
}) | ||
expect(executions).toEqual(['primary']) | ||
}) | ||
it('should use primary dialect for DML queries that mutate data', async () => { | ||
const queries = { | ||
deleteFrom: db.deleteFrom('users').where('id', '=', 1), | ||
insertInto: db | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }), | ||
mergeInto: db | ||
.mergeInto('users as u1') | ||
.using('users as u2', 'u1.id', 'u2.id') | ||
.whenMatched() | ||
.thenDoNothing(), | ||
replaceInto: db | ||
.replaceInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }), | ||
updateTable: db | ||
.updateTable('users') | ||
.set('is_verified', true) | ||
.where('id', '=', 1), | ||
with: db | ||
.with('insert', (qb) => | ||
qb | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }) | ||
.returning('id'), | ||
) | ||
.selectFrom('users') | ||
.innerJoin('insert', 'insert.id', 'users.id') | ||
.selectAll(), | ||
} satisfies { | ||
[K in | ||
| keyof Omit< | ||
QueryCreator<Database>, | ||
`select${string}` | `with${string}` | ||
> | ||
| 'with']: { | ||
execute(): Promise<unknown> | ||
it('should use primary dialect for DML queries that mutate data', async () => { | ||
const queries = { | ||
deleteFrom: db.deleteFrom('users').where('id', '=', 1), | ||
insertInto: db | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }), | ||
mergeInto: db | ||
.mergeInto('users as u1') | ||
.using('users as u2', 'u1.id', 'u2.id') | ||
.whenMatched() | ||
.thenDoNothing(), | ||
replaceInto: db | ||
.replaceInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }), | ||
updateTable: db | ||
.updateTable('users') | ||
.set('is_verified', true) | ||
.where('id', '=', 1), | ||
with: db | ||
.with('insert', (qb) => | ||
qb | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }) | ||
.returning('id'), | ||
) | ||
.selectFrom('users') | ||
.innerJoin('insert', 'insert.id', 'users.id') | ||
.selectAll(), | ||
} satisfies { | ||
[K in | ||
| keyof Omit< | ||
QueryCreator<Database>, | ||
`select${string}` | `with${string}` | ||
> | ||
| 'with']: { | ||
execute(): Promise<unknown> | ||
} | ||
} | ||
} | ||
await Promise.all(Object.values(queries).map((query) => query.execute())) | ||
await Promise.all(Object.values(queries).map((query) => query.execute())) | ||
expect(executions).toEqual(Object.values(queries).map(() => 'primary')) | ||
}) | ||
expect(executions).toEqual(Object.values(queries).map(() => 'primary')) | ||
}) | ||
it('should use replica dialects for DML queries that do not mutate data', async () => { | ||
const queries = { | ||
selectFrom: db.selectFrom('users').selectAll(), | ||
selectNoFrom: db | ||
.selectNoFrom((eb) => eb.selectFrom('users').selectAll().as('u')) | ||
.selectAll(), | ||
with: db | ||
.with('u1', (qb) => qb.selectFrom('users').selectAll()) | ||
.selectFrom('u1') | ||
.selectAll(), | ||
} satisfies { | ||
[K in keyof Pick< | ||
QueryCreator<Database>, | ||
'selectFrom' | 'selectNoFrom' | 'with' | ||
>]: { execute(): Promise<unknown> } | ||
} | ||
it('should use primary dialect for transactions', async () => { | ||
await db.transaction().execute(async (trx) => { | ||
await trx.selectFrom('users').selectAll().execute() | ||
}) | ||
await Promise.all([ | ||
Object.values(queries).map((query) => query.execute()), | ||
db.selectFrom('users').selectAll().where('id', '=', 1).execute(), | ||
db.selectFrom('users').selectAll().where('id', '=', 2).execute(), | ||
]) | ||
expect(executions).toEqual(['primary']) | ||
}) | ||
expect(executions).toEqual([ | ||
'replica-0', | ||
'replica-1', | ||
'replica-2', | ||
'replica-0', | ||
'replica-1', | ||
]) | ||
}) | ||
}) | ||
it('should use replica dialects for DML queries that do not mutate data', async () => { | ||
const queries = { | ||
selectFrom: db.selectFrom('users').selectAll(), | ||
selectNoFrom: db | ||
.selectNoFrom((eb) => eb.selectFrom('users').selectAll().as('u')) | ||
.selectAll(), | ||
with: db | ||
.with('u1', (qb) => qb.selectFrom('users').selectAll()) | ||
.selectFrom('u1') | ||
.selectAll(), | ||
} satisfies { | ||
[K in keyof Pick< | ||
QueryCreator<Database>, | ||
'selectFrom' | 'selectNoFrom' | 'with' | ||
>]: { execute(): Promise<unknown> } | ||
} | ||
// with extra queries to test round-robin | ||
await Promise.all([ | ||
Object.values(queries).map((query) => query.execute()), | ||
db.selectFrom('users').selectAll().execute(), | ||
db.selectFrom('users').selectAll().execute(), | ||
]) | ||
replicaAssertion(executions) | ||
}) | ||
const message = | ||
'KyselyReplication: transaction started with replica connection!' | ||
if (onTransaction === 'error') { | ||
it('should throw an error when a transaction is started with a replica connection', async () => { | ||
await expect( | ||
db.connection().execute(async (con) => { | ||
await con.selectFrom('users').selectAll().execute() | ||
await con.transaction().execute(async (trx) => { | ||
await trx.selectFrom('users').selectAll().execute() | ||
}) | ||
}), | ||
).rejects.toThrow(message) | ||
}) | ||
} else { | ||
it('should warn when a transaction is started with a replica connection', async () => { | ||
await db.connection().execute(async (con) => { | ||
await con.selectFrom('users').selectAll().execute() | ||
await con.transaction().execute(async (trx) => { | ||
await trx.selectFrom('users').selectAll().execute() | ||
}) | ||
}) | ||
expect(warnSpy).toHaveBeenCalledTimes(1) | ||
expect(warnSpy).toHaveBeenCalledWith(message) | ||
}) | ||
} | ||
}, | ||
) | ||
function getDummyDialect(name: string, executions: string[]): Dialect { | ||
@@ -169,0 +234,0 @@ return { |
@@ -6,2 +6,3 @@ import { defineConfig } from 'vitest/config' | ||
allowOnly: false, | ||
clearMocks: true, | ||
globalSetup: ['./vitest.setup.ts'], | ||
@@ -8,0 +9,0 @@ typecheck: { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
78200
1012
102
0