Gateway: fix node invoke receive loop

This commit is contained in:
Mariano Belinky
2026-01-31 17:46:57 +01:00
committed by Mariano Belinky
parent 78f7e5147b
commit 84e115834f
6 changed files with 440 additions and 12 deletions

View File

@@ -284,6 +284,9 @@ final class GatewayConnectionController {
private func makeConnectOptions() -> GatewayConnectOptions { private func makeConnectOptions() -> GatewayConnectOptions {
let defaults = UserDefaults.standard let defaults = UserDefaults.standard
let displayName = self.resolvedDisplayName(defaults: defaults) let displayName = self.resolvedDisplayName(defaults: defaults)
let manualClientId = defaults.string(forKey: "gateway.manual.clientId")?
.trimmingCharacters(in: .whitespacesAndNewlines)
let resolvedClientId = manualClientId?.isEmpty == false ? manualClientId! : "openclaw-ios"
return GatewayConnectOptions( return GatewayConnectOptions(
role: "node", role: "node",
@@ -291,7 +294,7 @@ final class GatewayConnectionController {
caps: self.currentCaps(), caps: self.currentCaps(),
commands: self.currentCommands(), commands: self.currentCommands(),
permissions: self.currentPermissions(), permissions: self.currentPermissions(),
clientId: "openclaw-ios", clientId: resolvedClientId,
clientMode: "node", clientMode: "node",
clientDisplayName: displayName) clientDisplayName: displayName)
} }

View File

@@ -11,6 +11,7 @@ enum GatewaySettingsStore {
private static let manualHostDefaultsKey = "gateway.manual.host" private static let manualHostDefaultsKey = "gateway.manual.host"
private static let manualPortDefaultsKey = "gateway.manual.port" private static let manualPortDefaultsKey = "gateway.manual.port"
private static let manualTlsDefaultsKey = "gateway.manual.tls" private static let manualTlsDefaultsKey = "gateway.manual.tls"
private static let manualPasswordDefaultsKey = "gateway.manual.password"
private static let discoveryDebugLogsDefaultsKey = "gateway.discovery.debugLogs" private static let discoveryDebugLogsDefaultsKey = "gateway.discovery.debugLogs"
private static let instanceIdAccount = "instanceId" private static let instanceIdAccount = "instanceId"
@@ -21,6 +22,7 @@ enum GatewaySettingsStore {
self.ensureStableInstanceID() self.ensureStableInstanceID()
self.ensurePreferredGatewayStableID() self.ensurePreferredGatewayStableID()
self.ensureLastDiscoveredGatewayStableID() self.ensureLastDiscoveredGatewayStableID()
self.ensureManualGatewayPassword()
} }
static func loadStableInstanceID() -> String? { static func loadStableInstanceID() -> String? {
@@ -174,4 +176,23 @@ enum GatewaySettingsStore {
} }
} }
private static func ensureManualGatewayPassword() {
let defaults = UserDefaults.standard
let instanceId = defaults.string(forKey: self.instanceIdDefaultsKey)?
.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
guard !instanceId.isEmpty else { return }
let manualPassword = defaults.string(forKey: self.manualPasswordDefaultsKey)?
.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
guard !manualPassword.isEmpty else { return }
if self.loadGatewayPassword(instanceId: instanceId) == nil {
self.saveGatewayPassword(manualPassword, instanceId: instanceId)
}
if self.loadGatewayPassword(instanceId: instanceId) == manualPassword {
defaults.removeObject(forKey: self.manualPasswordDefaultsKey)
}
}
} }

View File

