From 34fef3ae60909c8fee4c9233efe8b17f8d1bc87b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 21:31:49 +0100 Subject: [PATCH] fix(delivery): quarantine permanent recovery failures Co-authored-by: Aldo <17973757+aldoeliacim@users.noreply.github.com> --- CHANGELOG.md | 1 + src/infra/outbound/delivery-queue.ts | 36 +++++++++++++++++----- src/infra/outbound/outbound.test.ts | 45 ++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfcc35552..766bc80fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai - Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves. - Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves. - Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl. +- Delivery/Queue: quarantine queue entries immediately on known permanent delivery errors (for example invalid recipients or missing conversation references) by moving them to `failed/` instead of retrying on every restart. (#23794) Thanks @aldoeliacim. - Cron/Status: split execution outcome (`lastRunStatus`) from delivery outcome (`lastDeliveryStatus`) in persisted cron state, finished events, and run history so failed/unknown announcement delivery is visible without conflating it with run errors. - Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber. - Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg. diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 04f1baddd..699ba6f74 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -282,19 +282,24 @@ export async function recoverPendingDeliveries(opts: { recovered += 1; opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`); } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + if (isPermanentDeliveryError(errMsg)) { + opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`); + try { + await moveToFailed(entry.id, opts.stateDir); + } catch (moveErr) { + opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(moveErr)}`); + } + failed += 1; + continue; + } try { - await failDelivery( - entry.id, - err instanceof Error ? err.message : String(err), - opts.stateDir, - ); + await failDelivery(entry.id, errMsg, opts.stateDir); } catch { // Best-effort update. } failed += 1; - opts.log.warn( - `Retry failed for delivery ${entry.id}: ${err instanceof Error ? err.message : String(err)}`, - ); + opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`); } } @@ -305,3 +310,18 @@ export async function recoverPendingDeliveries(opts: { } export { MAX_RETRIES }; + +const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [ + /no conversation reference found/i, + /chat not found/i, + /user not found/i, + /bot was blocked by the user/i, + /forbidden: bot was kicked/i, + /chat_id is empty/i, + /recipient is not a valid/i, + /outbound not configured for channel/i, +]; + +export function isPermanentDeliveryError(error: string): boolean { + return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error)); +} diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index 897a4a3f0..8b57bb7a3 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -11,6 +11,7 @@ import { type DeliverFn, enqueueDelivery, failDelivery, + isPermanentDeliveryError, loadPendingDeliveries, MAX_RETRIES, moveToFailed, @@ -142,6 +143,30 @@ describe("delivery-queue", () => { }); }); + describe("isPermanentDeliveryError", () => { + it.each([ + "No conversation reference found for user:abc", + "Telegram send failed: chat not found (chat_id=user:123)", + "user not found", + "Bot was blocked by the user", + "Forbidden: bot was kicked from the group chat", + "chat_id is empty", + "Outbound not configured for channel: msteams", + ])("returns true for permanent error: %s", (msg) => { + expect(isPermanentDeliveryError(msg)).toBe(true); + }); + + it.each([ + "network down", + "ETIMEDOUT", + "socket hang up", + "rate limited", + "500 Internal Server Error", + ])("returns false for transient error: %s", (msg) => { + expect(isPermanentDeliveryError(msg)).toBe(false); + }); + }); + describe("loadPendingDeliveries", () => { it("returns empty array when queue directory does not exist", async () => { const nonexistent = path.join(tmpDir, "no-such-dir"); @@ -265,6 +290,26 @@ describe("delivery-queue", () => { expect(entries[0].lastError).toBe("network down"); }); + it("moves entries to failed/ immediately on permanent delivery errors", async () => { + const id = await enqueueDelivery( + { channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] }, + tmpDir, + ); + const deliver = vi + .fn() + .mockRejectedValue(new Error("No conversation reference found for user:abc")); + const log = createLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(result.failed).toBe(1); + expect(result.recovered).toBe(0); + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(0); + const failedDir = path.join(tmpDir, "delivery-queue", "failed"); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error")); + }); + it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => { await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);