@libsql/client
Advanced tools
Comparing version 0.1.5 to 0.1.6
@@ -65,3 +65,3 @@ "use strict"; | ||
} | ||
throw e; | ||
throw mapHranaError(e); | ||
} | ||
@@ -71,6 +71,11 @@ return new HranaClient(client, url, config.authToken); | ||
exports._createClient = _createClient; | ||
const maxConnAgeMillis = 60 * 1000; | ||
class HranaClient { | ||
#url; | ||
#authToken; | ||
// State of the current connection. The `hrana.Client` inside may be closed at any moment due to an | ||
// asynchronous error. | ||
#connState; | ||
// If defined, this is a connection that will be used in the future, once it is ready. | ||
#futureConnState; | ||
closed; | ||
@@ -81,15 +86,12 @@ /** @private */ | ||
this.#authToken = authToken; | ||
this.#connState = { | ||
client, | ||
useSqlCache: undefined, | ||
sqlCache: new lru_js_1.Lru(), | ||
}; | ||
this.#connState = this.#openConn(client); | ||
this.#futureConnState = undefined; | ||
this.closed = false; | ||
} | ||
async execute(stmt) { | ||
const state = await this.#openStream(); | ||
const streamState = await this.#openStream(); | ||
try { | ||
const hranaStmt = applySqlCache(state, stmtToHrana(stmt)); | ||
const hranaRows = await state.stream.query(hranaStmt); | ||
evictSqlCache(state); | ||
const hranaStmt = applySqlCache(streamState.conn, stmtToHrana(stmt)); | ||
const hranaRows = await streamState.stream.query(hranaStmt); | ||
evictSqlCache(streamState.conn); | ||
return resultSetFromHrana(hranaRows); | ||
@@ -101,9 +103,9 @@ } | ||
finally { | ||
state.stream.close(); | ||
this._closeStream(streamState); | ||
} | ||
} | ||
async batch(stmts) { | ||
const state = await this.#openStream(); | ||
const streamState = await this.#openStream(); | ||
try { | ||
const batch = state.stream.batch(); | ||
const batch = streamState.stream.batch(); | ||
const beginStep = batch.step(); | ||
@@ -113,3 +115,3 @@ const beginPromise = beginStep.run("BEGIN").catch(_ => undefined); | ||
const stmtPromises = stmts.map((stmt) => { | ||
const hranaStmt = applySqlCache(state, stmtToHrana(stmt)); | ||
const hranaStmt = applySqlCache(streamState.conn, stmtToHrana(stmt)); | ||
const stmtStep = batch.step() | ||
@@ -128,3 +130,3 @@ .condition(hrana.BatchCond.ok(lastStep)); | ||
await batch.execute(); | ||
evictSqlCache(state); | ||
evictSqlCache(streamState.conn); | ||
const resultSets = []; | ||
@@ -145,13 +147,13 @@ for (const stmtPromise of stmtPromises) { | ||
finally { | ||
state.stream.close(); | ||
this._closeStream(streamState); | ||
} | ||
} | ||
async transaction() { | ||
const state = await this.#openStream(); | ||
const streamState = await this.#openStream(); | ||
try { | ||
await state.stream.run("BEGIN"); | ||
return new HranaTransaction(state); | ||
await streamState.stream.run("BEGIN"); | ||
return new HranaTransaction(this, streamState); | ||
} | ||
catch (e) { | ||
state.stream.close(); | ||
this._closeStream(streamState); | ||
throw mapHranaError(e); | ||
@@ -164,9 +166,47 @@ } | ||
} | ||
const now = new Date(); | ||
const ageMillis = now.valueOf() - this.#connState.openTime.valueOf(); | ||
if (ageMillis > maxConnAgeMillis && this.#futureConnState === undefined) { | ||
// The existing connection is too old, let's open a new one. | ||
const futureConnState = this.#openConn(); | ||
this.#futureConnState = futureConnState; | ||
// However, if we used `futureConnState` immediately, we would introduce additional latency, | ||
// because we would have to wait for the WebSocket handshake to complete, even though we may a | ||
// have perfectly good existing connection in `this.#connState`! | ||
// | ||
// So we wait until the `hrana.Client.getVersion()` operation completes (which happens when the | ||
// WebSocket hanshake completes), and only then we replace `this.#connState` with | ||
// `futureConnState`, which is stored in `this.#futureConnState` in the meantime. | ||
futureConnState.client.getVersion().then((_version) => { | ||
if (this.#connState !== futureConnState) { | ||
// We need to close `this.#connState` before we replace it. However, it is possible | ||
// that `this.#connState` has already been replaced: see the code below. | ||
if (this.#connState.streamStates.size === 0) { | ||
this.#connState.client.close(); | ||
} | ||
else { | ||
// If there are existing streams on the connection, we must not close it, because | ||
// these streams would be broken. The last stream to be closed will also close the | ||
// connection in `_closeStream()`. | ||
} | ||
} | ||
this.#connState = futureConnState; | ||
this.#futureConnState = undefined; | ||
}, (_e) => { | ||
// If the new connection could not be established, let's just ignore the error and keep | ||
// using the existing connection. | ||
this.#futureConnState = undefined; | ||
}); | ||
} | ||
if (this.#connState.client.closed) { | ||
// An error happened on this connection and it has been closed. Let's try to seamlessly reconnect. | ||
try { | ||
this.#connState = { | ||
client: hrana.open(this.#url, this.#authToken), | ||
useSqlCache: undefined, | ||
sqlCache: new lru_js_1.Lru(), | ||
}; | ||
if (this.#futureConnState !== undefined) { | ||
// We are already in the process of opening a new connection, so let's just use it | ||
// immediately. | ||
this.#connState = this.#futureConnState; | ||
} | ||
else { | ||
this.#connState = this.#openConn(); | ||
} | ||
} | ||
@@ -179,2 +219,5 @@ catch (e) { | ||
try { | ||
// Now we wait for the WebSocket handshake to complete (if it hasn't completed yet). Note that | ||
// this does not increase latency, because any messages that we would send on the WebSocket before | ||
// the handshake would be queued until the handshake is completed anyway. | ||
if (connState.useSqlCache === undefined) { | ||
@@ -184,3 +227,5 @@ connState.useSqlCache = await connState.client.getVersion() >= 2; | ||
const stream = connState.client.openStream(); | ||
return { stream, ...connState }; | ||
const streamState = { conn: connState, stream }; | ||
connState.streamStates.add(streamState); | ||
return streamState; | ||
} | ||
@@ -191,2 +236,26 @@ catch (e) { | ||
} | ||
#openConn(client) { | ||
try { | ||
return { | ||
client: client ?? hrana.open(this.#url, this.#authToken), | ||
useSqlCache: undefined, | ||
sqlCache: new lru_js_1.Lru(), | ||
openTime: new Date(), | ||
streamStates: new Set(), | ||
}; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
} | ||
_closeStream(streamState) { | ||
streamState.stream.close(); | ||
const connState = streamState.conn; | ||
connState.streamStates.delete(streamState); | ||
if (connState.streamStates.size === 0 && connState !== this.#connState) { | ||
// We are not using this connection anymore and this is the last stream that was using it, so we | ||
// must close it now. | ||
connState.client.close(); | ||
} | ||
} | ||
close() { | ||
@@ -199,15 +268,17 @@ this.#connState.client.close(); | ||
class HranaTransaction { | ||
#state; | ||
#client; | ||
#streamState; | ||
/** @private */ | ||
constructor(state) { | ||
this.#state = state; | ||
constructor(client, state) { | ||
this.#client = client; | ||
this.#streamState = state; | ||
} | ||
async execute(stmt) { | ||
if (this.#state.stream.closed) { | ||
if (this.#streamState.stream.closed) { | ||
throw new api_js_1.LibsqlError("Cannot execute a statement because the transaction is closed", "TRANSACTION_CLOSED"); | ||
} | ||
try { | ||
const hranaStmt = applySqlCache(this.#state, stmtToHrana(stmt)); | ||
const hranaRows = await this.#state.stream.query(hranaStmt); | ||
evictSqlCache(this.#state); | ||
const hranaStmt = applySqlCache(this.#streamState.conn, stmtToHrana(stmt)); | ||
const hranaRows = await this.#streamState.stream.query(hranaStmt); | ||
evictSqlCache(this.#streamState.conn); | ||
return resultSetFromHrana(hranaRows); | ||
@@ -220,24 +291,24 @@ } | ||
async rollback() { | ||
if (this.#state.stream.closed) { | ||
if (this.#streamState.stream.closed) { | ||
return; | ||
} | ||
const promise = this.#state.stream.run("ROLLBACK") | ||
const promise = this.#streamState.stream.run("ROLLBACK") | ||
.catch(e => { throw mapHranaError(e); }); | ||
this.#state.stream.close(); | ||
this.#streamState.stream.close(); | ||
await promise; | ||
} | ||
async commit() { | ||
if (this.#state.stream.closed) { | ||
if (this.#streamState.stream.closed) { | ||
throw new api_js_1.LibsqlError("Cannot commit the transaction because it is already closed", "TRANSACTION_CLOSED"); | ||
} | ||
const promise = this.#state.stream.run("COMMIT") | ||
const promise = this.#streamState.stream.run("COMMIT") | ||
.catch(e => { throw mapHranaError(e); }); | ||
this.#state.stream.close(); | ||
this.#streamState.stream.close(); | ||
await promise; | ||
} | ||
close() { | ||
this.#state.stream.close(); | ||
this.#client._closeStream(this.#streamState); | ||
} | ||
get closed() { | ||
return this.#state.stream.closed; | ||
return this.#streamState.stream.closed; | ||
} | ||
@@ -247,9 +318,9 @@ } | ||
const sqlCacheCapacity = 100; | ||
function applySqlCache(state, hranaStmt) { | ||
if (state.useSqlCache && typeof hranaStmt.sql === "string") { | ||
function applySqlCache(connState, hranaStmt) { | ||
if (connState.useSqlCache && typeof hranaStmt.sql === "string") { | ||
const sqlText = hranaStmt.sql; | ||
let sqlObj = state.sqlCache.get(sqlText); | ||
let sqlObj = connState.sqlCache.get(sqlText); | ||
if (sqlObj === undefined) { | ||
sqlObj = state.client.storeSql(sqlText); | ||
state.sqlCache.set(sqlText, sqlObj); | ||
sqlObj = connState.client.storeSql(sqlText); | ||
connState.sqlCache.set(sqlText, sqlObj); | ||
} | ||
@@ -262,5 +333,5 @@ if (sqlObj !== undefined) { | ||
} | ||
function evictSqlCache(state) { | ||
while (state.sqlCache.size > sqlCacheCapacity) { | ||
const sqlObj = state.sqlCache.deleteLru(); | ||
function evictSqlCache(connState) { | ||
while (connState.sqlCache.size > sqlCacheCapacity) { | ||
const sqlObj = connState.sqlCache.deleteLru(); | ||
sqlObj.close(); | ||
@@ -267,0 +338,0 @@ } |
@@ -14,4 +14,7 @@ /// <reference types="node" /> | ||
sqlCache: Lru<string, hrana.Sql>; | ||
openTime: Date; | ||
streamStates: Set<StreamState>; | ||
} | ||
interface StreamState extends ConnState { | ||
interface StreamState { | ||
conn: ConnState; | ||
stream: hrana.Stream; | ||
@@ -27,2 +30,3 @@ } | ||
transaction(): Promise<HranaTransaction>; | ||
_closeStream(streamState: StreamState): void; | ||
close(): void; | ||
@@ -33,3 +37,3 @@ } | ||
/** @private */ | ||
constructor(state: StreamState); | ||
constructor(client: HranaClient, state: StreamState); | ||
execute(stmt: InStatement): Promise<ResultSet>; | ||
@@ -36,0 +40,0 @@ rollback(): Promise<void>; |
@@ -35,10 +35,15 @@ import * as hrana from "@libsql/hrana-client"; | ||
} | ||
throw e; | ||
throw mapHranaError(e); | ||
} | ||
return new HranaClient(client, url, config.authToken); | ||
} | ||
const maxConnAgeMillis = 60 * 1000; | ||
export class HranaClient { | ||
#url; | ||
#authToken; | ||
// State of the current connection. The `hrana.Client` inside may be closed at any moment due to an | ||
// asynchronous error. | ||
#connState; | ||
// If defined, this is a connection that will be used in the future, once it is ready. | ||
#futureConnState; | ||
closed; | ||
@@ -49,15 +54,12 @@ /** @private */ | ||
this.#authToken = authToken; | ||
this.#connState = { | ||
client, | ||
useSqlCache: undefined, | ||
sqlCache: new Lru(), | ||
}; | ||
this.#connState = this.#openConn(client); | ||
this.#futureConnState = undefined; | ||
this.closed = false; | ||
} | ||
async execute(stmt) { | ||
const state = await this.#openStream(); | ||
const streamState = await this.#openStream(); | ||
try { | ||
const hranaStmt = applySqlCache(state, stmtToHrana(stmt)); | ||
const hranaRows = await state.stream.query(hranaStmt); | ||
evictSqlCache(state); | ||
const hranaStmt = applySqlCache(streamState.conn, stmtToHrana(stmt)); | ||
const hranaRows = await streamState.stream.query(hranaStmt); | ||
evictSqlCache(streamState.conn); | ||
return resultSetFromHrana(hranaRows); | ||
@@ -69,9 +71,9 @@ } | ||
finally { | ||
state.stream.close(); | ||
this._closeStream(streamState); | ||
} | ||
} | ||
async batch(stmts) { | ||
const state = await this.#openStream(); | ||
const streamState = await this.#openStream(); | ||
try { | ||
const batch = state.stream.batch(); | ||
const batch = streamState.stream.batch(); | ||
const beginStep = batch.step(); | ||
@@ -81,3 +83,3 @@ const beginPromise = beginStep.run("BEGIN").catch(_ => undefined); | ||
const stmtPromises = stmts.map((stmt) => { | ||
const hranaStmt = applySqlCache(state, stmtToHrana(stmt)); | ||
const hranaStmt = applySqlCache(streamState.conn, stmtToHrana(stmt)); | ||
const stmtStep = batch.step() | ||
@@ -96,3 +98,3 @@ .condition(hrana.BatchCond.ok(lastStep)); | ||
await batch.execute(); | ||
evictSqlCache(state); | ||
evictSqlCache(streamState.conn); | ||
const resultSets = []; | ||
@@ -113,13 +115,13 @@ for (const stmtPromise of stmtPromises) { | ||
finally { | ||
state.stream.close(); | ||
this._closeStream(streamState); | ||
} | ||
} | ||
async transaction() { | ||
const state = await this.#openStream(); | ||
const streamState = await this.#openStream(); | ||
try { | ||
await state.stream.run("BEGIN"); | ||
return new HranaTransaction(state); | ||
await streamState.stream.run("BEGIN"); | ||
return new HranaTransaction(this, streamState); | ||
} | ||
catch (e) { | ||
state.stream.close(); | ||
this._closeStream(streamState); | ||
throw mapHranaError(e); | ||
@@ -132,9 +134,47 @@ } | ||
} | ||
const now = new Date(); | ||
const ageMillis = now.valueOf() - this.#connState.openTime.valueOf(); | ||
if (ageMillis > maxConnAgeMillis && this.#futureConnState === undefined) { | ||
// The existing connection is too old, let's open a new one. | ||
const futureConnState = this.#openConn(); | ||
this.#futureConnState = futureConnState; | ||
// However, if we used `futureConnState` immediately, we would introduce additional latency, | ||
// because we would have to wait for the WebSocket handshake to complete, even though we may a | ||
// have perfectly good existing connection in `this.#connState`! | ||
// | ||
// So we wait until the `hrana.Client.getVersion()` operation completes (which happens when the | ||
// WebSocket hanshake completes), and only then we replace `this.#connState` with | ||
// `futureConnState`, which is stored in `this.#futureConnState` in the meantime. | ||
futureConnState.client.getVersion().then((_version) => { | ||
if (this.#connState !== futureConnState) { | ||
// We need to close `this.#connState` before we replace it. However, it is possible | ||
// that `this.#connState` has already been replaced: see the code below. | ||
if (this.#connState.streamStates.size === 0) { | ||
this.#connState.client.close(); | ||
} | ||
else { | ||
// If there are existing streams on the connection, we must not close it, because | ||
// these streams would be broken. The last stream to be closed will also close the | ||
// connection in `_closeStream()`. | ||
} | ||
} | ||
this.#connState = futureConnState; | ||
this.#futureConnState = undefined; | ||
}, (_e) => { | ||
// If the new connection could not be established, let's just ignore the error and keep | ||
// using the existing connection. | ||
this.#futureConnState = undefined; | ||
}); | ||
} | ||
if (this.#connState.client.closed) { | ||
// An error happened on this connection and it has been closed. Let's try to seamlessly reconnect. | ||
try { | ||
this.#connState = { | ||
client: hrana.open(this.#url, this.#authToken), | ||
useSqlCache: undefined, | ||
sqlCache: new Lru(), | ||
}; | ||
if (this.#futureConnState !== undefined) { | ||
// We are already in the process of opening a new connection, so let's just use it | ||
// immediately. | ||
this.#connState = this.#futureConnState; | ||
} | ||
else { | ||
this.#connState = this.#openConn(); | ||
} | ||
} | ||
@@ -147,2 +187,5 @@ catch (e) { | ||
try { | ||
// Now we wait for the WebSocket handshake to complete (if it hasn't completed yet). Note that | ||
// this does not increase latency, because any messages that we would send on the WebSocket before | ||
// the handshake would be queued until the handshake is completed anyway. | ||
if (connState.useSqlCache === undefined) { | ||
@@ -152,3 +195,5 @@ connState.useSqlCache = await connState.client.getVersion() >= 2; | ||
const stream = connState.client.openStream(); | ||
return { stream, ...connState }; | ||
const streamState = { conn: connState, stream }; | ||
connState.streamStates.add(streamState); | ||
return streamState; | ||
} | ||
@@ -159,2 +204,26 @@ catch (e) { | ||
} | ||
#openConn(client) { | ||
try { | ||
return { | ||
client: client ?? hrana.open(this.#url, this.#authToken), | ||
useSqlCache: undefined, | ||
sqlCache: new Lru(), | ||
openTime: new Date(), | ||
streamStates: new Set(), | ||
}; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
} | ||
_closeStream(streamState) { | ||
streamState.stream.close(); | ||
const connState = streamState.conn; | ||
connState.streamStates.delete(streamState); | ||
if (connState.streamStates.size === 0 && connState !== this.#connState) { | ||
// We are not using this connection anymore and this is the last stream that was using it, so we | ||
// must close it now. | ||
connState.client.close(); | ||
} | ||
} | ||
close() { | ||
@@ -166,15 +235,17 @@ this.#connState.client.close(); | ||
export class HranaTransaction { | ||
#state; | ||
#client; | ||
#streamState; | ||
/** @private */ | ||
constructor(state) { | ||
this.#state = state; | ||
constructor(client, state) { | ||
this.#client = client; | ||
this.#streamState = state; | ||
} | ||
async execute(stmt) { | ||
if (this.#state.stream.closed) { | ||
if (this.#streamState.stream.closed) { | ||
throw new LibsqlError("Cannot execute a statement because the transaction is closed", "TRANSACTION_CLOSED"); | ||
} | ||
try { | ||
const hranaStmt = applySqlCache(this.#state, stmtToHrana(stmt)); | ||
const hranaRows = await this.#state.stream.query(hranaStmt); | ||
evictSqlCache(this.#state); | ||
const hranaStmt = applySqlCache(this.#streamState.conn, stmtToHrana(stmt)); | ||
const hranaRows = await this.#streamState.stream.query(hranaStmt); | ||
evictSqlCache(this.#streamState.conn); | ||
return resultSetFromHrana(hranaRows); | ||
@@ -187,34 +258,34 @@ } | ||
async rollback() { | ||
if (this.#state.stream.closed) { | ||
if (this.#streamState.stream.closed) { | ||
return; | ||
} | ||
const promise = this.#state.stream.run("ROLLBACK") | ||
const promise = this.#streamState.stream.run("ROLLBACK") | ||
.catch(e => { throw mapHranaError(e); }); | ||
this.#state.stream.close(); | ||
this.#streamState.stream.close(); | ||
await promise; | ||
} | ||
async commit() { | ||
if (this.#state.stream.closed) { | ||
if (this.#streamState.stream.closed) { | ||
throw new LibsqlError("Cannot commit the transaction because it is already closed", "TRANSACTION_CLOSED"); | ||
} | ||
const promise = this.#state.stream.run("COMMIT") | ||
const promise = this.#streamState.stream.run("COMMIT") | ||
.catch(e => { throw mapHranaError(e); }); | ||
this.#state.stream.close(); | ||
this.#streamState.stream.close(); | ||
await promise; | ||
} | ||
close() { | ||
this.#state.stream.close(); | ||
this.#client._closeStream(this.#streamState); | ||
} | ||
get closed() { | ||
return this.#state.stream.closed; | ||
return this.#streamState.stream.closed; | ||
} | ||
} | ||
const sqlCacheCapacity = 100; | ||
function applySqlCache(state, hranaStmt) { | ||
if (state.useSqlCache && typeof hranaStmt.sql === "string") { | ||
function applySqlCache(connState, hranaStmt) { | ||
if (connState.useSqlCache && typeof hranaStmt.sql === "string") { | ||
const sqlText = hranaStmt.sql; | ||
let sqlObj = state.sqlCache.get(sqlText); | ||
let sqlObj = connState.sqlCache.get(sqlText); | ||
if (sqlObj === undefined) { | ||
sqlObj = state.client.storeSql(sqlText); | ||
state.sqlCache.set(sqlText, sqlObj); | ||
sqlObj = connState.client.storeSql(sqlText); | ||
connState.sqlCache.set(sqlText, sqlObj); | ||
} | ||
@@ -227,5 +298,5 @@ if (sqlObj !== undefined) { | ||
} | ||
function evictSqlCache(state) { | ||
while (state.sqlCache.size > sqlCacheCapacity) { | ||
const sqlObj = state.sqlCache.deleteLru(); | ||
function evictSqlCache(connState) { | ||
while (connState.sqlCache.size > sqlCacheCapacity) { | ||
const sqlObj = connState.sqlCache.deleteLru(); | ||
sqlObj.close(); | ||
@@ -232,0 +303,0 @@ } |
{ | ||
"name": "@libsql/client", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"keywords": [ | ||
@@ -5,0 +5,0 @@ "libsql", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
91874
2219