singularity-forge/packages/pi-ai/src/providers/openai-responses.ts
2026-05-05 14:31:16 +02:00

267 lines
7 KiB
TypeScript

// Lazy-loaded: OpenAI SDK is imported on first use, not at startup.
// This avoids penalizing users who don't use OpenAI models.
import type { ResponseCreateParamsStreaming } from "openai/resources/responses/responses.js";
import { getEnvApiKey } from "../env-api-keys.js";
import { supportsXhigh } from "../models.js";
import type {
CacheRetention,
Context,
Model,
SimpleStreamOptions,
StreamFunction,
StreamOptions,
Usage,
} from "../types.js";
import { AssistantMessageEventStream } from "../utils/event-stream.js";
import {
convertResponsesMessages,
convertResponsesTools,
processResponsesStream,
} from "./openai-responses-shared.js";
import {
assertStreamSuccess,
buildInitialOutput,
clampReasoningForModel,
createOpenAIClient,
finalizeStream,
handleStreamError,
} from "./openai-shared.js";
import {
buildBaseOptions,
clampReasoning,
isAutoReasoning,
resolveReasoningLevel,
} from "./simple-options.js";
const OPENAI_TOOL_CALL_PROVIDERS = new Set([
"openai",
"openai-codex",
"opencode",
]);
/**
* Resolve cache retention preference.
* Defaults to "short" and uses PI_CACHE_RETENTION for backward compatibility.
*/
function resolveCacheRetention(
cacheRetention?: CacheRetention,
): CacheRetention {
if (cacheRetention) {
return cacheRetention;
}
if (
typeof process !== "undefined" &&
process.env.PI_CACHE_RETENTION === "long"
) {
return "long";
}
return "short";
}
/**
* Get prompt cache retention based on cacheRetention and base URL.
* Only applies to direct OpenAI API calls (api.openai.com).
*/
function getPromptCacheRetention(
baseUrl: string,
cacheRetention: CacheRetention,
): "24h" | undefined {
if (cacheRetention !== "long") {
return undefined;
}
if (baseUrl.includes("api.openai.com")) {
return "24h";
}
return undefined;
}
// OpenAI Responses-specific options
export interface OpenAIResponsesOptions extends StreamOptions {
/** "auto" means no effort constraint — model decides its own reasoning depth (GPT-5+). */
reasoningEffort?: "auto" | "minimal" | "low" | "medium" | "high" | "xhigh";
reasoningSummary?: "auto" | "detailed" | "concise" | null;
serviceTier?: ResponseCreateParamsStreaming["service_tier"];
}
/**
* Generate function for OpenAI Responses API
*/
export const streamOpenAIResponses: StreamFunction<
"openai-responses",
OpenAIResponsesOptions
> = (
model: Model<"openai-responses">,
context: Context,
options?: OpenAIResponsesOptions,
): AssistantMessageEventStream => {
const stream = new AssistantMessageEventStream();
// Start async processing
(async () => {
const output = buildInitialOutput(model);
try {
// Create OpenAI client
const apiKey = options?.apiKey || getEnvApiKey(model.provider) || "";
const client = await createOpenAIClient(model, context, apiKey, {
optionsHeaders: options?.headers,
});
let params = buildParams(model, context, options);
const nextParams = await options?.onPayload?.(params, model);
if (nextParams !== undefined) {
params = nextParams as ResponseCreateParamsStreaming;
}
const openaiStream = await client.responses.create(
params,
options?.signal ? { signal: options.signal } : undefined,
);
stream.push({ type: "start", partial: output });
await processResponsesStream(openaiStream, output, stream, model, {
serviceTier: options?.serviceTier,
applyServiceTierPricing,
});
assertStreamSuccess(output, options?.signal);
finalizeStream(stream, output);
} catch (error) {
handleStreamError(stream, output, error, options?.signal);
}
})();
return stream;
};
export const streamSimpleOpenAIResponses: StreamFunction<
"openai-responses",
SimpleStreamOptions
> = (
model: Model<"openai-responses">,
context: Context,
options?: SimpleStreamOptions,
): AssistantMessageEventStream => {
const apiKey = options?.apiKey || getEnvApiKey(model.provider);
if (!apiKey) {
throw new Error(`No API key for provider: ${model.provider}`);
}
const base = buildBaseOptions(model, options, apiKey);
const reasoningEffort: OpenAIResponsesOptions["reasoningEffort"] =
isAutoReasoning(options?.reasoning)
? "auto"
: supportsXhigh(model)
? resolveReasoningLevel(model, options?.reasoning)
: clampReasoning(resolveReasoningLevel(model, options?.reasoning));
return streamOpenAIResponses(model, context, {
...base,
reasoningEffort,
} satisfies OpenAIResponsesOptions);
};
function buildParams(
model: Model<"openai-responses">,
context: Context,
options?: OpenAIResponsesOptions,
) {
const messages = convertResponsesMessages(
model,
context,
OPENAI_TOOL_CALL_PROVIDERS,
);
const cacheRetention = resolveCacheRetention(options?.cacheRetention);
const params: ResponseCreateParamsStreaming = {
model: model.id,
input: messages,
stream: true,
prompt_cache_key:
cacheRetention === "none" ? undefined : options?.sessionId,
prompt_cache_retention: getPromptCacheRetention(
model.baseUrl,
cacheRetention,
),
store: false,
};
if (options?.maxTokens) {
params.max_output_tokens = options?.maxTokens;
}
if (options?.temperature !== undefined) {
params.temperature = options?.temperature;
}
if (options?.serviceTier !== undefined) {
params.service_tier = options.serviceTier;
}
if (context.tools && context.tools.length > 0) {
params.tools = convertResponsesTools(context.tools);
}
if (model.reasoning) {
params.include = ["reasoning.encrypted_content"];
if (options?.reasoningEffort === "auto") {
// Let the model decide its own reasoning depth — no effort constraint.
// GPT-5+ will reason as much as it judges necessary, same as
// THINKING_LEVEL_UNSPECIFIED for Gemini 2.5.
params.reasoning = { summary: options?.reasoningSummary || "auto" };
} else if (options?.reasoningEffort || options?.reasoningSummary) {
const effort = clampReasoningForModel(
model.name,
options?.reasoningEffort || "medium",
) as typeof options.reasoningEffort;
params.reasoning = {
effort: effort || "medium",
summary: options?.reasoningSummary || "auto",
};
} else {
if (model.name.startsWith("gpt-5")) {
// Jesus Christ, see https://community.openai.com/t/need-reasoning-false-option-for-gpt-5/1351588/7
messages.push({
role: "developer",
content: [
{
type: "input_text",
text: "# Juice: 0 !important",
},
],
});
}
}
}
return params;
}
function getServiceTierCostMultiplier(
serviceTier: ResponseCreateParamsStreaming["service_tier"] | undefined,
): number {
switch (serviceTier) {
case "flex":
return 0.5;
case "priority":
return 2;
default:
return 1;
}
}
function applyServiceTierPricing(
usage: Usage,
serviceTier: ResponseCreateParamsStreaming["service_tier"] | undefined,
) {
const multiplier = getServiceTierCostMultiplier(serviceTier);
if (multiplier === 1) return;
usage.cost.input *= multiplier;
usage.cost.output *= multiplier;
usage.cost.cacheRead *= multiplier;
usage.cost.cacheWrite *= multiplier;
usage.cost.total =
usage.cost.input +
usage.cost.output +
usage.cost.cacheRead +
usage.cost.cacheWrite;
}