12 minute read

当用户在whatsapp, discord 等消息软件中发送了消息后,网关是如何获得的,再到回复,整个流程是如何运转的?

OpenClaw 消息处理完整流程

1. 消息接入2. 路由决策3. AI 处理4. 回复发送


核心文件位置

模块 文件位置 功能
渠道实现 extensions/*/src/channel.ts WhatsApp/Discord/Telegram 等渠道插件
渠道监听 extensions/discord/src/monitor/listeners.ts 监听渠道消息事件
消息分发 src/auto-reply/dispatch.ts 协调预处理、路由、回复
路由解析 src/routing/resolve-route.ts 根据 bindings 配置决定由哪个 agent 处理
网关服务 src/gateway/server.impl.ts 网关服务器主实现
Agent 执行 src/agents/pi-embedded-runner/ 运行 AI agent
消息发送 src/infra/outbound/deliver.ts 统一发送逻辑

详细流程示例(以 Discord 为例)

Discord 用户发送消息
       ↓
[extensions/discord/src/monitor/listeners.ts
   DiscordMessageListener 监听消息事件
       ↓
   预处理(权限检查、白名单验证
       ↓
   解析消息内容、附件、用户信息
       ↓
src/auto-reply/dispatch.ts
   dispatchInboundMessage()
       ↓
src/routing/resolve-route.ts
   解析路由绑定(bindings)
   ├─ 匹配规则优先级:
   ├─ peer (用户)
   ├─ guild (服务器)
   ├─ channel (渠道)
   └─ default (默认)
       ↓
   生成 sessionKey,决定由哪个 agent 处理
       ↓
src/agents/pi-embedded-runner/
   执行 AI agent,处理工具调用
       ↓
src/infra/outbound/deliver.ts
   规范化回复内容
       ↓
   根据渠道特性格式化消息
       ↓
extensions/discord/src/send.ts
   发送回 Discord

关键设计点

  1. 渠道插件化 - 每个渠道在 extensions/*/ 独立实现
  2. 路由绑定 - `bindings 配置按优先级匹配
  3. 会话管理 - sessionKey 追踪会话状态
  4. 统一发送 - 各渠道格式统一处理

展示一下 resolveAgentRoute 函数的具体代码

resolveAgentRoute 函数代码详解

这是路由解析的核心函数(位于 src/routing/resolve-route.ts:295):

核心流程

1. 规范化输入 → 2. 获取 bindings → 3. 按优先级匹配 → 4. 返回路由结果

匹配优先级(tiers 数组)

优先级 匹配方式 条件
1 binding.peer 直接匹配用户
2 binding.peer.parent 继承父线程(帖子)的绑定
3 binding.guild+roles Discord 服务器 + 角色
4 binding.guild Discord 服务器
5 binding.team Slack 团队
6 binding.account 账户
7 binding.channel 渠道
8 default 默认 agent

关键返回值

{
  agentId: string;           // 目标 agent ID
  channel: string;           // 渠道
  accountId: string;         // 账户
  sessionKey: string;        // 会话密钥
  mainSessionKey: string;    // 主会话密钥(用于 DM 合并)
  matchedBy: \"...\";          // 匹配方式(用于调试)
}

优化点

  • 使用 WeakMap 缓存 evaluatedBindingsCacheByCfg,避免重复解析 bindings
  • 缓存最大 2000 个 channel+account 组合”

从 Web UI 进来的消息流程是?

Web UI 消息流程

完整链路图

┌─────────────────────────────────────────────────────────────────────────┐
│                         Web UI (前端)                                     │
│  ui/src/ui/controllers/chat.ts                                            │
│  └─ sendChatMessage()                                                     │
│     └─ client.request(\"chat.send\", ...)                                  │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │ WebSocket
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      GatewayBrowserClient                                 │
│  ui/src/ui/gateway.ts                                                    │
│  └─ request() 发送 JSON-RPC 消息                                         │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │ WebSocket
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                   Gateway Server (网关服务端)                              │
│  src/gateway/server-methods/chat.ts                                       │
│  └─ chatHandlers[\"chat.send\"]                                             │
│     ├─ 验证参数                                                           │
│     ├─ 解析附件                                                           │
│     ├─ 加载 session                                                       │
│     ├─ 检查停止命令                                                       │
│     └─ 调用 dispatchInboundMessage()                                     │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                   Auto-Reply (自动回复系统)                                │
│  src/auto-reply/dispatch.ts                                              │
│  └─ dispatchInboundMessage()                                             │
│     └─ dispatchReplyFromConfig()                                         │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                    Routing (路由解析)                                      │
│  src/routing/resolve-route.ts                                            │
│  └─ resolveAgentRoute() → 决定由哪个 agent 处理                          │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                    Agent Execution (Agent 执行)                           │
│  src/agents/pi-embedded-runner/                                          │
│  └─ 运行 AI agent,处理工具调用                                           │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                   回复通过 chat 事件流返回                                  │
│  src/gateway/server-methods/chat.ts                                       │
│  └─ broadcastChatFinal() / broadcastChatEvent()                          │
│     └─ 通过 WebSocket 发送 \"chat\" 事件                                   │
└────────────────────────────────────┬────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      Web UI (前端)                                         │
│  ui/src/ui/controllers/chat.ts                                            │
│  └─ handleChatEvent()                                                     │
│     ├─ \"delta\" 状态: 更新流式文本                                         │
│     └─ \"final\" 状态: 加载完整历史,显示最终消息                           │
└─────────────────────────────────────────────────────────────────────────┘

关键文件位置

环节 文件 说明
前端发送 ui/src/ui/controllers/chat.ts:75 sendChatMessage() 函数
WebSocket 客户端 ui/src/ui/gateway.ts:289 GatewayBrowserClient.request()
网关处理 src/gateway/server-methods/chat.ts:704 chatHandlers[\"chat.send\"]
消息分发 src/auto-reply/dispatch.ts dispatchInboundMessage()
路由解析 src/routing/resolve-route.ts:295 resolveAgentRoute()
事件接收 ui/src/ui/controllers/chat.ts:185 handleChatEvent()

与外部渠道(WhatsApp/Discord)的区别

特性 Web UI 外部渠道
通道 WebSocket (JSON-RPC) 渠道 API/SDK
入口方法 chat.send 渠道监听器
回复方式 WebSocket 事件流 渠道发送 API
Session Key 直接使用 通过 bindings 解析
内部 channel \"internal\" / \"webchat\" 实际渠道名

Web UI 和 外部渠道 都要经过网关控制,然后进行路由?

路由机制的一致性

相同的路由解析函数

无论是 Web UI 还是外部渠道,都会调用同一个路由解析函数:

// 外部渠道(如 WhatsApp)在 src/web/auto-reply/monitor/on-message.ts 中调用
const route = resolveAgentRoute({
  cfg: loadConfig(),
  channel: \"whatsapp\",
  accountId: msg.accountId,
  peer: {
    kind: msg.chatType === \"group\" ? \"group\" : \"direct\",
    id: peerId,
  },
});

// Web UI 在 src/gateway/server-methods/chat.ts 中调用
const agentId = resolveSessionAgentId({
  sessionKey,
  config: cfg,
});
// 而 resolveSessionAgentId 最终会使用与 resolveAgentRoute 相同的逻辑

共享的路由规则

路由规则在配置文件中定义,对所有渠道都是统一的:

// 路由解析的核心逻辑(src/routing/resolve-route.ts)
const tiers: Array<{
  matchedBy: Exclude<ResolvedAgentRoute[\"matchedBy\"], \"default\">;
  enabled: boolean;
  scopePeer: RoutePeer | null;
  predicate: (candidate: EvaluatedBinding) => boolean;
}> = [
  { matchedBy: \"binding.peer\", enabled: Boolean(peer), scopePeer: peer, predicate: (candidate) => candidate.match.peer.state === \"valid\" },
  { matchedBy: \"binding.peer.parent\", enabled: Boolean(parentPeer && parentPeer.id), scopePeer: parentPeer && parentPeer.id ? parentPeer : null, predicate: (candidate) => candidate.match.peer.state === \"valid\" },
  { matchedBy: \"binding.guild+roles\", enabled: Boolean(guildId && memberRoleIds.length > 0), scopePeer: peer, predicate: (candidate) => hasGuildConstraint(candidate.match) && hasRolesConstraint(candidate.match) },
  { matchedBy: \"binding.guild\", enabled: Boolean(guildId), scopePeer: peer, predicate: (candidate) => hasGuildConstraint(candidate.match) && !hasRolesConstraint(candidate.match) },
  { matchedBy: \"binding.team\", enabled: Boolean(teamId), scopePeer: peer, predicate: (candidate) => hasTeamConstraint(candidate.match) },
  { matchedBy: \"binding.account\", enabled: true, scopePeer: peer, predicate: (candidate) => candidate.match.accountPattern !== \"*\" },
  { matchedBy: \"binding.channel\", enabled: true, scopePeer: peer, predicate: (candidate) => candidate.match.accountPattern === \"*\" },
];

统一的回复处理

无论是 Web UI 还是外部渠道,最终都会使用相同的回复处理逻辑:

// Web UI 在 src/gateway/server-methods/chat.ts 中调用
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
  ctx,
  resolvedOpts,
  cfg,
);

// 外部渠道在 src/web/auto-reply/monitor/process-message.ts 中调用
const replyResult = await params.replyResolver(
  ctx,
  replyOptions,
  params.cfg,
);

处理流程的区别

虽然路由机制相同,但两者的入口点和处理流程略有不同:

外部渠道(如 WhatsApp)的流程

  1. 监听渠道消息(monitorWebChannel
  2. 消息预处理(去重、验证等)
  3. 解析路由(resolveAgentRoute
  4. 处理消息和生成回复(processMessage
  5. 发送回复到渠道

Web UI 的流程

  1. 用户在浏览器中发送消息
  2. 前端通过 WebSocket 发送到网关(chat.send 方法)
  3. 网关验证参数并解析会话
  4. 解析路由(resolveSessionAgentId
  5. 处理消息和生成回复(dispatchInboundMessage
  6. 通过 WebSocket 发送回复到前端

总结

Web UI 和外部渠道都要经过网关的控制和路由机制,使用相同的路由规则和回复处理逻辑,确保了系统行为的一致性。这种设计使得开发者可以统一管理所有渠道的消息处理,同时也为扩展新渠道提供了便利。

Pi Agent 如何调用 LLM 和本地执行命令

Pi Agent 调用 LLM 和本地执行命令的机制

1. LLM 调用机制

核心流程

Pi Agent 通过以下步骤调用 LLM:

// 1. 初始化 Agent 会话
const activeSession = createAgentSession({
  session: sessionManager,
  settings: settingsManager,
  // ...其他配置
});

// 2. 配置模型和工具
if (params.model.api === \"ollama\") {
  activeSession.agent.streamFn = createOllamaStreamFn(ollamaBaseUrl);
} else {
  activeSession.agent.streamFn = streamSimple; // 使用 @mariozechner/pi-ai 库
}

// 3. 发送提示给 LLM
await activeSession.prompt(effectivePrompt, { images: imageResult.images });

关键实现位置

  • LLM 配置src/agents/pi-embedded-runner/run/attempt.ts:612-634
  • 模型解析src/agents/pi-embedded-runner/model.ts
  • 会话管理:使用 @mariozechner/pi-coding-agent 库的 SessionManager
  • Ollama 调用src/agents/ollama-stream.ts
  • 通用 LLM 调用@mariozechner/pi-ai 库的 streamSimple 函数

支持的模型提供者

  • OpenAI/GPT-4o
  • Anthropic Claude 3
  • Google Gemini
  • Ollama(本地模型)
  • GitHub Copilot
  • 其他通过配置文件定义的模型

2. 本地命令执行机制

核心工具创建

// 在 src/agents/pi-tools.ts 中创建工具
const tools = createOpenClawCodingTools({
  exec: {
    ...params.execOverrides,
    elevated: params.bashElevated,
  },
  sandbox,
  // ...其他配置
});

工具类型

  • 编码工具:读取、写入、编辑文件
  • 执行工具:执行 shell 命令(bash-tools.exec.ts
  • 进程工具:管理长期运行的进程(bash-tools.process.ts
  • OpenClaw 特定工具:浏览器操作、发送消息等(openclaw-tools.ts

命令执行流程

// 在 src/agents/bash-tools.exec-runtime.ts 中
export async function runExecProcess(opts: {
  command: string;
  workdir: string;
  env: Record<string, string>;
  sandbox?: BashSandboxConfig;
  // ...其他配置
}): Promise<ExecProcessHandle> {
  // 1. 安全检查和环境准备
  validateHostEnv(env);
  
  // 2. 执行命令
  if (opts.sandbox) {
    // 沙箱执行(Docker)
    const argv = [\"docker\", ...buildDockerExecArgs({...})];
    // ...
  } else {
    // 直接执行
    const child = childProcess.spawn(command, options);
    // ...
  }
  
  // 3. 输出处理和更新
  child.stdout.on('data', handleStdout);
  child.stderr.on('data', handleStderr);
  
  // 4. 结果返回
  return processHandle;
}

安全机制

  • 白名单检查evaluateShellAllowlist() 检查命令是否在安全白名单中
  • 沙箱执行:支持 Docker 沙箱
  • 权限分级:根据配置决定安全级别(deny allowlist full)
  • 审计日志:记录所有执行的命令

关键实现位置

  • 工具创建src/agents/pi-tools.ts
  • 执行工具src/agents/bash-tools.exec.ts
  • 执行运行时src/agents/bash-tools.exec-runtime.ts
  • 进程管理src/agents/bash-tools.process.ts
  • 执行审批src/infra/exec-approvals.ts

3. 工具调用流程

Agent 与工具的交互

// 在 src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.ts 中
const subscription = subscribeEmbeddedPiSession({
  session: activeSession,
  runId: params.runId,
  onToolResult: (toolResult) => {
    // 处理工具结果
  },
  // ...其他回调
});

工具调用事件处理

  • 工具执行开始handleToolExecutionStart()
  • 工具执行结束handleToolExecutionEnd()
  • 工具输出处理:通过回调函数实时更新 UI
  • 错误处理:捕获和显示工具错误

4. 配置和策略

工具策略配置

# config.yaml 中的工具策略配置
tools:
  exec:
    security: allowlist  # 安全模式
    ask: on-miss         # 审批模式
    safeBins:            # 安全命令白名单
      - ls
      - cat
      - git

沙箱配置

sandbox:
  enabled: true
  containerName: openclaw-sandbox
  containerWorkdir: /workspace

总结

Pi Agent 提供了一个完整的 AI 代理系统,支持通过配置文件灵活定义工具和模型行为。LLM 调用和本地命令执行通过统一的工具接口实现,同时提供了严格的安全机制(如沙箱、白名单)来确保系统安全。这种架构使得 OpenClaw 能够处理各种复杂的任务,从代码编写到系统管理。

定时任务 执行的流程

OpenClaw 定时任务执行流程

1. 任务定义与配置

定时任务支持三种调度类型:

// src/cron/types.ts
type CronSchedule =
  | { kind: \"at\"; at: string }           // 一次性任务(ISO时间或+duration)
  | { kind: \"every\"; everyMs: number; anchorMs?: number }  // 周期性任务(毫秒间隔)
  | { kind: \"cron\"; expr: string; tz?: string }; // Cron表达式任务

任务存储在 ~/.openclaw/cron/ 目录的 JSON 文件中,通过 CronService 管理。

2. 调度器原理

核心调度器组件

  • CronService:定时任务服务主类(src/cron/service.ts
  • 定时器管理:管理定时任务的唤醒和执行(src/cron/service/timer.ts
  • 作业管理:任务创建、更新、删除(src/cron/service/jobs.ts

调度策略

// src/cron/service/timer.ts
const MAX_TIMER_DELAY_MS = 60_000; // 最大定时器延迟(1分钟)
const MIN_REFIRE_GAP_MS = 2000; // 最小触发间隔(防止连续触发)

export function armTimer(state: CronServiceState) {
  const nextAt = nextWakeAtMs(state);  // 计算下一个任务的执行时间
  const delay = Math.max(nextAt - now, 0);
  const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS);
  
  state.timer = setTimeout(() => {
    void onTimer(state).catch((err) => {
      state.deps.log.error({ err: String(err) }, \"cron: timer tick failed\");
    });
  }, clampedDelay);
}

3. 任务执行流程

定时器触发(onTimer)

// src/cron/service/timer.ts
export async function onTimer(state: CronServiceState) {
  if (state.running) {  // 防止重入
    state.timer = setTimeout(() => void onTimer(state), MAX_TIMER_DELAY_MS);
    return;
  }
  
  state.running = true;
  try {
    // 1. 查找到期任务
    const dueJobs = await locked(state, async () => {
      await ensureLoaded(state, { forceReload: true, skipRecompute: true });
      const due = findDueJobs(state);
      
      for (const job of due) {
        job.state.runningAtMs = state.deps.nowMs();
      }
      await persist(state);
      return due;
    });
    
    // 2. 执行任务
    const results = [];
    for (const { id, job } of dueJobs) {
      const startedAt = state.deps.nowMs();
      const result = await executeJobCore(state, job);
      results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
    }
    
    // 3. 处理执行结果
    await locked(state, async () => {
      for (const result of results) {
        const job = state.store?.jobs.find((j) => j.id === result.jobId);
        if (job) {
          const shouldDelete = applyJobResult(state, job, result);
          if (shouldDelete) {
            state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
          }
        }
      }
      await persist(state);
    });
    
    // 4. 会话清理
    await sweepCronRunSessions({...});
  } finally {
    state.running = false;
    armTimer(state);  // 重新武装定时器
  }
}

任务执行(executeJobCore)

// src/cron/service/timer.ts
async function executeJobCore(state: CronServiceState, job: CronJob): Promise<CronRunOutcome & CronRunTelemetry> {
  if (job.sessionTarget === \"main\") {
    // 主会话任务:发送系统事件
    const text = resolveJobPayloadTextForMain(job);
    state.deps.enqueueSystemEvent(text, {
      agentId: job.agentId,
      sessionKey: job.sessionKey,
      contextKey: `cron:${job.id}`,
    });
    
    if (job.wakeMode === \"now\") {
      await state.deps.runHeartbeatOnce({...});
    } else {
      state.deps.requestHeartbeatNow({...});
    }
  } else if (job.sessionTarget === \"isolated\") {
    // 隔离会话任务:执行AI代理对话
    const res = await state.deps.runIsolatedAgentJob({
      job,
      message: job.payload.message,
    });
  }
}

4. 执行入口

CLI 命令

# 查看任务状态
pnpm openclaw cron status

# 列出所有任务
pnpm openclaw cron list

# 添加任务
pnpm openclaw cron add --at \"2025-12-31T23:59:00\" --message \"Happy New Year!\"

# 立即运行任务
pnpm openclaw cron run <job-id>

网关 API

// src/gateway/server-methods/cron.ts
export const cronHandlers: GatewayRequestHandlers = {
  \"cron.list\": async ({ params, respond, context }) => { ... },
  \"cron.add\": async ({ params, respond, context }) => { ... },
  \"cron.run\": async ({ params, respond, context }) => { ... },
  \"cron.status\": async ({ params, respond, context }) => { ... },
};

5. 与其他系统组件的交互

系统事件

任务可以发送系统事件,在主会话中显示。

心跳系统

任务可以请求立即执行心跳,触发 AI 处理。

隔离代理执行

支持在隔离会话中执行 AI 任务,不会影响主会话的上下文。

6. 错误处理与稳定性

  • 任务超时:默认 10 分钟超时
  • 错误退避:指数退避策略(30s → 1min → 5min → 15min → 60min)
  • 状态恢复:启动时清除过期的运行标记
  • 存储锁定:确保操作原子性

总结

OpenClaw 的定时任务系统是一个设计完善的调度系统,支持多种任务类型和执行方式,具有高可靠性和良好的扩展性。

定时任务的整个生命周期

定时任务的完整生命周期

1. 任务创建阶段

创建入口

  • CLI命令pnpm openclaw cron add
  • API调用cron.add 网关方法
  • UI操作:通过 Web UI 或应用程序界面

创建流程

// src/cron/service/jobs.ts
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {
  const job: CronJob = {
    id: input.id || generateUUID(),
    schedule: input.schedule,
    payload: input.payload,
    agentId: input.agentId,
    sessionKey: input.sessionKey,
    sessionTarget: input.sessionTarget,
    wakeMode: input.wakeMode,
    deleteAfterRun: input.deleteAfterRun,
    webhookTargets: input.webhookTargets,
    tags: input.tags,
    state: {
      runningAtMs: undefined,
      lastRunAtMs: undefined,
      lastStatus: undefined,
      lastDurationMs: undefined,
      consecutiveErrors: 0,
      nextRunAtMs: computeJobNextRunAtMs(input, state.deps.nowMs()),
      lastErrorTimeMs: undefined,
    },
  };
  
  state.store.jobs.push(job);
  return job;
}

任务验证

// src/cron/normalize.ts
export function normalizeCronJobCreate(input: unknown): CronJobCreate | null;

// src/cron/validate-timestamp.ts
export function validateScheduleTimestamp(schedule: CronSchedule): { ok: boolean; message?: string };

2. 任务存储阶段

存储位置

  • 默认路径~/.openclaw/cron/
  • 文件格式:JSON
  • 存储接口src/cron/store.ts

持久化流程

// src/cron/store.ts
async function saveCronStore(path: string, store: CronStore): Promise<void> {
  const dir = path.dirname(path);
  await fs.mkdir(dir, { recursive: true });
  await fs.writeFile(path, JSON.stringify(store, null, 2), \"utf-8\");
}

3. 任务调度阶段

调度器初始化

// src/cron/service/ops.ts
export async function start(state: CronServiceState): Promise<void> {
  await locked(state, async () => {
    await ensureLoaded(state, { skipRecompute: true });
    const jobs = state.store?.jobs ?? [];
    
    // 清除过期的运行标记
    for (const job of jobs) {
      if (typeof job.state.runningAtMs === \"number\") {
        job.state.runningAtMs = undefined;
      }
    }
    
    await runMissedJobs(state);  // 运行错过的任务
    recomputeNextRuns(state);   // 重新计算所有任务的下次运行时间
    await persist(state);       // 持久化状态
    armTimer(state);            // 启动定时器
  });
}

定时器管理

// src/cron/service/timer.ts
export function armTimer(state: CronServiceState): void {
  const nextAt = nextWakeAtMs(state);  // 计算下一个任务的执行时间
  const delay = Math.max(nextAt - now, 0);
  const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); // 限制最大延迟
  
  state.timer = setTimeout(() => {
    void onTimer(state).catch((err) => {
      state.deps.log.error({ err: String(err) }, \"cron: timer tick failed\");
    });
  }, clampedDelay);
}

4. 任务执行阶段

任务触发

// src/cron/service/timer.ts (onTimer)
export async function onTimer(state: CronServiceState) {
  if (state.running) {  // 防止重入
    state.timer = setTimeout(() => void onTimer(state), MAX_TIMER_DELAY_MS);
    return;
  }
  
  state.running = true;
  try {
    const dueJobs = await locked(state, async () => {
      await ensureLoaded(state, { forceReload: true, skipRecompute: true });
      const due = findDueJobs(state);  // 查找到期任务
      
      for (const job of due) {
        job.state.runningAtMs = state.deps.nowMs();
      }
      await persist(state);
      return due;
    });
    
    // 执行任务
    const results = [];
    for (const { id, job } of dueJobs) {
      const startedAt = state.deps.nowMs();
      const result = await executeJobCore(state, job);
      results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
    }
    
    // 处理执行结果
    await locked(state, async () => {
      for (const result of results) {
        const job = state.store?.jobs.find((j) => j.id === result.jobId);
        if (job) {
          const shouldDelete = applyJobResult(state, job, result);
          if (shouldDelete) {
            state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
          }
        }
      }
      await persist(state);
    });
    
    await sweepCronRunSessions({...});
  } finally {
    state.running = false;
    armTimer(state);  // 重新武装定时器
  }
}

任务执行(executeJobCore)

// src/cron/service/timer.ts
async function executeJobCore(state: CronServiceState, job: CronJob): Promise<CronRunOutcome & CronRunTelemetry> {
  if (job.sessionTarget === \"main\") {
    // 主会话任务:发送系统事件
    const text = resolveJobPayloadTextForMain(job);
    state.deps.enqueueSystemEvent(text, {
      agentId: job.agentId,
      sessionKey: job.sessionKey,
      contextKey: `cron:${job.id}`,
    });
    
    if (job.wakeMode === \"now\") {
      await state.deps.runHeartbeatOnce({...});
    } else {
      state.deps.requestHeartbeatNow({...});
    }
  } else if (job.sessionTarget === \"isolated\") {
    // 隔离会话任务:执行AI代理对话
    const res = await state.deps.runIsolatedAgentJob({
      job,
      message: job.payload.message,
    });
  }
}

5. 结果处理阶段

结果应用

// src/cron/service/timer.ts (applyJobResult)
function applyJobResult(
  state: CronServiceState,
  job: CronJob,
  result: { status: CronRunStatus; error?: string; ... }
): boolean {
  // 更新任务状态
  job.state.runningAtMs = undefined;
  job.state.lastRunAtMs = result.startedAt;
  job.state.lastStatus = result.status;
  job.state.lastDurationMs = result.endedAt - result.startedAt;
  
  // 处理连续错误和退避
  if (result.status === \"error\") {
    job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
    const backoff = errorBackoffMs(job.state.consecutiveErrors);
    job.state.nextRunAtMs = Math.max(normalNext, result.endedAt + backoff);
  }
  
  // 处理一次性任务删除
  const shouldDelete = job.schedule.kind === \"at\" && 
                      job.deleteAfterRun === true && 
                      result.status === \"ok\";
  
  return shouldDelete;
}

结果记录

// src/cron/run-log.ts
export async function appendCronRunLog(logPath: string, entry: CronRunLogEntry): Promise<void> {
  const dir = path.dirname(logPath);
  await fs.mkdir(dir, { recursive: true });
  const fd = await fs.open(logPath, \"a\");
  try {
    await fd.writeFile(`${JSON.stringify(entry)}\
