import requests
import json
import csv
import time
import re
import random
from fake_useragent import UserAgent
from requests.exceptions import RequestException
class KugouMusicCrawler:
def __init__(self, max_page=5):
"""
初始化酷狗音乐热门榜单爬虫
:param max_page: 最大爬取页数(每页 22 首,默认爬取前 5 页)
"""
self.ua = UserAgent()
self.headers = {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.kugou.com/',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
self.max_page = max_page
self.base_url = 'https://www.kugou.com/yy/rank/home/{page}-8888.html'
self.song_data = []
def extract_json(self, html):
"""从 HTML 中提取榜单 JSON 数据"""
try:
pattern = re.compile(r'var rankData = (.*?);\s*</script>')
match = pattern.search(html)
if match:
json_str = match.group(1)
json_str = json_str.replace('\n', '').replace('\r', '').replace('\t', '')
return json.loads(json_str)
return None
except Exception as e:
print(f"提取 JSON 数据失败:{e}")
return None
def get_rank_page(self, page):
"""爬取指定页数的榜单数据"""
url = self.base_url.format(page=page)
params = {
'rnd': int(time.time() * 1000),
'json': 'true'
}
try:
time.sleep(random.uniform(1, 3))
response = requests.get(
url=url,
headers=self.headers,
params=params,
timeout=15
)
response.raise_for_status()
response.encoding = 'utf-8'
return response.text
except RequestException as e:
print(f"第{page}页请求失败:{e}")
return None
def parse_rank_data(self, json_data):
"""解析 JSON 数据,提取歌曲核心信息"""
if not json_data or 'data' not in json_data:
print("无有效榜单数据")
return
for song in json_data['data']:
try:
song_info = {
'排名': song.get('rank', 0),
'歌曲名称': song.get('songname', '未知歌曲'),
'歌手': song.get('singername', '未知歌手'),
'播放量': song.get('play_count', '0'),
'评分': song.get('score', 0),
'时长': song.get('duration', '00:00'),
'歌曲 Hash': song.get('hash', ''),
'播放链接': f"https://www.kugou.com/song/#hash={song.get('hash', '')}"
}
self.song_data.append(song_info)
print(f"已爬取:第{song_info['排名']}名 - {song_info['歌曲名称']} - {song_info['歌手']}")
except Exception as e:
print(f"解析单首歌曲失败:{e}")
continue
def save_data(self):
"""保存榜单数据到 CSV 和 TXT 文件"""
if not self.song_data:
print("无数据可保存")
return
csv_headers = ['排名', '歌曲名称', '歌手', '播放量', '评分', '时长', '播放链接']
with open('kugou_hot_songs.csv', 'w', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=csv_headers)
writer.writeheader()
sorted_data = sorted(self.song_data, key=lambda x: x['排名'])
writer.writerows(sorted_data)
with open('kugou_hot_songs.txt', 'w', encoding='utf-8') as f:
f.write('酷狗音乐热门歌曲榜单\n')
f.write('=' * 50 + '\n\n')
for song in sorted_data:
f.write(f"【第{song['排名']}名】{song['歌曲名称']} - {song['歌手']}\n")
f.write(f"播放量:{song['播放量']} | 评分:{song['评分']} | 时长:{song['时长']}\n")
f.write(f"播放链接:{song['播放链接']}\n")
f.write('-' * 30 + '\n')
print(f"数据保存完成!共爬取{len(self.song_data)}首热门歌曲")
print(f"CSV 文件:kugou_hot_songs.csv")
print(f"TXT 文件:kugou_hot_songs.txt")
def run(self):
"""执行爬虫主流程"""
print("开始爬取酷狗音乐热门歌曲榜单...")
for page in range(1, self.max_page + 1):
print(f"\n正在爬取第{page}页榜单...")
html = self.get_rank_page(page)
if not html:
continue
json_data = self.extract_json(html)
self.parse_rank_data(json_data)
self.save_data()
print("\n爬虫执行完毕!")
if __name__ == '__main__':
crawler = KugouMusicCrawler(max_page=5)
crawler.run()