Files
2026-01-30 03:04:10 +00:00

8.1 KiB

State Management Patterns

Strategies for managing state in Cloudflare Agents.

How State Works

State is automatically persisted to the cf_agents_state SQL table. The this.state getter lazily loads from storage, while this.setState() serializes and persists changes. State survives Durable Object evictions.

class MyAgent extends Agent<Env, { count: number }> {
  initialState = { count: 0 };

  increment() {
    this.setState({ count: this.state.count + 1 });
  }

  onStateUpdate(state: State, source: string) {
    console.log("State updated by:", source);
  }
}

State vs SQL: When to Use Which

Use this.state + setState() When:

  • Data is small (< 1MB recommended)
  • Needs real-time sync to all connected clients
  • Simple key-value or object structure
  • Frequently read, occasionally updated
interface State {
  currentUser: { id: string; name: string };
  preferences: Record<string, string>;
  recentMessages: Message[];  // Keep limited, e.g., last 50
  isTyping: boolean;
}

Use this.sql When:

  • Large datasets (many records)
  • Complex queries (JOINs, aggregations, filtering)
  • Historical data / audit logs
  • Data that doesn't need real-time sync
// Good for SQL
// - Full message history
// - User documents
// - Analytics events
// - Search indexes

Hybrid Pattern

Combine both for optimal performance:

interface State {
  recentMessages: Message[];
  onlineUsers: string[];
  currentDocument: Document | null;
}

export class HybridAgent extends Agent<Env, State> {
  initialState: State = {
    recentMessages: [],
    onlineUsers: [],
    currentDocument: null,
  };

  async onStart() {
    await this.sql`
      CREATE TABLE IF NOT EXISTS messages (
        id TEXT PRIMARY KEY,
        user_id TEXT NOT NULL,
        content TEXT NOT NULL,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
      )
    `;

    const recent = await this.sql`
      SELECT * FROM messages
      ORDER BY created_at DESC
      LIMIT 50
    `;

    this.setState({
      ...this.state,
      recentMessages: recent.reverse(),
    });
  }

  async addMessage(message: Message) {
    await this.sql`
      INSERT INTO messages (id, user_id, content)
      VALUES (${message.id}, ${message.userId}, ${message.content})
    `;

    const recentMessages = [...this.state.recentMessages, message].slice(-50);
    this.setState({ ...this.state, recentMessages });
  }
}

Queue System

The SDK includes a built-in queue for background task processing. Tasks are stored in SQLite and processed in FIFO order.

Queue Methods

Method Purpose
queue(callback, payload) Add task, returns task ID
dequeue(id) Remove specific task
dequeueAll() Clear entire queue
dequeueAllByCallback(name) Remove tasks by callback name
getQueue(id) Get single task
getQueues(key, value) Find tasks by payload field

Queue Example

export class TaskAgent extends Agent<Env, State> {
  async onMessage(connection: Connection, message: string) {
    const data = JSON.parse(message);

    if (data.type === "process_later") {
      const taskId = await this.queue("processItem", {
        itemId: data.itemId,
        priority: data.priority,
      });

      connection.send(JSON.stringify({ queued: true, taskId }));
    }
  }

  // Callback receives payload and QueueItem metadata
  async processItem(payload: { itemId: string }, item: QueueItem) {
    console.log(`Processing ${payload.itemId}, queued at ${item.createdAt}`);
    // Successfully executed tasks are auto-removed
  }
}

Queue characteristics:

  • Sequential processing (no parallelization)
  • Persists across agent restarts
  • No built-in retry mechanism
  • Payloads must be JSON-serializable

Context Management

Custom methods automatically have full agent context. Use getCurrentAgent() to access context from external functions.

import { getCurrentAgent } from "agents";

// External utility function
async function logActivity(action: string) {
  const { agent } = getCurrentAgent<MyAgent>();
  await agent.sql`
    INSERT INTO activity_log (action, timestamp)
    VALUES (${action}, ${Date.now()})
  `;
}

export class MyAgent extends Agent<Env, State> {
  async performAction() {
    // Context automatically available
    await logActivity("action_performed");
  }
}

getCurrentAgent<T>() returns:

  • agent - The current agent instance
  • connection - Connection object (if applicable)
  • request - Request object (if applicable)

State Synchronization

Optimistic Updates

Update UI immediately, then persist:

async onMessage(connection: Connection, message: string) {
  const data = JSON.parse(message);

  if (data.type === "update_preference") {
    this.setState({
      ...this.state,
      preferences: {
        ...this.state.preferences,
        [data.key]: data.value,
      },
    });

    await this.sql`
      INSERT OR REPLACE INTO preferences (key, value)
      VALUES (${data.key}, ${data.value})
    `;
  }
}

Conflict Resolution

Handle concurrent updates with versioning:

interface State {
  document: {
    content: string;
    version: number;
    lastModifiedBy: string;
  };
}

async updateDocument(userId: string, newContent: string, expectedVersion: number) {
  if (this.state.document.version !== expectedVersion) {
    throw new Error("Conflict: document was modified by another user");
  }

  this.setState({
    ...this.state,
    document: {
      content: newContent,
      version: expectedVersion + 1,
      lastModifiedBy: userId,
    },
  });
}

Per-Connection State

Track ephemeral state for each connected client:

export class MultiUserAgent extends Agent<Env, State> {
  private connectionState = new Map<string, {
    userId: string;
    cursor: { x: number; y: number };
    lastActivity: number;
  }>();

  async onConnect(connection: Connection) {
    this.connectionState.set(connection.id, {
      userId: "",
      cursor: { x: 0, y: 0 },
      lastActivity: Date.now(),
    });
  }

  async onClose(connection: Connection) {
    this.connectionState.delete(connection.id);
  }
}

State Migration

When state schema changes:

interface StateV2 {
  messages: Array<{ id: string; content: string; timestamp: string }>;
  version: 2;
}

export class MigratingAgent extends Agent<Env, StateV2> {
  initialState: StateV2 = {
    messages: [],
    version: 2,
  };

  async onStart() {
    const rawState = this.state as any;

    if (!rawState.version || rawState.version < 2) {
      const migratedMessages = (rawState.messages || []).map(
        (content: string, i: number) => ({
          id: `migrated-${i}`,
          content,
          timestamp: new Date().toISOString(),
        })
      );

      this.setState({
        messages: migratedMessages,
        version: 2,
      });
    }
  }
}

State Size Management

Keep state lean for performance:

export class LeanStateAgent extends Agent<Env, State> {
  private readonly MAX_RECENT_MESSAGES = 100;

  async addMessage(message: Message) {
    await this.sql`INSERT INTO messages (id, content) VALUES (${message.id}, ${message.content})`;

    let recentMessages = [...this.state.recentMessages, message];
    if (recentMessages.length > this.MAX_RECENT_MESSAGES) {
      recentMessages = recentMessages.slice(-this.MAX_RECENT_MESSAGES);
    }

    this.setState({
      ...this.state,
      recentMessages,
      stats: {
        ...this.state.stats,
        totalMessages: this.state.stats.totalMessages + 1,
        lastActivity: new Date().toISOString(),
      },
    });
  }
}

Debugging State

async onMessage(connection: Connection, message: string) {
  const data = JSON.parse(message);

  if (data.type === "debug_state") {
    connection.send(JSON.stringify({
      type: "debug_response",
      state: this.state,
      stateSize: JSON.stringify(this.state).length,
      sqlTables: await this.sql`
        SELECT name FROM sqlite_master WHERE type='table'
      `,
    }));
  }
}