`);
  } finally {
    await fd.close();
  }
}

6. 任务更新与删除阶段

任务更新

// src/cron/service/jobs.ts
export function patchJob(state: CronServiceState, id: string, patch: CronJobPatch): CronJob | undefined {
  const job = state.store.jobs.find((j) => j.id === id);
  if (!job) {
    return undefined;
  }
  
  // 合并更新
  const next = mergeCronJobPatch(job, patch);
  
  // 重新计算下次运行时间
  next.state.nextRunAtMs = computeJobNextRunAtMs(next, state.deps.nowMs());
  return next;
}

任务删除

// src/cron/service/ops.ts
export async function remove(state: CronServiceState, id: string): Promise<boolean> {
  return await locked(state, async () => {
    const index = state.store.jobs.findIndex((j) => j.id === id);
    if (index === -1) {
      return false;
    }
    
    state.store.jobs.splice(index, 1);
    await persist(state);
    
    // 可能需要重新武装定时器
    if (!state.timer) {
      armTimer(state);
    }
    
    return true;
  });
}

7. 任务清理阶段

会话清理

// src/cron/session-reaper.ts
export async function sweepCronRunSessions(params: {
  cronConfig?: CronConfig;
  sessionStorePath: string;
  nowMs: number;
  log: Logger;
}) {
  // 清理过期的会话(根据cronConfig.sessionRetention配置)
}

日志清理

// src/cron/run-log.ts
export async function pruneCronRunLog(logPath: string, opts: { maxAgeMs?: number; maxEntries?: number }): Promise<void> {
  // 清理过时的日志条目
}

8. 错误处理与稳定性保障

错误类型

// src/cron/types.ts
type CronRunStatus = \"ok\" | \"error\" | \"skipped\";

type CronRunOutcome = {
  status: CronRunStatus;
  error?: string;
  summary?: string;
  sessionId?: string;
  sessionKey?: string;
};

稳定性保障

  1. 任务超时:每个任务有执行超时限制(默认 10 分钟)
  2. 错误退避:指数退避策略防止重试风暴
  3. 状态恢复:启动时清除过期的运行标记
  4. 存储锁定:使用 locked() 函数确保操作原子性
  5. 定时器保护:防止定时器泄漏和重入
  6. 任务重跑:支持手动重跑任务和运行错过的任务

完整流程图

任务创建 → 验证 → 存储 → 调度 → 执行 → 结果处理 → 更新/删除 → 清理

         ↓
任务创建 → 验证 → 存储 → 调度 → 执行 → 结果处理 → 更新/删除 → 清理
    ↑            ↑            ↑            ↑
    │            │            │            │
    └────────────┴────────────┴────────────┘

总结

OpenClaw 的定时任务系统实现了完整的任务生命周期管理,从创建到执行完成的所有阶段都有详细的处理逻辑。系统具有高可靠性和良好的扩展性,支持多种任务类型和执行方式,同时提供了完善的错误处理和稳定性保障机制。

定时任务开始会直接进入消息路由?

定时任务的执行路径

1. 会话管理

定时任务使用专门的会话管理机制:

// src/cron/isolated-agent/run.ts
const agentSessionKey = buildAgentMainSessionKey({
  agentId,
  mainKey: baseSessionKey,
});

const cronSession = resolveCronSession({
  cfg: params.cfg,
  sessionKey: agentSessionKey,
  agentId,
  nowMs: now,
});
  • 使用 buildAgentMainSessionKey 直接构建会话密钥
  • 不通过 resolveAgentRoute 解析路由
  • 有专门的会话存储和恢复机制(resolveCronSession

2. 执行流程

任务执行时直接调用代理运行器:

// src/cron/isolated-agent/run.ts
const fallbackResult = await runWithModelFallback({
  cfg: cfgWithAgentDefaults,
  provider,
  model,
  agentDir,
  fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
  run: (providerOverride, modelOverride) => {
    if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
      return runCliAgent({...});
    }
    return runEmbeddedPiAgent({...});
  },
});
  • 直接调用 runEmbeddedPiAgentrunCliAgent
  • 不经过 dispatchInboundMessage 入口点
  • 有自己的模型选择和 fallback 机制

3. 交付机制

任务结果交付也有专门的流程:

// src/cron/isolated-agent/run.ts
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
  if (resolvedDelivery.error) {
    // 处理错误
  }
  const deliveryResults = await deliverOutboundPayloads({
    cfg: cfgWithAgentDefaults,
    channel: resolvedDelivery.channel,
    to: resolvedDelivery.to,
    accountId: resolvedDelivery.accountId,
    threadId: resolvedDelivery.threadId,
    payloads: deliveryPayloads,
    agentId,
    identity,
    bestEffort: deliveryBestEffort,
    deps: createOutboundSendDeps(params.deps),
  });
  delivered = deliveryResults.length > 0;
}

与常规消息路由的区别

特性 定时任务 常规消息路由
会话解析 buildAgentMainSessionKey resolveAgentRoute
入口点 runCronIsolatedAgentTurn dispatchInboundMessage
执行路径 直接调用代理运行器 经过路由解析和分发
会话管理 专门的 cron 会话机制 通用会话管理
交付方式 专门的交付计划 通道发送逻辑

总结

定时任务有自己独立的执行流程,从会话管理到任务执行再到结果交付,都不依赖常规的消息路由机制。这种设计使得定时任务能够在隔离的环境中运行,避免与常规对话任务相互干扰。

Updated: