企业微信自建应用实现接收消息和发送消息功能(python)

企业微信自建应用实现接收消息和发送消息功能(python)

# 这一周我不断的琢磨企业微信自建应用并且实现了自建应用的消息接收和发送功能

1.笔记,记录

第一步:打开企业微信后台 https://work.weixin.qq.com

1.1 如果没有企业可以在这里申请,如果有可以直接扫码登录

1.2 打开后台-应用管理-自建应用-创建应用——填写自建应用的logo,应用名称,应用介绍等信息。

1.3 获取自建应用的AgentId,Secret,以及我的企业-企业ID信息

第二步:初步测试发送消息功能(注意:将刚刚保存好的信息正确填入到相应的代码段)

import json #用于处理json数据 import urllib.parse # 用于对URL进行解析和构建 import requests #用于发送HTTTP请求 # 企业ID agentid = # 应用ID # 应用Secret # 接收消息的用户 # 企业微信API的基础URL base = 'https://qyapi.weixin.qq.com' # 请求登录凭证(access-token) # 构造函数(获取access-token的API URL) access_token_api = urllib.parse.urljoin(base, '/cgi-bin/gettoken') # 使用urllib.parse.urljoin来构建获取access-token的完整url # 定义请求参数(包括企业ID,应用密钥) params = {'corpid': corpid, 'corpsecret': corpsecret} # 发送GET请求获取access-token,并且将json响应转化为python字典 response = requests.get(url=access_token_api, params=params).json() # 从响应中获取access-token access_token = response['access_token'] # 发送消息 # 构建发送消息的完整URL,包含access-token message_send_api = urllib.parse.urljoin(base, f'/cgi-bin/message/send?access_token={access_token}') # 定义要发送的消息数据(文本格式) data = {'touser': touser, 'msgtype': 'text', 'agentid': agentid, 'text': {'content': '测试数据:hello world!'}} # 发送POST请求以发送消息, 并将json响应转化为python字典 response = requests.post(url=message_send_api, data=json.dumps(data)).json() # 当请求返回值为0时(异常处理) if response['errcode'] == 0: print('发送成功') else: print(response) 

安装请求处理包 :

pip install requests

发送消息:

第三步:下载微信官方的加解密文件(注:下载对应的我这里下载的时python的加解密包)

下载地址:https://developer.work.weixin.qq.com/document/path/90468

3.1 下载加解密文件并且进行解压安装相关的依赖包

在这个加解密文件中主要使用(WXBizMsgCrypt3.py)文件,解压配置依赖包导入到项目文件中。

