一.青龙面板配置环境变量
1.配置通知渠道,企业微信应用或者飞书

2.名称为xmqb,值为抓包的两个值,中间用#号隔开

二.添加脚本

import os import time import requests import urllib3 import json from datetime import datetime from typing import Optional, Dict, Any, Union urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Notifier: @staticmethod def send(title: str, content: str): """通过青龙面板通知设置发送消息""" notifier_type = os.environ.get("NOTIFY_TYPE", "").lower() if not notifier_type: print("未配置通知环境变量,无法发送通知") return try: if "feishu" in notifier_type: # 飞书机器人通知 Notifier.feishu_notify(title, content) elif "qywx" in notifier_type: # 企业微信应用通知 Notifier.qywx_notify(title, content) else: print(f"不支持的通知类型: {notifier_type}") except Exception as e: print(f"发送通知失败: {e}") @staticmethod def feishu_notify(title: str, content: str): """飞书机器人通知方式""" fskey = os.environ.get("FSKEY") if not fskey: print("未配置FSKEY环境变量") return # 构建飞书Webhook URL webhook_url = f"https://open.feishu.cn/open-apis/bot/v2/hook/{fskey}" # 构建消息体(支持富文本格式) payload = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": title, "content": [ [{"tag": "text", "text": content}] ] } } } } try: response = requests.post( webhook_url, headers={"Content-Type": "application/json"}, data=json.dumps(payload), timeout=10, verify=False ) if response.status_code != 200: print(f"飞书通知发送失败: HTTP {response.status_code}") else: resp_data = response.json() if resp_data.get("code") != 0: print(f"飞书API返回错误: {resp_data.get('msg')}") else: print("✅ 飞书通知发送成功") except Exception as e: print(f"飞书通知异常: {str(e)}") @staticmethod def qywx_notify(title: str, content: str): """企业微信应用通知""" qywx_am = os.environ.get("QYWX_AM") if not qywx_am: print("未配置QYWX_AM环境变量") return # 解析QYWX_AM格式: corpid,corpsecret,touser,agentid,素材类型 parts = qywx_am.split(',') if len(parts) < 5: print("QYWX_AM格式错误,应为: corpid,corpsecret,touser,agentid,素材类型") return corpid, corpsecret, touser, agentid, msg_type = parts[:5] # 获取access_token token_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}" try: token_resp = requests.get(token_url, verify=False).json() if token_resp.get('errcode') != 0: print(f"获取企业微信access_token失败: {token_resp.get('errmsg')}") return access_token = token_resp.get('access_token') except Exception as e: print(f"获取企业微信access_token异常: {str(e)}") return # 根据素材类型构建消息体 send_url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" payload = { "touser": touser, "agentid": agentid, "msgtype": "text" if msg_type == "1" else "mpnews", } if msg_type == "1": # 文本消息 payload["text"] = {"content": f"{title}\n\n{content}"} else: # 图文消息(默认) payload["mpnews"] = { "articles": [