diff --git a/CHANGELOG.md b/CHANGELOG.md index 24c0f6187..ed1d616a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai - Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils. - Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow. - Discord/channel resolution: default bare numeric recipients to channels, harden allowlist numeric ID handling with safe fallbacks, and avoid inbound WS heartbeat stalls. (#33142) Thanks @thewilloftheshadow. +- Discord/chunk delivery reliability: preserve chunk ordering when using a REST client and retry chunk sends on 429/5xx using account retry settings. (#33226) Thanks @thewilloftheshadow. - Exec heartbeat routing: scope exec-triggered heartbeat wakes to agent session keys so unrelated agents are no longer awakened by exec events, while preserving legacy unscoped behavior for non-canonical session keys. (#32724) thanks @altaywtf - macOS/Tailscale remote gateway discovery: add a Tailscale Serve fallback peer probe path (`wss://.ts.net`) when Bonjour and wide-area DNS-SD discovery return no gateways, and refresh both discovery paths from macOS onboarding. (#32860) Thanks @ngutman. - iOS/Gateway keychain hardening: move gateway metadata and TLS fingerprints to device keychain storage with safer migration behavior and rollback-safe writes to reduce credential loss risk during upgrades. (#33029) thanks @mbelinky. diff --git a/src/discord/monitor/reply-delivery.test.ts b/src/discord/monitor/reply-delivery.test.ts index d15a9e01c..ff8e94db2 100644 --- a/src/discord/monitor/reply-delivery.test.ts +++ b/src/discord/monitor/reply-delivery.test.ts @@ -9,6 +9,7 @@ import { const sendMessageDiscordMock = vi.hoisted(() => vi.fn()); const sendVoiceMessageDiscordMock = vi.hoisted(() => vi.fn()); const sendWebhookMessageDiscordMock = vi.hoisted(() => vi.fn()); +const sendDiscordTextMock = vi.hoisted(() => vi.fn()); vi.mock("../send.js", () => ({ sendMessageDiscord: (...args: unknown[]) => sendMessageDiscordMock(...args), @@ -16,6 +17,10 @@ vi.mock("../send.js", () => ({ sendWebhookMessageDiscord: (...args: unknown[]) => sendWebhookMessageDiscordMock(...args), })); +vi.mock("../send.shared.js", () => ({ + sendDiscordText: (...args: unknown[]) => sendDiscordTextMock(...args), +})); + describe("deliverDiscordReply", () => { const runtime = {} as RuntimeEnv; const createBoundThreadBindings = async ( @@ -62,6 +67,10 @@ describe("deliverDiscordReply", () => { messageId: "webhook-1", channelId: "thread-1", }); + sendDiscordTextMock.mockClear().mockResolvedValue({ + id: "msg-direct-1", + channel_id: "channel-1", + }); threadBindingTesting.resetThreadBindingsForTests(); }); @@ -182,6 +191,131 @@ describe("deliverDiscordReply", () => { ); }); + it("sends text chunks in order via sendDiscordText when rest is provided", async () => { + const fakeRest = {} as import("@buape/carbon").RequestClient; + const callOrder: string[] = []; + sendDiscordTextMock.mockImplementation( + async (_rest: unknown, _channelId: unknown, text: string) => { + callOrder.push(text); + return { id: `msg-${callOrder.length}`, channel_id: "789" }; + }, + ); + + await deliverDiscordReply({ + replies: [{ text: "1234567890" }], + target: "channel:789", + token: "token", + rest: fakeRest, + runtime, + textLimit: 5, + }); + + expect(sendMessageDiscordMock).not.toHaveBeenCalled(); + expect(sendDiscordTextMock).toHaveBeenCalledTimes(2); + expect(callOrder).toEqual(["12345", "67890"]); + expect(sendDiscordTextMock.mock.calls[0]?.[1]).toBe("789"); + expect(sendDiscordTextMock.mock.calls[1]?.[1]).toBe("789"); + }); + + it("falls back to sendMessageDiscord when rest is not provided", async () => { + await deliverDiscordReply({ + replies: [{ text: "single chunk" }], + target: "channel:789", + token: "token", + runtime, + textLimit: 2000, + }); + + expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); + expect(sendDiscordTextMock).not.toHaveBeenCalled(); + }); + + it("retries bot send on 429 rate limit then succeeds", async () => { + const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 }); + sendMessageDiscordMock + .mockRejectedValueOnce(rateLimitErr) + .mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" }); + + await deliverDiscordReply({ + replies: [{ text: "retry me" }], + target: "channel:123", + token: "token", + runtime, + textLimit: 2000, + }); + + expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); + }); + + it("retries bot send on 500 server error then succeeds", async () => { + const serverErr = Object.assign(new Error("internal"), { status: 500 }); + sendMessageDiscordMock + .mockRejectedValueOnce(serverErr) + .mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" }); + + await deliverDiscordReply({ + replies: [{ text: "retry me" }], + target: "channel:123", + token: "token", + runtime, + textLimit: 2000, + }); + + expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); + }); + + it("does not retry on 4xx client errors", async () => { + const clientErr = Object.assign(new Error("bad request"), { status: 400 }); + sendMessageDiscordMock.mockRejectedValueOnce(clientErr); + + await expect( + deliverDiscordReply({ + replies: [{ text: "fail" }], + target: "channel:123", + token: "token", + runtime, + textLimit: 2000, + }), + ).rejects.toThrow("bad request"); + + expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); + }); + + it("throws after exhausting retry attempts", async () => { + const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 }); + sendMessageDiscordMock.mockRejectedValue(rateLimitErr); + + await expect( + deliverDiscordReply({ + replies: [{ text: "persistent failure" }], + target: "channel:123", + token: "token", + runtime, + textLimit: 2000, + }), + ).rejects.toThrow("rate limited"); + + expect(sendMessageDiscordMock).toHaveBeenCalledTimes(3); + }); + + it("delivers remaining chunks after a mid-sequence retry", async () => { + sendMessageDiscordMock + .mockResolvedValueOnce({ messageId: "c1" }) + .mockRejectedValueOnce(Object.assign(new Error("rate limited"), { status: 429 })) + .mockResolvedValueOnce({ messageId: "c2-retry" }) + .mockResolvedValueOnce({ messageId: "c3" }); + + await deliverDiscordReply({ + replies: [{ text: "A".repeat(6) }], + target: "channel:123", + token: "token", + runtime, + textLimit: 2, + }); + + expect(sendMessageDiscordMock).toHaveBeenCalledTimes(4); + }); + it("sends bound-session text replies through webhook delivery", async () => { const threadBindings = await createBoundThreadBindings({ label: "codex-refactor" }); diff --git a/src/discord/monitor/reply-delivery.ts b/src/discord/monitor/reply-delivery.ts index 1c79e2165..71444fdb6 100644 --- a/src/discord/monitor/reply-delivery.ts +++ b/src/discord/monitor/reply-delivery.ts @@ -4,10 +4,14 @@ import type { ChunkMode } from "../../auto-reply/chunk.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; import { loadConfig } from "../../config/config.js"; import type { MarkdownTableMode, ReplyToMode } from "../../config/types.base.js"; +import { createDiscordRetryRunner, type RetryRunner } from "../../infra/retry-policy.js"; +import { resolveRetryConfig, retryAsync, type RetryConfig } from "../../infra/retry.js"; import { convertMarkdownTables } from "../../markdown/tables.js"; import type { RuntimeEnv } from "../../runtime.js"; +import { resolveDiscordAccount } from "../accounts.js"; import { chunkDiscordTextWithMode } from "../chunk.js"; import { sendMessageDiscord, sendVoiceMessageDiscord, sendWebhookMessageDiscord } from "../send.js"; +import { sendDiscordText } from "../send.shared.js"; export type DiscordThreadBindingLookupRecord = { accountId: string; @@ -23,6 +27,54 @@ export type DiscordThreadBindingLookup = { touchThread?: (params: { threadId: string; at?: number; persist?: boolean }) => unknown; }; +type ResolvedRetryConfig = Required; + +const DISCORD_DELIVERY_RETRY_DEFAULTS: ResolvedRetryConfig = { + attempts: 3, + minDelayMs: 1000, + maxDelayMs: 30_000, + jitter: 0, +}; + +function isRetryableDiscordError(err: unknown): boolean { + const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode; + return status === 429 || (status !== undefined && status >= 500); +} + +function getDiscordRetryAfterMs(err: unknown): number | undefined { + if (!err || typeof err !== "object") { + return undefined; + } + if ( + "retryAfter" in err && + typeof err.retryAfter === "number" && + Number.isFinite(err.retryAfter) + ) { + return err.retryAfter * 1000; + } + const retryAfterRaw = (err as { headers?: Record }).headers?.["retry-after"]; + if (!retryAfterRaw) { + return undefined; + } + const retryAfterMs = Number(retryAfterRaw) * 1000; + return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined; +} + +function resolveDeliveryRetryConfig(retry?: RetryConfig): ResolvedRetryConfig { + return resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, retry); +} + +async function sendWithRetry( + fn: () => Promise, + retryConfig: ResolvedRetryConfig, +): Promise { + await retryAsync(fn, { + ...retryConfig, + shouldRetry: (err) => isRetryableDiscordError(err), + retryAfterMs: getDiscordRetryAfterMs, + }); +} + function resolveTargetChannelId(target: string): string | undefined { if (!target.startsWith("channel:")) { return undefined; @@ -83,6 +135,12 @@ async function sendDiscordChunkWithFallback(params: { binding?: DiscordThreadBindingLookupRecord; username?: string; avatarUrl?: string; + /** Pre-resolved channel ID to bypass redundant resolution per chunk. */ + channelId?: string; + /** Pre-created retry runner to avoid creating one per chunk. */ + request?: RetryRunner; + /** Pre-resolved retry config (account-level). */ + retryConfig: ResolvedRetryConfig; }) { if (!params.text.trim()) { return; @@ -105,12 +163,27 @@ async function sendDiscordChunkWithFallback(params: { // Fall through to the standard bot sender path. } } - await sendMessageDiscord(params.target, text, { - token: params.token, - rest: params.rest, - accountId: params.accountId, - replyTo: params.replyTo, - }); + // When channelId and request are pre-resolved, send directly via sendDiscordText + // to avoid per-chunk overhead (channel-type GET, re-chunking, client creation) + // that can cause ordering issues under queue contention or rate limiting. + if (params.channelId && params.request && params.rest) { + const { channelId, request, rest } = params; + await sendWithRetry( + () => sendDiscordText(rest, channelId, text, params.replyTo, request), + params.retryConfig, + ); + return; + } + await sendWithRetry( + () => + sendMessageDiscord(params.target, text, { + token: params.token, + rest: params.rest, + accountId: params.accountId, + replyTo: params.replyTo, + }), + params.retryConfig, + ); } async function sendAdditionalDiscordMedia(params: { @@ -120,16 +193,21 @@ async function sendAdditionalDiscordMedia(params: { accountId?: string; mediaUrls: string[]; resolveReplyTo: () => string | undefined; + retryConfig: ResolvedRetryConfig; }) { for (const mediaUrl of params.mediaUrls) { const replyTo = params.resolveReplyTo(); - await sendMessageDiscord(params.target, "", { - token: params.token, - rest: params.rest, - mediaUrl, - accountId: params.accountId, - replyTo, - }); + await sendWithRetry( + () => + sendMessageDiscord(params.target, "", { + token: params.token, + rest: params.rest, + mediaUrl, + accountId: params.accountId, + replyTo, + }), + params.retryConfig, + ); } } @@ -174,6 +252,15 @@ export async function deliverDiscordReply(params: { target: params.target, }); const persona = resolveBindingPersona(binding); + // Pre-resolve channel ID and retry runner once to avoid per-chunk overhead. + // This eliminates redundant channel-type GET requests and client creation that + // can cause ordering issues when multiple chunks share the RequestClient queue. + const channelId = resolveTargetChannelId(params.target); + const account = resolveDiscordAccount({ cfg: loadConfig(), accountId: params.accountId }); + const retryConfig = resolveDeliveryRetryConfig(account.config.retry); + const request: RetryRunner | undefined = channelId + ? createDiscordRetryRunner({ configRetry: account.config.retry }) + : undefined; let deliveredAny = false; for (const payload of params.replies) { const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); @@ -208,6 +295,9 @@ export async function deliverDiscordReply(params: { binding, username: persona.username, avatarUrl: persona.avatarUrl, + channelId, + request, + retryConfig, }); deliveredAny = true; } @@ -240,6 +330,9 @@ export async function deliverDiscordReply(params: { binding, username: persona.username, avatarUrl: persona.avatarUrl, + channelId, + request, + retryConfig, }); // Additional media items are sent as regular attachments (voice is single-file only). await sendAdditionalDiscordMedia({ @@ -249,6 +342,7 @@ export async function deliverDiscordReply(params: { accountId: params.accountId, mediaUrls: mediaList.slice(1), resolveReplyTo, + retryConfig, }); continue; } @@ -269,6 +363,7 @@ export async function deliverDiscordReply(params: { accountId: params.accountId, mediaUrls: mediaList.slice(1), resolveReplyTo, + retryConfig, }); }