kysely-replication
Advanced tools
Comparing version 0.1.0 to 0.2.0
@@ -15,3 +15,3 @@ import { K as KyselyReplicationDialectConfig, R as ReplicaStrategy } from './config-5S6ngVvi.js'; | ||
#private; | ||
constructor(primary: Driver, getReplica: () => Promise<Driver>, onReplicaTransaction: 'error' | 'warn' | 'allow'); | ||
constructor(primary: Driver, getReplica: (compiledQuery: CompiledQuery) => Promise<Driver>, onReplicaTransaction: 'error' | 'warn' | 'allow'); | ||
executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>>; | ||
@@ -18,0 +18,0 @@ streamQuery<R>(compiledQuery: CompiledQuery, chunkSize: number): AsyncIterableIterator<QueryResult<R>>; |
@@ -82,3 +82,3 @@ // src/connection.ts | ||
} | ||
this.#driver = compiledQueryOrContext === "transaction" || this.#isQueryForPrimary(compiledQueryOrContext) ? this.#primaryDriver : await this.#getReplicaDriver(); | ||
this.#driver = compiledQueryOrContext === "transaction" || this.#isQueryForPrimary(compiledQueryOrContext) ? this.#primaryDriver : await this.#getReplicaDriver(compiledQueryOrContext); | ||
this.#connection = await this.#driver.acquireConnection(); | ||
@@ -89,2 +89,5 @@ return { connection: this.#connection, driver: this.#driver }; | ||
const { query } = compiledQuery; | ||
if ("__dialect__" in query) { | ||
return query.__dialect__ === "primary"; | ||
} | ||
return this.#isOperationNodeForPrimary(query) || SelectQueryNode.is(query) && Boolean( | ||
@@ -114,6 +117,4 @@ query.with?.expressions.some( | ||
this.#primaryDriver, | ||
async () => { | ||
const replicaIndex = await this.#replicaStrategy.next( | ||
this.#replicaDrivers.length | ||
); | ||
async (compiledQuery) => { | ||
const replicaIndex = "__replicaIndex__" in compiledQuery.query ? compiledQuery.query.__replicaIndex__ : await this.#replicaStrategy.next(this.#replicaDrivers.length); | ||
const replicaDriver = this.#replicaDrivers[replicaIndex]; | ||
@@ -120,0 +121,0 @@ if (!replicaDriver) { |
{ | ||
"name": "kysely-replication", | ||
"version": "0.1.0", | ||
"version": "0.2.0", | ||
"description": "Replication-aware Kysely query execution", | ||
@@ -16,2 +16,25 @@ "repository": { | ||
}, | ||
"keywords": [ | ||
"kysely", | ||
"replication", | ||
"dialect", | ||
"sql", | ||
"database", | ||
"query", | ||
"builder" | ||
], | ||
"author": "Igal Klebanov <igalklebanov@gmail.com>", | ||
"license": "MIT", | ||
"devDependencies": { | ||
"@arethetypeswrong/cli": "^0.17.2", | ||
"@biomejs/biome": "^1.9.4", | ||
"@tsconfig/node22": "^22.0.0", | ||
"@types/node": "^22.10.2", | ||
"kysely": "^0.27.5", | ||
"tsup": "^8.3.5", | ||
"tsx": "^4.19.2", | ||
"typescript": "^5.7.2", | ||
"vitest": "^2.1.8" | ||
}, | ||
"sideEffects": false, | ||
"exports": { | ||
@@ -29,2 +52,12 @@ "./package.json": "./package.json", | ||
}, | ||
"./force": { | ||
"import": { | ||
"types": "./dist/force/index.d.ts", | ||
"default": "./dist/force/index.js" | ||
}, | ||
"require": { | ||
"types": "./dist/force/index.d.cts", | ||
"default": "./dist/force/index.cjs" | ||
} | ||
}, | ||
"./strategy/random": { | ||
@@ -51,25 +84,2 @@ "import": { | ||
}, | ||
"keywords": [ | ||
"kysely", | ||
"replication", | ||
"dialect", | ||
"sql", | ||
"database", | ||
"query", | ||
"builder" | ||
], | ||
"author": "Igal Klebanov <igalklebanov@gmail.com>", | ||
"license": "MIT", | ||
"devDependencies": { | ||
"@arethetypeswrong/cli": "^0.17.2", | ||
"@biomejs/biome": "^1.9.4", | ||
"@tsconfig/node22": "^22.0.0", | ||
"@types/node": "^22.10.2", | ||
"kysely": "^0.27.5", | ||
"tsup": "^8.3.5", | ||
"tsx": "^4.19.2", | ||
"typescript": "^5.7.2", | ||
"vitest": "^2.1.8" | ||
}, | ||
"sideEffects": false, | ||
"scripts": { | ||
@@ -76,0 +86,0 @@ "build": "tsup && tsx ./scripts/fix-build.mts", |
@@ -102,1 +102,45 @@ # kysely-replication | ||
queries in a `with` clause. | ||
## Force Execute on a Specific Dialect | ||
Sometimes you want to force a query to execute on a specific dialect. This can | ||
be done using the `withPrimary` and `withReplica` methods. | ||
First, import the `force` module where you define your db instance: | ||
```ts | ||
import 'kysely-replication/force' | ||
``` | ||
It will add the `withPrimary` and `withReplica` methods in various places in the | ||
Kysely API. | ||
### Force Execute on Primary | ||
```ts | ||
const users = await db | ||
.withPrimary() | ||
.selectFrom('users') | ||
.selectAll() | ||
.execute() // executes on primary instead of replica | ||
``` | ||
## Force Execute on Replica | ||
```ts | ||
const users = await db | ||
.withReplica() | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }) | ||
.execute() // executes on a replica instead of primary | ||
``` | ||
You can also provide a specific replica index to override the replica strategy: | ||
```ts | ||
const users = await db | ||
.withReplica(2) | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }) | ||
.execute() // executes on replica 2 instead of primary | ||
``` |
@@ -1,27 +0,13 @@ | ||
import { | ||
type Dialect, | ||
type Generated, | ||
Kysely, | ||
PostgresAdapter, | ||
PostgresIntrospector, | ||
PostgresQueryCompiler, | ||
type QueryCreator, | ||
type QueryResult, | ||
type SchemaModule, | ||
type SqlBool, | ||
sql, | ||
} from 'kysely' | ||
import { type Kysely, sql } from 'kysely' | ||
import { afterEach, beforeAll, describe, expect, it, vi } from 'vitest' | ||
import { KyselyReplicationDialect } from '../src/index.js' | ||
import { RandomReplicaStrategy } from '../src/strategy/random.js' | ||
import { RoundRobinReplicaStrategy } from '../src/strategy/round-robin.js' | ||
import { | ||
type Database, | ||
getDDLQueries, | ||
getKysely, | ||
getMutationQueries, | ||
getReadQueries, | ||
} from './test-setup.js' | ||
interface Database { | ||
users: { | ||
id: Generated<number> | ||
email: string | ||
is_verified: SqlBool | ||
} | ||
} | ||
const randomSpy = vi.spyOn(global.Math, 'random') | ||
@@ -56,19 +42,5 @@ const warnSpy = vi.spyOn(global.console, 'warn') | ||
const executions: string[] = [] | ||
let primaryDialect: Dialect | ||
let replicaDialects: readonly [Dialect, ...Dialect[]] | ||
beforeAll(() => { | ||
primaryDialect = getDummyDialect('primary', executions) | ||
replicaDialects = new Array(3) | ||
.fill(null) | ||
.map((_, i) => | ||
getDummyDialect(`replica-${i}`, executions), | ||
) as unknown as readonly [Dialect, ...Dialect[]] | ||
db = new Kysely({ | ||
dialect: new KyselyReplicationDialect({ | ||
primaryDialect, | ||
replicaDialects, | ||
replicaStrategy: new Strategy({ onTransaction }), | ||
}), | ||
}) | ||
db = getKysely(new Strategy({ onTransaction }), executions) | ||
}) | ||
@@ -81,30 +53,3 @@ | ||
it('should use primary dialect for DDL queries', async () => { | ||
const queries = { | ||
alterTable: db.schema | ||
.alterTable('users') | ||
.addColumn('nickname', 'varchar'), | ||
createIndex: db.schema | ||
.createIndex('users_index') | ||
.on('users') | ||
.column('email'), | ||
createSchema: db.schema.createSchema('moshe'), | ||
createTable: db.schema | ||
.createTable('cakes') | ||
.addColumn('id', 'serial', (cb) => cb.primaryKey()), | ||
createType: db.schema | ||
.createType('cake_type') | ||
.asEnum(['chocolate', 'vanilla']), | ||
createView: db.schema | ||
.createView('users_view') | ||
.as(db.selectFrom('users').selectAll()), | ||
dropIndex: db.schema.dropIndex('users_index'), | ||
dropSchema: db.schema.dropSchema('moshe'), | ||
dropTable: db.schema.dropTable('cakes'), | ||
dropType: db.schema.dropType('cake_type'), | ||
dropView: db.schema.dropView('users_view'), | ||
} satisfies { | ||
[K in keyof Omit<SchemaModule, `with${string}`>]: { | ||
execute(): Promise<unknown> | ||
} | ||
} | ||
const queries = getDDLQueries(() => db.schema) | ||
@@ -123,39 +68,3 @@ await Promise.all(Object.values(queries).map((query) => query.execute())) | ||
it('should use primary dialect for DML queries that mutate data', async () => { | ||
const queries = { | ||
deleteFrom: db.deleteFrom('users').where('id', '=', 1), | ||
insertInto: db | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }), | ||
mergeInto: db | ||
.mergeInto('users as u1') | ||
.using('users as u2', 'u1.id', 'u2.id') | ||
.whenMatched() | ||
.thenDoNothing(), | ||
replaceInto: db | ||
.replaceInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }), | ||
updateTable: db | ||
.updateTable('users') | ||
.set('is_verified', true) | ||
.where('id', '=', 1), | ||
with: db | ||
.with('insert', (qb) => | ||
qb | ||
.insertInto('users') | ||
.values({ email: 'info@example.com', is_verified: false }) | ||
.returning('id'), | ||
) | ||
.selectFrom('users') | ||
.innerJoin('insert', 'insert.id', 'users.id') | ||
.selectAll(), | ||
} satisfies { | ||
[K in | ||
| keyof Omit< | ||
QueryCreator<Database>, | ||
`select${string}` | `with${string}` | ||
> | ||
| 'with']: { | ||
execute(): Promise<unknown> | ||
} | ||
} | ||
const queries = getMutationQueries(() => db) | ||
@@ -176,17 +85,3 @@ await Promise.all(Object.values(queries).map((query) => query.execute())) | ||
it('should use replica dialects for DML queries that do not mutate data', async () => { | ||
const queries = { | ||
selectFrom: db.selectFrom('users').selectAll(), | ||
selectNoFrom: db | ||
.selectNoFrom((eb) => eb.selectFrom('users').selectAll().as('u')) | ||
.selectAll(), | ||
with: db | ||
.with('u1', (qb) => qb.selectFrom('users').selectAll()) | ||
.selectFrom('u1') | ||
.selectAll(), | ||
} satisfies { | ||
[K in keyof Pick< | ||
QueryCreator<Database>, | ||
'selectFrom' | 'selectNoFrom' | 'with' | ||
>]: { execute(): Promise<unknown> } | ||
} | ||
const queries = getReadQueries(() => db) | ||
@@ -234,27 +129,1 @@ // with extra queries to test round-robin | ||
) | ||
function getDummyDialect(name: string, executions: string[]): Dialect { | ||
return { | ||
createAdapter: () => new PostgresAdapter(), | ||
createDriver: () => ({ | ||
acquireConnection: () => { | ||
executions.push(name) | ||
return Promise.resolve({ | ||
executeQuery: () => | ||
Promise.resolve({ rows: [] } satisfies QueryResult<unknown>), | ||
streamQuery: () => { | ||
throw new Error('Not implemented') | ||
}, | ||
}) | ||
}, | ||
beginTransaction: () => Promise.resolve(), | ||
commitTransaction: () => Promise.resolve(), | ||
destroy: () => Promise.resolve(), | ||
init: () => Promise.resolve(), | ||
releaseConnection: () => Promise.resolve(), | ||
rollbackTransaction: () => Promise.resolve(), | ||
}), | ||
createIntrospector: (db) => new PostgresIntrospector(db), | ||
createQueryCompiler: () => new PostgresQueryCompiler(), | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
98236
55
1301
146