From 6a16e7bb31e7ea5ddb8221b702c4ec710d646330 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 28 Feb 2026 11:13:54 +0530 Subject: [PATCH] fix(gateway): skip heartbeat wake on deduped notifications --- src/gateway/server-node-events.test.ts | 26 ++++++++++++++++++++++++++ src/gateway/server-node-events.ts | 9 +++++++-- src/infra/system-events.test.ts | 8 ++++++++ src/infra/system-events.ts | 5 +++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index 12fdbcf57..6cb7a79d7 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -357,6 +357,7 @@ describe("notifications changed events", () => { requestHeartbeatNowMock.mockClear(); loadSessionEntryMock.mockClear(); loadSessionEntryMock.mockImplementation((sessionKey: string) => buildSessionLookup(sessionKey)); + enqueueSystemEventMock.mockReturnValue(true); }); it("enqueues notifications.changed posted events", async () => { @@ -457,6 +458,31 @@ describe("notifications changed events", () => { expect(enqueueSystemEventMock).not.toHaveBeenCalled(); expect(requestHeartbeatNowMock).not.toHaveBeenCalled(); }); + + it("does not wake heartbeat when notifications.changed event is deduped", async () => { + enqueueSystemEventMock.mockReset(); + enqueueSystemEventMock.mockReturnValueOnce(true).mockReturnValueOnce(false); + const ctx = buildCtx(); + const payload = JSON.stringify({ + change: "posted", + key: "notif-dupe", + packageName: "com.example.chat", + title: "Message", + text: "Ping from Alex", + }); + + await handleNodeEvent(ctx, "node-n6", { + event: "notifications.changed", + payloadJSON: payload, + }); + await handleNodeEvent(ctx, "node-n6", { + event: "notifications.changed", + payloadJSON: payload, + }); + + expect(enqueueSystemEventMock).toHaveBeenCalledTimes(2); + expect(requestHeartbeatNowMock).toHaveBeenCalledTimes(1); + }); }); describe("agent request events", () => { diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 85b233a48..b196e26cc 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -485,8 +485,13 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt } } - enqueueSystemEvent(summary, { sessionKey, contextKey: `notification:${key}` }); - requestHeartbeatNow({ reason: "notifications-event", sessionKey }); + const queued = enqueueSystemEvent(summary, { + sessionKey, + contextKey: `notification:${key}`, + }); + if (queued) { + requestHeartbeatNow({ reason: "notifications-event", sessionKey }); + } return; } case "chat.subscribe": { diff --git a/src/infra/system-events.test.ts b/src/infra/system-events.test.ts index 2667a5718..482289659 100644 --- a/src/infra/system-events.test.ts +++ b/src/infra/system-events.test.ts @@ -46,6 +46,14 @@ describe("system events (session routing)", () => { it("requires an explicit session key", () => { expect(() => enqueueSystemEvent("Node: Mac Studio", { sessionKey: " " })).toThrow("sessionKey"); }); + + it("returns false for consecutive duplicate events", () => { + const first = enqueueSystemEvent("Node connected", { sessionKey: "agent:main:main" }); + const second = enqueueSystemEvent("Node connected", { sessionKey: "agent:main:main" }); + + expect(first).toBe(true); + expect(second).toBe(false); + }); }); describe("isCronSystemEvent", () => { diff --git a/src/infra/system-events.ts b/src/infra/system-events.ts index c20237291..771890bcd 100644 --- a/src/infra/system-events.ts +++ b/src/infra/system-events.ts @@ -63,12 +63,12 @@ export function enqueueSystemEvent(text: string, options: SystemEventOptions) { })(); const cleaned = text.trim(); if (!cleaned) { - return; + return false; } const normalizedContextKey = normalizeContextKey(options?.contextKey); entry.lastContextKey = normalizedContextKey; if (entry.lastText === cleaned) { - return; + return false; } // skip consecutive duplicates entry.lastText = cleaned; entry.queue.push({ @@ -79,6 +79,7 @@ export function enqueueSystemEvent(text: string, options: SystemEventOptions) { if (entry.queue.length > MAX_EVENTS) { entry.queue.shift(); } + return true; } export function drainSystemEventEntries(sessionKey: string): SystemEvent[] {