diff --git a/docs/channels/slack.md b/docs/channels/slack.md index 3d2b3a1e4..9fdd3fb89 100644 --- a/docs/channels/slack.md +++ b/docs/channels/slack.md @@ -461,6 +461,32 @@ openclaw pairing list slack +## Text streaming + +OpenClaw supports Slack native text streaming via the Agents and AI Apps API. + +By default, streaming is enabled. Disable it per account: + +```yaml +channels: + slack: + streaming: false +``` + +### Requirements + +1. Enable **Agents and AI Apps** in your Slack app settings. +2. Ensure the app has the `assistant:write` scope. +3. A reply thread must be available for that message. Thread selection still follows `replyToMode`. + +### Behavior + +- First text chunk starts a stream (`chat.startStream`). +- Later text chunks append to the same stream (`chat.appendStream`). +- End of reply finalizes stream (`chat.stopStream`). +- Media and non-text payloads fall back to normal delivery. +- If streaming fails mid-reply, OpenClaw falls back to normal delivery for remaining payloads. + ## Configuration reference pointers Primary reference: diff --git a/src/config/types.slack.ts b/src/config/types.slack.ts index ae5dee2e9..22210dcf7 100644 --- a/src/config/types.slack.ts +++ b/src/config/types.slack.ts @@ -125,6 +125,13 @@ export type SlackAccountConfig = { blockStreaming?: boolean; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; + /** + * Enable Slack native text streaming (Agents & AI Apps). Default: true. + * + * Set to `false` to disable native Slack text streaming and use normal reply + * delivery behavior only. + */ + streaming?: boolean; /** Slack stream preview mode (replace|status_final|append). Default: replace. */ streamMode?: SlackStreamMode; mediaMaxMb?: number; diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 319c167b3..6a97c0eb9 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -546,6 +546,7 @@ export const SlackAccountSchema = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + streaming: z.boolean().optional(), mediaMaxMb: z.number().positive().optional(), reactionNotifications: z.enum(["off", "own", "all", "allowlist"]).optional(), reactionAllowlist: z.array(z.union([z.string(), z.number()])).optional(), diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 7964aea36..8397e5505 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -1,3 +1,6 @@ +import type { ReplyPayload } from "../../../auto-reply/types.js"; +import type { SlackStreamSession } from "../../streaming.js"; +import type { PreparedSlackMessage } from "./types.js"; import { resolveHumanDelayConfig } from "../../../agents/identity.js"; import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js"; @@ -15,9 +18,44 @@ import { buildStatusFinalPreviewText, resolveSlackStreamMode, } from "../../stream-mode.js"; +import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js"; import { resolveSlackThreadTargets } from "../../threading.js"; -import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js"; -import type { PreparedSlackMessage } from "./types.js"; +import { createSlackReplyDeliveryPlan, deliverReplies, resolveSlackThreadTs } from "../replies.js"; + +function hasMedia(payload: ReplyPayload): boolean { + return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; +} + +export function isSlackStreamingEnabled(streaming: boolean | undefined): boolean { + return streaming !== false; +} + +export function resolveSlackStreamingThreadHint(params: { + replyToMode: "off" | "first" | "all"; + incomingThreadTs: string | undefined; + messageTs: string | undefined; +}): string | undefined { + return resolveSlackThreadTs({ + replyToMode: params.replyToMode, + incomingThreadTs: params.incomingThreadTs, + messageTs: params.messageTs, + hasReplied: false, + }); +} + +function shouldUseStreaming(params: { + streamingEnabled: boolean; + threadTs: string | undefined; +}): boolean { + if (!params.streamingEnabled) { + return false; + } + if (!params.threadTs) { + logVerbose("slack-stream: streaming disabled — no reply thread target available"); + return false; + } + return true; +} export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessage) { const { ctx, account, message, route } = prepared; @@ -108,10 +146,84 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag accountId: route.accountId, }); + const streamingEnabled = isSlackStreamingEnabled(account.config.streaming); + const streamThreadHint = resolveSlackStreamingThreadHint({ + replyToMode: ctx.replyToMode, + incomingThreadTs, + messageTs, + }); + const useStreaming = shouldUseStreaming({ + streamingEnabled, + threadTs: streamThreadHint, + }); + let streamSession: SlackStreamSession | null = null; + let streamFailed = false; + + const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise => { + const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs(); + await deliverReplies({ + replies: [payload], + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + runtime, + textLimit: ctx.textLimit, + replyThreadTs, + }); + replyPlan.markSent(); + }; + + const deliverWithStreaming = async (payload: ReplyPayload): Promise => { + if (streamFailed || hasMedia(payload) || !payload.text?.trim()) { + await deliverNormally(payload, streamSession?.threadTs); + return; + } + + const text = payload.text.trim(); + try { + if (!streamSession) { + const streamThreadTs = replyPlan.nextThreadTs(); + if (!streamThreadTs) { + logVerbose( + "slack-stream: no reply thread target for stream start, falling back to normal delivery", + ); + streamFailed = true; + await deliverNormally(payload); + return; + } + + streamSession = await startSlackStream({ + client: ctx.app.client, + channel: message.channel, + threadTs: streamThreadTs, + text, + }); + replyPlan.markSent(); + return; + } + + await appendSlackStream({ + session: streamSession, + text: "\n" + text, + }); + } catch (err) { + runtime.error?.( + danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`), + ); + streamFailed = true; + await deliverNormally(payload, streamSession?.threadTs); + } + }; + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload) => { + if (useStreaming) { + await deliverWithStreaming(payload); + return; + } + const mediaCount = payload.mediaUrls?.length ?? (payload.mediaUrl ? 1 : 0); const draftMessageId = draftStream?.messageId(); const draftChannelId = draftStream?.channelId(); @@ -239,38 +351,57 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ...replyOptions, skillFilter: prepared.channelConfig?.skills, hasRepliedRef, - disableBlockStreaming: - typeof account.config.blockStreaming === "boolean" + disableBlockStreaming: useStreaming + ? false + : typeof account.config.blockStreaming === "boolean" ? !account.config.blockStreaming : undefined, onModelSelected, - onPartialReply: async (payload) => { - updateDraftFromPartial(payload.text); - }, - onAssistantMessageStart: async () => { - if (hasStreamedMessage) { - draftStream.forceNewMessage(); - hasStreamedMessage = false; - appendRenderedText = ""; - appendSourceText = ""; - statusUpdateCount = 0; - } - }, - onReasoningEnd: async () => { - if (hasStreamedMessage) { - draftStream.forceNewMessage(); - hasStreamedMessage = false; - appendRenderedText = ""; - appendSourceText = ""; - statusUpdateCount = 0; - } - }, + onPartialReply: useStreaming + ? undefined + : async (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: useStreaming + ? undefined + : async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; + } + }, + onReasoningEnd: useStreaming + ? undefined + : async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; + } + }, }, }); await draftStream.flush(); draftStream.stop(); markDispatchIdle(); + // ----------------------------------------------------------------------- + // Finalize the stream if one was started + // ----------------------------------------------------------------------- + const finalStream = streamSession as SlackStreamSession | null; + if (finalStream && !finalStream.stopped) { + try { + await stopSlackStream({ session: finalStream }); + } catch (err) { + runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`)); + } + } + const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0; if (!anyReplyDelivered) { diff --git a/src/slack/streaming.ts b/src/slack/streaming.ts new file mode 100644 index 000000000..f9e1ab697 --- /dev/null +++ b/src/slack/streaming.ts @@ -0,0 +1,137 @@ +/** + * Slack native text streaming helpers. + * + * Uses the Slack SDK's `ChatStreamer` (via `client.chatStream()`) to stream + * text responses word-by-word in a single updating message, matching Slack's + * "Agents & AI Apps" streaming UX. + * + * @see https://docs.slack.dev/ai/developing-ai-apps#streaming + * @see https://docs.slack.dev/reference/methods/chat.startStream + * @see https://docs.slack.dev/reference/methods/chat.appendStream + * @see https://docs.slack.dev/reference/methods/chat.stopStream + */ + +import type { WebClient } from "@slack/web-api"; +import type { ChatStreamer } from "@slack/web-api/dist/chat-stream.js"; +import { logVerbose } from "../globals.js"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type SlackStreamSession = { + /** The SDK ChatStreamer instance managing this stream. */ + streamer: ChatStreamer; + /** Channel this stream lives in. */ + channel: string; + /** Thread timestamp (required for streaming). */ + threadTs: string; + /** True once stop() has been called. */ + stopped: boolean; +}; + +export type StartSlackStreamParams = { + client: WebClient; + channel: string; + threadTs: string; + /** Optional initial markdown text to include in the stream start. */ + text?: string; +}; + +export type AppendSlackStreamParams = { + session: SlackStreamSession; + text: string; +}; + +export type StopSlackStreamParams = { + session: SlackStreamSession; + /** Optional final markdown text to append before stopping. */ + text?: string; +}; + +// --------------------------------------------------------------------------- +// Stream lifecycle +// --------------------------------------------------------------------------- + +/** + * Start a new Slack text stream. + * + * Returns a {@link SlackStreamSession} that should be passed to + * {@link appendSlackStream} and {@link stopSlackStream}. + * + * The first chunk of text can optionally be included via `text`. + */ +export async function startSlackStream( + params: StartSlackStreamParams, +): Promise { + const { client, channel, threadTs, text } = params; + + logVerbose(`slack-stream: starting stream in ${channel} thread=${threadTs}`); + + const streamer = client.chatStream({ + channel, + thread_ts: threadTs, + }); + + const session: SlackStreamSession = { + streamer, + channel, + threadTs, + stopped: false, + }; + + // If initial text is provided, send it as the first append which will + // trigger the ChatStreamer to call chat.startStream under the hood. + if (text) { + await streamer.append({ markdown_text: text }); + logVerbose(`slack-stream: appended initial text (${text.length} chars)`); + } + + return session; +} + +/** + * Append markdown text to an active Slack stream. + */ +export async function appendSlackStream(params: AppendSlackStreamParams): Promise { + const { session, text } = params; + + if (session.stopped) { + logVerbose("slack-stream: attempted to append to a stopped stream, ignoring"); + return; + } + + if (!text) { + return; + } + + await session.streamer.append({ markdown_text: text }); + logVerbose(`slack-stream: appended ${text.length} chars`); +} + +/** + * Stop (finalize) a Slack stream. + * + * After calling this the stream message becomes a normal Slack message. + * Optionally include final text to append before stopping. + */ +export async function stopSlackStream(params: StopSlackStreamParams): Promise { + const { session, text } = params; + + if (session.stopped) { + logVerbose("slack-stream: stream already stopped, ignoring duplicate stop"); + return; + } + + session.stopped = true; + + logVerbose( + `slack-stream: stopping stream in ${session.channel} thread=${session.threadTs}${ + text ? ` (final text: ${text.length} chars)` : "" + }`, + ); + + await session.streamer.stop(text ? { markdown_text: text } : undefined); + + logVerbose("slack-stream: stream stopped"); +}