Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@breadboard-ai/data-store

Package Overview
Dependencies
Maintainers
0
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@breadboard-ai/data-store - npm Package Compare versions

Comparing version 0.2.0 to 0.2.1

18

dist/src/run/idb-run-store.d.ts

@@ -7,5 +7,6 @@ /**

import { HarnessRunResult } from "@google-labs/breadboard/harness";
import { RunStore } from "@google-labs/breadboard";
import { RunStore, RunTimestamp, RunURL } from "@google-labs/breadboard";
export declare class IDBRunStore implements RunStore {
#private;
constructor();
/**

@@ -15,12 +16,13 @@ * Starts tracking a run.

* @param storeId The ID of the store to create.
* @param limit The maximum number of old runs to keep around.
* @param releaseGroupIds The IDs of any old stores to be released.
* @returns The store ID used.
*/
start(storeId: string, limit?: number): Promise<string>;
write(result: HarnessRunResult): Promise<void>;
stop(): Promise<void>;
abort(): Promise<void>;
drop(): Promise<void>;
getNewestRuns(limit?: number): Promise<HarnessRunResult[][]>;
start(url: RunURL): Promise<RunTimestamp>;
write(url: RunURL, timestamp: RunTimestamp, result: HarnessRunResult): Promise<void>;
stop(url: RunURL, timestamp: RunTimestamp): Promise<void>;
abort(url: RunURL, timestamp: RunTimestamp): Promise<void>;
drop(url?: RunURL): Promise<void>;
truncate(url: RunURL, limit: number): Promise<void>;
getStoredRuns(url: RunURL): Promise<Map<RunTimestamp, HarnessRunResult[]>>;
}
//# sourceMappingURL=idb-run-store.d.ts.map

