From 92dbb59b793a589744b835bb1d9eb2e8db1e21bc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 13 Mar 2026 19:26:29 +0000 Subject: [PATCH] refactor: share stream payload patch helper --- .../anthropic-stream-wrappers.ts | 17 +++++----------- .../openai-stream-wrappers.ts | 17 +++++----------- .../stream-payload-utils.ts | 20 +++++++++++++++++++ 3 files changed, 30 insertions(+), 24 deletions(-) create mode 100644 src/agents/pi-embedded-runner/stream-payload-utils.ts diff --git a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts index efed94176..19b5701ea 100644 --- a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts @@ -7,6 +7,7 @@ import { usesOpenAiStringModeAnthropicToolChoice, } from "../provider-capabilities.js"; import { log } from "./logger.js"; +import { streamWithPayloadPatch } from "./stream-payload-utils.js"; const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07"; const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const; @@ -341,18 +342,10 @@ export function createAnthropicFastModeWrapper( return underlying(model, context, options); } - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if (payload && typeof payload === "object") { - const payloadObj = payload as Record; - if (payloadObj.service_tier === undefined) { - payloadObj.service_tier = serviceTier; - } - } - return originalOnPayload?.(payload, model); - }, + return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => { + if (payloadObj.service_tier === undefined) { + payloadObj.service_tier = serviceTier; + } }); }; } diff --git a/src/agents/pi-embedded-runner/openai-stream-wrappers.ts b/src/agents/pi-embedded-runner/openai-stream-wrappers.ts index d0b483e83..8542f329c 100644 --- a/src/agents/pi-embedded-runner/openai-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/openai-stream-wrappers.ts @@ -2,6 +2,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; import type { SimpleStreamOptions } from "@mariozechner/pi-ai"; import { streamSimple } from "@mariozechner/pi-ai"; import { log } from "./logger.js"; +import { streamWithPayloadPatch } from "./stream-payload-utils.js"; type OpenAIServiceTier = "auto" | "default" | "flex" | "priority"; type OpenAIReasoningEffort = "low" | "medium" | "high"; @@ -325,18 +326,10 @@ export function createOpenAIServiceTierWrapper( ) { return underlying(model, context, options); } - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if (payload && typeof payload === "object") { - const payloadObj = payload as Record; - if (payloadObj.service_tier === undefined) { - payloadObj.service_tier = serviceTier; - } - } - return originalOnPayload?.(payload, model); - }, + return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => { + if (payloadObj.service_tier === undefined) { + payloadObj.service_tier = serviceTier; + } }); }; } diff --git a/src/agents/pi-embedded-runner/stream-payload-utils.ts b/src/agents/pi-embedded-runner/stream-payload-utils.ts new file mode 100644 index 000000000..580bf5b13 --- /dev/null +++ b/src/agents/pi-embedded-runner/stream-payload-utils.ts @@ -0,0 +1,20 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; + +export function streamWithPayloadPatch( + underlying: StreamFn, + model: Parameters[0], + context: Parameters[1], + options: Parameters[2], + patchPayload: (payload: Record) => void, +) { + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + if (payload && typeof payload === "object") { + patchPayload(payload as Record); + } + return originalOnPayload?.(payload, model); + }, + }); +}