import requests
import json
import time
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from urllib.parse import urlparse
class WeChatArticleSpider:
def __init__(self):
self.ua = UserAgent()
self.headers = {
"User-Agent": self.ua.random,
"Referer": "https://mp.weixin.qq.com/",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Cookie": "你的微信公众号 Cookie",
}
self.timeout = 10
self.delay = 2
def get_article_html(self, article_url):
"""
获取文章页面 HTML 源码
:param article_url: 公众号文章链接
:return: 页面 HTML 文本/None
"""
try:
time.sleep(self.delay)
response = requests.get(
url=article_url,
headers=self.headers,
timeout=self.timeout,
allow_redirects=True
)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response.text
except requests.exceptions.RequestException as e:
print(f"获取页面失败:{e}")
return None
def parse_static_content(self, html):
"""
解析静态内容:标题、作者、发布时间、正文
:param html: 页面 HTML 文本
:return: 静态数据字典
"""
soup = BeautifulSoup(html, "lxml")
static_data = {}
title_tag = soup.find("h1", class_="rich_media_title")
static_data["title"] = title_tag.get_text(strip=True) if title_tag else "未知标题"
author_tag = soup.find("a", class_="rich_media_meta_link")
static_data["author"] = author_tag.get_text(strip=True) if author_tag else "未知作者"
time_tag = soup.find("em", class_="rich_media_meta rich_media_meta_text")
static_data["publish_time"] = time_tag.get_text(strip=True) if time_tag else "未知时间"
content_tag = soup.find("div", class_="rich_media_content")
if content_tag:
for useless_tag in content_tag.find_all(["script", "style", "iframe"]):
useless_tag.decompose()
static_data["content"] = content_tag.get_text(strip=True, separator="\n")
else:
static_data["content"] = "正文解析失败"
return static_data
def parse_dynamic_data(self, html):
"""
解析动态数据:阅读量、在看数(从页面 JS 变量中提取)
:param html: 页面 HTML 文本
:return: 动态数据字典
"""
dynamic_data = {"read_count": 0, "like_count": 0}
try:
js_start = html.find("var msgBizInfo = ")
if js_start != -1:
js_end = html.find("};", js_start) + 1
js_str = html[js_start:js_end].replace("var msgBizInfo = ", "")
biz_info = json.loads(js_str)
dynamic_data["read_count"] = biz_info.get("read_num", 0)
dynamic_data["like_count"] = biz_info.get("like_num", 0)
except Exception as e:
print(f"解析动态数据失败:{e}")
return dynamic_data
def run(self, article_url):
"""
爬虫主流程
:param article_url: 公众号文章链接
:return: 完整文章数据
"""
parsed_url = urlparse(article_url)
if not parsed_url.scheme or not parsed_url.netloc:
print("URL 格式错误")
return None
html = self.get_article_html(article_url)
if not html:
return None
static_data = self.parse_static_content(html)
dynamic_data = self.parse_dynamic_data(html)
article_data = {**static_data, **dynamic_data}
return article_data
if __name__ == "__main__":
spider = WeChatArticleSpider()
target_url = "https://mp.weixin.qq.com/s/xxxxxx"
result = spider.run(target_url)
if result:
print("===== 微信公众号文章抓取结果 =====")
print(f"标题:{result['title']}")
print(f"作者:{result['author']}")
print(f"发布时间:{result['publish_time']}")
print(f"阅读量:{result['read_count']}")
print(f"在看数:{result['like_count']}")
print(f"正文(前 200 字):{result['content'][:200]}...")
else:
print("爬虫执行失败")
import json
with open("wechat_article.json", "w", encoding="utf-8") as f:
json.dump(article_data, f, ensure_ascii=False, indent=4)
print("数据已保存到 wechat_article.json")
def batch_crawl(self, url_list):
"""批量抓取多个公众号文章"""
all_results = []
for url in url_list:
result = self.run(url)
if result:
all_results.append(result)
return all_results
url_list = [
"https://mp.weixin.qq.com/s/xxxxxx1",
"https://mp.weixin.qq.com/s/xxxxxx2"
]
batch_result = spider.batch_crawl(url_list)
print(f"批量抓取完成,共获取{len(batch_result)}篇文章")