#!/usr/bin/env python # -*- encoding:utf-8 -*- """ 对企业微信发送给企业后台的消息加解密示例代码. @copyright: Copyright (c) 1998-2014 Tencent Inc. """ # ------------------------------------------------------------------------ import logging import base64 import random import hashlib import time import struct from Crypto.Cipher import AES import xml.etree.cElementTree as ET import socket import ierror """ Crypto.Cipher包已不再维护,开发者可以通过以下命令下载安装最新版的加解密工具包 pip install pycryptodome """ class FormatException(Exception): pass def throw_exception(message, exception_class=FormatException): """my define raise exception function""" raise exception_class(message) class SHA1: """计算企业微信的消息签名接口""" def getSHA1(self, token, timestamp, nonce, encrypt): """用SHA1算法生成安全签名 @param token: 票据 @param timestamp: 时间戳 @param encrypt: 密文 @param nonce: 随机字符串 @return: 安全签名 """ try: sortlist = [token, timestamp, nonce, encrypt] sortlist.sort() sha = hashlib.sha1() sha.update("".join(sortlist).encode()) return ierror.WXBizMsgCrypt_OK, sha.hexdigest() except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_ComputeSignature_Error, None class XMLParse: """提供提取消息格式中的密文及生成回复消息格式的接口""" # xml消息模板"<xml> <Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt> <MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature> <TimeStamp>%(timestamp)s</TimeStamp> <Nonce><![CDATA[%(nonce)s]]></Nonce> </xml>""" def extract(self, xmltext): """提取出xml数据包中的加密消息 @param xmltext: 待提取的xml字符串 @return: 提取出的加密消息字符串 """ try: xml_tree = ET.fromstring(xmltext) encrypt = xml_tree.find("Encrypt") return ierror.WXBizMsgCrypt_OK, encrypt.text except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_ParseXml_Error, None def generate(self, encrypt, signature, timestamp, nonce): """生成xml消息 @param encrypt: 加密后的消息密文 @param signature: 安全签名 @param timestamp: 时间戳 @param nonce: 随机字符串 @return: 生成的xml字符串 """ resp_dict = { 'msg_encrypt': encrypt, 'msg_signaturet': signature, 'timestamp': timestamp, 'nonce': nonce, } resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict return resp_xml class PKCS7Encoder(): """提供基于PKCS7算法的加解密接口""" block_size = 32 def encode(self, text): """ 对需要加密的明文进行填充补位 @param text: 需要进行填充补位操作的明文 @return: 补齐明文字符串 """ text_length = len(text) # 计算需要填充的位数 amount_to_pad = self.block_size - (text_length % self.block_size) if amount_to_pad == 0: amount_to_pad = self.block_size # 获得补位所用的字符 pad = chr(amount_to_pad) return text + (pad * amount_to_pad).encode() def decode(self, decrypted): """删除解密后明文的补位字符 @param decrypted: 解密后的明文 @return: 删除补位字符后的明文 """ pad = ord(decrypted[-1]) if pad < 1 or pad > 32: pad = 0 return decrypted[:-pad] class Prpcrypt(object): """提供接收和推送给企业微信消息的加解密接口""" def __init__(self, key): # self.key = base64.b64decode(key+"=") self.key = key # 设置加解密模式为AES的CBC模式 self.mode = AES.MODE_CBC def encrypt(self, text, receiveid): """对明文进行加密 @param text: 需要加密的明文 @return: 加密得到的字符串 """ # 16位随机字符串添加到明文开头 text = text.encode() text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode() # 使用自定义的填充方式对明文进行补位填充 pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # 加密 cryptor = AES.new(self.key, self.mode, self.key[:16]) try: ciphertext = cryptor.encrypt(text) # 使用BASE64对加密后的字符串进行编码 return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext) except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_EncryptAES_Error, None def decrypt(self, text, receiveid): """对解密后的明文进行补位删除 @param text: 密文 @return: 删除填充补位后的明文 """ try: cryptor = AES.new(self.key, self.mode, self.key[:16]) # 使用BASE64对密文进行解码,然后AES-CBC解密 plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_DecryptAES_Error, None try: pad = plain_text[-1] # 去掉补位字符串 # pkcs7 = PKCS7Encoder() # plain_text = pkcs7.encode(plain_text) # 去除16位随机字符串 content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0]) xml_content = content[4: xml_len + 4] from_receiveid = content[xml_len + 4:] except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_IllegalBuffer, None if from_receiveid.decode('utf8') != receiveid: return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None return 0, xml_content def get_random_str(self): """ 随机生成16位字符串 @return: 16位字符串 """ return str(random.randint(1000000000000000, 9999999999999999)).encode() class WXBizMsgCrypt(object): # 构造函数 def __init__(self, sToken, sEncodingAESKey, sReceiveId): try: self.key = base64.b64decode(sEncodingAESKey + "=") assert len(self.key) == 32 except: throw_exception("[error]: EncodingAESKey unvalid !", FormatException) # return ierror.WXBizMsgCrypt_IllegalAesKey,None self.m_sToken = sToken self.m_sReceiveId = sReceiveId # 验证URL # @param sMsgSignature: 签名串,对应URL参数的msg_signature # @param sTimeStamp: 时间戳,对应URL参数的timestamp # @param sNonce: 随机串,对应URL参数的nonce # @param sEchoStr: 随机串,对应URL参数的echostr # @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效 # @return:成功0,失败返回对应的错误码 def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr): sha1 = SHA1() ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr) if ret != 0: return ret, None if not signature == sMsgSignature: return ierror.WXBizMsgCrypt_ValidateSignature_Error, None pc = Prpcrypt(self.key) ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId) return ret, sReplyEchoStr def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None): # 将企业回复用户的消息加密打包 # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串 # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间 # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce # sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串, # return:成功0,sEncryptMsg,失败返回对应的错误码None pc = Prpcrypt(self.key) ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId) encrypt = encrypt.decode('utf8') if ret != 0: return ret, None if timestamp is None: timestamp = str(int(time.time())) # 生成安全签名 sha1 = SHA1() ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt) if ret != 0: return ret, None xmlParse = XMLParse() return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce) def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce): # 检验消息的真实性,并且获取解密后的明文 # @param sMsgSignature: 签名串,对应URL参数的msg_signature # @param sTimeStamp: 时间戳,对应URL参数的timestamp # @param sNonce: 随机串,对应URL参数的nonce # @param sPostData: 密文,对应POST请求的数据 # xml_content: 解密后的原文,当return返回0时有效 # @return: 成功0,失败返回对应的错误码 # 验证安全签名 xmlParse = XMLParse() ret, encrypt = xmlParse.extract(sPostData) if ret != 0: return ret, None sha1 = SHA1() ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt) if ret != 0: return ret, None if not signature == sMsgSignature: return ierror.WXBizMsgCrypt_ValidateSignature_Error, None pc = Prpcrypt(self.key) ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId) return ret, xml_content 

