fix(telegram): harden persisted offset confirmation and stall recovery

Landed from #39111 by @MumuTW.

Co-authored-by: MumuTW <clothl47364@gmail.com>
This commit is contained in:
Peter Steinberger
2026-03-07 20:47:33 +00:00
parent 9b4a114eb6
commit 2015ab3194
5 changed files with 242 additions and 14 deletions

View File

@@ -274,6 +274,7 @@ Docs: https://docs.openclaw.ai
- Usage/token count formatting: round near-million token counts to millions (`1.0m`) instead of `1000k`, with explicit boundary coverage for `999_499` and `999_500`. (#39129) Thanks @CurryMessi.
- Gateway/session bootstrap cache invalidation ordering: clear bootstrap snapshots only after active embedded-run shutdown wait completes, preventing dying runs from repopulating stale cache between `/new`/`sessions.reset` turns. (#38873) Thanks @MumuTW.
- Browser/dispatcher error clarity: preserve dispatcher-side failure context in browser fetch errors while still appending operator guidance and explicit no-retry model hints, preventing misleading `"Can't reach service"` wrapping and avoiding LLM retry loops. (#39090) Thanks @NewdlDewdl.
- Telegram/polling offset safety: confirm persisted offsets before polling startup while validating stored `lastUpdateId` values as non-negative safe integers (with overflow guards) so malformed offset state cannot cause update skipping/dropping. (#39111) Thanks @MumuTW.
## 2026.3.2

View File

@@ -22,6 +22,10 @@ const api = {
sendDocument: vi.fn(),
setWebhook: vi.fn(),
deleteWebhook: vi.fn(),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn(),
},
};
const { initSpy, runSpy, loadConfig } = vi.hoisted(() => ({
initSpy: vi.fn(async () => undefined),
@@ -67,6 +71,9 @@ const { computeBackoff, sleepWithAbort } = vi.hoisted(() => ({
computeBackoff: vi.fn(() => 0),
sleepWithAbort: vi.fn(async () => undefined),
}));
const { readTelegramUpdateOffsetSpy } = vi.hoisted(() => ({
readTelegramUpdateOffsetSpy: vi.fn(async () => null as number | null),
}));
const { startTelegramWebhookSpy } = vi.hoisted(() => ({
startTelegramWebhookSpy: vi.fn(async () => ({ server: { close: vi.fn() }, stop: vi.fn() })),
}));
@@ -183,6 +190,11 @@ vi.mock("./webhook.js", () => ({
startTelegramWebhook: startTelegramWebhookSpy,
}));
vi.mock("./update-offset-store.js", () => ({
readTelegramUpdateOffset: readTelegramUpdateOffsetSpy,
writeTelegramUpdateOffset: vi.fn(async () => undefined),
}));
vi.mock("../auto-reply/reply.js", () => ({
getReplyFromConfig: async (ctx: { Body?: string }) => ({
text: `echo:${ctx.Body}`,
@@ -198,6 +210,8 @@ describe("monitorTelegramProvider (grammY)", () => {
channels: { telegram: {} },
});
initSpy.mockClear();
readTelegramUpdateOffsetSpy.mockReset().mockResolvedValue(null);
api.getUpdates.mockReset().mockResolvedValue([]);
runSpy.mockReset().mockImplementation(() =>
makeRunnerStub({
task: () => Promise.reject(new Error("runSpy called without explicit test stub")),
@@ -218,9 +232,11 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("processes a DM and sends reply", async () => {
Object.values(api).forEach((fn) => {
fn?.mockReset?.();
});
for (const v of Object.values(api)) {
if (typeof v === "function" && "mockReset" in v) {
(v as ReturnType<typeof vi.fn>).mockReset();
}
}
await monitorWithAutoAbort();
expect(handlers.message).toBeDefined();
await handlers.message?.({
@@ -260,9 +276,11 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("requires mention in groups by default", async () => {
Object.values(api).forEach((fn) => {
fn?.mockReset?.();
});
for (const v of Object.values(api)) {
if (typeof v === "function" && "mockReset" in v) {
(v as ReturnType<typeof vi.fn>).mockReset();
}
}
await monitorWithAutoAbort();
await handlers.message?.({
message: {
@@ -467,6 +485,112 @@ describe("monitorTelegramProvider (grammY)", () => {
expect(settled).toHaveBeenCalledTimes(1);
});
it("force-restarts polling when getUpdates stalls (watchdog)", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const abort = new AbortController();
let running = true;
let releaseTask: (() => void) | undefined;
const stop = vi.fn(async () => {
running = false;
releaseTask?.();
});
runSpy
.mockImplementationOnce(() =>
makeRunnerStub({
task: () =>
new Promise<void>((resolve) => {
releaseTask = resolve;
}),
stop,
isRunning: () => running,
}),
)
.mockImplementationOnce(() =>
makeRunnerStub({
task: async () => {
abort.abort();
},
}),
);
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1));
// Advance time past the stall threshold (90s) + watchdog interval (30s)
vi.advanceTimersByTime(120_000);
await monitor;
expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1);
expect(computeBackoff).toHaveBeenCalled();
expect(runSpy).toHaveBeenCalledTimes(2);
vi.useRealTimers();
});
it("confirms persisted offset with Telegram before starting runner", async () => {
readTelegramUpdateOffsetSpy.mockResolvedValueOnce(549076203);
const abort = new AbortController();
const order: string[] = [];
api.getUpdates.mockReset();
api.getUpdates.mockImplementationOnce(async () => {
order.push("getUpdates");
return [];
});
api.deleteWebhook.mockReset();
api.deleteWebhook.mockImplementationOnce(async () => {
order.push("deleteWebhook");
return true;
});
runSpy.mockImplementationOnce(() => {
order.push("run");
return makeAbortRunner(abort);
});
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(api.getUpdates).toHaveBeenCalledWith({ offset: 549076204, limit: 1, timeout: 0 });
expect(order).toEqual(["deleteWebhook", "getUpdates", "run"]);
});
it("skips offset confirmation when no persisted offset exists", async () => {
readTelegramUpdateOffsetSpy.mockResolvedValueOnce(null);
const abort = new AbortController();
api.getUpdates.mockReset();
api.deleteWebhook.mockReset();
api.deleteWebhook.mockResolvedValueOnce(true);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(api.getUpdates).not.toHaveBeenCalled();
});
it("skips offset confirmation when persisted offset is invalid", async () => {
readTelegramUpdateOffsetSpy.mockResolvedValueOnce(-1 as number);
const abort = new AbortController();
api.getUpdates.mockReset();
api.deleteWebhook.mockReset();
api.deleteWebhook.mockResolvedValueOnce(true);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(api.getUpdates).not.toHaveBeenCalled();
});
it("skips offset confirmation when persisted offset cannot be safely incremented", async () => {
readTelegramUpdateOffsetSpy.mockResolvedValueOnce(Number.MAX_SAFE_INTEGER);
const abort = new AbortController();
api.getUpdates.mockReset();
api.deleteWebhook.mockReset();
api.deleteWebhook.mockResolvedValueOnce(true);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(api.getUpdates).not.toHaveBeenCalled();
});
it("falls back to configured webhookSecret when not passed explicitly", async () => {
await monitorTelegramProvider({
token: "tok",

View File

@@ -61,8 +61,21 @@ const TELEGRAM_POLL_RESTART_POLICY = {
jitter: 0.25,
};
// Polling stall detection: if no getUpdates call is seen for this long,
// assume the runner is stuck and force-restart it.
// Default fetch timeout is 30s, so 3x gives ample margin for slow responses.
const POLL_STALL_THRESHOLD_MS = 90_000;
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
type TelegramBot = ReturnType<typeof createTelegramBot>;
function normalizePersistedUpdateId(value: number | null): number | null {
if (!Number.isSafeInteger(value) || value < 0) {
return null;
}
return value;
}
const isGetUpdatesConflict = (err: unknown) => {
if (!err || typeof err !== "object") {
return false;
@@ -137,19 +150,30 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const proxyFetch =
opts.proxyFetch ?? (account.config.proxy ? makeProxyFetch(account.config.proxy) : undefined);
let lastUpdateId = await readTelegramUpdateOffset({
const persistedOffsetRaw = await readTelegramUpdateOffset({
accountId: account.accountId,
botToken: token,
});
let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw);
if (persistedOffsetRaw !== null && lastUpdateId === null) {
log(
`[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`,
);
}
const persistUpdateId = async (updateId: number) => {
if (lastUpdateId !== null && updateId <= lastUpdateId) {
const normalizedUpdateId = normalizePersistedUpdateId(updateId);
if (normalizedUpdateId === null) {
log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`);
return;
}
lastUpdateId = updateId;
if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) {
return;
}
lastUpdateId = normalizedUpdateId;
try {
await writeTelegramUpdateOffset({
accountId: account.accountId,
updateId,
updateId: normalizedUpdateId,
botToken: token,
});
} catch (err) {
@@ -258,10 +282,35 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
}
};
const confirmPersistedOffset = async (bot: TelegramBot): Promise<void> => {
if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) {
return;
}
try {
await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 });
} catch {
// Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate.
}
};
const runPollingCycle = async (bot: TelegramBot): Promise<"continue" | "exit"> => {
// Confirm the persisted offset with Telegram so the runner (which starts
// at offset 0) does not re-fetch already-processed updates on restart.
await confirmPersistedOffset(bot);
// Track getUpdates calls to detect polling stalls.
let lastGetUpdatesAt = Date.now();
bot.api.config.use((prev, method, payload, signal) => {
if (method === "getUpdates") {
lastGetUpdatesAt = Date.now();
}
return prev(method, payload, signal);
});
const runner = run(bot, runnerOptions);
activeRunner = runner;
let stopPromise: Promise<void> | undefined;
let stalledRestart = false;
const stopRunner = () => {
stopPromise ??= Promise.resolve(runner.stop())
.then(() => undefined)
@@ -282,6 +331,22 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
void stopRunner();
}
};
// Watchdog: detect when getUpdates calls have stalled and force-restart.
const watchdog = setInterval(() => {
if (opts.abortSignal?.aborted) {
return;
}
const elapsed = Date.now() - lastGetUpdatesAt;
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
stalledRestart = true;
log(
`[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`,
);
void stopRunner();
}
}, POLL_WATCHDOG_INTERVAL_MS);
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
// runner.task() returns a promise that resolves when the runner stops
@@ -289,9 +354,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
if (opts.abortSignal?.aborted) {
return "exit";
}
const reason = forceRestarted
? "unhandled network error"
: "runner stopped (maxRetryTime exceeded or graceful stop)";
const reason = stalledRestart
? "polling stall detected"
: forceRestarted
? "unhandled network error"
: "runner stopped (maxRetryTime exceeded or graceful stop)";
forceRestarted = false;
const shouldRestart = await waitBeforeRestart(
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
@@ -314,6 +381,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
);
return shouldRestart ? "continue" : "exit";
} finally {
clearInterval(watchdog);
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
await stopRunner();
await stopBot();

View File

@@ -78,4 +78,32 @@ describe("deleteTelegramUpdateOffset", () => {
).toBeNull();
});
});
it("ignores invalid persisted update IDs from disk", async () => {
await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => {
const offsetPath = path.join(stateDir, "telegram", "update-offset-default.json");
await fs.mkdir(path.dirname(offsetPath), { recursive: true });
await fs.writeFile(
offsetPath,
`${JSON.stringify({ version: 2, lastUpdateId: -1, botId: "111111" }, null, 2)}\n`,
"utf-8",
);
expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull();
await fs.writeFile(
offsetPath,
`${JSON.stringify({ version: 2, lastUpdateId: Number.POSITIVE_INFINITY, botId: "111111" }, null, 2)}\n`,
"utf-8",
);
expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull();
});
});
it("rejects writing invalid update IDs", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await expect(
writeTelegramUpdateOffset({ accountId: "default", updateId: -1 as number }),
).rejects.toThrow(/non-negative safe integer/i);
});
});
});

View File

@@ -12,6 +12,10 @@ type TelegramUpdateOffsetState = {
botId: string | null;
};
function isValidUpdateId(value: unknown): value is number {
return typeof value === "number" && Number.isSafeInteger(value) && value >= 0;
}
function normalizeAccountId(accountId?: string) {
const trimmed = accountId?.trim();
if (!trimmed) {
@@ -51,7 +55,7 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null {
if (parsed?.version !== STORE_VERSION && parsed?.version !== 1) {
return null;
}
if (parsed.lastUpdateId !== null && typeof parsed.lastUpdateId !== "number") {
if (parsed.lastUpdateId !== null && !isValidUpdateId(parsed.lastUpdateId)) {
return null;
}
if (
@@ -103,6 +107,9 @@ export async function writeTelegramUpdateOffset(params: {
botToken?: string;
env?: NodeJS.ProcessEnv;
}): Promise<void> {
if (!isValidUpdateId(params.updateId)) {
throw new Error("Telegram update offset must be a non-negative safe integer.");
}
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
const payload: TelegramUpdateOffsetState = {
version: STORE_VERSION,