@breadboard-ai/data-store
Advanced tools
Comparing version 0.2.0 to 0.2.1
@@ -7,5 +7,6 @@ /** | ||
import { HarnessRunResult } from "@google-labs/breadboard/harness"; | ||
import { RunStore } from "@google-labs/breadboard"; | ||
import { RunStore, RunTimestamp, RunURL } from "@google-labs/breadboard"; | ||
export declare class IDBRunStore implements RunStore { | ||
#private; | ||
constructor(); | ||
/** | ||
@@ -15,12 +16,13 @@ * Starts tracking a run. | ||
* @param storeId The ID of the store to create. | ||
* @param limit The maximum number of old runs to keep around. | ||
* @param releaseGroupIds The IDs of any old stores to be released. | ||
* @returns The store ID used. | ||
*/ | ||
start(storeId: string, limit?: number): Promise<string>; | ||
write(result: HarnessRunResult): Promise<void>; | ||
stop(): Promise<void>; | ||
abort(): Promise<void>; | ||
drop(): Promise<void>; | ||
getNewestRuns(limit?: number): Promise<HarnessRunResult[][]>; | ||
start(url: RunURL): Promise<RunTimestamp>; | ||
write(url: RunURL, timestamp: RunTimestamp, result: HarnessRunResult): Promise<void>; | ||
stop(url: RunURL, timestamp: RunTimestamp): Promise<void>; | ||
abort(url: RunURL, timestamp: RunTimestamp): Promise<void>; | ||
drop(url?: RunURL): Promise<void>; | ||
truncate(url: RunURL, limit: number): Promise<void>; | ||
getStoredRuns(url: RunURL): Promise<Map<RunTimestamp, HarnessRunResult[]>>; | ||
} | ||
//# sourceMappingURL=idb-run-store.d.ts.map |
@@ -7,11 +7,19 @@ /** | ||
import * as idb from "idb"; | ||
import { isLLMContent, isStoredData, toInlineDataPart, } from "@google-labs/breadboard"; | ||
const RUN_DB = "runs"; | ||
import { isLLMContent, isLLMContentArray, isMetadataEntry, isStoredData, toInlineDataPart, } from "@google-labs/breadboard"; | ||
const RUN_LISTING_DB = "run-listing"; | ||
const RUN_LISTING_VERSION = 1; | ||
export class IDBRunStore { | ||
#writer = null; | ||
#version = idb.openDB(RUN_DB).then((db) => { | ||
const { version } = db; | ||
db.close(); | ||
return version; | ||
}); | ||
#writers = new Map(); | ||
#urlToDbName(url) { | ||
return `run-${url}`; | ||
} | ||
constructor() { | ||
// Remove the deprecated 'runs' database if it exists. | ||
try { | ||
idb.deleteDB("runs"); | ||
} | ||
catch (err) { | ||
// Best effort - don't throw if there are any issues. | ||
} | ||
} | ||
/** | ||
@@ -21,43 +29,38 @@ * Starts tracking a run. | ||
* @param storeId The ID of the store to create. | ||
* @param limit The maximum number of old runs to keep around. | ||
* @param releaseGroupIds The IDs of any old stores to be released. | ||
* @returns The store ID used. | ||
*/ | ||
async start(storeId, limit = 2) { | ||
if (this.#writer) { | ||
throw new Error("Already writing a stream - please stop it first"); | ||
} | ||
// Get the current version, bump it and set it for future starts. | ||
const newVersion = (await this.#version) + 1; | ||
this.#version = Promise.resolve(newVersion); | ||
// Now figure out the new version. | ||
const dbNewVersion = await idb.openDB(RUN_DB, newVersion, { | ||
blocked(currentVersion, blockedVersion, event) { | ||
console.warn(`IDB Store blocked version ${blockedVersion} by version ${currentVersion}`, event); | ||
}, | ||
async start(url) { | ||
// 1. Store the URLs that we've seen (necessary to support the truncation | ||
// and drop calls). | ||
const runListing = await idb.openDB(RUN_LISTING_DB, RUN_LISTING_VERSION, { | ||
upgrade(db) { | ||
db.createObjectStore(storeId, { | ||
keyPath: "id", | ||
autoIncrement: true, | ||
}); | ||
[...db.objectStoreNames] | ||
.sort((a, b) => { | ||
if (a > b) | ||
return -1; | ||
if (a < b) | ||
return 1; | ||
return 0; | ||
}) | ||
.slice(limit) | ||
.map((storeName) => { | ||
// Delete the entries. | ||
db.deleteObjectStore(storeName); | ||
}); | ||
db.createObjectStore("urls", { keyPath: "url" }); | ||
}, | ||
}); | ||
dbNewVersion.close(); | ||
// Now set up a stream to write to the new version. | ||
await runListing.put("urls", { url }); | ||
runListing.close(); | ||
// 2. Create a database and object store for this particular run. | ||
const dbName = this.#urlToDbName(url); | ||
const timestamp = Date.now(); | ||
const timestampKey = timestamp.toString(); | ||
const dbVersion = await idb.openDB(dbName); | ||
const nextVersion = dbVersion.version + 1; | ||
dbVersion.close(); | ||
// 3. Set up a stream to write to the new database. | ||
let db; | ||
const stream = new WritableStream({ | ||
async start() { | ||
db = await idb.openDB(RUN_DB); | ||
db = await idb.openDB(dbName, nextVersion, { | ||
blocked(currentVersion, blockedVersion, event) { | ||
console.warn(`IDB Store blocked version ${blockedVersion} by version ${currentVersion}`, event); | ||
}, | ||
upgrade(db) { | ||
db.createObjectStore(timestampKey, { | ||
keyPath: "id", | ||
autoIncrement: true, | ||
}); | ||
}, | ||
}); | ||
db.close(); | ||
}, | ||
@@ -71,19 +74,28 @@ async write(chunk) { | ||
for (const output of Object.values(result.data.outputs)) { | ||
if (!isLLMContent(output)) { | ||
if (!isLLMContent(output) && !isLLMContentArray(output)) { | ||
continue; | ||
} | ||
for (let i = 0; i < output.parts.length; i++) { | ||
const part = output.parts[i]; | ||
if (!isStoredData(part)) { | ||
const outputs = isLLMContent(output) | ||
? [output] | ||
: output; | ||
for (const output of outputs) { | ||
if (isMetadataEntry(output)) { | ||
continue; | ||
} | ||
output.parts[i] = await toInlineDataPart(part); | ||
for (let i = 0; i < output.parts.length; i++) { | ||
const part = output.parts[i]; | ||
if (!isStoredData(part)) { | ||
continue; | ||
} | ||
output.parts[i] = await toInlineDataPart(part); | ||
} | ||
} | ||
} | ||
} | ||
const tx = db.transaction(storeId, "readwrite"); | ||
db = await idb.openDB(dbName); | ||
const tx = db.transaction(timestampKey, "readwrite"); | ||
await Promise.all([tx.store.add(result), tx.done]); | ||
} | ||
catch (err) { | ||
console.warn("Unable to write to storage", chunk); | ||
console.warn(`Unable to write to storage (URL: ${url}, Timestamp: ${timestampKey})`, chunk); | ||
console.warn(err); | ||
@@ -94,2 +106,5 @@ if (this.abort) { | ||
} | ||
finally { | ||
db.close(); | ||
} | ||
}, | ||
@@ -103,48 +118,99 @@ abort() { | ||
}); | ||
this.#writer = stream.getWriter(); | ||
return storeId; | ||
// 4. Store the writer and return the timestamp. | ||
let store = this.#writers.get(url); | ||
if (!store) { | ||
store = new Map(); | ||
this.#writers.set(url, store); | ||
} | ||
if (store.has(timestamp)) { | ||
throw new Error("Already writing a stream - please stop it first"); | ||
} | ||
store.set(timestamp, stream.getWriter()); | ||
return timestamp; | ||
} | ||
async write(result) { | ||
if (!this.#writer) { | ||
async write(url, timestamp, result) { | ||
const store = this.#writers.get(url); | ||
if (!store) { | ||
throw new Error("No active stream - please start one before writing"); | ||
} | ||
await this.#writer.ready; | ||
this.#writer.write(result); | ||
const writer = store.get(timestamp); | ||
if (!writer) { | ||
throw new Error("No active stream - please start one before writing"); | ||
} | ||
await writer.ready; | ||
writer.write(result); | ||
} | ||
async stop() { | ||
if (!this.#writer) { | ||
async stop(url, timestamp) { | ||
const store = this.#writers.get(url); | ||
if (!store) { | ||
throw new Error("No active stream - please start one before writing"); | ||
} | ||
await this.#writer.ready; | ||
this.#writer.close(); | ||
this.#writer = null; | ||
const writer = store.get(timestamp); | ||
if (!writer) { | ||
throw new Error("No active stream - please start one before writing"); | ||
} | ||
await writer.ready; | ||
writer.close(); | ||
store.delete(timestamp); | ||
} | ||
async abort() { | ||
if (!this.#writer) { | ||
async abort(url, timestamp) { | ||
const store = this.#writers.get(url); | ||
if (!store) { | ||
throw new Error("No active stream - please start one before writing"); | ||
} | ||
await this.#writer.ready; | ||
this.#writer.abort(); | ||
this.#writer = null; | ||
const writer = store.get(timestamp); | ||
if (!writer) { | ||
throw new Error("No active stream - please start one before writing"); | ||
} | ||
await writer.ready; | ||
writer.abort(); | ||
store.delete(timestamp); | ||
} | ||
async drop() { | ||
await idb.deleteDB(RUN_DB); | ||
async drop(url) { | ||
if (url) { | ||
await idb.deleteDB(this.#urlToDbName(url)); | ||
return; | ||
} | ||
const runListing = await idb.openDB(RUN_LISTING_DB); | ||
if (runListing.objectStoreNames.contains("urls")) { | ||
const urls = await runListing.getAll("urls"); | ||
for (const item of urls) { | ||
await idb.deleteDB(this.#urlToDbName(item.url)); | ||
} | ||
} | ||
runListing.close(); | ||
await idb.deleteDB(RUN_LISTING_DB); | ||
} | ||
async getNewestRuns(limit = Number.POSITIVE_INFINITY) { | ||
await this.#version; | ||
const db = await idb.openDB(RUN_DB); | ||
const storeNames = [...db.objectStoreNames] | ||
async truncate(url, limit) { | ||
const dbName = this.#urlToDbName(url); | ||
const db = await idb.openDB(dbName); | ||
const nextVersion = db.version + 1; | ||
const storesToRemove = [...db.objectStoreNames] | ||
.sort((a, b) => { | ||
if (a > b) | ||
return -1; | ||
if (a < b) | ||
return 1; | ||
return 0; | ||
return Number.parseInt(b) - Number.parseInt(a); | ||
}) | ||
.slice(0, limit); | ||
const runs = await Promise.all(storeNames.map((storeName) => db.getAll(storeName))); | ||
.slice(limit); | ||
db.close(); | ||
return runs; | ||
// Now re-open the database with a new version and use that operation to | ||
// delete the stores that are no longer needed. | ||
const truncateDb = await idb.openDB(this.#urlToDbName(url), nextVersion, { | ||
upgrade(db) { | ||
for (const store of storesToRemove) { | ||
db.deleteObjectStore(store); | ||
} | ||
}, | ||
}); | ||
truncateDb.close(); | ||
} | ||
async getStoredRuns(url) { | ||
const dbName = this.#urlToDbName(url); | ||
const db = await idb.openDB(dbName); | ||
const runs = await Promise.all([...db.objectStoreNames].map(async (timestamp) => { | ||
const events = (await db.getAll(timestamp)); | ||
return [Number.parseInt(timestamp), events]; | ||
})); | ||
db.close(); | ||
return new Map(runs); | ||
} | ||
} | ||
//# sourceMappingURL=idb-run-store.js.map |
{ | ||
"name": "@breadboard-ai/data-store", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "A data store implementation to support Breadboard", | ||
@@ -145,5 +145,5 @@ "main": "./dist/src/index.js", | ||
"@web/dev-server-esbuild": "^1.0.2", | ||
"@web/test-runner": "^0.18.2", | ||
"@web/test-runner": "^0.18.3", | ||
"ava": "^5.2.0", | ||
"esbuild": "^0.23.0", | ||
"esbuild": "^0.23.1", | ||
"typescript": "^5.5.4" | ||
@@ -155,5 +155,5 @@ }, | ||
"dependencies": { | ||
"@google-labs/breadboard": "^0.24.0", | ||
"@google-labs/breadboard": "^0.25.0", | ||
"idb": "^8.0.0" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
22942
280
0
+ Added@google-labs/breadboard@0.25.0(transitive)
- Removed@google-labs/breadboard@0.24.0(transitive)