3.2 导入下载安装包中的(ierror.py)错误码提示文件

#!/usr/bin/env python # -*- coding: utf-8 -*- ######################################################################### # Author: jonyqin # Created Time: Thu 11 Sep 2014 01:53:58 PM CST # File Name: ierror.py # Description:定义错误码含义 ######################################################################### # "======================================================提供了错误码===================================================" WXBizMsgCrypt_OK = 0 WXBizMsgCrypt_ValidateSignature_Error = -40001 WXBizMsgCrypt_ParseXml_Error = -40002 WXBizMsgCrypt_ComputeSignature_Error = -40003 WXBizMsgCrypt_IllegalAesKey = -40004 WXBizMsgCrypt_ValidateCorpid_Error = -40005 WXBizMsgCrypt_EncryptAES_Error = -40006 WXBizMsgCrypt_DecryptAES_Error = -40007 WXBizMsgCrypt_IllegalBuffer = -40008 WXBizMsgCrypt_EncodeBase64_Error = -40009 WXBizMsgCrypt_DecodeBase64_Error = -40010 WXBizMsgCrypt_GenReturnXml_Error = -40011 

第四步:编写发送和接收消息逻辑(fastapi)

在这里我使用fastapi实现了接口的开发,详细如下,逻辑代码中需要配置(接收消息设置里边的TOKEN,ENCODING_AES_KEY,CORP_ID等信息,这么后面讲怎么获取。

from fastapi import FastAPI, Request, HTTPException from fastapi.responses import PlainTextResponse, Response from WXBizMsgCrypt3 import WXBizMsgCrypt import xmltodict import logging import time import xml.etree.ElementTree as ET # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # 企业微信配置 # 设置的Token #设置密钥 # 企业ID try: wxcpt = WXBizMsgCrypt(TOKEN, ENCODING_AES_KEY, CORP_ID) except Exception as e: logger.error(f"初始化WXBizMsgCrypt失败: {str(e)}") raise @app.get("/callback") async def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str): """验证URL有效性""" try: logger.info(f"收到验证请求: msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}") ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr) if ret == 0: logger.info("URL验证成功") return PlainTextResponse(content=sEchoStr) else: logger.error(f"URL验证失败,错误码: {ret}") raise HTTPException(status_code=400, detail="验证失败") except Exception as e: logger.error(f"验证过程发生错误: {str(e)}") raise HTTPException(status_code=500, detail="服务器内部错误") @app.post("/callback") async def receive_message(request: Request): """接收并处理企业微信消息""" try: # 获取请求参数 body = await request.body() msg_signature = request.query_params.get("msg_signature") timestamp = request.query_params.get("timestamp") nonce = request.query_params.get("nonce") if not all([msg_signature, timestamp, nonce]): raise HTTPException(status_code=400, detail="缺少必要的参数") logger.info(f"收到消息推送: msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}") # 解密消息 ret, sMsg = wxcpt.DecryptMsg(body, msg_signature, timestamp, nonce) if ret != 0: logger.error(f"消息解密失败,错误码: {ret}") raise HTTPException(status_code=400, detail="消息解密失败") # 解析XML消息 xml_dict = xmltodict.parse(sMsg) logger.info(f"解密后的消息内容: {xml_dict}") # 提取消息内容 xml_content = xml_dict['xml'] to_user_name = xml_content.get('ToUserName') from_user_name = xml_content.get('FromUserName') create_time = xml_content.get('CreateTime') msg_type = xml_content.get('MsgType') content = xml_content.get('Content') msg_id = xml_content.get('MsgId') agent_id = xml_content.get('AgentID') logger.info(f"收到消息: {content}") # 构造回复消息 reply_content = content.replace('吗', '').replace('?', '!').replace('?', '!') current_time = str(int(time.time())) reply_msg = f""" <xml> <ToUserName><![CDATA[{from_user_name}]]></ToUserName> <FromUserName><![CDATA[{to_user_name}]]></FromUserName> <CreateTime>{current_time}</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[{reply_content}]]></Content> <MsgId>{msg_id}</MsgId> <AgentID>{agent_id}</AgentID> </xml> """ # 加密回复消息 ret, encrypted_msg = wxcpt.EncryptMsg(reply_msg, nonce, current_time) if ret != 0: logger.error(f"消息加密失败,错误码: {ret}") raise HTTPException(status_code=500, detail="消息加密失败") logger.info("成功构造并加密回复消息") return Response(content=encrypted_msg, media_type="application/xml") except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") raise HTTPException(status_code=500, detail="服务器内部错误") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=5000)

第五步:回调配置

5.1 进入企业微信后台配置-接收消息

这里需要我们提供一个处理请求回调的URL,随机生成Token,EncodingAESKey,并且需要把这写数据配置到上方代码中

5.2 配置URL和项目部署

首先需要创建一个域名

添加好的域名中指向项目文件并且配置反向代理(设置-配置文件)

保存起来,然后创建一个文件,把配置好的代码上传到创建的文件中,并且验证回调服务,让程序跑起来。

5.3 配置成功以后保存企业微信的URL以及其他配置信息

结束了。。。

接下来看效果:

日志文件

效果:

Read more

《并查集:算法中的高效集合操作利器》:一文带你掌握并查集数据结构

《并查集:算法中的高效集合操作利器》:一文带你掌握并查集数据结构

系列文章目录 文章目录 * 系列文章目录 * 一、认识并查集 * 1.并查集的定义 * 2.基本概念 * 2.1.集合的表示 * 2.2.合并操作 * 2.3.查询操作 * 3.基本操作 * 3.1初始化 * 3.2.查找 * 3.3.合并 * 4.优化技巧 * 4.1.路径压缩 * 4.2.按秩合并 * 5.代码完整实例 * 6.应用场景 * 6.1.图的连通性 * 6.2.社交网络分析 * 6.3.动态连通性问题 * 7.

By Ne0inhk
AI的提示词专栏:Prompt 与 Python Pandas 的结合使用指南

AI的提示词专栏:Prompt 与 Python Pandas 的结合使用指南

AI的提示词专栏:Prompt 与 Python Pandas 的结合使用指南 该指南聚焦 Prompt 与 Pandas 结合的实践应用,先阐述二者结合的价值 —— 降低 Pandas 学习门槛、提升数据处理效率,接着梳理代码生成、解释、优化等 6 大核心应用场景及对应 Prompt 目标。随后详解高质量 Prompt 设计的五大原则,强调需精准描述数据结构、明确操作目标等要点。通过 5 个实战案例,从基础数据清洗到批量生成报表,展示 Prompt 设计、模型输出与结果验证全流程,并给出 8 个高频问题的解决方案。最后总结核心价值,提供扩展学习建议,助力读者掌握 “自然语言驱动数据处理” 能力,形成高效工作流。 人工智能专栏介绍     人工智能学习合集专栏是 AI 学习者的实用工具。它像一个全面的

By Ne0inhk
Wi-Fi 破解原理与防御:用 Python + Scapy 抓取“握手包”并跑字典,硬核演示 WPA2 弱点

Wi-Fi 破解原理与防御:用 Python + Scapy 抓取“握手包”并跑字典,硬核演示 WPA2 弱点

🚨 前言:你家的 Wi-Fi 为什么不安全? WPA2 协议看似铜墙铁壁,但它有一个致命的设计逻辑缺陷: 设备(手机/电脑)连接路由器时,必须进行**“四次握手” (4-Way Handshake)** 来验证密码是否正确。 这个过程是公开广播的。 黑客只要在旁边静静地“听”到这四次握手的数据包,把它抓回家,就可以关起门来用字典疯狂尝试密码,直到计算出的哈希值与抓到的包匹配。 攻防逻辑示意图 (Mermaid): 1. 请求连接2. 发送随机数 Anonce3. 发送随机数 Snonce + MIC (关键)4. 确认 抓取握手包 跑字典/暴力破解 用户手机 路由器 AP 黑客 (监听模式) 本地电脑 得出明文密码 🛠️ 准备工作 1. 硬件:你需要一个支持监听模式 (Monitor

By Ne0inhk

Python 真实世界的数据科学(十二)

原文:Python: Real-World Data Science 协议:CC BY-NC-SA 4.0 四十二、使用回归分析预测连续目标变量 在前几章中,您了解了监督学习背后的主要概念,并为分类任务训练了许多不同的模型以预测组成员或分类变量。 在本章中,我们将深入研究监督学习的另一个子类别:回归分析。 回归模型用于在连续规模上预测目标变量,这使它们对于解决科学和工业应用中的许多问题具有吸引力,例如理解变量之间的关系,评估趋势或进行预测。 一个例子是预测未来几个月公司的销售额。 在本章中,我们将讨论回归模型的主要概念,并涉及以下主题: * 探索和可视化数据集 * 研究实现线性回归模型的不同方法 * 训练对异常值具有鲁棒性的回归模型 * 评估回归模型并诊断常见问题 * 将回归模型拟合到非线性数据 介绍一个简单的线性回归模型 简单(单变量)线性回归的目标是为单个特征(解释变量x)与连续值响应之间的关系建模的模型( 目标变量和)。 具有一个解释变量的线性模型方程定义如下: https://github.com/OpenDocCN/freelearn-ds-z

By Ne0inhk