feat(slack): stream partial replies via draft message updates

This commit is contained in:
Colin
2026-02-16 15:43:29 -05:00
committed by Peter Steinberger
parent 78c34bcf33
commit bec974aba9
3 changed files with 346 additions and 0 deletions

View File

@@ -0,0 +1,106 @@
import { describe, expect, it, vi } from "vitest";
import { createSlackDraftStream } from "./draft-stream.js";
describe("createSlackDraftStream", () => {
it("sends the first update and edits subsequent updates", async () => {
const send = vi.fn(async () => ({
channelId: "C123",
messageId: "111.222",
}));
const edit = vi.fn(async () => {});
const stream = createSlackDraftStream({
target: "channel:C123",
token: "xoxb-test",
throttleMs: 250,
send,
edit,
});
stream.update("hello");
await stream.flush();
stream.update("hello world");
await stream.flush();
expect(send).toHaveBeenCalledTimes(1);
expect(edit).toHaveBeenCalledTimes(1);
expect(edit).toHaveBeenCalledWith("C123", "111.222", "hello world", {
token: "xoxb-test",
accountId: undefined,
});
});
it("does not send duplicate text", async () => {
const send = vi.fn(async () => ({
channelId: "C123",
messageId: "111.222",
}));
const edit = vi.fn(async () => {});
const stream = createSlackDraftStream({
target: "channel:C123",
token: "xoxb-test",
throttleMs: 250,
send,
edit,
});
stream.update("same");
await stream.flush();
stream.update("same");
await stream.flush();
expect(send).toHaveBeenCalledTimes(1);
expect(edit).toHaveBeenCalledTimes(0);
});
it("supports forceNewMessage for subsequent assistant messages", async () => {
const send = vi
.fn()
.mockResolvedValueOnce({ channelId: "C123", messageId: "111.222" })
.mockResolvedValueOnce({ channelId: "C123", messageId: "333.444" });
const edit = vi.fn(async () => {});
const stream = createSlackDraftStream({
target: "channel:C123",
token: "xoxb-test",
throttleMs: 250,
send,
edit,
});
stream.update("first");
await stream.flush();
stream.forceNewMessage();
stream.update("second");
await stream.flush();
expect(send).toHaveBeenCalledTimes(2);
expect(edit).toHaveBeenCalledTimes(0);
expect(stream.messageId()).toBe("333.444");
});
it("stops when text exceeds max chars", async () => {
const send = vi.fn(async () => ({
channelId: "C123",
messageId: "111.222",
}));
const edit = vi.fn(async () => {});
const warn = vi.fn();
const stream = createSlackDraftStream({
target: "channel:C123",
token: "xoxb-test",
maxChars: 5,
throttleMs: 250,
send,
edit,
warn,
});
stream.update("123456");
await stream.flush();
stream.update("ok");
await stream.flush();
expect(send).not.toHaveBeenCalled();
expect(edit).not.toHaveBeenCalled();
expect(warn).toHaveBeenCalledTimes(1);
});
});

172
src/slack/draft-stream.ts Normal file
View File