@@ -7,11 +7,19 @@ /**

import * as idb from "idb";
import { isLLMContent, isStoredData, toInlineDataPart, } from "@google-labs/breadboard";
const RUN_DB = "runs";
import { isLLMContent, isLLMContentArray, isMetadataEntry, isStoredData, toInlineDataPart, } from "@google-labs/breadboard";
const RUN_LISTING_DB = "run-listing";
const RUN_LISTING_VERSION = 1;
export class IDBRunStore {
#writer = null;
#version = idb.openDB(RUN_DB).then((db) => {
const { version } = db;
db.close();
return version;
});
#writers = new Map();
#urlToDbName(url) {
return `run-${url}`;
}
constructor() {
// Remove the deprecated 'runs' database if it exists.
try {
idb.deleteDB("runs");
}
catch (err) {
// Best effort - don't throw if there are any issues.
}
}
/**

@@ -21,43 +29,38 @@ * Starts tracking a run.

* @param storeId The ID of the store to create.
* @param limit The maximum number of old runs to keep around.
* @param releaseGroupIds The IDs of any old stores to be released.
* @returns The store ID used.
*/
async start(storeId, limit = 2) {
if (this.#writer) {
throw new Error("Already writing a stream - please stop it first");
}
// Get the current version, bump it and set it for future starts.
const newVersion = (await this.#version) + 1;
this.#version = Promise.resolve(newVersion);
// Now figure out the new version.
const dbNewVersion = await idb.openDB(RUN_DB, newVersion, {
blocked(currentVersion, blockedVersion, event) {
console.warn(`IDB Store blocked version ${blockedVersion} by version ${currentVersion}`, event);
},
async start(url) {
// 1. Store the URLs that we've seen (necessary to support the truncation
// and drop calls).
const runListing = await idb.openDB(RUN_LISTING_DB, RUN_LISTING_VERSION, {
upgrade(db) {
db.createObjectStore(storeId, {
keyPath: "id",
autoIncrement: true,
});
[...db.objectStoreNames]
.sort((a, b) => {
if (a > b)
return -1;
if (a < b)
return 1;
return 0;
})
.slice(limit)
.map((storeName) => {
// Delete the entries.
db.deleteObjectStore(storeName);
});
db.createObjectStore("urls", { keyPath: "url" });
},
});
dbNewVersion.close();
// Now set up a stream to write to the new version.
await runListing.put("urls", { url });
runListing.close();
// 2. Create a database and object store for this particular run.
const dbName = this.#urlToDbName(url);
const timestamp = Date.now();
const timestampKey = timestamp.toString();
const dbVersion = await idb.openDB(dbName);
const nextVersion = dbVersion.version + 1;
dbVersion.close();
// 3. Set up a stream to write to the new database.
let db;
const stream = new WritableStream({
async start() {
db = await idb.openDB(RUN_DB);
db = await idb.openDB(dbName, nextVersion, {
blocked(currentVersion, blockedVersion, event) {
console.warn(`IDB Store blocked version ${blockedVersion} by version ${currentVersion}`, event);
},
upgrade(db) {
db.createObjectStore(timestampKey, {
keyPath: "id",
autoIncrement: true,
});
},
});
db.close();
},

@@ -71,19 +74,28 @@ async write(chunk) {

for (const output of Object.values(result.data.outputs)) {
if (!isLLMContent(output)) {
if (!isLLMContent(output) && !isLLMContentArray(output)) {
continue;
}
for (let i = 0; i < output.parts.length; i++) {
const part = output.parts[i];
if (!isStoredData(part)) {
const outputs = isLLMContent(output)
? [output]
: output;
for (const output of outputs) {
if (isMetadataEntry(output)) {
continue;
}
output.parts[i] = await toInlineDataPart(part);
for (let i = 0; i < output.parts.length; i++) {
const part = output.parts[i];
if (!isStoredData(part)) {
continue;
}
output.parts[i] = await toInlineDataPart(part);
}
}
}
}
const tx = db.transaction(storeId, "readwrite");
db = await idb.openDB(dbName);
const tx = db.transaction(timestampKey, "readwrite");
await Promise.all([tx.store.add(result), tx.done]);
}
catch (err) {
console.warn("Unable to write to storage", chunk);
console.warn(`Unable to write to storage (URL: ${url}, Timestamp: ${timestampKey})`, chunk);
console.warn(err);

@@ -94,2 +106,5 @@ if (this.abort) {

}
finally {
db.close();
}
},

@@ -103,48 +118,99 @@ abort() {

});
this.#writer = stream.getWriter();
return storeId;
// 4. Store the writer and return the timestamp.
let store = this.#writers.get(url);
if (!store) {
store = new Map();
this.#writers.set(url, store);
}
if (store.has(timestamp)) {
throw new Error("Already writing a stream - please stop it first");
}
store.set(timestamp, stream.getWriter());
return timestamp;
}
async write(result) {
if (!this.#writer) {
async write(url, timestamp, result) {
const store = this.#writers.get(url);
if (!store) {
throw new Error("No active stream - please start one before writing");
}
await this.#writer.ready;
this.#writer.write(result);
const writer = store.get(timestamp);
if (!writer) {
throw new Error("No active stream - please start one before writing");
}
await writer.ready;
writer.write(result);
}
async stop() {
if (!this.#writer) {
async stop(url, timestamp) {
const store = this.#writers.get(url);
if (!store) {
throw new Error("No active stream - please start one before writing");
}
await this.#writer.ready;
this.#writer.close();
this.#writer = null;
const writer = store.get(timestamp);
if (!writer) {
throw new Error("No active stream - please start one before writing");
}
await writer.ready;
writer.close();
store.delete(timestamp);
}
async abort() {
if (!this.#writer) {
async abort(url, timestamp) {
const store = this.#writers.get(url);
if (!store) {
throw new Error("No active stream - please start one before writing");
}
await this.#writer.ready;
this.#writer.abort();
this.#writer = null;
const writer = store.get(timestamp);
if (!writer) {
throw new Error("No active stream - please start one before writing");
}
await writer.ready;
writer.abort();
store.delete(timestamp);
}
async drop() {
await idb.deleteDB(RUN_DB);
async drop(url) {
if (url) {
await idb.deleteDB(this.#urlToDbName(url));
return;
}
const runListing = await idb.openDB(RUN_LISTING_DB);
if (runListing.objectStoreNames.contains("urls")) {
const urls = await runListing.getAll("urls");
for (const item of urls) {
await idb.deleteDB(this.#urlToDbName(item.url));
}
}
runListing.close();
await idb.deleteDB(RUN_LISTING_DB);
}
async getNewestRuns(limit = Number.POSITIVE_INFINITY) {
await this.#version;
const db = await idb.openDB(RUN_DB);
const storeNames = [...db.objectStoreNames]
async truncate(url, limit) {
const dbName = this.#urlToDbName(url);
const db = await idb.openDB(dbName);
const nextVersion = db.version + 1;
const storesToRemove = [...db.objectStoreNames]
.sort((a, b) => {
if (a > b)
return -1;
if (a < b)
return 1;
return 0;
return Number.parseInt(b) - Number.parseInt(a);
})
.slice(0, limit);
const runs = await Promise.all(storeNames.map((storeName) => db.getAll(storeName)));
.slice(limit);
db.close();
return runs;
// Now re-open the database with a new version and use that operation to
// delete the stores that are no longer needed.
const truncateDb = await idb.openDB(this.#urlToDbName(url), nextVersion, {
upgrade(db) {
for (const store of storesToRemove) {
db.deleteObjectStore(store);
}
},
});
truncateDb.close();
}
async getStoredRuns(url) {
const dbName = this.#urlToDbName(url);
const db = await idb.openDB(dbName);
const runs = await Promise.all([...db.objectStoreNames].map(async (timestamp) => {
const events = (await db.getAll(timestamp));
return [Number.parseInt(timestamp), events];
}));
db.close();
return new Map(runs);
}
}
//# sourceMappingURL=idb-run-store.js.map
{
"name": "@breadboard-ai/data-store",
"version": "0.2.0",
"version": "0.2.1",
"description": "A data store implementation to support Breadboard",

@@ -145,5 +145,5 @@ "main": "./dist/src/index.js",

"@web/dev-server-esbuild": "^1.0.2",
"@web/test-runner": "^0.18.2",
"@web/test-runner": "^0.18.3",
"ava": "^5.2.0",
"esbuild": "^0.23.0",
"esbuild": "^0.23.1",
"typescript": "^5.5.4"

@@ -155,5 +155,5 @@ },

"dependencies": {
"@google-labs/breadboard": "^0.24.0",
"@google-labs/breadboard": "^0.25.0",
"idb": "^8.0.0"
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc