🚀 Socket Launch Week Day 5:Introducing Repository Access Permissions and Custom Roles.Learn more
Sign In

ai

Package Overview
Dependencies
Maintainers
5
Versions
1283
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ai - npm Package Compare versions

Comparing version
7.0.0-beta.177
to
7.0.0-beta.178
+220
src/telemetry/tracing-channel-publisher.ts
import type * as diagnosticsChannelModule from 'node:diagnostics_channel';
import type * as asyncHooksModule from 'node:async_hooks';
import {
AI_SDK_TELEMETRY_TRACING_CHANNEL,
type TelemetryTracingChannelMessage,
} from './tracing-channel';
import { isNodeRuntime } from '../util/is-node-runtime';
type DiagnosticsChannel = typeof diagnosticsChannelModule;
type AsyncHooks = typeof asyncHooksModule;
type AsyncResource = asyncHooksModule.AsyncResource;
export type TracingChannelContext = {
run<T>(execute: () => T): T;
};
let diagnosticsChannelPromise:
| Promise<DiagnosticsChannel | undefined>
| undefined;
/**
* Loads Node's diagnostics channel module only when the current runtime supports
* it. Unsupported runtimes and failed imports intentionally resolve to
* undefined so telemetry tracing never crashes user code.
*/
async function loadDiagnosticsChannel(): Promise<
DiagnosticsChannel | undefined
> {
if (!isNodeRuntime()) {
return undefined;
}
if (diagnosticsChannelPromise == null) {
diagnosticsChannelPromise = (
import(
/* webpackIgnore: true */
'node:diagnostics_channel'
) as Promise<DiagnosticsChannel>
).catch(() => undefined);
}
return diagnosticsChannelPromise;
}
function loadBuiltinModule<T>(id: string): T | undefined {
const processWithBuiltins = globalThis.process as
| {
getBuiltinModule?: (id: string) => unknown;
}
| undefined;
try {
return processWithBuiltins?.getBuiltinModule?.(id) as T | undefined;
} catch {
return undefined;
}
}
/**
* Runs an async operation inside the AI SDK telemetry tracing channel when
* tracing subscribers may exist. Without Node diagnostics-channel support,
* without tracingChannel support, or when the runtime reports no subscribers,
* this is a direct pass-through.
*
* The execution bookkeeping preserves the original model/tool result or error
* if tracing itself throws, and prevents falling back by calling `execute` a
* second time.
*/
export async function runWithTracingChannelSpan<T>(
message: TelemetryTracingChannelMessage,
execute: () => PromiseLike<T>,
): Promise<T> {
const diagnosticsChannel = await loadDiagnosticsChannel();
const tracingChannel = diagnosticsChannel?.tracingChannel?.(
AI_SDK_TELEMETRY_TRACING_CHANNEL,
);
if (tracingChannel == null || tracingChannel.hasSubscribers === false) {
return await execute();
}
let executePromise: Promise<T> | undefined;
let executionResult: T | undefined;
let executionError: unknown;
let hasExecutionResult = false;
let hasExecutionError = false;
const tracedExecute = () => {
try {
executePromise = Promise.resolve(execute());
} catch (error) {
executePromise = Promise.reject(error);
}
executePromise = executePromise.then(
result => {
executionResult = result;
hasExecutionResult = true;
return result;
},
error => {
executionError = error;
hasExecutionError = true;
throw error;
},
);
return executePromise;
};
try {
return await tracingChannel.tracePromise(tracedExecute, message);
} catch {
if (hasExecutionError) {
throw executionError;
}
if (hasExecutionResult) {
return executionResult as T;
}
if (executePromise != null) {
return await executePromise;
}
return await execute();
}
}
/**
* Opens a long-lived tracing-channel span context and returns a runner that can
* re-enter that context later without changing stream setup timing.
*/
export function openTelemetryChannelSpanContext({
message,
completion,
}: {
message: TelemetryTracingChannelMessage;
completion: PromiseLike<unknown>;
}): TracingChannelContext | undefined {
if (!isNodeRuntime()) {
return undefined;
}
const diagnosticsChannel = loadBuiltinModule<DiagnosticsChannel>(
'node:diagnostics_channel',
);
const asyncHooks = loadBuiltinModule<AsyncHooks>('node:async_hooks');
const tracingChannel = diagnosticsChannel?.tracingChannel?.(
AI_SDK_TELEMETRY_TRACING_CHANNEL,
);
if (
tracingChannel == null ||
tracingChannel.hasSubscribers === false ||
asyncHooks == null
) {
Promise.resolve(completion).catch(() => {});
return undefined;
}
const context = message as TelemetryTracingChannelMessage & {
result?: unknown;
error?: unknown;
};
let asyncResource: AsyncResource | undefined;
let asyncEndPublished = false;
const safePublish = (publish: () => void) => {
try {
publish();
} catch {
// Diagnostics subscribers should never affect SDK stream behavior.
}
};
const publishAsyncEnd = ({
result,
error,
}: {
result?: unknown;
error?: unknown;
}) => {
if (asyncEndPublished) {
return;
}
asyncEndPublished = true;
if (error !== undefined) {
context.error = error;
safePublish(() => tracingChannel.error.publish(context));
}
if (result !== undefined) {
context.result = result;
}
safePublish(() => tracingChannel.asyncEnd.publish(context));
};
safePublish(() => {
tracingChannel.start.runStores(context, () => {
asyncResource = new asyncHooks.AsyncResource('ai.telemetry');
});
});
safePublish(() => tracingChannel.end.publish(context));
void Promise.resolve(completion).then(
result => publishAsyncEnd({ result }),
error => publishAsyncEnd({ error }),
);
return {
run: execute =>
asyncResource == null
? execute()
: asyncResource.runInAsyncScope(execute),
};
}
export const AI_SDK_TELEMETRY_TRACING_CHANNEL = 'ai:telemetry';
export type TelemetryTracingEventType =
| 'generateText'
| 'streamText'
| 'step'
| 'languageModelCall'
| 'executeTool'
| 'embed'
| 'rerank';
export type TelemetryTracingChannelMessage<EVENT = unknown> = {
readonly type: TelemetryTracingEventType;
readonly event: EVENT;
};
+2
-2
{
"name": "ai",
"version": "7.0.0-beta.177",
"version": "7.0.0-beta.178",
"type": "module",

@@ -45,3 +45,3 @@ "description": "AI SDK by Vercel - build apps like ChatGPT, Claude, Gemini, and more with a single interface for any model using the Vercel AI Gateway or go direct to OpenAI, Anthropic, Google, or any other model provider.",

"dependencies": {
"@ai-sdk/gateway": "4.0.0-beta.108",
"@ai-sdk/gateway": "4.0.0-beta.109",
"@ai-sdk/provider": "4.0.0-beta.19",

@@ -48,0 +48,0 @@ "@ai-sdk/provider-utils": "5.0.0-beta.49"

@@ -158,96 +158,113 @@ import {

await notify({
event: {
callId,
operationId: 'ai.embed',
provider: model.provider,
modelId: model.modelId,
value,
maxRetries,
headers: headersWithUserAgent,
providerOptions,
},
callbacks: [resolvedOnStart, telemetryDispatcher.onStart],
});
const runInTracingChannelSpan =
telemetryDispatcher.runInTracingChannelSpan ??
(async <T>({ execute }: { execute: () => PromiseLike<T> }) =>
await execute());
try {
const { embedding, usage, warnings, response, providerMetadata } =
await retry(async () => {
const embedCallId = generateCallId();
const startEvent = {
callId,
operationId: 'ai.embed',
provider: model.provider,
modelId: model.modelId,
value,
maxRetries,
headers: headersWithUserAgent,
providerOptions,
};
await notify({
event: {
callId,
embedCallId,
operationId: 'ai.embed.doEmbed',
provider: model.provider,
modelId: model.modelId,
values: [value],
},
callbacks: [telemetryDispatcher.onEmbedStart],
});
return await runInTracingChannelSpan({
type: 'embed',
event: startEvent,
execute: async () => {
await notify({
event: startEvent,
callbacks: [resolvedOnStart, telemetryDispatcher.onStart],
});
const modelResponse = await model.doEmbed({
values: [value],
abortSignal,
headers: headersWithUserAgent,
providerOptions,
try {
const { embedding, usage, warnings, response, providerMetadata } =
await retry(async () => {
const embedCallId = generateCallId();
await notify({
event: {
callId,
embedCallId,
operationId: 'ai.embed.doEmbed',
provider: model.provider,
modelId: model.modelId,
values: [value],
},
callbacks: [telemetryDispatcher.onEmbedStart],
});
const modelResponse = await model.doEmbed({
values: [value],
abortSignal,
headers: headersWithUserAgent,
providerOptions,
});
const embedding = modelResponse.embeddings[0];
const usage = modelResponse.usage ?? { tokens: NaN };
await notify({
event: {
callId,
embedCallId,
operationId: 'ai.embed.doEmbed',
provider: model.provider,
modelId: model.modelId,
values: [value],
embeddings: modelResponse.embeddings,
usage,
},
callbacks: [telemetryDispatcher.onEmbedEnd],
});
return {
embedding,
usage,
warnings: modelResponse.warnings ?? [],
providerMetadata: modelResponse.providerMetadata,
response: modelResponse.response,
};
});
logWarnings({
warnings,
provider: model.provider,
model: model.modelId,
});
const embedding = modelResponse.embeddings[0];
const usage = modelResponse.usage ?? { tokens: NaN };
await notify({
event: {
callId,
embedCallId,
operationId: 'ai.embed.doEmbed',
operationId: 'ai.embed',
provider: model.provider,
modelId: model.modelId,
values: [value],
embeddings: modelResponse.embeddings,
value,
embedding,
usage,
warnings,
providerMetadata,
response,
},
callbacks: [telemetryDispatcher.onEmbedEnd],
callbacks: [resolvedOnEnd, telemetryDispatcher.onEnd],
});
return {
return new DefaultEmbedResult({
value,
embedding,
usage,
warnings: modelResponse.warnings ?? [],
providerMetadata: modelResponse.providerMetadata,
response: modelResponse.response,
};
});
logWarnings({ warnings, provider: model.provider, model: model.modelId });
await notify({
event: {
callId,
operationId: 'ai.embed',
provider: model.provider,
modelId: model.modelId,
value,
embedding,
usage,
warnings,
providerMetadata,
response,
},
callbacks: [resolvedOnEnd, telemetryDispatcher.onEnd],
});
return new DefaultEmbedResult({
value,
embedding,
usage,
warnings,
providerMetadata,
response,
});
} catch (error) {
await telemetryDispatcher.onError?.({ callId, error });
throw error;
}
warnings,
providerMetadata,
response,
});
} catch (error) {
await telemetryDispatcher.onError?.({ callId, error });
throw error;
}
},
});
}

@@ -254,0 +271,0 @@

@@ -15,2 +15,3 @@ import {

} from '../prompt/request-options';
import type { TelemetryDispatcher } from '../telemetry/telemetry';
import { mergeAbortSignals } from '../util/merge-abort-signals';

@@ -55,2 +56,3 @@ import { notify } from '../util/notify';

executeToolInTelemetryContext = async ({ execute }) => await execute(),
runInTracingChannelSpan = async ({ execute }) => await execute(),
}: {

@@ -68,7 +70,12 @@ toolCall: TypedToolCall<TOOLS>;

onToolExecutionEnd?: Arrayable<OnToolExecutionEndCallback<TOOLS>>;
executeToolInTelemetryContext?: <T>(params: {
callId: string;
toolCallId: string;
execute: () => PromiseLike<T>;
}) => PromiseLike<T>;
executeToolInTelemetryContext?: <T>(
params: Partial<ToolExecutionStartEvent<TOOLS>> & {
callId: string;
toolCallId: string;
execute: () => PromiseLike<T>;
},
) => PromiseLike<T>;
runInTracingChannelSpan?: NonNullable<
TelemetryDispatcher['runInTracingChannelSpan']
>;
}): Promise<

@@ -94,4 +101,3 @@ | {

const baseCallbackEvent = {
callId,
const toolExecutionContext = {
toolCall,

@@ -101,114 +107,121 @@ messages,

};
const baseCallbackEvent = {
callId,
...toolExecutionContext,
};
let output: unknown;
return await runInTracingChannelSpan({
type: 'executeTool',
event: baseCallbackEvent,
execute: async () => {
let output: unknown;
await notify({
event: baseCallbackEvent as ToolExecutionStartEvent<TOOLS>,
callbacks: onToolExecutionStart,
});
await notify({
event: baseCallbackEvent as ToolExecutionStartEvent<TOOLS>,
callbacks: onToolExecutionStart,
});
const toolTimeoutMs = getToolTimeoutMs<TOOLS>(timeout, toolName);
const toolAbortSignal = mergeAbortSignals(abortSignal, toolTimeoutMs);
const toolTimeoutMs = getToolTimeoutMs<TOOLS>(timeout, toolName);
const toolAbortSignal = mergeAbortSignals(abortSignal, toolTimeoutMs);
let toolExecutionMs = 0;
try {
// In order to correctly nest telemetry spans within tool calls spans, telemetry integrations need
// to be able to execute the tool call in a telemetry-integration-specific context.
//
// The call id and the tool call id are provided to the telemetry integration so that it can correctly
// identify the parent span.
await executeToolInTelemetryContext({
callId,
toolCallId,
execute: async () => {
const startTime = now();
try {
const stream = executeTool({
tool,
input: input as InferToolInput<typeof tool>,
options: {
toolCallId,
messages,
abortSignal: toolAbortSignal,
context,
experimental_sandbox: sandbox,
},
});
let toolExecutionMs = 0;
try {
// Integration wrappers keep nested AI SDK calls associated with this tool execution.
await executeToolInTelemetryContext({
callId,
toolCallId,
...(toolExecutionContext as Partial<ToolExecutionStartEvent<TOOLS>>),
execute: async () => {
const startTime = now();
try {
const stream = executeTool({
tool,
input: input as InferToolInput<typeof tool>,
options: {
toolCallId,
messages,
abortSignal: toolAbortSignal,
context,
experimental_sandbox: sandbox,
},
});
for await (const part of stream) {
if (part.type === 'preliminary') {
onPreliminaryToolResult?.({
...toolCall,
type: 'tool-result',
output: part.output,
preliminary: true,
});
} else {
output = part.output;
for await (const part of stream) {
if (part.type === 'preliminary') {
onPreliminaryToolResult?.({
...toolCall,
type: 'tool-result',
output: part.output,
preliminary: true,
});
} else {
output = part.output;
}
}
} finally {
toolExecutionMs = now() - startTime;
}
}
} finally {
toolExecutionMs = now() - startTime;
}
},
});
} catch (error) {
const toolError = {
type: 'tool-error',
toolCallId,
toolName,
input,
error,
dynamic: tool.type === 'dynamic',
...(toolCall.providerMetadata != null
? { providerMetadata: toolCall.providerMetadata }
: {}),
...(toolCall.toolMetadata != null
? { toolMetadata: toolCall.toolMetadata }
: {}),
} as TypedToolError<TOOLS>;
},
});
} catch (error) {
const toolError = {
type: 'tool-error',
toolCallId,
toolName,
input,
error,
dynamic: tool.type === 'dynamic',
...(toolCall.providerMetadata != null
? { providerMetadata: toolCall.providerMetadata }
: {}),
...(toolCall.toolMetadata != null
? { toolMetadata: toolCall.toolMetadata }
: {}),
} as TypedToolError<TOOLS>;
await notify({
event: {
...baseCallbackEvent,
toolOutput: toolError,
toolExecutionMs,
} as ToolExecutionEndEvent<TOOLS>,
callbacks: onToolExecutionEnd,
});
await notify({
event: {
...baseCallbackEvent,
toolOutput: toolError,
toolExecutionMs,
} as ToolExecutionEndEvent<TOOLS>,
callbacks: onToolExecutionEnd,
});
return {
output: toolError,
toolExecutionMs,
};
}
return {
output: toolError,
toolExecutionMs,
};
}
const toolResult = {
type: 'tool-result',
toolCallId,
toolName,
input,
output,
dynamic: tool.type === 'dynamic',
...(toolCall.providerMetadata != null
? { providerMetadata: toolCall.providerMetadata }
: {}),
...(toolCall.toolMetadata != null
? { toolMetadata: toolCall.toolMetadata }
: {}),
} as TypedToolResult<TOOLS>;
const toolResult = {
type: 'tool-result',
toolCallId,
toolName,
input,
output,
dynamic: tool.type === 'dynamic',
...(toolCall.providerMetadata != null
? { providerMetadata: toolCall.providerMetadata }
: {}),
...(toolCall.toolMetadata != null
? { toolMetadata: toolCall.toolMetadata }
: {}),
} as TypedToolResult<TOOLS>;
await notify({
event: {
...baseCallbackEvent,
toolOutput: toolResult,
toolExecutionMs,
} as ToolExecutionEndEvent<TOOLS>,
callbacks: onToolExecutionEnd,
await notify({
event: {
...baseCallbackEvent,
toolOutput: toolResult,
toolExecutionMs,
} as ToolExecutionEndEvent<TOOLS>,
callbacks: onToolExecutionEnd,
});
return {
output: toolResult,
toolExecutionMs,
};
},
});
return {
output: toolResult,
toolExecutionMs,
};
}

@@ -11,3 +11,3 @@ import type {

import type { TimeoutConfiguration } from '../prompt/request-options';
import type { Telemetry } from '../telemetry/telemetry';
import type { Telemetry, TelemetryDispatcher } from '../telemetry/telemetry';
import { executeToolCall } from './execute-tool-call';

@@ -53,2 +53,3 @@ import { resolveToolApproval } from './resolve-tool-approval';

executeToolInTelemetryContext,
runInTracingChannelSpan,
}: {

@@ -70,2 +71,5 @@ stream: ReadableStream<LanguageModelStreamPart<TOOLS>>;

executeToolInTelemetryContext?: Telemetry['executeTool'];
runInTracingChannelSpan?: NonNullable<
TelemetryDispatcher['runInTracingChannelSpan']
>;
}): ReadableStream<ExecuteToolsStreamPart<TOOLS>> {

@@ -216,2 +220,3 @@ const toolCallsToExecute: Array<TypedToolCall<TOOLS>> = [];

executeToolInTelemetryContext,
runInTracingChannelSpan,
onPreliminaryToolResult: result => {

@@ -218,0 +223,0 @@ controller.enqueue(result);

@@ -323,12 +323,17 @@ import {

const languageModelCallContext = {
provider: resolvedModel.provider,
modelId: resolvedModel.modelId,
instructions: standardizedPrompt.instructions,
messages: standardizedPrompt.messages,
tools: stepTools,
...callSettings,
};
const languageModelCallStartEvent = {
callId: effectiveCallId,
...languageModelCallContext,
};
await notify({
event: {
callId: effectiveCallId,
provider: resolvedModel.provider,
modelId: resolvedModel.modelId,
instructions: standardizedPrompt.instructions,
messages: standardizedPrompt.messages,
tools: stepTools,
...callSettings,
},
event: languageModelCallStartEvent,
callbacks: onLanguageModelCallStart,

@@ -344,3 +349,3 @@ });

} = await executeLanguageModelCallInTelemetryContext({
callId: effectiveCallId,
...languageModelCallStartEvent,
execute: async () =>

@@ -347,0 +352,0 @@ await resolvedModel.doStream({

@@ -156,2 +156,7 @@ import type { JSONObject, RerankingModelV4CallOptions } from '@ai-sdk/provider';

const runInTracingChannelSpan =
telemetryDispatcher.runInTracingChannelSpan ??
(async <T>({ execute }: { execute: () => PromiseLike<T> }) =>
await execute());
if (documents.length === 0) {

@@ -214,119 +219,127 @@ await notify({

await notify({
event: {
callId,
operationId: 'ai.rerank',
provider: model.provider,
modelId: model.modelId,
documents,
query,
topN,
maxRetries,
headers,
providerOptions,
},
callbacks: [resolvedOnStart, telemetryDispatcher.onStart],
});
const startEvent = {
callId,
operationId: 'ai.rerank',
provider: model.provider,
modelId: model.modelId,
documents,
query,
topN,
maxRetries,
headers,
providerOptions,
};
try {
const { ranking, response, providerMetadata, warnings } = await retry(
async () => {
return await runInTracingChannelSpan({
type: 'rerank',
event: startEvent,
execute: async () => {
await notify({
event: startEvent,
callbacks: [resolvedOnStart, telemetryDispatcher.onStart],
});
try {
const { ranking, response, providerMetadata, warnings } = await retry(
async () => {
await notify({
event: {
callId,
operationId: 'ai.rerank.doRerank',
provider: model.provider,
modelId: model.modelId,
documents,
documentsType: documentsToSend.type,
query,
topN,
},
callbacks: [telemetryDispatcher.onRerankStart],
});
const modelResponse = await model.doRerank({
documents: documentsToSend,
query,
topN,
providerOptions,
abortSignal,
headers,
});
const ranking = modelResponse.ranking;
await notify({
event: {
callId,
operationId: 'ai.rerank.doRerank',
provider: model.provider,
modelId: model.modelId,
documentsType: documentsToSend.type,
ranking,
},
callbacks: [telemetryDispatcher.onRerankEnd],
});
return {
ranking,
providerMetadata: modelResponse.providerMetadata,
response: modelResponse.response,
warnings: modelResponse.warnings,
};
},
);
logWarnings({
warnings: warnings ?? [],
provider: model.provider,
model: model.modelId,
});
await notify({
event: {
callId,
operationId: 'ai.rerank.doRerank',
operationId: 'ai.rerank',
provider: model.provider,
modelId: model.modelId,
documents,
documentsType: documentsToSend.type,
query,
topN,
ranking: ranking.map(r => ({
originalIndex: r.index,
score: r.relevanceScore,
document: documents[r.index],
})),
warnings: warnings ?? [],
providerMetadata,
response: {
id: response?.id,
timestamp: response?.timestamp ?? new Date(),
modelId: response?.modelId ?? model.modelId,
headers: response?.headers,
body: response?.body,
},
},
callbacks: [telemetryDispatcher.onRerankStart],
callbacks: [resolvedOnEnd, telemetryDispatcher.onEnd],
});
const modelResponse = await model.doRerank({
documents: documentsToSend,
query,
topN,
providerOptions,
abortSignal,
headers,
});
const ranking = modelResponse.ranking;
await notify({
event: {
callId,
operationId: 'ai.rerank.doRerank',
provider: model.provider,
modelId: model.modelId,
documentsType: documentsToSend.type,
ranking,
return new DefaultRerankResult({
originalDocuments: documents,
ranking: ranking.map(ranking => ({
originalIndex: ranking.index,
score: ranking.relevanceScore,
document: documents[ranking.index],
})),
providerMetadata,
response: {
id: response?.id,
timestamp: response?.timestamp ?? new Date(),
modelId: response?.modelId ?? model.modelId,
headers: response?.headers,
body: response?.body,
},
callbacks: [telemetryDispatcher.onRerankEnd],
});
return {
ranking,
providerMetadata: modelResponse.providerMetadata,
response: modelResponse.response,
warnings: modelResponse.warnings,
};
},
);
logWarnings({
warnings: warnings ?? [],
provider: model.provider,
model: model.modelId,
});
await notify({
event: {
callId,
operationId: 'ai.rerank',
provider: model.provider,
modelId: model.modelId,
documents,
query,
ranking: ranking.map(r => ({
originalIndex: r.index,
score: r.relevanceScore,
document: documents[r.index],
})),
warnings: warnings ?? [],
providerMetadata,
response: {
id: response?.id,
timestamp: response?.timestamp ?? new Date(),
modelId: response?.modelId ?? model.modelId,
headers: response?.headers,
body: response?.body,
},
},
callbacks: [resolvedOnEnd, telemetryDispatcher.onEnd],
});
return new DefaultRerankResult({
originalDocuments: documents,
ranking: ranking.map(ranking => ({
originalIndex: ranking.index,
score: ranking.relevanceScore,
document: documents[ranking.index],
})),
providerMetadata,
response: {
id: response?.id,
timestamp: response?.timestamp ?? new Date(),
modelId: response?.modelId ?? model.modelId,
headers: response?.headers,
body: response?.body,
},
});
} catch (error) {
await telemetryDispatcher.onError?.({ callId, error });
throw error;
}
} catch (error) {
await telemetryDispatcher.onError?.({ callId, error });
throw error;
}
},
});
}

@@ -333,0 +346,0 @@

@@ -9,4 +9,6 @@ import { asArray } from '@ai-sdk/provider-utils';

} from './telemetry';
import { type TelemetryDiagnosticEventType } from './diagnostic-channel';
import { publishTelemetryDiagnosticChannelMessage } from './diagnostic-channel-publisher';
import {
openTelemetryChannelSpanContext,
runWithTracingChannelSpan,
} from './tracing-channel-publisher';
import { getGlobalTelemetryIntegrations } from './telemetry-registry';

@@ -17,15 +19,18 @@ import type { TelemetryOptions } from './telemetry-options';

* The subset of `TelemetryDispatcher` keys whose values are Callback callbacks.
* This excludes non-Callback properties such as `executeLanguageModelCall` and
* `executeTool`.
* This excludes dispatcher-only helpers and non-Callback properties such as
* `executeLanguageModelCall` and `executeTool`.
*/
type TelemetryCallbackKey = keyof {
[K in keyof TelemetryDispatcher as TelemetryDispatcher[K] extends
| Callback<any>
| undefined
? K
: never]: true;
};
type TelemetryCallbackKey = Exclude<
keyof {
[K in keyof TelemetryDispatcher as TelemetryDispatcher[K] extends
| Callback<any>
| undefined
? K
: never]: true;
},
'runInTracingChannelSpan' | 'startTracingChannelContext'
>;
/**
* Resolves the public event type accepted by a telemetry callback key.
* Resolves the public event type accepted by a telemetry callback key.
*/

@@ -91,25 +96,21 @@ type TelemetryEvent<K extends TelemetryCallbackKey> =

): Callback<TelemetryEvent<KEY>> => {
// event data is now automatically published to the diagnostic channel
const publishDiagnosticChannelMessage = ((event: TelemetryEvent<KEY>) =>
publishTelemetryDiagnosticChannelMessage({
type: key as TelemetryDiagnosticEventType,
event: augmentEvent(event, telemetryMetadata),
})) as Callback<TelemetryEvent<KEY>>;
const integrationCallbacks = (
integrations
.map(integration => integration[key]?.bind(integration))
.filter(Boolean) as Array<
Callback<InferTelemetryEvent<TelemetryEvent<KEY>>>
>
).map(
callback =>
((event: TelemetryEvent<KEY>) =>
callback(augmentEvent(event, telemetryMetadata))) as Callback<
TelemetryEvent<KEY>
>,
);
return mergeCallbacks(
publishDiagnosticChannelMessage,
...(
integrations
.map(integration => integration[key]?.bind(integration))
.filter(Boolean) as Array<
Callback<InferTelemetryEvent<TelemetryEvent<KEY>>>
>
).map(
callback =>
((event: TelemetryEvent<KEY>) =>
callback(augmentEvent(event, telemetryMetadata))) as Callback<
TelemetryEvent<KEY>
>,
),
);
const mergedIntegrationCallback = mergeCallbacks(...integrationCallbacks);
return async (event: TelemetryEvent<KEY>) => {
await mergedIntegrationCallback(event);
};
};

@@ -128,2 +129,20 @@

return {
runInTracingChannelSpan: async ({ type, event, execute }) =>
await runWithTracingChannelSpan(
{
type,
event: augmentEvent(event, telemetryMetadata),
},
execute,
),
startTracingChannelContext: ({ type, event, completion }) =>
openTelemetryChannelSpanContext({
message: {
type,
event: augmentEvent(event, telemetryMetadata),
},
completion,
}),
onStart: mergeTelemetryCallback('onStart'),

@@ -155,15 +174,22 @@ onStepStart: mergeTelemetryCallback('onStepStart'),

executeLanguageModelCall:
executeLanguageModelCallWrappers.length > 0
? async args => {
let execute = args.execute;
for (const executeWrapper of executeLanguageModelCallWrappers) {
const innerExecute = execute;
execute = () =>
executeWrapper({ ...args, execute: innerExecute });
}
return await execute();
}
: undefined,
/**
* Runs provider calls inside integration-specific context so
* auto-instrumented provider requests can be associated with model work.
*/
executeLanguageModelCall: async ({ execute, ...event }) => {
const augmentedEvent = augmentEvent(event, telemetryMetadata);
let wrappedExecute = execute;
for (const executeWrapper of executeLanguageModelCallWrappers) {
const innerExecute = wrappedExecute;
wrappedExecute = () =>
executeWrapper({ ...augmentedEvent, execute: innerExecute });
}
return await runWithTracingChannelSpan(
{ type: 'languageModelCall', event: augmentedEvent },
wrappedExecute,
);
},
/**

@@ -175,15 +201,15 @@ * Composes all `executeTool` wrappers around the original tool execution.

*/
executeTool:
executeToolWrappers.length > 0
? async args => {
let execute = args.execute;
for (const executeWrapper of executeToolWrappers) {
const innerExecute = execute;
execute = () =>
executeWrapper({ ...args, execute: innerExecute });
}
return await execute();
}
: undefined,
executeTool: async ({ execute, ...event }) => {
const augmentedEvent = augmentEvent(event, telemetryMetadata);
let wrappedExecute = execute;
for (const executeWrapper of executeToolWrappers) {
const innerExecute = wrappedExecute;
wrappedExecute = () =>
executeWrapper({ ...augmentedEvent, execute: innerExecute });
}
return await wrappedExecute();
},
};
}

@@ -5,5 +5,5 @@ export type { TelemetryOptions } from './telemetry-options';

export {
AI_SDK_TELEMETRY_DIAGNOSTIC_CHANNEL,
type TelemetryDiagnosticChannelMessage,
type TelemetryDiagnosticEventType,
} from './diagnostic-channel';
AI_SDK_TELEMETRY_TRACING_CHANNEL,
type TelemetryTracingChannelMessage,
type TelemetryTracingEventType,
} from './tracing-channel';

@@ -39,2 +39,4 @@ import type { ToolSet } from '@ai-sdk/provider-utils';

import type { TelemetryOptions } from '../telemetry/telemetry-options';
import type { TelemetryTracingEventType } from './tracing-channel';
import type { TracingChannelContext } from './tracing-channel-publisher';

@@ -60,2 +62,21 @@ export type InferTelemetryEvent<EVENT> = EVENT &

export interface TelemetryDispatcher {
/**
* Runs awaited work inside a diagnostics-channel tracing span.
*/
runInTracingChannelSpan?: <T>(options: {
type: TelemetryTracingEventType;
event: unknown;
execute: () => PromiseLike<T>;
}) => Promise<T>;
/**
* Opens a tracing span context whose completion is observed separately.
* This is used by streamed operations that must preserve stream timing while
* still creating child spans with the correct parent.
*/
startTracingChannelContext?: (options: {
type: TelemetryTracingEventType;
event: unknown;
completion: PromiseLike<unknown>;
}) => TracingChannelContext | undefined;
onStart?: Callback<OperationStartEvent>;

@@ -227,9 +248,12 @@ onStepStart?: Callback<GenerateTextStepStartEvent>;

*
* @param options.callId - The call ID of the generation.
* @param options.execute - The function that performs the model call.
* The options carry the model-call start-event content as context (the event
* fields are optional), alongside the always-present `callId` and the
* `execute` function that performs the model call.
*/
executeLanguageModelCall?: <T>(options: {
callId: string;
execute: () => PromiseLike<T>;
}) => PromiseLike<T>;
executeLanguageModelCall?: <T>(
options: Partial<InferTelemetryEvent<LanguageModelCallStartEvent>> & {
callId: string;
execute: () => PromiseLike<T>;
},
) => PromiseLike<T>;

@@ -241,11 +265,13 @@ /**

*
* @param options.callId - The call ID of the tool call.
* @param options.toolCallId - The tool call ID.
* @param options.execute - The function to execute.
* The options carry the tool-execution start-event content as context (the
* event fields are optional), alongside the always-present `callId`,
* `toolCallId`, and the `execute` function to run.
*/
executeTool?: <T>(options: {
callId: string;
toolCallId: string;
execute: () => PromiseLike<T>;
}) => PromiseLike<T>;
executeTool?: <T>(
options: Partial<InferTelemetryEvent<ToolExecutionStartEvent>> & {
callId: string;
toolCallId: string;
execute: () => PromiseLike<T>;
},
) => PromiseLike<T>;
}
import type * as diagnosticsChannelModule from 'node:diagnostics_channel';
import {
AI_SDK_TELEMETRY_DIAGNOSTIC_CHANNEL,
type TelemetryDiagnosticChannelMessage,
} from './diagnostic-channel';
import { isNodeRuntime } from '../util/is-node-runtime';
type DiagnosticsChannel = typeof diagnosticsChannelModule;
let diagnosticsChannelPromise:
| Promise<DiagnosticsChannel | undefined>
| undefined;
async function loadDiagnosticsChannel(): Promise<
DiagnosticsChannel | undefined
> {
if (!isNodeRuntime()) {
return undefined;
}
if (diagnosticsChannelPromise == null) {
diagnosticsChannelPromise = (
import(
/* webpackIgnore: true */
'node:diagnostics_channel'
) as Promise<DiagnosticsChannel>
).catch(() => undefined);
}
return diagnosticsChannelPromise;
}
export async function publishTelemetryDiagnosticChannelMessage(
message: TelemetryDiagnosticChannelMessage,
): Promise<void> {
const diagnosticsChannel = await loadDiagnosticsChannel();
if (
diagnosticsChannel?.hasSubscribers(AI_SDK_TELEMETRY_DIAGNOSTIC_CHANNEL) !==
true
) {
return;
}
try {
diagnosticsChannel
.channel(AI_SDK_TELEMETRY_DIAGNOSTIC_CHANNEL)
.publish(message);
} catch {}
}
export const AI_SDK_TELEMETRY_DIAGNOSTIC_CHANNEL = 'aisdk:telemetry';
export type TelemetryDiagnosticEventType =
| 'onStart'
| 'onStepStart'
| 'onLanguageModelCallStart'
| 'onLanguageModelCallEnd'
| 'onToolExecutionStart'
| 'onToolExecutionEnd'
| 'onStepEnd'
| 'onStepFinish'
| 'onObjectStepStart'
| 'onObjectStepEnd'
| 'onEmbedStart'
| 'onEmbedEnd'
| 'onRerankStart'
| 'onRerankEnd'
| 'onEnd'
| 'onAbort'
| 'onError';
export type TelemetryDiagnosticChannelMessage<EVENT = unknown> = {
readonly type: TelemetryDiagnosticEventType;
readonly event: EVENT;
};

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display