ava-postgres
Advanced tools
Comparing version 7.1.0 to 7.2.0
@@ -36,2 +36,27 @@ "use strict"; | ||
var import_pg = require("pg"); | ||
var import_birpc = require("birpc"); | ||
var import_isPlainObject = __toESM(require("lodash/isPlainObject")); | ||
var isSerializable = (obj) => { | ||
var isNestedSerializable; | ||
function isPlain(val) { | ||
return typeof val === "undefined" || typeof val === "string" || typeof val === "boolean" || typeof val === "number" || Array.isArray(val) || (0, import_isPlainObject.default)(val); | ||
} | ||
if (!isPlain(obj)) { | ||
return false; | ||
} | ||
for (var property in obj) { | ||
if (obj.hasOwnProperty(property)) { | ||
if (!isPlain(obj[property])) { | ||
return false; | ||
} | ||
if (typeof obj[property] == "object") { | ||
isNestedSerializable = isSerializable(obj[property]); | ||
if (!isNestedSerializable) { | ||
return false; | ||
} | ||
} | ||
} | ||
} | ||
return true; | ||
}; | ||
var getWorker = async (initialData, options) => { | ||
@@ -59,2 +84,16 @@ const key = (0, import_object_hash.default)({ | ||
}; | ||
var teardownConnection = async ({ | ||
pool, | ||
pgbouncerPool | ||
}) => { | ||
try { | ||
await pool.end(); | ||
await (pgbouncerPool == null ? void 0 : pgbouncerPool.end()); | ||
} catch (error) { | ||
if (error.message.includes("Called end on pool more than once")) { | ||
return; | ||
} | ||
throw error; | ||
} | ||
}; | ||
var getTestPostgresDatabaseFactory = (options) => { | ||
@@ -67,46 +106,24 @@ const initialData = { | ||
const workerPromise = getWorker(initialData, options); | ||
const getTestPostgresDatabase = async (t, params, getTestDatabaseOptions) => { | ||
const mapWorkerConnectionDetailsToConnectionDetails = (connectionDetailsFromWorker) => { | ||
const pool = new import_pg.Pool({ | ||
connectionString: connectionDetailsFromWorker.connectionString | ||
const mapWorkerConnectionDetailsToConnectionDetails = (connectionDetailsFromWorker) => { | ||
const pool = new import_pg.Pool({ | ||
connectionString: connectionDetailsFromWorker.connectionString | ||
}); | ||
let pgbouncerPool; | ||
if (connectionDetailsFromWorker.pgbouncerConnectionString) { | ||
pgbouncerPool = new import_pg.Pool({ | ||
connectionString: connectionDetailsFromWorker.pgbouncerConnectionString | ||
}); | ||
let pgbouncerPool; | ||
if (connectionDetailsFromWorker.pgbouncerConnectionString) { | ||
pgbouncerPool = new import_pg.Pool({ | ||
connectionString: connectionDetailsFromWorker.pgbouncerConnectionString | ||
}); | ||
} | ||
t.teardown(async () => { | ||
try { | ||
await pool.end(); | ||
await (pgbouncerPool == null ? void 0 : pgbouncerPool.end()); | ||
} catch (error) { | ||
if (error.message.includes( | ||
"Called end on pool more than once" | ||
)) { | ||
return; | ||
} | ||
throw error; | ||
} | ||
}); | ||
return { | ||
...connectionDetailsFromWorker, | ||
pool, | ||
pgbouncerPool | ||
}; | ||
} | ||
return { | ||
...connectionDetailsFromWorker, | ||
pool, | ||
pgbouncerPool | ||
}; | ||
const worker = await workerPromise; | ||
await worker.available; | ||
const waitForAndHandleReply = async (message) => { | ||
let reply = await message.replies().next(); | ||
const replyData = reply.value.data; | ||
if (replyData.type === "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED") { | ||
let result = { | ||
status: "success", | ||
result: void 0 | ||
}; | ||
}; | ||
let rpcCallback; | ||
const rpc = (0, import_birpc.createBirpc)( | ||
{ | ||
runBeforeTemplateIsBakedHook: async (connection, params) => { | ||
if (options == null ? void 0 : options.beforeTemplateIsBaked) { | ||
const connectionDetails = mapWorkerConnectionDetailsToConnectionDetails( | ||
replyData.connectionDetails | ||
); | ||
const connectionDetails = mapWorkerConnectionDetailsToConnectionDetails(connection); | ||
connectionDetails.pool.on("error", (error) => { | ||
@@ -120,44 +137,9 @@ if (error.message.includes( | ||
}); | ||
try { | ||
const hookResult = await options.beforeTemplateIsBaked({ | ||
params, | ||
connection: connectionDetails, | ||
containerExec: async (command) => { | ||
const request = reply.value.reply({ | ||
type: "EXEC_COMMAND_IN_CONTAINER", | ||
command | ||
}); | ||
reply = await request.replies().next(); | ||
if (reply.value.data.type !== "EXEC_COMMAND_IN_CONTAINER_RESULT") { | ||
throw new Error( | ||
"Expected EXEC_COMMAND_IN_CONTAINER_RESULT message" | ||
); | ||
} | ||
return reply.value.data.result; | ||
} | ||
}); | ||
result = { | ||
status: "success", | ||
result: hookResult | ||
}; | ||
} catch (error) { | ||
result = { | ||
status: "error", | ||
error: error instanceof Error ? error.stack ?? error.message : new Error( | ||
"Unknown error type thrown in beforeTemplateIsBaked hook" | ||
) | ||
}; | ||
} finally { | ||
await connectionDetails.pool.end(); | ||
} | ||
} | ||
try { | ||
return waitForAndHandleReply( | ||
reply.value.reply({ | ||
type: "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED", | ||
result | ||
}) | ||
); | ||
} catch (error) { | ||
if (error instanceof Error && error.name === "DataCloneError") { | ||
const hookResult = await options.beforeTemplateIsBaked({ | ||
params, | ||
connection: connectionDetails, | ||
containerExec: async (command) => rpc.execCommandInContainer(command) | ||
}); | ||
await teardownConnection(connectionDetails); | ||
if (hookResult && !isSerializable(hookResult)) { | ||
throw new TypeError( | ||
@@ -167,27 +149,39 @@ "Return value of beforeTemplateIsBaked() hook could not be serialized. Make sure it returns only JSON-serializable values." | ||
} | ||
throw error; | ||
return hookResult; | ||
} | ||
} else if (replyData.type === "GOT_DATABASE") { | ||
if (replyData.beforeTemplateIsBakedResult.status === "error") { | ||
if (typeof replyData.beforeTemplateIsBakedResult.error === "string") { | ||
throw new Error(replyData.beforeTemplateIsBakedResult.error); | ||
} | ||
throw replyData.beforeTemplateIsBakedResult.error; | ||
} | ||
return { | ||
...mapWorkerConnectionDetailsToConnectionDetails( | ||
replyData.connectionDetails | ||
), | ||
beforeTemplateIsBakedResult: replyData.beforeTemplateIsBakedResult.result | ||
}; | ||
} | ||
throw new Error(`Unexpected message type: ${replyData.type}`); | ||
}, | ||
{ | ||
post: async (data) => { | ||
const worker = await workerPromise; | ||
await worker.available; | ||
worker.publish(data); | ||
}, | ||
on: (data) => { | ||
rpcCallback = data; | ||
} | ||
} | ||
); | ||
const _messageHandlerPromise = (async () => { | ||
const worker = await workerPromise; | ||
await worker.available; | ||
for await (const msg of worker.subscribe()) { | ||
rpcCallback(msg.data); | ||
} | ||
})(); | ||
const getTestPostgresDatabase = async (t, params, getTestDatabaseOptions) => { | ||
const testDatabaseConnection = await rpc.getTestDatabase({ | ||
databaseDedupeKey: getTestDatabaseOptions == null ? void 0 : getTestDatabaseOptions.databaseDedupeKey, | ||
params | ||
}); | ||
const connectionDetails = mapWorkerConnectionDetailsToConnectionDetails( | ||
testDatabaseConnection.connectionDetails | ||
); | ||
t.teardown(async () => { | ||
await teardownConnection(connectionDetails); | ||
}); | ||
return { | ||
...connectionDetails, | ||
beforeTemplateIsBakedResult: testDatabaseConnection.beforeTemplateIsBakedResult | ||
}; | ||
return waitForAndHandleReply( | ||
worker.publish({ | ||
type: "GET_TEST_DATABASE", | ||
params, | ||
key: getTestDatabaseOptions == null ? void 0 : getTestDatabaseOptions.databaseDedupeKey | ||
}) | ||
); | ||
}; | ||
@@ -194,0 +188,0 @@ return getTestPostgresDatabase; |
@@ -46,7 +46,4 @@ "use strict"; | ||
// src/worker.ts | ||
var TestWorkerShutdownError = class extends Error { | ||
constructor() { | ||
super("Test worker unexpectedly shut down"); | ||
} | ||
}; | ||
var import_birpc = require("birpc"); | ||
var import_node_events = require("events"); | ||
var Worker = class { | ||
@@ -59,3 +56,2 @@ constructor(initialData) { | ||
this.getOrCreateKeyToCreationMutex = new import_async_mutex.Mutex(); | ||
this.createdDatabasesByTestWorkerId = /* @__PURE__ */ new Map(); | ||
this.getOrCreateTemplateNameMutex = new import_async_mutex.Mutex(); | ||
@@ -65,120 +61,100 @@ this.startContainerPromise = this.startContainer(); | ||
async handleTestWorker(testWorker) { | ||
let workerRpcCallback; | ||
const rpcChannel = { | ||
post: (data) => testWorker.publish(data), | ||
on: (data) => { | ||
workerRpcCallback = data; | ||
} | ||
}; | ||
const messageHandlerAbortController = new AbortController(); | ||
const messageHandlerPromise = Promise.race([ | ||
(0, import_node_events.once)(messageHandlerAbortController.signal, "abort"), | ||
(async () => { | ||
for await (const msg of testWorker.subscribe()) { | ||
workerRpcCallback(msg.data); | ||
if (messageHandlerAbortController.signal.aborted) { | ||
break; | ||
} | ||
} | ||
})() | ||
]); | ||
testWorker.teardown(async () => { | ||
await this.handleTestWorkerTeardown(testWorker); | ||
messageHandlerAbortController.abort(); | ||
await messageHandlerPromise; | ||
}); | ||
for await (const message of testWorker.subscribe()) { | ||
await this.handleMessage(message); | ||
} | ||
} | ||
async handleMessage(message) { | ||
if (message.data.type === "GET_TEST_DATABASE") { | ||
const paramsHash = (0, import_object_hash.default)(message.data.params ?? null); | ||
let neededToCreateTemplate = false; | ||
await this.getOrCreateTemplateNameMutex.runExclusive(() => { | ||
if (!this.paramsHashToTemplateCreationPromise.has(paramsHash)) { | ||
neededToCreateTemplate = true; | ||
this.paramsHashToTemplateCreationPromise.set( | ||
paramsHash, | ||
this.createTemplate(message) | ||
); | ||
} | ||
}); | ||
let templateCreationResult; | ||
try { | ||
templateCreationResult = await this.paramsHashToTemplateCreationPromise.get(paramsHash); | ||
} catch (error) { | ||
if (error instanceof TestWorkerShutdownError) { | ||
return; | ||
} | ||
throw error; | ||
} | ||
const { | ||
templateName, | ||
beforeTemplateIsBakedResult, | ||
lastMessage: lastMessageFromTemplateCreation | ||
} = templateCreationResult; | ||
const { postgresClient } = await this.startContainerPromise; | ||
const fullDatabaseKey = `${paramsHash}-${message.data.key}`; | ||
let databaseName = message.data.key ? this.keyToDatabaseName.get(fullDatabaseKey) : void 0; | ||
if (!databaseName) { | ||
const createDatabase = async () => { | ||
databaseName = get_random_database_name_default(); | ||
await postgresClient.query( | ||
`CREATE DATABASE ${databaseName} WITH TEMPLATE ${templateName};` | ||
); | ||
this.createdDatabasesByTestWorkerId.set( | ||
message.testWorker.id, | ||
(this.createdDatabasesByTestWorkerId.get(message.testWorker.id) ?? []).concat(databaseName) | ||
); | ||
}; | ||
if (message.data.key) { | ||
await this.getOrCreateKeyToCreationMutex.runExclusive(() => { | ||
if (!this.keyToCreationMutex.has(fullDatabaseKey)) { | ||
this.keyToCreationMutex.set(fullDatabaseKey, new import_async_mutex.Mutex()); | ||
} | ||
const rpc = (0, import_birpc.createBirpc)( | ||
{ | ||
getTestDatabase: async (options) => { | ||
return this.getTestDatabase(options, rpc, (teardown) => { | ||
testWorker.teardown(teardown); | ||
}); | ||
const mutex = this.keyToCreationMutex.get(fullDatabaseKey); | ||
await mutex.runExclusive(async () => { | ||
if (!this.keyToDatabaseName.has(fullDatabaseKey)) { | ||
await createDatabase(); | ||
this.keyToDatabaseName.set(fullDatabaseKey, databaseName); | ||
} | ||
databaseName = this.keyToDatabaseName.get(fullDatabaseKey); | ||
}); | ||
} else { | ||
await createDatabase(); | ||
}, | ||
execCommandInContainer: async (command) => { | ||
const container = (await this.startContainerPromise).container; | ||
return container.exec(command); | ||
} | ||
}, | ||
rpcChannel | ||
); | ||
} | ||
async getTestDatabase(options, rpc, registerTeardown) { | ||
const paramsHash = (0, import_object_hash.default)(options.params ?? null); | ||
await this.getOrCreateTemplateNameMutex.runExclusive(() => { | ||
if (!this.paramsHashToTemplateCreationPromise.has(paramsHash)) { | ||
this.paramsHashToTemplateCreationPromise.set( | ||
paramsHash, | ||
this.createTemplate(rpc, options.params) | ||
); | ||
} | ||
const gotDatabaseMessage = { | ||
type: "GOT_DATABASE", | ||
connectionDetails: await this.getConnectionDetails(databaseName), | ||
beforeTemplateIsBakedResult | ||
}); | ||
const templateCreationResult = await this.paramsHashToTemplateCreationPromise.get(paramsHash); | ||
const { templateName, beforeTemplateIsBakedResult } = templateCreationResult; | ||
const { postgresClient } = await this.startContainerPromise; | ||
const fullDatabaseKey = `${paramsHash}-${options.databaseDedupeKey}`; | ||
let databaseName = options.databaseDedupeKey ? this.keyToDatabaseName.get(fullDatabaseKey) : void 0; | ||
if (!databaseName) { | ||
const createDatabase = async () => { | ||
databaseName = get_random_database_name_default(); | ||
await postgresClient.query( | ||
`CREATE DATABASE ${databaseName} WITH TEMPLATE ${templateName};` | ||
); | ||
}; | ||
if (neededToCreateTemplate) { | ||
lastMessageFromTemplateCreation.value.reply(gotDatabaseMessage); | ||
if (options.databaseDedupeKey) { | ||
await this.getOrCreateKeyToCreationMutex.runExclusive(() => { | ||
if (!this.keyToCreationMutex.has(fullDatabaseKey)) { | ||
this.keyToCreationMutex.set(fullDatabaseKey, new import_async_mutex.Mutex()); | ||
} | ||
}); | ||
const mutex = this.keyToCreationMutex.get(fullDatabaseKey); | ||
await mutex.runExclusive(async () => { | ||
if (!this.keyToDatabaseName.has(fullDatabaseKey)) { | ||
await createDatabase(); | ||
this.keyToDatabaseName.set(fullDatabaseKey, databaseName); | ||
} | ||
databaseName = this.keyToDatabaseName.get(fullDatabaseKey); | ||
}); | ||
} else { | ||
message.reply(gotDatabaseMessage); | ||
await createDatabase(); | ||
} | ||
return; | ||
} | ||
throw new Error(`Unknown message: ${JSON.stringify(message.data)}`); | ||
registerTeardown(async () => { | ||
if (options.databaseDedupeKey && this.keyToDatabaseName.has(fullDatabaseKey)) { | ||
return; | ||
} | ||
await this.forceDisconnectClientsFrom(databaseName); | ||
await postgresClient.query(`DROP DATABASE ${databaseName}`); | ||
}); | ||
return { | ||
connectionDetails: await this.getConnectionDetails(databaseName), | ||
beforeTemplateIsBakedResult | ||
}; | ||
} | ||
async handleTestWorkerTeardown(testWorker) { | ||
const databases = this.createdDatabasesByTestWorkerId.get(testWorker.id); | ||
if (databases) { | ||
const { postgresClient } = await this.startContainerPromise; | ||
const databasesAssociatedWithKeys = new Set( | ||
this.keyToDatabaseName.values() | ||
); | ||
await Promise.all( | ||
databases.filter((d) => !databasesAssociatedWithKeys.has(d)).map(async (database) => { | ||
await this.forceDisconnectClientsFrom(database); | ||
await postgresClient.query(`DROP DATABASE ${database}`); | ||
}) | ||
); | ||
} | ||
} | ||
async createTemplate(message) { | ||
async createTemplate(rpc, params) { | ||
const databaseName = get_random_database_name_default(); | ||
const { postgresClient, container, pgbouncerContainer } = await this.startContainerPromise; | ||
const { postgresClient } = await this.startContainerPromise; | ||
await postgresClient.query(`CREATE DATABASE ${databaseName};`); | ||
const msg = message.reply({ | ||
type: "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED", | ||
connectionDetails: await this.getConnectionDetails(databaseName) | ||
}); | ||
let reply = await msg.replies().next(); | ||
if (reply.done) { | ||
throw new TestWorkerShutdownError(); | ||
} | ||
while (reply.value.data.type !== "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED") { | ||
const replyValue = reply.value.data; | ||
if (replyValue.type === "EXEC_COMMAND_IN_CONTAINER") { | ||
const result = await container.exec(replyValue.command); | ||
const message2 = reply.value.reply({ | ||
type: "EXEC_COMMAND_IN_CONTAINER_RESULT", | ||
result | ||
}); | ||
reply = await message2.replies().next(); | ||
} | ||
} | ||
const beforeTemplateIsBakedResult = await rpc.runBeforeTemplateIsBakedHook( | ||
await this.getConnectionDetails(databaseName), | ||
params | ||
); | ||
await this.forceDisconnectClientsFrom(databaseName); | ||
@@ -190,4 +166,3 @@ await postgresClient.query( | ||
templateName: databaseName, | ||
beforeTemplateIsBakedResult: reply.value.data.result, | ||
lastMessage: reply | ||
beforeTemplateIsBakedResult | ||
}; | ||
@@ -194,0 +169,0 @@ } |
@@ -19,3 +19,3 @@ { | ||
], | ||
"version": "7.1.0", | ||
"version": "7.2.0", | ||
"main": "dist/index.js", | ||
@@ -32,2 +32,3 @@ "module": "./dist/index.mjs", | ||
"@semantic-release/release-notes-generator": "10.0.3", | ||
"@types/lodash": "4.17.0", | ||
"@types/node": "18.7.14", | ||
@@ -50,2 +51,4 @@ "@types/object-hash": "2.2.1", | ||
"async-mutex": "0.4.0", | ||
"birpc": "0.2.17", | ||
"lodash": "4.17.21", | ||
"nanoid": "4.0.0", | ||
@@ -65,3 +68,6 @@ "object-hash": "3.0.0", | ||
"format:check": "prettier --check ." | ||
}, | ||
"engines": { | ||
"node": ">=18" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
92363
8
19
915
+ Addedbirpc@0.2.17
+ Addedlodash@4.17.21
+ Addedbirpc@0.2.17(transitive)