@libsql/client
Advanced tools
Comparing version 0.6.2 to 0.7.0
@@ -28,2 +28,5 @@ "use strict"; | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -39,2 +42,3 @@ exports.HttpTransaction = exports.HttpClient = exports._createClient = exports.createClient = void 0; | ||
const migrations_js_1 = require("./migrations.js"); | ||
const promise_limit_1 = __importDefault(require("promise-limit")); | ||
__exportStar(require("@libsql/core/api"), exports); | ||
@@ -61,3 +65,3 @@ function createClient(config) { | ||
const url = (0, uri_1.encodeBaseUrl)(config.scheme, config.authority, config.path); | ||
return new HttpClient(url, config.authToken, config.intMode, config.fetch); | ||
return new HttpClient(url, config.authToken, config.intMode, config.fetch, config.concurrency); | ||
} | ||
@@ -72,4 +76,5 @@ exports._createClient = _createClient; | ||
#isSchemaDatabase; | ||
#promiseLimitFunction; | ||
/** @private */ | ||
constructor(url, authToken, intMode, customFetch) { | ||
constructor(url, authToken, intMode, customFetch, concurrency) { | ||
this.#client = hrana.openHttp(url, authToken, customFetch); | ||
@@ -80,6 +85,7 @@ this.#client.intMode = intMode; | ||
this.#authToken = authToken; | ||
this.#promiseLimitFunction = (0, promise_limit_1.default)(concurrency); | ||
} | ||
async getIsSchemaDatabase() { | ||
getIsSchemaDatabase() { | ||
if (this.#isSchemaDatabase === undefined) { | ||
this.#isSchemaDatabase = await (0, migrations_js_1.getIsSchemaDatabase)({ | ||
this.#isSchemaDatabase = (0, migrations_js_1.getIsSchemaDatabase)({ | ||
authToken: this.#authToken, | ||
@@ -91,94 +97,105 @@ baseUrl: this.#url.origin, | ||
} | ||
async limit(fn) { | ||
return this.#promiseLimitFunction(fn); | ||
} | ||
async execute(stmt) { | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = (0, hrana_js_1.stmtToHrana)(stmt); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and | ||
// close the stream in a single HTTP request. | ||
let rowsPromise; | ||
const stream = this.#client.openStream(); | ||
return this.limit(async () => { | ||
try { | ||
rowsPromise = stream.query(hranaStmt); | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = (0, hrana_js_1.stmtToHrana)(stmt); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and | ||
// close the stream in a single HTTP request. | ||
let rowsPromise; | ||
const stream = this.#client.openStream(); | ||
try { | ||
rowsPromise = stream.query(hranaStmt); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
} | ||
const rowsResult = await rowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return (0, hrana_js_1.resultSetFromHrana)(rowsResult); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
const rowsResult = await rowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return (0, hrana_js_1.resultSetFromHrana)(rowsResult); | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
}); | ||
} | ||
async batch(stmts, mode = "deferred") { | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(hrana_js_1.stmtToHrana); | ||
const version = await this.#client.getVersion(); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and | ||
// close the stream in a single HTTP request. | ||
let resultsPromise; | ||
const stream = this.#client.openStream(); | ||
return this.limit(async () => { | ||
try { | ||
// It makes sense to use a SQL cache even for a single batch, because it may contain the same | ||
// statement repeated multiple times. | ||
const sqlCache = new sql_cache_js_1.SqlCache(stream, sqlCacheCapacity); | ||
sqlCache.apply(hranaStmts); | ||
// TODO: we do not use a cursor here, because it would cause three roundtrips: | ||
// 1. pipeline request to store SQL texts | ||
// 2. cursor request | ||
// 3. pipeline request to close the stream | ||
const batch = stream.batch(false); | ||
resultsPromise = (0, hrana_js_1.executeHranaBatch)(mode, version, batch, hranaStmts); | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(hrana_js_1.stmtToHrana); | ||
const version = await this.#client.getVersion(); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and | ||
// close the stream in a single HTTP request. | ||
let resultsPromise; | ||
const stream = this.#client.openStream(); | ||
try { | ||
// It makes sense to use a SQL cache even for a single batch, because it may contain the same | ||
// statement repeated multiple times. | ||
const sqlCache = new sql_cache_js_1.SqlCache(stream, sqlCacheCapacity); | ||
sqlCache.apply(hranaStmts); | ||
// TODO: we do not use a cursor here, because it would cause three roundtrips: | ||
// 1. pipeline request to store SQL texts | ||
// 2. cursor request | ||
// 3. pipeline request to close the stream | ||
const batch = stream.batch(false); | ||
resultsPromise = (0, hrana_js_1.executeHranaBatch)(mode, version, batch, hranaStmts); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
} | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return results; | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return results; | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
}); | ||
} | ||
async transaction(mode = "write") { | ||
try { | ||
const version = await this.#client.getVersion(); | ||
return new HttpTransaction(this.#client.openStream(), mode, version); | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
return this.limit(async () => { | ||
try { | ||
const version = await this.#client.getVersion(); | ||
return new HttpTransaction(this.#client.openStream(), mode, version); | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
}); | ||
} | ||
async executeMultiple(sql) { | ||
try { | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and | ||
// close the stream in a single HTTP request. | ||
let promise; | ||
const stream = this.#client.openStream(); | ||
return this.limit(async () => { | ||
try { | ||
promise = stream.sequence(sql); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and | ||
// close the stream in a single HTTP request. | ||
let promise; | ||
const stream = this.#client.openStream(); | ||
try { | ||
promise = stream.sequence(sql); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
} | ||
await promise; | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
await promise; | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
}); | ||
} | ||
@@ -185,0 +202,0 @@ sync() { |
@@ -30,2 +30,7 @@ "use strict"; | ||
const sqlText = hranaStmt.sql; | ||
// Stored SQL cannot exceed 5kb. | ||
// https://github.com/tursodatabase/libsql/blob/e9d637e051685f92b0da43849507b5ef4232fbeb/libsql-server/src/hrana/http/request.rs#L10 | ||
if (sqlText.length >= 5000) { | ||
continue; | ||
} | ||
let sqlObj = this.#sqls.get(sqlText); | ||
@@ -32,0 +37,0 @@ if (sqlObj === undefined) { |
@@ -28,2 +28,5 @@ "use strict"; | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -39,2 +42,3 @@ exports.WsTransaction = exports.WsClient = exports._createClient = exports.createClient = void 0; | ||
const migrations_js_1 = require("./migrations.js"); | ||
const promise_limit_1 = __importDefault(require("promise-limit")); | ||
__exportStar(require("@libsql/core/api"), exports); | ||
@@ -75,3 +79,3 @@ function createClient(config) { | ||
} | ||
return new WsClient(client, url, config.authToken, config.intMode); | ||
return new WsClient(client, url, config.authToken, config.intMode, config.concurrency); | ||
} | ||
@@ -93,4 +97,5 @@ exports._createClient = _createClient; | ||
#isSchemaDatabase; | ||
#promiseLimitFunction; | ||
/** @private */ | ||
constructor(client, url, authToken, intMode) { | ||
constructor(client, url, authToken, intMode, concurrency) { | ||
this.#url = url; | ||
@@ -103,6 +108,7 @@ this.#authToken = authToken; | ||
this.protocol = "ws"; | ||
this.#promiseLimitFunction = (0, promise_limit_1.default)(concurrency); | ||
} | ||
async getIsSchemaDatabase() { | ||
getIsSchemaDatabase() { | ||
if (this.#isSchemaDatabase === undefined) { | ||
this.#isSchemaDatabase = await (0, migrations_js_1.getIsSchemaDatabase)({ | ||
this.#isSchemaDatabase = (0, migrations_js_1.getIsSchemaDatabase)({ | ||
authToken: this.#authToken, | ||
@@ -114,85 +120,96 @@ baseUrl: this.#url.origin, | ||
} | ||
async limit(fn) { | ||
return this.#promiseLimitFunction(fn); | ||
} | ||
async execute(stmt) { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = (0, hrana_js_1.stmtToHrana)(stmt); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply([hranaStmt]); | ||
const hranaRowsPromise = streamState.stream.query(hranaStmt); | ||
streamState.stream.closeGracefully(); | ||
const hranaRowsResult = await hranaRowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = (0, hrana_js_1.stmtToHrana)(stmt); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply([hranaStmt]); | ||
const hranaRowsPromise = streamState.stream.query(hranaStmt); | ||
streamState.stream.closeGracefully(); | ||
const hranaRowsResult = await hranaRowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return (0, hrana_js_1.resultSetFromHrana)(hranaRowsResult); | ||
} | ||
return (0, hrana_js_1.resultSetFromHrana)(hranaRowsResult); | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
}); | ||
} | ||
async batch(stmts, mode = "deferred") { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(hrana_js_1.stmtToHrana); | ||
const version = await streamState.conn.client.getVersion(); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply(hranaStmts); | ||
const batch = streamState.stream.batch(version >= 3); | ||
const resultsPromise = (0, hrana_js_1.executeHranaBatch)(mode, version, batch, hranaStmts); | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(hrana_js_1.stmtToHrana); | ||
const version = await streamState.conn.client.getVersion(); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply(hranaStmts); | ||
const batch = streamState.stream.batch(version >= 3); | ||
const resultsPromise = (0, hrana_js_1.executeHranaBatch)(mode, version, batch, hranaStmts); | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await (0, migrations_js_1.waitForLastMigrationJobToFinish)({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return results; | ||
} | ||
return results; | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
}); | ||
} | ||
async transaction(mode = "write") { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const version = await streamState.conn.client.getVersion(); | ||
// the BEGIN statement will be batched with the first statement on the transaction to save a | ||
// network roundtrip | ||
return new WsTransaction(this, streamState, mode, version); | ||
} | ||
catch (e) { | ||
this._closeStream(streamState); | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const version = await streamState.conn.client.getVersion(); | ||
// the BEGIN statement will be batched with the first statement on the transaction to save a | ||
// network roundtrip | ||
return new WsTransaction(this, streamState, mode, version); | ||
} | ||
catch (e) { | ||
this._closeStream(streamState); | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
}); | ||
} | ||
async executeMultiple(sql) { | ||
const streamState = await this.#openStream(); | ||
try { | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
const promise = streamState.stream.sequence(sql); | ||
streamState.stream.closeGracefully(); | ||
await promise; | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
const promise = streamState.stream.sequence(sql); | ||
streamState.stream.closeGracefully(); | ||
await promise; | ||
} | ||
catch (e) { | ||
throw (0, hrana_js_1.mapHranaError)(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
}); | ||
} | ||
@@ -199,0 +216,0 @@ sync() { |
@@ -17,4 +17,5 @@ /// <reference types="node" /> | ||
/** @private */ | ||
constructor(url: URL, authToken: string | undefined, intMode: IntMode, customFetch: Function | undefined); | ||
constructor(url: URL, authToken: string | undefined, intMode: IntMode, customFetch: Function | undefined, concurrency: number); | ||
getIsSchemaDatabase(): Promise<boolean>; | ||
private limit; | ||
execute(stmt: InStatement): Promise<ResultSet>; | ||
@@ -21,0 +22,0 @@ batch(stmts: Array<InStatement>, mode?: TransactionMode): Promise<Array<ResultSet>>; |
@@ -9,2 +9,3 @@ import * as hrana from "@libsql/hrana-client"; | ||
import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; | ||
import promiseLimit from "promise-limit"; | ||
export * from "@libsql/core/api"; | ||
@@ -30,3 +31,3 @@ export function createClient(config) { | ||
const url = encodeBaseUrl(config.scheme, config.authority, config.path); | ||
return new HttpClient(url, config.authToken, config.intMode, config.fetch); | ||
return new HttpClient(url, config.authToken, config.intMode, config.fetch, config.concurrency); | ||
} | ||
@@ -40,4 +41,5 @@ const sqlCacheCapacity = 30; | ||
#isSchemaDatabase; | ||
#promiseLimitFunction; | ||
/** @private */ | ||
constructor(url, authToken, intMode, customFetch) { | ||
constructor(url, authToken, intMode, customFetch, concurrency) { | ||
this.#client = hrana.openHttp(url, authToken, customFetch); | ||
@@ -48,6 +50,7 @@ this.#client.intMode = intMode; | ||
this.#authToken = authToken; | ||
this.#promiseLimitFunction = promiseLimit(concurrency); | ||
} | ||
async getIsSchemaDatabase() { | ||
getIsSchemaDatabase() { | ||
if (this.#isSchemaDatabase === undefined) { | ||
this.#isSchemaDatabase = await getIsSchemaDatabase({ | ||
this.#isSchemaDatabase = getIsSchemaDatabase({ | ||
authToken: this.#authToken, | ||
@@ -59,94 +62,105 @@ baseUrl: this.#url.origin, | ||
} | ||
async limit(fn) { | ||
return this.#promiseLimitFunction(fn); | ||
} | ||
async execute(stmt) { | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = stmtToHrana(stmt); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and | ||
// close the stream in a single HTTP request. | ||
let rowsPromise; | ||
const stream = this.#client.openStream(); | ||
return this.limit(async () => { | ||
try { | ||
rowsPromise = stream.query(hranaStmt); | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = stmtToHrana(stmt); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and | ||
// close the stream in a single HTTP request. | ||
let rowsPromise; | ||
const stream = this.#client.openStream(); | ||
try { | ||
rowsPromise = stream.query(hranaStmt); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
} | ||
const rowsResult = await rowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return resultSetFromHrana(rowsResult); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
const rowsResult = await rowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return resultSetFromHrana(rowsResult); | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
}); | ||
} | ||
async batch(stmts, mode = "deferred") { | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(stmtToHrana); | ||
const version = await this.#client.getVersion(); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and | ||
// close the stream in a single HTTP request. | ||
let resultsPromise; | ||
const stream = this.#client.openStream(); | ||
return this.limit(async () => { | ||
try { | ||
// It makes sense to use a SQL cache even for a single batch, because it may contain the same | ||
// statement repeated multiple times. | ||
const sqlCache = new SqlCache(stream, sqlCacheCapacity); | ||
sqlCache.apply(hranaStmts); | ||
// TODO: we do not use a cursor here, because it would cause three roundtrips: | ||
// 1. pipeline request to store SQL texts | ||
// 2. cursor request | ||
// 3. pipeline request to close the stream | ||
const batch = stream.batch(false); | ||
resultsPromise = executeHranaBatch(mode, version, batch, hranaStmts); | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(stmtToHrana); | ||
const version = await this.#client.getVersion(); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and | ||
// close the stream in a single HTTP request. | ||
let resultsPromise; | ||
const stream = this.#client.openStream(); | ||
try { | ||
// It makes sense to use a SQL cache even for a single batch, because it may contain the same | ||
// statement repeated multiple times. | ||
const sqlCache = new SqlCache(stream, sqlCacheCapacity); | ||
sqlCache.apply(hranaStmts); | ||
// TODO: we do not use a cursor here, because it would cause three roundtrips: | ||
// 1. pipeline request to store SQL texts | ||
// 2. cursor request | ||
// 3. pipeline request to close the stream | ||
const batch = stream.batch(false); | ||
resultsPromise = executeHranaBatch(mode, version, batch, hranaStmts); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
} | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return results; | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return results; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
}); | ||
} | ||
async transaction(mode = "write") { | ||
try { | ||
const version = await this.#client.getVersion(); | ||
return new HttpTransaction(this.#client.openStream(), mode, version); | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
return this.limit(async () => { | ||
try { | ||
const version = await this.#client.getVersion(); | ||
return new HttpTransaction(this.#client.openStream(), mode, version); | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
}); | ||
} | ||
async executeMultiple(sql) { | ||
try { | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and | ||
// close the stream in a single HTTP request. | ||
let promise; | ||
const stream = this.#client.openStream(); | ||
return this.limit(async () => { | ||
try { | ||
promise = stream.sequence(sql); | ||
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and | ||
// close the stream in a single HTTP request. | ||
let promise; | ||
const stream = this.#client.openStream(); | ||
try { | ||
promise = stream.sequence(sql); | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
} | ||
await promise; | ||
} | ||
finally { | ||
stream.closeGracefully(); | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
await promise; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
}); | ||
} | ||
@@ -153,0 +167,0 @@ sync() { |
@@ -27,2 +27,7 @@ export class SqlCache { | ||
const sqlText = hranaStmt.sql; | ||
// Stored SQL cannot exceed 5kb. | ||
// https://github.com/tursodatabase/libsql/blob/e9d637e051685f92b0da43849507b5ef4232fbeb/libsql-server/src/hrana/http/request.rs#L10 | ||
if (sqlText.length >= 5000) { | ||
continue; | ||
} | ||
let sqlObj = this.#sqls.get(sqlText); | ||
@@ -29,0 +34,0 @@ if (sqlObj === undefined) { |
@@ -28,4 +28,5 @@ /// <reference types="node" /> | ||
/** @private */ | ||
constructor(client: hrana.WsClient, url: URL, authToken: string | undefined, intMode: IntMode); | ||
constructor(client: hrana.WsClient, url: URL, authToken: string | undefined, intMode: IntMode, concurrency: number | undefined); | ||
getIsSchemaDatabase(): Promise<boolean>; | ||
private limit; | ||
execute(stmt: InStatement): Promise<ResultSet>; | ||
@@ -32,0 +33,0 @@ batch(stmts: Array<InStatement>, mode?: TransactionMode): Promise<Array<ResultSet>>; |
@@ -9,2 +9,3 @@ import * as hrana from "@libsql/hrana-client"; | ||
import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; | ||
import promiseLimit from "promise-limit"; | ||
export * from "@libsql/core/api"; | ||
@@ -44,3 +45,3 @@ export function createClient(config) { | ||
} | ||
return new WsClient(client, url, config.authToken, config.intMode); | ||
return new WsClient(client, url, config.authToken, config.intMode, config.concurrency); | ||
} | ||
@@ -61,4 +62,5 @@ const maxConnAgeMillis = 60 * 1000; | ||
#isSchemaDatabase; | ||
#promiseLimitFunction; | ||
/** @private */ | ||
constructor(client, url, authToken, intMode) { | ||
constructor(client, url, authToken, intMode, concurrency) { | ||
this.#url = url; | ||
@@ -71,6 +73,7 @@ this.#authToken = authToken; | ||
this.protocol = "ws"; | ||
this.#promiseLimitFunction = promiseLimit(concurrency); | ||
} | ||
async getIsSchemaDatabase() { | ||
getIsSchemaDatabase() { | ||
if (this.#isSchemaDatabase === undefined) { | ||
this.#isSchemaDatabase = await getIsSchemaDatabase({ | ||
this.#isSchemaDatabase = getIsSchemaDatabase({ | ||
authToken: this.#authToken, | ||
@@ -82,85 +85,96 @@ baseUrl: this.#url.origin, | ||
} | ||
async limit(fn) { | ||
return this.#promiseLimitFunction(fn); | ||
} | ||
async execute(stmt) { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = stmtToHrana(stmt); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply([hranaStmt]); | ||
const hranaRowsPromise = streamState.stream.query(hranaStmt); | ||
streamState.stream.closeGracefully(); | ||
const hranaRowsResult = await hranaRowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmt = stmtToHrana(stmt); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply([hranaStmt]); | ||
const hranaRowsPromise = streamState.stream.query(hranaStmt); | ||
streamState.stream.closeGracefully(); | ||
const hranaRowsResult = await hranaRowsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return resultSetFromHrana(hranaRowsResult); | ||
} | ||
return resultSetFromHrana(hranaRowsResult); | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
}); | ||
} | ||
async batch(stmts, mode = "deferred") { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(stmtToHrana); | ||
const version = await streamState.conn.client.getVersion(); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply(hranaStmts); | ||
const batch = streamState.stream.batch(version >= 3); | ||
const resultsPromise = executeHranaBatch(mode, version, batch, hranaStmts); | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const isSchemaDatabasePromise = this.getIsSchemaDatabase(); | ||
const hranaStmts = stmts.map(stmtToHrana); | ||
const version = await streamState.conn.client.getVersion(); | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
streamState.conn.sqlCache.apply(hranaStmts); | ||
const batch = streamState.stream.batch(version >= 3); | ||
const resultsPromise = executeHranaBatch(mode, version, batch, hranaStmts); | ||
const results = await resultsPromise; | ||
const isSchemaDatabase = await isSchemaDatabasePromise; | ||
if (isSchemaDatabase) { | ||
await waitForLastMigrationJobToFinish({ | ||
authToken: this.#authToken, | ||
baseUrl: this.#url.origin, | ||
}); | ||
} | ||
return results; | ||
} | ||
return results; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
}); | ||
} | ||
async transaction(mode = "write") { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const version = await streamState.conn.client.getVersion(); | ||
// the BEGIN statement will be batched with the first statement on the transaction to save a | ||
// network roundtrip | ||
return new WsTransaction(this, streamState, mode, version); | ||
} | ||
catch (e) { | ||
this._closeStream(streamState); | ||
throw mapHranaError(e); | ||
} | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
const version = await streamState.conn.client.getVersion(); | ||
// the BEGIN statement will be batched with the first statement on the transaction to save a | ||
// network roundtrip | ||
return new WsTransaction(this, streamState, mode, version); | ||
} | ||
catch (e) { | ||
this._closeStream(streamState); | ||
throw mapHranaError(e); | ||
} | ||
}); | ||
} | ||
async executeMultiple(sql) { | ||
const streamState = await this.#openStream(); | ||
try { | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
const promise = streamState.stream.sequence(sql); | ||
streamState.stream.closeGracefully(); | ||
await promise; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
return this.limit(async () => { | ||
const streamState = await this.#openStream(); | ||
try { | ||
// Schedule all operations synchronously, so they will be pipelined and executed in a single | ||
// network roundtrip. | ||
const promise = streamState.stream.sequence(sql); | ||
streamState.stream.closeGracefully(); | ||
await promise; | ||
} | ||
catch (e) { | ||
throw mapHranaError(e); | ||
} | ||
finally { | ||
this._closeStream(streamState); | ||
} | ||
}); | ||
} | ||
@@ -167,0 +181,0 @@ sync() { |
{ | ||
"name": "@libsql/client", | ||
"version": "0.6.2", | ||
"version": "0.7.0", | ||
"keywords": [ | ||
@@ -105,6 +105,7 @@ "libsql", | ||
"dependencies": { | ||
"@libsql/core": "^0.6.2", | ||
"@libsql/core": "^0.7.0", | ||
"@libsql/hrana-client": "^0.6.0", | ||
"js-base64": "^3.7.5", | ||
"libsql": "^0.3.10" | ||
"libsql": "^0.3.10", | ||
"promise-limit": "^2.7.0" | ||
}, | ||
@@ -111,0 +112,0 @@ "devDependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
128008
3079
5
+ Addedpromise-limit@^2.7.0
+ Added@libsql/core@0.7.0(transitive)
+ Addedpromise-limit@2.7.0(transitive)
- Removed@libsql/core@0.6.2(transitive)
Updated@libsql/core@^0.7.0