@@ -0,0 +1,172 @@
import { editSlackMessage } from "./actions.js";
import { sendMessageSlack } from "./send.js";
const SLACK_STREAM_MAX_CHARS = 4000;
const DEFAULT_THROTTLE_MS = 1000;
export type SlackDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
stop: () => void;
forceNewMessage: () => void;
messageId: () => string | undefined;
channelId: () => string | undefined;
};
export function createSlackDraftStream(params: {
target: string;
token: string;
accountId?: string;
maxChars?: number;
throttleMs?: number;
resolveThreadTs?: () => string | undefined;
onMessageSent?: () => void;
log?: (message: string) => void;
warn?: (message: string) => void;
send?: typeof sendMessageSlack;
edit?: typeof editSlackMessage;
}): SlackDraftStream {
const maxChars = Math.min(params.maxChars ?? SLACK_STREAM_MAX_CHARS, SLACK_STREAM_MAX_CHARS);
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const send = params.send ?? sendMessageSlack;
const edit = params.edit ?? editSlackMessage;
let streamMessageId: string | undefined;
let streamChannelId: string | undefined;
let lastSentText = "";
let lastSentAt = 0;
let pendingText = "";
let inFlightPromise: Promise<void> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
let stopped = false;
const sendOrEditStreamMessage = async (text: string) => {
if (stopped) {
return;
}
const trimmed = text.trimEnd();
if (!trimmed) {
return;
}
if (trimmed.length > maxChars) {
stopped = true;
params.warn?.(`slack stream preview stopped (text length ${trimmed.length} > ${maxChars})`);
return;
}
if (trimmed === lastSentText) {
return;
}
lastSentText = trimmed;
lastSentAt = Date.now();
try {
if (streamChannelId && streamMessageId) {
await edit(streamChannelId, streamMessageId, trimmed, {
token: params.token,
accountId: params.accountId,
});
return;
}
const sent = await send(params.target, trimmed, {
token: params.token,
accountId: params.accountId,
threadTs: params.resolveThreadTs?.(),
});
streamChannelId = sent.channelId || streamChannelId;
streamMessageId = sent.messageId || streamMessageId;
if (!streamChannelId || !streamMessageId) {
stopped = true;
params.warn?.("slack stream preview stopped (missing identifiers from sendMessage)");
return;
}
params.onMessageSent?.();
} catch (err) {
stopped = true;
params.warn?.(
`slack stream preview failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
};
const flush = async () => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
while (!stopped) {
if (inFlightPromise) {
await inFlightPromise;
continue;
}
const text = pendingText;
const trimmed = text.trim();
if (!trimmed) {
pendingText = "";
return;
}
pendingText = "";
const current = sendOrEditStreamMessage(text).finally(() => {
if (inFlightPromise === current) {
inFlightPromise = undefined;
}
});
inFlightPromise = current;
await current;
if (!pendingText) {
return;
}
}
};
const schedule = () => {
if (timer) {
return;
}
const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt));
timer = setTimeout(() => {
void flush();
}, delay);
};
const update = (text: string) => {
if (stopped) {
return;
}
pendingText = text;
if (inFlightPromise) {
schedule();
return;
}
if (!timer && Date.now() - lastSentAt >= throttleMs) {
void flush();
return;
}
schedule();
};
const stop = () => {
stopped = true;
pendingText = "";
if (timer) {
clearTimeout(timer);
timer = undefined;
}
};
const forceNewMessage = () => {
streamMessageId = undefined;
streamChannelId = undefined;
lastSentText = "";
pendingText = "";
};
params.log?.(`slack stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return {
update,
flush,
stop,
forceNewMessage,
messageId: () => streamMessageId,
channelId: () => streamChannelId,
};
}

View File

@@ -10,6 +10,7 @@ import { createTypingCallbacks } from "../../../channels/typing.js";
import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { removeSlackReaction } from "../../actions.js";
import { createSlackDraftStream } from "../../draft-stream.js";
import { resolveSlackThreadTargets } from "../../threading.js";
import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js";
@@ -106,6 +107,36 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
...prefixOptions,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => {
const mediaCount = payload.mediaUrls?.length ?? (payload.mediaUrl ? 1 : 0);
const draftMessageId = draftStream?.messageId();
const draftChannelId = draftStream?.channelId();
const finalText = payload.text;
const canFinalizeViaPreviewEdit =
mediaCount === 0 &&
!payload.isError &&
typeof finalText === "string" &&
finalText.trim().length > 0 &&
typeof draftMessageId === "string" &&
typeof draftChannelId === "string";
if (canFinalizeViaPreviewEdit) {
draftStream?.stop();
try {
await ctx.app.client.chat.update({
channel: draftChannelId,
ts: draftMessageId,
text: finalText.trim(),
});
return;
} catch (err) {
logVerbose(
`slack: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
} else if (mediaCount > 0) {
draftStream?.stop();
}
const replyThreadTs = replyPlan.nextThreadTs();
await deliverReplies({
replies: [payload],
@@ -126,6 +157,26 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
onIdle: typingCallbacks.onIdle,
});
const draftStream = createSlackDraftStream({
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
maxChars: Math.min(ctx.textLimit, 4000),
resolveThreadTs: () => replyPlan.nextThreadTs(),
onMessageSent: () => replyPlan.markSent(),
log: logVerbose,
warn: logVerbose,
});
let hasStreamedMessage = false;
const updateDraftFromPartial = (text?: string) => {
const trimmed = text?.trimEnd();
if (!trimmed) {
return;
}
draftStream.update(trimmed);
hasStreamedMessage = true;
};
const { queuedFinal, counts } = await dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
@@ -139,8 +190,25 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
? !account.config.blockStreaming
: undefined,
onModelSelected,
onPartialReply: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
}
},
onReasoningEnd: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
}
},
},
});
await draftStream.flush();
draftStream.stop();
markDispatchIdle();
const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;