fix: improve discord chunk delivery (#33226) (thanks @thewilloftheshadow) (#33226)

This commit is contained in:
Shadow
2026-03-03 10:17:33 -06:00
committed by GitHub
parent ec0eb9f8c3
commit 6593a57607
3 changed files with 243 additions and 13 deletions

View File

@@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai
- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils.
- Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow.
- Discord/channel resolution: default bare numeric recipients to channels, harden allowlist numeric ID handling with safe fallbacks, and avoid inbound WS heartbeat stalls. (#33142) Thanks @thewilloftheshadow.
- Discord/chunk delivery reliability: preserve chunk ordering when using a REST client and retry chunk sends on 429/5xx using account retry settings. (#33226) Thanks @thewilloftheshadow.
- Exec heartbeat routing: scope exec-triggered heartbeat wakes to agent session keys so unrelated agents are no longer awakened by exec events, while preserving legacy unscoped behavior for non-canonical session keys. (#32724) thanks @altaywtf
- macOS/Tailscale remote gateway discovery: add a Tailscale Serve fallback peer probe path (`wss://<peer>.ts.net`) when Bonjour and wide-area DNS-SD discovery return no gateways, and refresh both discovery paths from macOS onboarding. (#32860) Thanks @ngutman.
- iOS/Gateway keychain hardening: move gateway metadata and TLS fingerprints to device keychain storage with safer migration behavior and rollback-safe writes to reduce credential loss risk during upgrades. (#33029) thanks @mbelinky.

View File

@@ -9,6 +9,7 @@ import {
const sendMessageDiscordMock = vi.hoisted(() => vi.fn());
const sendVoiceMessageDiscordMock = vi.hoisted(() => vi.fn());
const sendWebhookMessageDiscordMock = vi.hoisted(() => vi.fn());
const sendDiscordTextMock = vi.hoisted(() => vi.fn());
vi.mock("../send.js", () => ({
sendMessageDiscord: (...args: unknown[]) => sendMessageDiscordMock(...args),
@@ -16,6 +17,10 @@ vi.mock("../send.js", () => ({
sendWebhookMessageDiscord: (...args: unknown[]) => sendWebhookMessageDiscordMock(...args),
}));
vi.mock("../send.shared.js", () => ({
sendDiscordText: (...args: unknown[]) => sendDiscordTextMock(...args),
}));
describe("deliverDiscordReply", () => {
const runtime = {} as RuntimeEnv;
const createBoundThreadBindings = async (
@@ -62,6 +67,10 @@ describe("deliverDiscordReply", () => {
messageId: "webhook-1",
channelId: "thread-1",
});
sendDiscordTextMock.mockClear().mockResolvedValue({
id: "msg-direct-1",
channel_id: "channel-1",
});
threadBindingTesting.resetThreadBindingsForTests();
});
@@ -182,6 +191,131 @@ describe("deliverDiscordReply", () => {
);
});
it("sends text chunks in order via sendDiscordText when rest is provided", async () => {
const fakeRest = {} as import("@buape/carbon").RequestClient;
const callOrder: string[] = [];
sendDiscordTextMock.mockImplementation(
async (_rest: unknown, _channelId: unknown, text: string) => {
callOrder.push(text);
return { id: `msg-${callOrder.length}`, channel_id: "789" };
},
);
await deliverDiscordReply({
replies: [{ text: "1234567890" }],
target: "channel:789",
token: "token",
rest: fakeRest,
runtime,
textLimit: 5,
});
expect(sendMessageDiscordMock).not.toHaveBeenCalled();
expect(sendDiscordTextMock).toHaveBeenCalledTimes(2);
expect(callOrder).toEqual(["12345", "67890"]);
expect(sendDiscordTextMock.mock.calls[0]?.[1]).toBe("789");
expect(sendDiscordTextMock.mock.calls[1]?.[1]).toBe("789");
});
it("falls back to sendMessageDiscord when rest is not provided", async () => {
await deliverDiscordReply({
replies: [{ text: "single chunk" }],
target: "channel:789",
token: "token",
runtime,
textLimit: 2000,
});
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1);
expect(sendDiscordTextMock).not.toHaveBeenCalled();
});
it("retries bot send on 429 rate limit then succeeds", async () => {
const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 });
sendMessageDiscordMock
.mockRejectedValueOnce(rateLimitErr)
.mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" });
await deliverDiscordReply({
replies: [{ text: "retry me" }],
target: "channel:123",
token: "token",
runtime,
textLimit: 2000,
});
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2);
});
it("retries bot send on 500 server error then succeeds", async () => {
const serverErr = Object.assign(new Error("internal"), { status: 500 });
sendMessageDiscordMock
.mockRejectedValueOnce(serverErr)
.mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" });
await deliverDiscordReply({
replies: [{ text: "retry me" }],
target: "channel:123",
token: "token",
runtime,
textLimit: 2000,
});
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2);
});
it("does not retry on 4xx client errors", async () => {
const clientErr = Object.assign(new Error("bad request"), { status: 400 });
sendMessageDiscordMock.mockRejectedValueOnce(clientErr);
await expect(
deliverDiscordReply({
replies: [{ text: "fail" }],
target: "channel:123",
token: "token",
runtime,
textLimit: 2000,
}),
).rejects.toThrow("bad request");
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1);
});
it("throws after exhausting retry attempts", async () => {
const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 });
sendMessageDiscordMock.mockRejectedValue(rateLimitErr);
await expect(
deliverDiscordReply({
replies: [{ text: "persistent failure" }],
target: "channel:123",
token: "token",
runtime,
textLimit: 2000,
}),
).rejects.toThrow("rate limited");
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(3);
});
it("delivers remaining chunks after a mid-sequence retry", async () => {
sendMessageDiscordMock
.mockResolvedValueOnce({ messageId: "c1" })
.mockRejectedValueOnce(Object.assign(new Error("rate limited"), { status: 429 }))
.mockResolvedValueOnce({ messageId: "c2-retry" })
.mockResolvedValueOnce({ messageId: "c3" });
await deliverDiscordReply({
replies: [{ text: "A".repeat(6) }],
target: "channel:123",
token: "token",
runtime,
textLimit: 2,
});
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(4);
});
it("sends bound-session text replies through webhook delivery", async () => {
const threadBindings = await createBoundThreadBindings({ label: "codex-refactor" });

View File

@@ -4,10 +4,14 @@ import type { ChunkMode } from "../../auto-reply/chunk.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { loadConfig } from "../../config/config.js";
import type { MarkdownTableMode, ReplyToMode } from "../../config/types.base.js";
import { createDiscordRetryRunner, type RetryRunner } from "../../infra/retry-policy.js";
import { resolveRetryConfig, retryAsync, type RetryConfig } from "../../infra/retry.js";
import { convertMarkdownTables } from "../../markdown/tables.js";
import type { RuntimeEnv } from "../../runtime.js";
import { resolveDiscordAccount } from "../accounts.js";
import { chunkDiscordTextWithMode } from "../chunk.js";
import { sendMessageDiscord, sendVoiceMessageDiscord, sendWebhookMessageDiscord } from "../send.js";
import { sendDiscordText } from "../send.shared.js";
export type DiscordThreadBindingLookupRecord = {
accountId: string;
@@ -23,6 +27,54 @@ export type DiscordThreadBindingLookup = {
touchThread?: (params: { threadId: string; at?: number; persist?: boolean }) => unknown;
};
type ResolvedRetryConfig = Required<RetryConfig>;
const DISCORD_DELIVERY_RETRY_DEFAULTS: ResolvedRetryConfig = {
attempts: 3,
minDelayMs: 1000,
maxDelayMs: 30_000,
jitter: 0,
};
function isRetryableDiscordError(err: unknown): boolean {
const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode;
return status === 429 || (status !== undefined && status >= 500);
}
function getDiscordRetryAfterMs(err: unknown): number | undefined {
if (!err || typeof err !== "object") {
return undefined;
}
if (
"retryAfter" in err &&
typeof err.retryAfter === "number" &&
Number.isFinite(err.retryAfter)
) {
return err.retryAfter * 1000;
}
const retryAfterRaw = (err as { headers?: Record<string, string> }).headers?.["retry-after"];
if (!retryAfterRaw) {
return undefined;
}
const retryAfterMs = Number(retryAfterRaw) * 1000;
return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined;
}
function resolveDeliveryRetryConfig(retry?: RetryConfig): ResolvedRetryConfig {
return resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, retry);
}
async function sendWithRetry(
fn: () => Promise<unknown>,
retryConfig: ResolvedRetryConfig,
): Promise<void> {
await retryAsync(fn, {
...retryConfig,
shouldRetry: (err) => isRetryableDiscordError(err),
retryAfterMs: getDiscordRetryAfterMs,
});
}
function resolveTargetChannelId(target: string): string | undefined {
if (!target.startsWith("channel:")) {
return undefined;
@@ -83,6 +135,12 @@ async function sendDiscordChunkWithFallback(params: {
binding?: DiscordThreadBindingLookupRecord;
username?: string;
avatarUrl?: string;
/** Pre-resolved channel ID to bypass redundant resolution per chunk. */
channelId?: string;
/** Pre-created retry runner to avoid creating one per chunk. */
request?: RetryRunner;
/** Pre-resolved retry config (account-level). */
retryConfig: ResolvedRetryConfig;
}) {
if (!params.text.trim()) {
return;
@@ -105,12 +163,27 @@ async function sendDiscordChunkWithFallback(params: {
// Fall through to the standard bot sender path.
}
}
await sendMessageDiscord(params.target, text, {
token: params.token,
rest: params.rest,
accountId: params.accountId,
replyTo: params.replyTo,
});
// When channelId and request are pre-resolved, send directly via sendDiscordText
// to avoid per-chunk overhead (channel-type GET, re-chunking, client creation)
// that can cause ordering issues under queue contention or rate limiting.
if (params.channelId && params.request && params.rest) {
const { channelId, request, rest } = params;
await sendWithRetry(
() => sendDiscordText(rest, channelId, text, params.replyTo, request),
params.retryConfig,
);
return;
}
await sendWithRetry(
() =>
sendMessageDiscord(params.target, text, {
token: params.token,
rest: params.rest,
accountId: params.accountId,
replyTo: params.replyTo,
}),
params.retryConfig,
);
}
async function sendAdditionalDiscordMedia(params: {
@@ -120,16 +193,21 @@ async function sendAdditionalDiscordMedia(params: {
accountId?: string;
mediaUrls: string[];
resolveReplyTo: () => string | undefined;
retryConfig: ResolvedRetryConfig;
}) {
for (const mediaUrl of params.mediaUrls) {
const replyTo = params.resolveReplyTo();
await sendMessageDiscord(params.target, "", {
token: params.token,
rest: params.rest,
mediaUrl,
accountId: params.accountId,
replyTo,
});
await sendWithRetry(
() =>
sendMessageDiscord(params.target, "", {
token: params.token,
rest: params.rest,
mediaUrl,
accountId: params.accountId,
replyTo,
}),
params.retryConfig,
);
}
}
@@ -174,6 +252,15 @@ export async function deliverDiscordReply(params: {
target: params.target,
});
const persona = resolveBindingPersona(binding);
// Pre-resolve channel ID and retry runner once to avoid per-chunk overhead.
// This eliminates redundant channel-type GET requests and client creation that
// can cause ordering issues when multiple chunks share the RequestClient queue.
const channelId = resolveTargetChannelId(params.target);
const account = resolveDiscordAccount({ cfg: loadConfig(), accountId: params.accountId });
const retryConfig = resolveDeliveryRetryConfig(account.config.retry);
const request: RetryRunner | undefined = channelId
? createDiscordRetryRunner({ configRetry: account.config.retry })
: undefined;
let deliveredAny = false;
for (const payload of params.replies) {
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
@@ -208,6 +295,9 @@ export async function deliverDiscordReply(params: {
binding,
username: persona.username,
avatarUrl: persona.avatarUrl,
channelId,
request,
retryConfig,
});
deliveredAny = true;
}
@@ -240,6 +330,9 @@ export async function deliverDiscordReply(params: {
binding,
username: persona.username,
avatarUrl: persona.avatarUrl,
channelId,
request,
retryConfig,
});
// Additional media items are sent as regular attachments (voice is single-file only).
await sendAdditionalDiscordMedia({
@@ -249,6 +342,7 @@ export async function deliverDiscordReply(params: {
accountId: params.accountId,
mediaUrls: mediaList.slice(1),
resolveReplyTo,
retryConfig,
});
continue;
}
@@ -269,6 +363,7 @@ export async function deliverDiscordReply(params: {
accountId: params.accountId,
mediaUrls: mediaList.slice(1),
resolveReplyTo,
retryConfig,
});
}