From bec974aba9ac95612c8df41dd7b2cf43a981b961 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 16 Feb 2026 15:43:29 -0500 Subject: [PATCH] feat(slack): stream partial replies via draft message updates --- src/slack/draft-stream.test.ts | 106 +++++++++++ src/slack/draft-stream.ts | 172 ++++++++++++++++++ src/slack/monitor/message-handler/dispatch.ts | 68 +++++++ 3 files changed, 346 insertions(+) create mode 100644 src/slack/draft-stream.test.ts create mode 100644 src/slack/draft-stream.ts diff --git a/src/slack/draft-stream.test.ts b/src/slack/draft-stream.test.ts new file mode 100644 index 000000000..bcb1488ec --- /dev/null +++ b/src/slack/draft-stream.test.ts @@ -0,0 +1,106 @@ +import { describe, expect, it, vi } from "vitest"; +import { createSlackDraftStream } from "./draft-stream.js"; + +describe("createSlackDraftStream", () => { + it("sends the first update and edits subsequent updates", async () => { + const send = vi.fn(async () => ({ + channelId: "C123", + messageId: "111.222", + })); + const edit = vi.fn(async () => {}); + const stream = createSlackDraftStream({ + target: "channel:C123", + token: "xoxb-test", + throttleMs: 250, + send, + edit, + }); + + stream.update("hello"); + await stream.flush(); + stream.update("hello world"); + await stream.flush(); + + expect(send).toHaveBeenCalledTimes(1); + expect(edit).toHaveBeenCalledTimes(1); + expect(edit).toHaveBeenCalledWith("C123", "111.222", "hello world", { + token: "xoxb-test", + accountId: undefined, + }); + }); + + it("does not send duplicate text", async () => { + const send = vi.fn(async () => ({ + channelId: "C123", + messageId: "111.222", + })); + const edit = vi.fn(async () => {}); + const stream = createSlackDraftStream({ + target: "channel:C123", + token: "xoxb-test", + throttleMs: 250, + send, + edit, + }); + + stream.update("same"); + await stream.flush(); + stream.update("same"); + await stream.flush(); + + expect(send).toHaveBeenCalledTimes(1); + expect(edit).toHaveBeenCalledTimes(0); + }); + + it("supports forceNewMessage for subsequent assistant messages", async () => { + const send = vi + .fn() + .mockResolvedValueOnce({ channelId: "C123", messageId: "111.222" }) + .mockResolvedValueOnce({ channelId: "C123", messageId: "333.444" }); + const edit = vi.fn(async () => {}); + const stream = createSlackDraftStream({ + target: "channel:C123", + token: "xoxb-test", + throttleMs: 250, + send, + edit, + }); + + stream.update("first"); + await stream.flush(); + stream.forceNewMessage(); + stream.update("second"); + await stream.flush(); + + expect(send).toHaveBeenCalledTimes(2); + expect(edit).toHaveBeenCalledTimes(0); + expect(stream.messageId()).toBe("333.444"); + }); + + it("stops when text exceeds max chars", async () => { + const send = vi.fn(async () => ({ + channelId: "C123", + messageId: "111.222", + })); + const edit = vi.fn(async () => {}); + const warn = vi.fn(); + const stream = createSlackDraftStream({ + target: "channel:C123", + token: "xoxb-test", + maxChars: 5, + throttleMs: 250, + send, + edit, + warn, + }); + + stream.update("123456"); + await stream.flush(); + stream.update("ok"); + await stream.flush(); + + expect(send).not.toHaveBeenCalled(); + expect(edit).not.toHaveBeenCalled(); + expect(warn).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/slack/draft-stream.ts b/src/slack/draft-stream.ts new file mode 100644 index 000000000..3e918c2e6 --- /dev/null +++ b/src/slack/draft-stream.ts @@ -0,0 +1,172 @@ +import { editSlackMessage } from "./actions.js"; +import { sendMessageSlack } from "./send.js"; + +const SLACK_STREAM_MAX_CHARS = 4000; +const DEFAULT_THROTTLE_MS = 1000; + +export type SlackDraftStream = { + update: (text: string) => void; + flush: () => Promise; + stop: () => void; + forceNewMessage: () => void; + messageId: () => string | undefined; + channelId: () => string | undefined; +}; + +export function createSlackDraftStream(params: { + target: string; + token: string; + accountId?: string; + maxChars?: number; + throttleMs?: number; + resolveThreadTs?: () => string | undefined; + onMessageSent?: () => void; + log?: (message: string) => void; + warn?: (message: string) => void; + send?: typeof sendMessageSlack; + edit?: typeof editSlackMessage; +}): SlackDraftStream { + const maxChars = Math.min(params.maxChars ?? SLACK_STREAM_MAX_CHARS, SLACK_STREAM_MAX_CHARS); + const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); + const send = params.send ?? sendMessageSlack; + const edit = params.edit ?? editSlackMessage; + + let streamMessageId: string | undefined; + let streamChannelId: string | undefined; + let lastSentText = ""; + let lastSentAt = 0; + let pendingText = ""; + let inFlightPromise: Promise | undefined; + let timer: ReturnType | undefined; + let stopped = false; + + const sendOrEditStreamMessage = async (text: string) => { + if (stopped) { + return; + } + const trimmed = text.trimEnd(); + if (!trimmed) { + return; + } + if (trimmed.length > maxChars) { + stopped = true; + params.warn?.(`slack stream preview stopped (text length ${trimmed.length} > ${maxChars})`); + return; + } + if (trimmed === lastSentText) { + return; + } + lastSentText = trimmed; + lastSentAt = Date.now(); + try { + if (streamChannelId && streamMessageId) { + await edit(streamChannelId, streamMessageId, trimmed, { + token: params.token, + accountId: params.accountId, + }); + return; + } + const sent = await send(params.target, trimmed, { + token: params.token, + accountId: params.accountId, + threadTs: params.resolveThreadTs?.(), + }); + streamChannelId = sent.channelId || streamChannelId; + streamMessageId = sent.messageId || streamMessageId; + if (!streamChannelId || !streamMessageId) { + stopped = true; + params.warn?.("slack stream preview stopped (missing identifiers from sendMessage)"); + return; + } + params.onMessageSent?.(); + } catch (err) { + stopped = true; + params.warn?.( + `slack stream preview failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + }; + + const flush = async () => { + if (timer) { + clearTimeout(timer); + timer = undefined; + } + while (!stopped) { + if (inFlightPromise) { + await inFlightPromise; + continue; + } + const text = pendingText; + const trimmed = text.trim(); + if (!trimmed) { + pendingText = ""; + return; + } + pendingText = ""; + const current = sendOrEditStreamMessage(text).finally(() => { + if (inFlightPromise === current) { + inFlightPromise = undefined; + } + }); + inFlightPromise = current; + await current; + if (!pendingText) { + return; + } + } + }; + + const schedule = () => { + if (timer) { + return; + } + const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt)); + timer = setTimeout(() => { + void flush(); + }, delay); + }; + + const update = (text: string) => { + if (stopped) { + return; + } + pendingText = text; + if (inFlightPromise) { + schedule(); + return; + } + if (!timer && Date.now() - lastSentAt >= throttleMs) { + void flush(); + return; + } + schedule(); + }; + + const stop = () => { + stopped = true; + pendingText = ""; + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }; + + const forceNewMessage = () => { + streamMessageId = undefined; + streamChannelId = undefined; + lastSentText = ""; + pendingText = ""; + }; + + params.log?.(`slack stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); + + return { + update, + flush, + stop, + forceNewMessage, + messageId: () => streamMessageId, + channelId: () => streamChannelId, + }; +} diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 8a988ca35..c6fe0643e 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -10,6 +10,7 @@ import { createTypingCallbacks } from "../../../channels/typing.js"; import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { removeSlackReaction } from "../../actions.js"; +import { createSlackDraftStream } from "../../draft-stream.js"; import { resolveSlackThreadTargets } from "../../threading.js"; import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js"; @@ -106,6 +107,36 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload) => { + const mediaCount = payload.mediaUrls?.length ?? (payload.mediaUrl ? 1 : 0); + const draftMessageId = draftStream?.messageId(); + const draftChannelId = draftStream?.channelId(); + const finalText = payload.text; + const canFinalizeViaPreviewEdit = + mediaCount === 0 && + !payload.isError && + typeof finalText === "string" && + finalText.trim().length > 0 && + typeof draftMessageId === "string" && + typeof draftChannelId === "string"; + + if (canFinalizeViaPreviewEdit) { + draftStream?.stop(); + try { + await ctx.app.client.chat.update({ + channel: draftChannelId, + ts: draftMessageId, + text: finalText.trim(), + }); + return; + } catch (err) { + logVerbose( + `slack: preview final edit failed; falling back to standard send (${String(err)})`, + ); + } + } else if (mediaCount > 0) { + draftStream?.stop(); + } + const replyThreadTs = replyPlan.nextThreadTs(); await deliverReplies({ replies: [payload], @@ -126,6 +157,26 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag onIdle: typingCallbacks.onIdle, }); + const draftStream = createSlackDraftStream({ + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + maxChars: Math.min(ctx.textLimit, 4000), + resolveThreadTs: () => replyPlan.nextThreadTs(), + onMessageSent: () => replyPlan.markSent(), + log: logVerbose, + warn: logVerbose, + }); + let hasStreamedMessage = false; + const updateDraftFromPartial = (text?: string) => { + const trimmed = text?.trimEnd(); + if (!trimmed) { + return; + } + draftStream.update(trimmed); + hasStreamedMessage = true; + }; + const { queuedFinal, counts } = await dispatchInboundMessage({ ctx: prepared.ctxPayload, cfg, @@ -139,8 +190,25 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ? !account.config.blockStreaming : undefined, onModelSelected, + onPartialReply: async (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + } + }, + onReasoningEnd: async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + } + }, }, }); + await draftStream.flush(); + draftStream.stop(); markDispatchIdle(); const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;