From 8d048d412f5eed7639de155643c3575d9392d504 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 19 Feb 2026 06:43:17 +0000 Subject: [PATCH] refactor(queue): share next-item drain helper across queue drains --- src/agents/subagent-announce-queue.ts | 26 ++++++++------------- src/auto-reply/reply/queue/drain.ts | 33 +++++++++++---------------- src/utils/queue-helpers.ts | 13 +++++++++++ 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 864f2cbe7..e0dc8fcbf 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -10,6 +10,7 @@ import { applyQueueDropPolicy, buildCollectPrompt, clearQueueSummaryState, + drainNextQueueItem, hasCrossChannelItems, previewQueueSummaryPrompt, waitForQueueDebounce, @@ -108,12 +109,9 @@ function scheduleAnnounceDrain(key: string) { await waitForQueueDebounce(queue); if (queue.mode === "collect") { if (forceIndividualCollect) { - const next = queue.items[0]; - if (!next) { + if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) { break; } - await queue.send(next); - queue.items.shift(); continue; } const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { @@ -127,12 +125,9 @@ function scheduleAnnounceDrain(key: string) { }); if (isCrossChannel) { forceIndividualCollect = true; - const next = queue.items[0]; - if (!next) { + if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) { break; } - await queue.send(next); - queue.items.shift(); continue; } const items = queue.items.slice(); @@ -157,22 +152,21 @@ function scheduleAnnounceDrain(key: string) { const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "announce" }); if (summaryPrompt) { - const next = queue.items[0]; - if (!next) { + if ( + !(await drainNextQueueItem( + queue.items, + async (item) => await queue.send({ ...item, prompt: summaryPrompt }), + )) + ) { break; } - await queue.send({ ...next, prompt: summaryPrompt }); - queue.items.shift(); clearQueueSummaryState(queue); continue; } - const next = queue.items[0]; - if (!next) { + if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) { break; } - await queue.send(next); - queue.items.shift(); } } catch (err) { // Keep items in queue and retry after debounce; avoid hot-loop retries. diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 3d739b3dc..be409b3c7 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -2,6 +2,7 @@ import { defaultRuntime } from "../../../runtime.js"; import { buildCollectPrompt, clearQueueSummaryState, + drainNextQueueItem, hasCrossChannelItems, previewQueueSummaryPrompt, waitForQueueDebounce, @@ -30,12 +31,9 @@ export function scheduleFollowupDrain( // // Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts` if (forceIndividualCollect) { - const next = queue.items[0]; - if (!next) { + if (!(await drainNextQueueItem(queue.items, runFollowup))) { break; } - await runFollowup(next); - queue.items.shift(); continue; } @@ -60,12 +58,9 @@ export function scheduleFollowupDrain( if (isCrossChannel) { forceIndividualCollect = true; - const next = queue.items[0]; - if (!next) { + if (!(await drainNextQueueItem(queue.items, runFollowup))) { break; } - await runFollowup(next); - queue.items.shift(); continue; } @@ -114,26 +109,24 @@ export function scheduleFollowupDrain( if (!run) { break; } - const next = queue.items[0]; - if (!next) { + if ( + !(await drainNextQueueItem(queue.items, async () => { + await runFollowup({ + prompt: summaryPrompt, + run, + enqueuedAt: Date.now(), + }); + })) + ) { break; } - await runFollowup({ - prompt: summaryPrompt, - run, - enqueuedAt: Date.now(), - }); - queue.items.shift(); clearQueueSummaryState(queue); continue; } - const next = queue.items[0]; - if (!next) { + if (!(await drainNextQueueItem(queue.items, runFollowup))) { break; } - await runFollowup(next); - queue.items.shift(); } } catch (err) { queue.lastEnqueuedAt = Date.now(); diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts index 1a5d310c0..4ebb627e8 100644 --- a/src/utils/queue-helpers.ts +++ b/src/utils/queue-helpers.ts @@ -129,6 +129,19 @@ export function waitForQueueDebounce(queue: { }); } +export async function drainNextQueueItem( + items: T[], + run: (item: T) => Promise, +): Promise { + const next = items[0]; + if (!next) { + return false; + } + await run(next); + items.shift(); + return true; +} + export function buildQueueSummaryPrompt(params: { state: QueueSummaryState; noun: string;