From dacb3d1aa2c6e4f30f553857efafd9c8a98ec3aa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 17:54:19 +0000 Subject: [PATCH] refactor(queue): share drain helpers across announce and reply --- src/agents/subagent-announce-queue.ts | 45 ++++++++++++++------------- src/auto-reply/reply/queue/drain.ts | 17 +++++----- src/utils/queue-helpers.ts | 29 +++++++++++++++++ 3 files changed, 60 insertions(+), 31 deletions(-) diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 9c18bffa0..c81dd94b1 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -8,9 +8,10 @@ import { import { applyQueueRuntimeSettings, applyQueueDropPolicy, + beginQueueDrain, buildCollectPrompt, clearQueueSummaryState, - drainCollectItemIfNeeded, + drainCollectQueueStep, drainNextQueueItem, hasCrossChannelItems, previewQueueSummaryPrompt, @@ -97,33 +98,35 @@ function getAnnounceQueue( return created; } +function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean { + return hasCrossChannelItems(items, (item) => { + if (!item.origin) { + return {}; + } + if (!item.originKey) { + return { cross: true }; + } + return { key: item.originKey }; + }); +} + function scheduleAnnounceDrain(key: string) { - const queue = ANNOUNCE_QUEUES.get(key); - if (!queue || queue.draining) { + const queue = beginQueueDrain(ANNOUNCE_QUEUES, key); + if (!queue) { return; } - queue.draining = true; void (async () => { try { - let forceIndividualCollect = false; - while (queue.items.length > 0 || queue.droppedCount > 0) { + const collectState = { forceIndividualCollect: false }; + for (;;) { + if (queue.items.length === 0 && queue.droppedCount === 0) { + break; + } await waitForQueueDebounce(queue); if (queue.mode === "collect") { - const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { - if (!item.origin) { - return {}; - } - if (!item.originKey) { - return { cross: true }; - } - return { key: item.originKey }; - }); - const collectDrainResult = await drainCollectItemIfNeeded({ - forceIndividualCollect, - isCrossChannel, - setForceIndividualCollect: (next) => { - forceIndividualCollect = next; - }, + const collectDrainResult = await drainCollectQueueStep({ + collectState, + isCrossChannel: hasAnnounceCrossChannelItems(queue.items), items: queue.items, run: async (item) => await queue.send(item), }); diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 35cb8de68..c0bcc2c20 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -1,8 +1,9 @@ import { defaultRuntime } from "../../../runtime.js"; import { buildCollectPrompt, + beginQueueDrain, clearQueueSummaryState, - drainCollectItemIfNeeded, + drainCollectQueueStep, drainNextQueueItem, hasCrossChannelItems, previewQueueSummaryPrompt, @@ -16,14 +17,13 @@ export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, ): void { - const queue = FOLLOWUP_QUEUES.get(key); - if (!queue || queue.draining) { + const queue = beginQueueDrain(FOLLOWUP_QUEUES, key); + if (!queue) { return; } - queue.draining = true; void (async () => { try { - let forceIndividualCollect = false; + const collectState = { forceIndividualCollect: false }; while (queue.items.length > 0 || queue.droppedCount > 0) { await waitForQueueDebounce(queue); if (queue.mode === "collect") { @@ -50,12 +50,9 @@ export function scheduleFollowupDrain( }; }); - const collectDrainResult = await drainCollectItemIfNeeded({ - forceIndividualCollect, + const collectDrainResult = await drainCollectQueueStep({ + collectState, isCrossChannel, - setForceIndividualCollect: (next) => { - forceIndividualCollect = next; - }, items: queue.items, run: runFollowup, }); diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts index 5a487f9bb..8c1b7f307 100644 --- a/src/utils/queue-helpers.ts +++ b/src/utils/queue-helpers.ts @@ -132,6 +132,18 @@ export function waitForQueueDebounce(queue: { }); } +export function beginQueueDrain( + map: Map, + key: string, +): T | undefined { + const queue = map.get(key); + if (!queue || queue.draining) { + return undefined; + } + queue.draining = true; + return queue; +} + export async function drainNextQueueItem( items: T[], run: (item: T) => Promise, @@ -162,6 +174,23 @@ export async function drainCollectItemIfNeeded(params: { return drained ? "drained" : "empty"; } +export async function drainCollectQueueStep(params: { + collectState: { forceIndividualCollect: boolean }; + isCrossChannel: boolean; + items: T[]; + run: (item: T) => Promise; +}): Promise<"skipped" | "drained" | "empty"> { + return await drainCollectItemIfNeeded({ + forceIndividualCollect: params.collectState.forceIndividualCollect, + isCrossChannel: params.isCrossChannel, + setForceIndividualCollect: (next) => { + params.collectState.forceIndividualCollect = next; + }, + items: params.items, + run: params.run, + }); +} + export function buildQueueSummaryPrompt(params: { state: QueueSummaryState; noun: string;