《OpenClaw架构与源码解读》· 第 12 章 Cron、Webhooks 与事件驱动自动化
第 12 章 Cron、Webhooks 与事件驱动自动化
前面第 8–10 章介绍的消息处理链路,都是被动响应式的:用户先说话,OpenClaw 才行动。但 OpenClaw 更有价值的地方之一,恰恰是它可以主动出击——在你没有发消息的时候,悄悄把事情做了,再来汇报。
本章介绍三种让 OpenClaw「自己动起来」的机制:Cron 定时任务、Webhooks 外部触发、以及类 Gmail Pub/Sub 的长链路事件源。
12.1 Cron Jobs:让 OpenClaw「记住」该做什么
12.1.1 什么是 Cron Jobs
Cron Jobs 就是定时任务:在指定时间或时间间隔触发一段操作。在 OpenClaw 里,你可以用 Cron Jobs 让它每天早上 8 点给你发今日简报(天气、日历、收件箱摘要),每 2 小时检查一次 CI/CD 状态有失败时主动告警,每周一整理一次你的 GitHub Issue 积压,或者每晚 11 点发一条「今天的未完成 Todo」。
12.1.2 Cron 配置
在 ~/.openclaw/openclaw.json 中,Cron Jobs 的配置格式大致如下:
// ~/.openclaw/openclaw.json(宽松 JSON){cron:[{id:"morning-briefing",name:"早晨简报",schedule:"0 8 * * *",// 每天 8:00message:"给我一个今天的早晨简报:天气、日历安排、未读邮件摘要。",enabled:true},{id:"ci-check",name:"CI 状态检查",schedule:"0 */2 * * *",// 每 2 小时message:"检查最近 2 小时的 CI/CD 状态,有失败的通知我。",enabled:true}]}schedule 字段使用标准的 cron 表达式(分 时 日 月 星期):
| 表达式 | 含义 |
|---|---|
0 8 * * * | 每天 8:00 |
0 */2 * * * | 每 2 小时整点 |
*/15 * * * * | 每 15 分钟 |
0 9 * * 1 | 每周一 9:00 |
12.1.3 Cron 任务的触发流程
// src/cron/cron-engine.ts(伪代码)classCronEngine{private jobs: Map<string, CronJob>=newMap();asyncinit(configs: CronConfig[], agentPool: AgentPool){for(const config of configs){if(!config.enabled)continue;const job = schedule.createJob(config.schedule,async()=>{awaitthis.triggerJob(config, agentPool);});this.jobs.set(config.id, job);}}privateasynctriggerJob(config: CronConfig, agentPool: AgentPool){const syntheticMsg: InboundMessage ={ id:`cron:${config.id}:${Date.now()}`, channel:"internal:cron", peerId:"system", chatId: config.sessionId, text: config.message, timestamp:newDate(), raw:{ triggerType:"cron", jobId: config.id },};await gateway.dispatchInbound(syntheticMsg);}}关键设计点在于:Cron 触发的「消息」走的是和用户消息完全一样的分发链路。这意味着 Cron 任务也会经过 Session 解析、Agent 路由、Agent Runtime、Skill 调用、回复生成,最终把结果发回你的 Slack/iMessage。这种「统一入口」设计极大地简化了代码,避免了 Cron 路径和用户消息路径之间的逻辑重复。
12.1.4 管理 Cron Jobs
# 列出所有 Cron Jobs openclaw cron list # 启用/禁用某个 Job openclaw cronenable morning-briefing openclaw cron disable ci-check # 立刻手动触发一次(不等到下次调度时间) openclaw cron trigger morning-briefing # 新增一个 Cron Job openclaw cronadd--id nightly-todo --schedule"0 23 * * *"\--message"给我一个今天的未完成 Todo 总结"--agent personal-assistant 12.2 Webhooks:让外部系统「推」给 OpenClaw
12.2.1 Webhook 的使用场景
Cron 是「定时触发」,Webhook 是「事件触发」。GitHub 合并了一个 PR 可以触发 OpenClaw 发一条通知,Sentry 检测到线上报错可以触发 OpenClaw 通知你并自动尝试诊断,Stripe 收到一笔付款可以触发收款通知,某个爬虫任务完成后可以触发 OpenClaw 处理数据并发送摘要。Webhook 的核心优势是近实时——事件发生后几秒内就能触发,而不是像 Cron 那样要等到下一个轮询周期才知道。
12.2.2 Webhook 端点的注册与配置
Gateway 会暴露一个统一的 Webhook 端点:
POST http://localhost:18789/webhook/{webhookId} 在 ~/.openclaw/openclaw.json 里注册一个 Webhook:
{webhooks:[{id:"github-pr-merged",name:"GitHub PR 合并通知",secret:"my-secret-token",messageTemplate:"GitHub 上有一个 PR 被合并了:{payload.pull_request.title}(仓库:{payload.repository.full_name})",enabled:true}]}然后在 GitHub 的 Webhook 设置里填入对应的 Payload URL、Secret 和 Content type。
12.2.3 Webhook 处理流程
// src/gateway/server-webhook.ts(伪代码) app.post("/webhook/:webhookId",async(req, res)=>{const config = webhookRegistry.get(req.params.webhookId);if(!config ||!config.enabled){ res.status(404).end();return;}// 1. 验证签名(防止伪造请求)const signature = req.headers["x-hub-signature-256"]asstring;const isValid =verifySignature(req.rawBody, config.secret, signature);if(!isValid){ res.status(401).end();return;}// 2. 立即返回 200(让发起方尽快确认接收,避免超时重发) res.status(200).end();// 3. 异步处理 Webhook 内容setImmediate(async()=>{const payload = req.body;const messageText =renderTemplate(config.messageTemplate,{ payload });const syntheticMsg: InboundMessage ={ id:`webhook:${config.id}:${Date.now()}`, channel:"internal:webhook", peerId:"system", chatId: config.sessionId, text: messageText, timestamp:newDate(), raw:{ triggerType:"webhook", webhookId: config.id, payload },};await gateway.dispatchInbound(syntheticMsg);});});注意第 2 步:先返回 200,再异步处理。这是 Webhook 处理的最佳实践——大多数发起方(GitHub、Stripe 等)要求在 5 到 10 秒内收到响应,如果处理逻辑太慢就会触发重发。先回 200 确认收到,再慢慢处理,是标准做法。
12.2.4 幂等性:重复触发的防护
由于网络或发送方重试,同一个 Webhook 事件可能被发送多次。为了避免重复触发 Agent,Gateway 需要做幂等去重:
const deliveryId = req.headers["x-github-delivery"]asstring;if(deliveryId){const isDuplicate =await idempotencyStore.check(deliveryId);if(isDuplicate){ res.status(200).end();return;}await idempotencyStore.mark(deliveryId,TTL_24H);}12.3 Gmail Pub/Sub:长链路事件源处理
对于邮件这类特殊场景,既不适合 Cron(延迟较高),也没有可靠的 Webhook 推送,Gmail 提供了一套基于 Google Cloud Pub/Sub 的实时通知机制。你授权 Gmail 把邮件变更事件推送到某个 Google Cloud Pub/Sub 主题,OpenClaw 订阅该主题,收到通知后拉取最新邮件变更,然后根据规则决定是否触发 Agent(例如「有新的 GitHub 通知邮件时」)。
// src/gateway/gmail-pubsub.ts(伪代码)asyncfunctionstartGmailWatch(config: GmailPubSubConfig){const client =awaitgetGmailClient(config.userId);await client.users.watch({ userId:"me", requestBody:{ topicName: config.pubsubTopicName, labelIds:["INBOX"],},});const pubsub =newPubSub({ projectId: config.gcpProjectId });const subscription = pubsub.subscription(config.subscriptionName); subscription.on("message",async(message)=>{ message.ack();const notification =JSON.parse( Buffer.from(message.data asstring,"base64").toString());awaitprocessGmailHistory(notification.historyId, config);});}asyncfunctionprocessGmailHistory(historyId:string, config: GmailPubSubConfig){const client =awaitgetGmailClient(config.userId);const history =await client.users.history.list({ userId:"me", startHistoryId: historyId, historyTypes:["messageAdded"],});const newMessages = history.data.history?.flatMap(h => h.messagesAdded ??[])??[];for(const{ message }of newMessages){if(matchesRule(message, config.rules)){const syntheticMsg: InboundMessage ={ id:`gmail-pubsub:${message.id}`, channel:"internal:gmail-pubsub", peerId:"system", chatId: config.sessionId, text:`收到一封新邮件,ID: ${message.id}。请检查并根据规则处理。`, timestamp:newDate(), raw:{ messageId: message.id, historyId },};await gateway.dispatchInbound(syntheticMsg);}}}12.4 事件总线:统一的异步事件分发
除了上面三种机制,Gateway 内部还有一个轻量级的事件总线(EventBus),用于在各模块之间发布和订阅事件,避免直接耦合:
// 发布事件 eventBus.emit("skill:gmail:archive_completed",{ sessionId:"main", count:17, timestamp:newDate(),});// 订阅事件 eventBus.on("skill:gmail:archive_completed",async(data)=>{await dashboard.updateStats({ type:"archive", count: data.count });});EventBus 让不同模块(Skills、Nodes、Automation Engine、Web UI)可以松散地相互感知,而不需要直接调用对方的接口。
12.5 源码走读导向
在阅读自动化相关代码时,可以沿以下路径。Cron Engine 在 src/cron/ 中,关注如何把 cron 表达式转化为定时器以及如何构造合成消息触发 Gateway。Webhook Handler 在 src/gateway/ 中与 webhook 相关的文件(如 server-webhook.ts),以及 src/plugin-sdk/webhook-request-guards.ts(签名验证)和 src/plugin-sdk/persistent-dedupe.ts(幂等去重)。Gmail Pub/Sub 相关代码可能分布在 Skills 或 src/gateway/ 的特定模块中,关注 Watch 注册和 Pub/Sub 消息解析。EventBus 在 src/gateway/ 中事件相关模块。
12.6 小结
本章介绍了 OpenClaw 的三种主动触发机制。Cron 定时触发,构造合成消息走标准分发链路,配置简单直观。Webhook 是外部事件推送,先 200 再异步处理,注意签名验证和幂等去重。Gmail Pub/Sub 专为邮件类实时通知设计,依赖 GCP 基础设施。
三种机制的共同点是:都最终收敛到 Gateway 的 dispatchInbound 这同一个入口,这让自动化任务和用户主动触发的任务在处理逻辑上保持完全一致。
下一章,我们进入本书的「动手实战」章节:从零开始,一步步构建你自己的 Skill。