@@ -12,6 +12,9 @@ private let nodeService = "ai.openclaw.node"
private let instanceIdEntry = KeychainEntry(service: nodeService, account: "instanceId") private let instanceIdEntry = KeychainEntry(service: nodeService, account: "instanceId")
private let preferredGatewayEntry = KeychainEntry(service: gatewayService, account: "preferredStableID") private let preferredGatewayEntry = KeychainEntry(service: gatewayService, account: "preferredStableID")
private let lastGatewayEntry = KeychainEntry(service: gatewayService, account: "lastDiscoveredStableID") private let lastGatewayEntry = KeychainEntry(service: gatewayService, account: "lastDiscoveredStableID")
private func gatewayPasswordEntry(instanceId: String) -> KeychainEntry {
KeychainEntry(service: gatewayService, account: "gateway-password.\(instanceId)")
}
private func snapshotDefaults(_ keys: [String]) -> [String: Any?] { private func snapshotDefaults(_ keys: [String]) -> [String: Any?] {
let defaults = UserDefaults.standard let defaults = UserDefaults.standard
@@ -124,4 +127,33 @@ private func restoreKeychain(_ snapshot: [KeychainEntry: String?]) {
#expect(defaults.string(forKey: "gateway.preferredStableID") == "preferred-from-keychain") #expect(defaults.string(forKey: "gateway.preferredStableID") == "preferred-from-keychain")
#expect(defaults.string(forKey: "gateway.lastDiscoveredStableID") == "last-from-keychain") #expect(defaults.string(forKey: "gateway.lastDiscoveredStableID") == "last-from-keychain")
} }
@Test func bootstrapCopiesManualPasswordToKeychainWhenMissing() {
let instanceId = "node-test"
let defaultsKeys = [
"node.instanceId",
"gateway.manual.password",
]
let passwordEntry = gatewayPasswordEntry(instanceId: instanceId)
let defaultsSnapshot = snapshotDefaults(defaultsKeys)
let keychainSnapshot = snapshotKeychain([passwordEntry, instanceIdEntry])
defer {
restoreDefaults(defaultsSnapshot)
restoreKeychain(keychainSnapshot)
}
applyDefaults([
"node.instanceId": instanceId,
"gateway.manual.password": "manual-secret",
])
applyKeychain([
passwordEntry: nil,
instanceIdEntry: nil,
])
GatewaySettingsStore.bootstrapPersistence()
#expect(KeychainStore.loadString(service: gatewayService, account: passwordEntry.account) == "manual-secret")
#expect(UserDefaults.standard.string(forKey: "gateway.manual.password") == nil)
}
} }

View File

@@ -110,7 +110,13 @@ private enum ConnectChallengeError: Error {
public actor GatewayChannelActor { public actor GatewayChannelActor {
private let logger = Logger(subsystem: "ai.openclaw", category: "gateway") private let logger = Logger(subsystem: "ai.openclaw", category: "gateway")
#if DEBUG
private var debugEventLogCount = 0
private var debugMessageLogCount = 0
private var debugListenLogCount = 0
#endif
private var task: WebSocketTaskBox? private var task: WebSocketTaskBox?
private var listenTask: Task<Void, Never>?
private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:] private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:]
private var connected = false private var connected = false
private var isConnecting = false private var isConnecting = false
@@ -169,6 +175,9 @@ public actor GatewayChannelActor {
self.tickTask?.cancel() self.tickTask?.cancel()
self.tickTask = nil self.tickTask = nil
self.listenTask?.cancel()
self.listenTask = nil
self.task?.cancel(with: .goingAway, reason: nil) self.task?.cancel(with: .goingAway, reason: nil)
self.task = nil self.task = nil
@@ -221,6 +230,8 @@ public actor GatewayChannelActor {
self.isConnecting = true self.isConnecting = true
defer { self.isConnecting = false } defer { self.isConnecting = false }
self.listenTask?.cancel()
self.listenTask = nil
self.task?.cancel(with: .goingAway, reason: nil) self.task?.cancel(with: .goingAway, reason: nil)
self.task = self.session.makeWebSocketTask(url: self.url) self.task = self.session.makeWebSocketTask(url: self.url)
self.task?.resume() self.task?.resume()
@@ -248,6 +259,7 @@ public actor GatewayChannelActor {
throw wrapped throw wrapped
} }
self.listen() self.listen()
self.logger.info("gateway ws listen registered")
self.connected = true self.connected = true
self.backoffMs = 500 self.backoffMs = 500
self.lastSeq = nil self.lastSeq = nil
@@ -416,28 +428,50 @@ public actor GatewayChannelActor {
guard let self else { return } guard let self else { return }
await self.watchTicks() await self.watchTicks()
} }
await self.pushHandler?(.snapshot(ok)) if let pushHandler = self.pushHandler {
Task { await pushHandler(.snapshot(ok)) }
}
} }
private func listen() { private func listen() {
self.task?.receive { [weak self] result in #if DEBUG
if self.debugListenLogCount < 3 {
self.debugListenLogCount += 1
self.logger.info("gateway ws listen start")
}
#endif
self.listenTask?.cancel()
self.listenTask = Task { [weak self] in
guard let self else { return } guard let self else { return }
switch result { defer { Task { await self.clearListenTask() } }
case let .failure(err): while !Task.isCancelled {
Task { await self.handleReceiveFailure(err) } guard let task = await self.currentTask() else { return }
case let .success(msg): do {
Task { let msg = try await task.receive()
await self.handle(msg) await self.handle(msg)
await self.listen() } catch {
if Task.isCancelled { return }
await self.handleReceiveFailure(error)
return
} }
} }
} }
} }
private func clearListenTask() {
self.listenTask = nil
}
private func currentTask() -> WebSocketTaskBox? {
self.task
}
private func handleReceiveFailure(_ err: Error) async { private func handleReceiveFailure(_ err: Error) async {
let wrapped = self.wrap(err, context: "gateway receive") let wrapped = self.wrap(err, context: "gateway receive")
self.logger.error("gateway ws receive failed \(wrapped.localizedDescription, privacy: .public)") self.logger.error("gateway ws receive failed \(wrapped.localizedDescription, privacy: .public)")
self.connected = false self.connected = false
self.listenTask?.cancel()
self.listenTask = nil
await self.disconnectHandler?("receive failed: \(wrapped.localizedDescription)") await self.disconnectHandler?("receive failed: \(wrapped.localizedDescription)")
await self.failPending(wrapped) await self.failPending(wrapped)
await self.scheduleReconnect() await self.scheduleReconnect()
@@ -449,6 +483,13 @@ public actor GatewayChannelActor {
case let .string(s): s.data(using: .utf8) case let .string(s): s.data(using: .utf8)
@unknown default: nil @unknown default: nil
} }
#if DEBUG
if self.debugMessageLogCount < 8 {
self.debugMessageLogCount += 1
let size = data?.count ?? 0
self.logger.info("gateway ws message received size=\(size, privacy: .public)")
}
#endif
guard let data else { return } guard let data else { return }
guard let frame = try? self.decoder.decode(GatewayFrame.self, from: data) else { guard let frame = try? self.decoder.decode(GatewayFrame.self, from: data) else {
self.logger.error("gateway decode failed") self.logger.error("gateway decode failed")
@@ -462,6 +503,13 @@ public actor GatewayChannelActor {
} }
case let .event(evt): case let .event(evt):
if evt.event == "connect.challenge" { return } if evt.event == "connect.challenge" { return }
#if DEBUG
if self.debugEventLogCount < 12 {
self.debugEventLogCount += 1
self.logger.info(
"gateway event received event=\(evt.event, privacy: .public) payload=\(evt.payload != nil, privacy: .public)")
}
#endif
if let seq = evt.seq { if let seq = evt.seq {
if let last = lastSeq, seq > last + 1 { if let last = lastSeq, seq > last + 1 {
await self.pushHandler?(.seqGap(expected: last + 1, received: seq)) await self.pushHandler?(.seqGap(expected: last + 1, received: seq))

View File

@@ -190,10 +190,11 @@ public actor GatewayNodeSession {
private func handleEvent(_ evt: EventFrame) async { private func handleEvent(_ evt: EventFrame) async {
self.broadcastServerEvent(evt) self.broadcastServerEvent(evt)
guard evt.event == "node.invoke.request" else { return } guard evt.event == "node.invoke.request" else { return }
self.logger.info("node invoke request received")
guard let payload = evt.payload else { return } guard let payload = evt.payload else { return }
do { do {
let data = try self.encoder.encode(payload) let request = try self.decodeInvokeRequest(from: payload)
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data) self.logger.info("node invoke request decoded id=\(request.id, privacy: .public) command=\(request.command, privacy: .public)")
guard let onInvoke else { return } guard let onInvoke else { return }
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON) let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
let response = await Self.invokeWithTimeout( let response = await Self.invokeWithTimeout(
@@ -207,8 +208,21 @@ public actor GatewayNodeSession {
} }
} }
private func decodeInvokeRequest(from payload: OpenClawProtocol.AnyCodable) throws -> NodeInvokeRequestPayload {
do {
let data = try self.encoder.encode(payload)
return try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
} catch {
if let raw = payload.value as? String, let data = raw.data(using: .utf8) {
return try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
}
throw error
}
}
private func sendInvokeResult(request: NodeInvokeRequestPayload, response: BridgeInvokeResponse) async { private func sendInvokeResult(request: NodeInvokeRequestPayload, response: BridgeInvokeResponse) async {
guard let channel = self.channel else { return } guard let channel = self.channel else { return }
self.logger.info("node invoke result sending id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)")
var params: [String: AnyCodable] = [ var params: [String: AnyCodable] = [
"id": AnyCodable(request.id), "id": AnyCodable(request.id),
"nodeId": AnyCodable(request.nodeId), "nodeId": AnyCodable(request.nodeId),
@@ -226,7 +240,7 @@ public actor GatewayNodeSession {
do { do {
try await channel.send(method: "node.invoke.result", params: params) try await channel.send(method: "node.invoke.result", params: params)
} catch { } catch {
self.logger.error("node invoke result failed: \(error.localizedDescription, privacy: .public)") self.logger.error("node invoke result failed id=\(request.id, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
} }
} }

View File

@@ -0,0 +1,310 @@
import Foundation
import Testing
@testable import OpenClawKit
import OpenClawProtocol
@Suite struct GatewayNodeInvokeTests {
@Test
func nodeInvokeRequestSendsInvokeResult() async throws {
let task = TestWebSocketTask()
let session = TestWebSocketSession(task: task)
task.enqueue(Self.makeEventMessage(
event: "connect.challenge",
payload: ["nonce": "test-nonce"]))
let tracker = InvokeTracker()
let gateway = GatewayNodeSession()
try await gateway.connect(
url: URL(string: "ws://127.0.0.1:18789")!,
token: nil,
password: "test-password",
connectOptions: GatewayConnectOptions(
role: "node",
scopes: [],
caps: [],
commands: ["device.info"],
permissions: [:],
clientId: "openclaw-ios",
clientMode: "node",
clientDisplayName: "Test iOS Node"),
sessionBox: WebSocketSessionBox(session: session),
onConnected: {},
onDisconnected: { _ in },
onInvoke: { req in
await tracker.set(req)
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{\"ok\":true}")
})
task.enqueue(Self.makeEventMessage(
event: "node.invoke.request",
payload: [
"id": "invoke-1",
"nodeId": "node-1",
"command": "device.info",
"timeoutMs": 15000,
"idempotencyKey": "abc123",
]))
let resultFrame = try await waitForSentMethod(
task,
method: "node.invoke.result",
timeoutSeconds: 1.0)
let sentParams = resultFrame.params?.value as? [String: OpenClawProtocol.AnyCodable]
#expect(sentParams?["id"]?.value as? String == "invoke-1")
#expect(sentParams?["nodeId"]?.value as? String == "node-1")
#expect(sentParams?["ok"]?.value as? Bool == true)
let captured = await tracker.get()
#expect(captured?.command == "device.info")
#expect(captured?.id == "invoke-1")
}
@Test
func nodeInvokeRequestHandlesStringPayload() async throws {
let task = TestWebSocketTask()
let session = TestWebSocketSession(task: task)
task.enqueue(Self.makeEventMessage(
event: "connect.challenge",
payload: ["nonce": "test-nonce"]))
let tracker = InvokeTracker()
let gateway = GatewayNodeSession()
try await gateway.connect(
url: URL(string: "ws://127.0.0.1:18789")!,
token: nil,
password: "test-password",
connectOptions: GatewayConnectOptions(
role: "node",
scopes: [],
caps: [],
commands: ["device.info"],
permissions: [:],
clientId: "openclaw-ios",
clientMode: "node",
clientDisplayName: "Test iOS Node"),
sessionBox: WebSocketSessionBox(session: session),
onConnected: {},
onDisconnected: { _ in },
onInvoke: { req in
await tracker.set(req)
return BridgeInvokeResponse(id: req.id, ok: true)
})
let payload = """
{"id":"invoke-2","nodeId":"node-1","command":"device.info"}
"""
task.enqueue(Self.makeEventMessage(
event: "node.invoke.request",
payload: payload))
let resultFrame = try await waitForSentMethod(
task,
method: "node.invoke.result",
timeoutSeconds: 1.0)
let sentParams = resultFrame.params?.value as? [String: OpenClawProtocol.AnyCodable]
#expect(sentParams?["id"]?.value as? String == "invoke-2")
#expect(sentParams?["nodeId"]?.value as? String == "node-1")
#expect(sentParams?["ok"]?.value as? Bool == true)
let captured = await tracker.get()
#expect(captured?.command == "device.info")
#expect(captured?.id == "invoke-2")
}
}
private enum TestError: Error {
case timeout
}
private func waitForSentMethod(
_ task: TestWebSocketTask,
method: String,
timeoutSeconds: Double
) async throws -> RequestFrame {
try await AsyncTimeout.withTimeout(
seconds: timeoutSeconds,
onTimeout: { TestError.timeout },
operation: {
while true {
let frames = task.sentRequests()
if let match = frames.first(where: { $0.method == method }) {
return match
}
try? await Task.sleep(nanoseconds: 50_000_000)
}
})
}
private actor InvokeTracker {
private var request: BridgeInvokeRequest?
func set(_ req: BridgeInvokeRequest) {
self.request = req
}
func get() -> BridgeInvokeRequest? {
self.request
}
}
private final class TestWebSocketSession: WebSocketSessioning {
private let task: TestWebSocketTask
init(task: TestWebSocketTask) {
self.task = task
}
func makeWebSocketTask(url: URL) -> WebSocketTaskBox {
WebSocketTaskBox(task: self.task)
}
}
private final class TestWebSocketTask: WebSocketTasking, @unchecked Sendable {
private let lock = NSLock()
private var _state: URLSessionTask.State = .suspended
private var receiveQueue: [URLSessionWebSocketTask.Message] = []
private var receiveContinuations: [CheckedContinuation<URLSessionWebSocketTask.Message, Error>] = []
private var receiveHandlers: [@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void] = []
private var sent: [URLSessionWebSocketTask.Message] = []
var state: URLSessionTask.State {
self.lock.withLock { self._state }
}
func resume() {
self.lock.withLock { self._state = .running }
}
func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
self.lock.withLock { self._state = .canceling }
}
func send(_ message: URLSessionWebSocketTask.Message) async throws {
self.lock.withLock { self.sent.append(message) }
guard let frame = Self.decodeRequestFrame(message) else { return }
guard frame.method == "connect" else { return }
let id = frame.id
let response = Self.connectResponse(for: id)
self.enqueue(.data(response))
}
func receive() async throws -> URLSessionWebSocketTask.Message {
try await withCheckedThrowingContinuation { cont in
var next: URLSessionWebSocketTask.Message?
self.lock.withLock {
if !self.receiveQueue.isEmpty {
next = self.receiveQueue.removeFirst()
} else {
self.receiveContinuations.append(cont)
}
}
if let next { cont.resume(returning: next) }
}
}
func receive(completionHandler: @escaping @Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void) {
var next: URLSessionWebSocketTask.Message?
self.lock.withLock {
if !self.receiveQueue.isEmpty {
next = self.receiveQueue.removeFirst()
} else {
self.receiveHandlers.append(completionHandler)
}
}
if let next {
completionHandler(.success(next))
}
}
func enqueue(_ message: URLSessionWebSocketTask.Message) {
var handler: (@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)?
var continuation: CheckedContinuation<URLSessionWebSocketTask.Message, Error>?
self.lock.withLock {
if !self.receiveHandlers.isEmpty {
handler = self.receiveHandlers.removeFirst()
} else if !self.receiveContinuations.isEmpty {
continuation = self.receiveContinuations.removeFirst()
} else {
self.receiveQueue.append(message)
}
}
if let handler {
handler(.success(message))
} else if let continuation {
continuation.resume(returning: message)
}
}
func sentRequests() -> [RequestFrame] {
let messages = self.lock.withLock { self.sent }
return messages.compactMap(Self.decodeRequestFrame)
}
private static func decodeRequestFrame(_ message: URLSessionWebSocketTask.Message) -> RequestFrame? {
let data: Data?
switch message {
case let .data(raw): data = raw
case let .string(text): data = text.data(using: .utf8)
@unknown default: data = nil
}
guard let data else { return nil }
return try? JSONDecoder().decode(RequestFrame.self, from: data)
}
private static func connectResponse(for id: String) -> Data {
let payload: [String: Any] = [
"type": "hello-ok",
"protocol": 3,
"server": [
"version": "dev",
"connId": "test-conn",
],
"features": [
"methods": [],
"events": [],
],
"snapshot": [
"presence": [],
"health": ["ok": true],
"stateVersion": ["presence": 0, "health": 0],
"uptimeMs": 0,
],
"policy": [
"maxPayload": 1,
"maxBufferedBytes": 1,
"tickIntervalMs": 1000,
],
]
let frame: [String: Any] = [
"type": "res",
"id": id,
"ok": true,
"payload": payload,
]
return (try? JSONSerialization.data(withJSONObject: frame)) ?? Data()
}
}
private extension GatewayNodeInvokeTests {
static func makeEventMessage(event: String, payload: Any) -> URLSessionWebSocketTask.Message {
let frame: [String: Any] = [
"type": "event",
"event": event,
"payload": payload,
]
let data = try? JSONSerialization.data(withJSONObject: frame)
return .data(data ?? Data())
}
}
private extension NSLock {
func withLock<T>(_ body: () -> T) -> T {
self.lock()
defer { self.unlock() }
return body()
}
}