langsmith
Advanced tools
Comparing version 0.0.70 to 0.0.71-rc.0
@@ -69,2 +69,8 @@ import { AsyncCallerParams } from "./utils/async_caller.js"; | ||
}; | ||
export declare class Queue<T> { | ||
items: [T, () => void][]; | ||
get size(): number; | ||
push(item: T): Promise<void>; | ||
pop(upToN: number): [T[], () => void]; | ||
} | ||
export declare class Client { | ||
@@ -83,3 +89,3 @@ private apiKey?; | ||
private batchEndpointSupported?; | ||
private pendingAutoBatchedRuns; | ||
private autoBatchQueue; | ||
private pendingAutoBatchedRunLimit; | ||
@@ -108,4 +114,4 @@ private autoBatchTimeout; | ||
private _filterForSampling; | ||
private triggerAutoBatchSend; | ||
private appendRunCreateToAutoBatchQueue; | ||
private drainAutoBatchQueue; | ||
private processRunOperation; | ||
protected batchEndpointIsSupported(): Promise<boolean>; | ||
@@ -112,0 +118,0 @@ createRun(run: CreateRunParams): Promise<void>; |
@@ -75,2 +75,38 @@ import * as uuid from "uuid"; | ||
} | ||
export class Queue { | ||
constructor() { | ||
Object.defineProperty(this, "items", { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: [] | ||
}); | ||
} | ||
get size() { | ||
return this.items.length; | ||
} | ||
push(item) { | ||
// this.items.push is synchronous with promise creation: | ||
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise | ||
return new Promise((resolve) => { | ||
this.items.push([item, resolve]); | ||
}); | ||
} | ||
pop(upToN) { | ||
if (upToN < 1) { | ||
throw new Error("Number of items to pop off may not be less than 1."); | ||
} | ||
const popped = []; | ||
while (popped.length < upToN && this.items.length) { | ||
const item = this.items.shift(); | ||
if (item) { | ||
popped.push(item); | ||
} | ||
else { | ||
break; | ||
} | ||
} | ||
return [popped.map((it) => it[0]), () => popped.forEach((it) => it[1]())]; | ||
} | ||
} | ||
export class Client { | ||
@@ -150,7 +186,7 @@ constructor(config = {}) { | ||
}); | ||
Object.defineProperty(this, "pendingAutoBatchedRuns", { | ||
Object.defineProperty(this, "autoBatchQueue", { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: [] | ||
value: new Queue() | ||
}); | ||
@@ -363,41 +399,42 @@ Object.defineProperty(this, "pendingAutoBatchedRunLimit", { | ||
} | ||
async triggerAutoBatchSend(runs) { | ||
let batch = runs; | ||
if (batch === undefined) { | ||
batch = this.pendingAutoBatchedRuns.slice(0, this.pendingAutoBatchedRunLimit); | ||
this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice(this.pendingAutoBatchedRunLimit); | ||
async drainAutoBatchQueue() { | ||
while (this.autoBatchQueue.size >= 0) { | ||
const [batch, done] = this.autoBatchQueue.pop(this.pendingAutoBatchedRunLimit); | ||
if (!batch.length) { | ||
done(); | ||
return; | ||
} | ||
try { | ||
await this.batchIngestRuns({ | ||
runCreates: batch | ||
.filter((item) => item.action === "create") | ||
.map((item) => item.item), | ||
runUpdates: batch | ||
.filter((item) => item.action === "update") | ||
.map((item) => item.item), | ||
}); | ||
} | ||
finally { | ||
done(); | ||
} | ||
} | ||
await this.batchIngestRuns({ | ||
runCreates: batch | ||
.filter((item) => item.action === "create") | ||
.map((item) => item.item), | ||
runUpdates: batch | ||
.filter((item) => item.action === "update") | ||
.map((item) => item.item), | ||
}); | ||
} | ||
appendRunCreateToAutoBatchQueue(item) { | ||
async processRunOperation(item, immediatelyTriggerBatch) { | ||
const oldTimeout = this.autoBatchTimeout; | ||
clearTimeout(this.autoBatchTimeout); | ||
this.autoBatchTimeout = undefined; | ||
this.pendingAutoBatchedRuns.push(item); | ||
while (this.pendingAutoBatchedRuns.length >= this.pendingAutoBatchedRunLimit) { | ||
const batch = this.pendingAutoBatchedRuns.slice(0, this.pendingAutoBatchedRunLimit); | ||
this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice(this.pendingAutoBatchedRunLimit); | ||
void this.triggerAutoBatchSend(batch); | ||
const itemPromise = this.autoBatchQueue.push(item); | ||
if (immediatelyTriggerBatch || | ||
this.autoBatchQueue.size > this.pendingAutoBatchedRunLimit) { | ||
await this.drainAutoBatchQueue(); | ||
} | ||
if (this.pendingAutoBatchedRuns.length > 0) { | ||
if (!oldTimeout) { | ||
this.autoBatchTimeout = setTimeout(() => { | ||
this.autoBatchTimeout = undefined; | ||
void this.triggerAutoBatchSend(); | ||
}, this.autoBatchInitialDelayMs); | ||
} | ||
else { | ||
this.autoBatchTimeout = setTimeout(() => { | ||
this.autoBatchTimeout = undefined; | ||
void this.triggerAutoBatchSend(); | ||
}, this.autoBatchAggregationDelayMs); | ||
} | ||
if (this.autoBatchQueue.size > 0) { | ||
this.autoBatchTimeout = setTimeout(() => { | ||
this.autoBatchTimeout = undefined; | ||
void this.drainAutoBatchQueue(); | ||
}, oldTimeout | ||
? this.autoBatchAggregationDelayMs | ||
: this.autoBatchInitialDelayMs); | ||
} | ||
return itemPromise; | ||
} | ||
@@ -433,3 +470,3 @@ async batchEndpointIsSupported() { | ||
runCreate.dotted_order !== undefined) { | ||
this.appendRunCreateToAutoBatchQueue({ | ||
void this.processRunOperation({ | ||
action: "create", | ||
@@ -536,3 +573,11 @@ item: runCreate, | ||
data.dotted_order !== undefined) { | ||
this.appendRunCreateToAutoBatchQueue({ action: "update", item: data }); | ||
if (run.end_time !== undefined && data.parent_run_id === undefined) { | ||
// Trigger a batch as soon as a root trace ends and block to ensure trace finishes | ||
// in serverless environments. | ||
await this.processRunOperation({ action: "update", item: data }, true); | ||
return; | ||
} | ||
else { | ||
void this.processRunOperation({ action: "update", item: data }); | ||
} | ||
return; | ||
@@ -539,0 +584,0 @@ } |
{ | ||
"name": "langsmith", | ||
"version": "0.0.70", | ||
"version": "0.0.71-rc.0", | ||
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.", | ||
@@ -5,0 +5,0 @@ "packageManager": "yarn@1.22.19", |
Sorry, the diff of this file is not supported yet
228296
5675