fix: prevent Telegram preview stream cross-edit race (#23202)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 529abf209d56d9f991a7d308f4ecce78ac992e94
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Ayaan Zaidi
2026-02-22 10:04:33 +05:30
committed by GitHub
parent 413f81b856
commit 63b4c500d9
5 changed files with 346 additions and 60 deletions

View File

@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Telegram/Streaming: preserve archived draft preview mapping after flush and clean superseded reasoning preview bubbles so multi-message preview finals no longer cross-edit or orphan stale messages under send/rotation races. (#23202) Thanks @obviyus.
- Slack/Slash commands: preserve the Bolt app receiver when registering external select options handlers so monitor startup does not crash on runtimes that require bound `app.options` calls. (#23209) Thanks @0xgaia.
- Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester.
- Cron/Gateway: keep `cron.list` and `cron.status` responsive during startup catch-up by avoiding a long-held cron lock while missed jobs execute. (#23106) Thanks @jayleekr.

View File

@@ -137,7 +137,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
}
function createBot(): Bot {
return { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot;
return {
api: {
sendMessage: vi.fn(),
editMessageText: vi.fn(),
deleteMessage: vi.fn().mockResolvedValue(true),
},
} as unknown as Bot;
}
function createRuntime(): Parameters<typeof dispatchTelegramMessage>[0]["runtime"] {
@@ -154,10 +160,12 @@ describe("dispatchTelegramMessage draft streaming", () => {
context: TelegramMessageContext;
telegramCfg?: Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"];
streamMode?: Parameters<typeof dispatchTelegramMessage>[0]["streamMode"];
bot?: Bot;
}) {
const bot = params.bot ?? createBot();
await dispatchTelegramMessage({
context: params.context,
bot: createBot(),
bot,
cfg: {},
runtime: createRuntime(),
replyToMode: "first",
@@ -577,6 +585,141 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliverReplies).not.toHaveBeenCalled();
});
it("maps finals correctly when first preview id resolves after message boundary", async () => {
let answerMessageId: number | undefined;
let answerDraftParams:
| {
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
}
| undefined;
const answerDraftStream = {
update: vi.fn().mockImplementation((text: string) => {
if (text.includes("Message B")) {
answerMessageId = 1002;
}
}),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockImplementation(() => answerMessageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn().mockImplementation(() => {
answerMessageId = undefined;
}),
};
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce((params) => {
answerDraftParams = params as typeof answerDraftParams;
return answerDraftStream;
})
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
// Simulate late resolution of message A preview ID after boundary rotation.
answerDraftParams?.onSupersededPreview?.({
messageId: 1001,
textSnapshot: "Message A partial",
});
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("maps finals correctly when archived preview id arrives during final flush", async () => {
let answerMessageId: number | undefined;
let answerDraftParams:
| {
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
}
| undefined;
let emittedSupersededPreview = false;
const answerDraftStream = {
update: vi.fn().mockImplementation((text: string) => {
if (text.includes("Message B")) {
answerMessageId = 1002;
}
}),
flush: vi.fn().mockImplementation(async () => {
if (!emittedSupersededPreview) {
emittedSupersededPreview = true;
answerDraftParams?.onSupersededPreview?.({
messageId: 1001,
textSnapshot: "Message A partial",
});
}
}),
messageId: vi.fn().mockImplementation(() => answerMessageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn().mockImplementation(() => {
answerMessageId = undefined;
}),
};
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce((params) => {
answerDraftParams = params as typeof answerDraftParams;
return answerDraftStream;
})
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it.each(["block", "partial"] as const)(
"splits reasoning lane only when a later reasoning block starts (%s mode)",
async (streamMode) => {
@@ -604,6 +747,46 @@ describe("dispatchTelegramMessage draft streaming", () => {
},
);
it("cleans superseded reasoning previews after lane rotation", async () => {
let reasoningDraftParams:
| {
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
}
| undefined;
const answerDraftStream = createDraftStream(999);
const reasoningDraftStream = createDraftStream(111);
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce((params) => {
reasoningDraftParams = params as typeof reasoningDraftParams;
return reasoningDraftStream;
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" });
await replyOptions?.onReasoningEnd?.();
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" });
reasoningDraftParams?.onSupersededPreview?.({
messageId: 4444,
textSnapshot: "Reasoning:\n_first block_",
});
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
const bot = createBot();
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
expect(reasoningDraftParams?.onSupersededPreview).toBeTypeOf("function");
const deleteMessageCalls = (
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
).deleteMessage.mock.calls;
expect(deleteMessageCalls).toContainEqual([123, 4444]);
});
it.each(["block", "partial"] as const)(
"does not split reasoning lane on reasoning end without a later reasoning block (%s mode)",
async (streamMode) => {

View File

@@ -155,7 +155,10 @@ export const dispatchTelegramMessage = async ({
lastPartialText: string;
hasStreamedMessage: boolean;
};
const createDraftLane = (enabled: boolean): DraftLaneState => {
type ArchivedPreview = { messageId: number; textSnapshot: string };
const archivedAnswerPreviews: ArchivedPreview[] = [];
const archivedReasoningPreviewIds: number[] = [];
const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => {
const stream = enabled
? createTelegramDraftStream({
api: bot.api,
@@ -165,6 +168,21 @@ export const dispatchTelegramMessage = async ({
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
renderText: renderDraftPreview,
onSupersededPreview:
laneName === "answer" || laneName === "reasoning"
? (preview) => {
if (laneName === "reasoning") {
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
archivedReasoningPreviewIds.push(preview.messageId);
}
return;
}
archivedAnswerPreviews.push({
messageId: preview.messageId,
textSnapshot: preview.textSnapshot,
});
}
: undefined,
log: logVerbose,
warn: logVerbose,
})
@@ -176,15 +194,13 @@ export const dispatchTelegramMessage = async ({
};
};
const lanes: Record<LaneName, DraftLaneState> = {
answer: createDraftLane(canStreamAnswerDraft),
reasoning: createDraftLane(canStreamReasoningDraft),
answer: createDraftLane("answer", canStreamAnswerDraft),
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
};
const answerLane = lanes.answer;
const reasoningLane = lanes.reasoning;
let splitReasoningOnNextStream = false;
const reasoningStepState = createTelegramReasoningStepState();
type ArchivedPreview = { messageId: number; textSnapshot: string };
const archivedAnswerPreviews: ArchivedPreview[] = [];
type SplitLaneSegment = { lane: LaneName; text: string };
const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => {
const split = splitTelegramReasoningText(text);
@@ -434,6 +450,43 @@ export const dispatchTelegramMessage = async ({
return result.delivered;
};
type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
const consumeArchivedAnswerPreviewForFinal = async (params: {
lane: DraftLaneState;
text: string;
payload: ReplyPayload;
previewButtons?: TelegramInlineButtons;
canEditViaPreview: boolean;
}): Promise<LaneDeliveryResult | undefined> => {
const archivedPreview = archivedAnswerPreviews.shift();
if (!archivedPreview) {
return undefined;
}
if (params.canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane: params.lane,
laneName: "answer",
text: params.text,
previewButtons: params.previewButtons,
stopBeforeEdit: false,
skipRegressive: "existingOnly",
context: "final",
previewMessageId: archivedPreview.messageId,
previewTextSnapshot: archivedPreview.textSnapshot,
});
if (finalized) {
return "preview-finalized";
}
}
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {
logVerbose(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
const delivered = await sendPayload(applyTextToPayload(params.payload, params.text));
return delivered ? "sent" : "skipped";
};
const deliverLaneText = async (params: {
laneName: LaneName;
text: string;
@@ -456,38 +509,32 @@ export const dispatchTelegramMessage = async ({
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (infoKind === "final") {
if (laneName === "answer" && archivedAnswerPreviews.length > 0) {
const archivedPreview = archivedAnswerPreviews.shift();
if (archivedPreview) {
if (canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: false,
skipRegressive: "existingOnly",
context: "final",
previewMessageId: archivedPreview.messageId,
previewTextSnapshot: archivedPreview.textSnapshot,
});
if (finalized) {
return "preview-finalized";
}
}
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {
logVerbose(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
const delivered = await sendPayload(applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
if (laneName === "answer") {
const archivedResult = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResult) {
return archivedResult;
}
}
if (canEditViaPreview && !finalizedPreviewByLane[laneName]) {
await flushDraftLane(lane);
if (laneName === "answer") {
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResultAfterFlush) {
return archivedResultAfterFlush;
}
}
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
@@ -735,6 +782,15 @@ export const dispatchTelegramMessage = async ({
);
}
}
for (const messageId of archivedReasoningPreviewIds) {
try {
await bot.api.deleteMessage(chatId, messageId);
} catch (err) {
logVerbose(
`telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`,
);
}
}
}
let sentFallback = false;
if (

View File

@@ -1,3 +1,4 @@
import type { Bot } from "grammy";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createTelegramDraftStream } from "./draft-stream.js";
@@ -18,8 +19,7 @@ function createThreadedDraftStream(
thread: { id: number; scope: "forum" | "dm" },
) {
return createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
thread,
});
@@ -109,8 +109,7 @@ describe("createTelegramDraftStream", () => {
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
});
@@ -146,8 +145,7 @@ describe("createTelegramDraftStream", () => {
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
throttleMs: 1000,
});
@@ -167,11 +165,47 @@ describe("createTelegramDraftStream", () => {
}
});
it("does not rebind to an old message when forceNewMessage races an in-flight send", async () => {
let resolveFirstSend: ((value: { message_id: number }) => void) | undefined;
const firstSend = new Promise<{ message_id: number }>((resolve) => {
resolveFirstSend = resolve;
});
const api = {
sendMessage: vi.fn().mockReturnValueOnce(firstSend).mockResolvedValueOnce({ message_id: 42 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const onSupersededPreview = vi.fn();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
onSupersededPreview,
});
stream.update("Message A partial");
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1));
// Rotate to message B before message A send resolves.
stream.forceNewMessage();
stream.update("Message B partial");
resolveFirstSend?.({ message_id: 17 });
await stream.flush();
expect(onSupersededPreview).toHaveBeenCalledWith({
messageId: 17,
textSnapshot: "Message A partial",
parseMode: undefined,
});
expect(api.sendMessage).toHaveBeenCalledTimes(2);
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Message B partial", undefined);
expect(api.editMessageText).not.toHaveBeenCalledWith(123, 17, "Message B partial");
});
it("supports rendered previews with parse_mode", async () => {
const api = createMockDraftApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
renderText: (text) => ({ text: `<i>${text}</i>`, parseMode: "HTML" }),
});
@@ -191,8 +225,7 @@ describe("createTelegramDraftStream", () => {
const api = createMockDraftApi();
const warn = vi.fn();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
maxChars: 100,
renderText: () => ({ text: `<b>${"<".repeat(120)}</b>`, parseMode: "HTML" }),
@@ -229,8 +262,7 @@ describe("draft stream initial message debounce", () => {
it("sends immediately on stop() even with 1 character", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
@@ -245,8 +277,7 @@ describe("draft stream initial message debounce", () => {
it("sends immediately on stop() with short sentence", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
@@ -263,8 +294,7 @@ describe("draft stream initial message debounce", () => {
it("does not send first message below threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
@@ -278,8 +308,7 @@ describe("draft stream initial message debounce", () => {
it("sends first message when reaching threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
@@ -294,8 +323,7 @@ describe("draft stream initial message debounce", () => {
it("works with longer text above threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
@@ -311,8 +339,7 @@ describe("draft stream initial message debounce", () => {
it("edits normally after first message is sent", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
@@ -335,8 +362,7 @@ describe("draft stream initial message debounce", () => {
it("sends immediately without minInitialChars set (backward compatible)", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
api: api as unknown as Bot["api"],
chatId: 123,
// no minInitialChars (backward-compatible behavior)
});

View File

@@ -20,6 +20,12 @@ type TelegramDraftPreview = {
parseMode?: "HTML";
};
type SupersededTelegramPreview = {
messageId: number;
textSnapshot: string;
parseMode?: "HTML";
};
export function createTelegramDraftStream(params: {
api: Bot["api"];
chatId: number;
@@ -31,6 +37,8 @@ export function createTelegramDraftStream(params: {
minInitialChars?: number;
/** Optional preview renderer (e.g. markdown -> HTML + parse mode). */
renderText?: (text: string) => TelegramDraftPreview;
/** Called when a late send resolves after forceNewMessage() switched generations. */
onSupersededPreview?: (preview: SupersededTelegramPreview) => void;
log?: (message: string) => void;
warn?: (message: string) => void;
}): TelegramDraftStream {
@@ -52,6 +60,7 @@ export function createTelegramDraftStream(params: {
let lastSentParseMode: "HTML" | undefined;
let stopped = false;
let isFinal = false;
let generation = 0;
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
// Allow final flush even if stopped (e.g., after clear()).
@@ -80,6 +89,7 @@ export function createTelegramDraftStream(params: {
if (renderedText === lastSentText && renderedParseMode === lastSentParseMode) {
return true;
}
const sendGeneration = generation;
// Debounce first preview send for better push notification quality.
if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) {
@@ -114,7 +124,16 @@ export function createTelegramDraftStream(params: {
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
return false;
}
streamMessageId = Math.trunc(sentMessageId);
const normalizedMessageId = Math.trunc(sentMessageId);
if (sendGeneration !== generation) {
params.onSupersededPreview?.({
messageId: normalizedMessageId,
textSnapshot: renderedText,
parseMode: renderedParseMode,
});
return true;
}
streamMessageId = normalizedMessageId;
return true;
} catch (err) {
stopped = true;
@@ -163,6 +182,7 @@ export function createTelegramDraftStream(params: {
};
const forceNewMessage = () => {
generation += 1;
streamMessageId = undefined;
lastSentText = "";
lastSentParseMode = undefined;