Python 内存泄漏追踪实战:tracemalloc 与 objgraph 深度解析
Python 内存泄漏的诊断与解决方案。内容涵盖内存泄漏原理、tracemalloc 和 objgraph 两大工具的基础使用与高级技巧。通过 Web 应用、Django 等实战案例,展示了如何定位循环引用、缓存无限增长等问题。此外,还提供了标准调试流程、生产环境低开销监控方案以及防御性编程建议,帮助开发者有效识别并修复内存问题,保障服务稳定性。

Python 内存泄漏的诊断与解决方案。内容涵盖内存泄漏原理、tracemalloc 和 objgraph 两大工具的基础使用与高级技巧。通过 Web 应用、Django 等实战案例,展示了如何定位循环引用、缓存无限增长等问题。此外,还提供了标准调试流程、生产环境低开销监控方案以及防御性编程建议,帮助开发者有效识别并修复内存问题,保障服务稳定性。

凌晨三点,我被运维的电话吵醒:'你们的数据处理服务又崩了!内存占用从 2GB 飙到 32GB,服务器直接 OOM 重启!'这已经是本月第三次了。
那是我职业生涯中最难熬的一周。白天正常运行的服务,到了晚上就像失控的野兽,疯狂吞噬内存。我尝试了所有能想到的方法:检查日志、审查代码、增加内存限制……问题依旧。直到我掌握了 tracemalloc 和 objgraph 这两大利器,才终于揪出了隐藏在缓存层中的内存泄漏元凶。
今天,我将通过真实案例,带你系统掌握 Python 内存泄漏的诊断与解决方案。无论你是刚遇到内存问题的新手,还是想深化调优技能的资深开发者,这篇文章都将成为你的实战手册。
在 Python 中,内存泄漏指的是:程序持续分配内存但无法释放已不再使用的对象,导致可用内存逐渐减少。
# 经典内存泄漏示例
class DataCache:
def __init__(self):
self._cache = {} # 永远不清理的缓存
def add_data(self, key, value):
self._cache[key] = value # 数据只增不减
def process_request(self, request_id, data):
# 每个请求都缓存数据,从不删除
self.add_data(request_id, data)
return f"Processed {request_id}"
# 使用示例
cache = DataCache()
for i in range(1000000): # 一百万次请求后,内存爆炸!
cache.process_request(f"req_{i}", "x"*1000)
Python 使用**引用计数 + 垃圾回收(GC)**机制管理内存:
import sys
# 引用计数示例
obj = [1, 2, 3]
print(f"初始引用计数:{sys.getrefcount(obj)-1}") # -1 因为 getrefcount 自己也引用了
ref1 = obj
print(f"增加引用后:{sys.getrefcount(obj)-1}")
del ref1
print(f"删除引用后:{sys.getrefcount(obj)-1}")
# 循环引用问题
class Node:
def __init__(self, value):
self.value = value
self.next = None
# 创建循环引用
node1 = Node(1)
node2 = Node(2)
node1.next = node2
node2.next = node1 # 循环!
# 即使删除引用,循环内的对象也不会立即释放
del node1, node2 # GC 会在后台处理,但可能有延迟
# 场景一:全局容器无限增长
global_logs = []
def log_event(event):
global_logs.append(event) # 永不清理
# 场景二:闭包捕获大对象
def create_handler(large_data):
def handler():
# 闭包持有 large_data 引用
return len(large_data)
return handler
# 场景三:未正确关闭资源
class FileProcessor:
def __init__(self, filename):
self.file = open(filename) # 没有 __del__ 或 __exit__
def process(self):
return self.file.read()
# 场景四:缓存未设置过期策略
cache = {}
def get_or_compute(key):
if key not in cache:
cache[key] = expensive_computation(key)
return cache[key]
def expensive_computation(key):
return [0]*1000000 # 模拟大对象
import tracemalloc
import linecache
def display_top_memory(snapshot, key_type='lineno', limit=10):
"""显示内存占用 Top N"""
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
))
top_stats = snapshot.statistics(key_type)
print(f"\n{'='*70}")
print(f"Top {limit} 内存占用(按 {key_type} 排序)")
print(f"{'='*70}")
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
filename = frame.filename
lineno = frame.lineno
# 获取源代码
line = linecache.getline(filename, lineno).strip()
print(f"\n#{index}: {filename}:{lineno}")
print(f" {line}")
print(f" 大小:{stat.size / 1024/1024:.1f} MB")
print(f" 数量:{stat.count} 个对象")
# 实战案例:检测内存泄漏
():
tracemalloc.start()
snapshot1 = tracemalloc.take_snapshot()
leaked_objects = []
i ():
leaked_objects.append([]*)
snapshot2 = tracemalloc.take_snapshot()
()
display_top_memory(snapshot1, limit=)
()
display_top_memory(snapshot2, limit=)
top_stats = snapshot2.compare_to(snapshot1, )
()
()
()
stat top_stats[:]:
()
stat.count_diff > :
()
()
tracemalloc.stop()
memory_leak_example()
import tracemalloc
from flask import Flask, request
import time
app = Flask(__name__)
# 全局缓存(潜在泄漏点)
request_cache = {}
class MemoryMonitor:
"""内存监控装饰器"""
def __init__(self):
self.snapshots = []
tracemalloc.start()
def capture_snapshot(self, label):
"""捕获内存快照"""
snapshot = tracemalloc.take_snapshot()
self.snapshots.append((label, snapshot, time.time()))
def analyze_leak(self, threshold_mb=10):
"""分析内存泄漏"""
if len(self.snapshots) < 2:
print("需要至少两个快照进行对比")
return
for i in range(1, len(self.snapshots)):
label1, snapshot1, time1 = self.snapshots[i-1]
label2, snapshot2, time2 = self.snapshots[i]
# 计算内存增量
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
total_increase = sum(stat.size_diff for stat in top_stats if stat.size_diff > 0)
increase_mb = total_increase / 1024/
()
()
()
()
()
increase_mb > threshold_mb:
()
()
stat top_stats[:]:
stat.size_diff > :
()
()
()
monitor = MemoryMonitor()
():
request.start_time = time.time()
():
(request, ):
elapsed = time.time() - request.start_time
elapsed > :
monitor.capture_snapshot()
response
():
request_id = request.args.get(, )
large_data = []*
request_cache[request_id] = large_data
{: , : (request_cache)}
():
monitor.analyze_leak(threshold_mb=)
{: }
__name__ == :
app.test_client() client:
monitor.capture_snapshot()
i ():
client.get()
monitor.capture_snapshot()
i (, ):
client.get()
monitor.capture_snapshot()
client.get()
import tracemalloc
import gc
class ObjectTracker:
"""追踪特定类型对象的内存分配"""
@staticmethod
def track_allocations(target_type, duration_seconds=10):
"""追踪指定时间内的对象分配"""
tracemalloc.start()
initial_snapshot = tracemalloc.take_snapshot()
print(f"开始追踪 {target_type.__name__} 对象,持续 {duration_seconds} 秒...")
time.sleep(duration_seconds)
final_snapshot = tracemalloc.take_snapshot()
tracemalloc.stop()
# 分析增量
top_stats = final_snapshot.compare_to(initial_snapshot, 'lineno')
print(f"\n{target_type.__name__} 对象内存分配分析:")
for stat in top_stats[:10]:
if target_type.__name__ in str(stat):
print(f"\n{stat}")
@staticmethod
def find_object_sources(obj):
"""查找对象的引用来源"""
print(f"\n{'='*70}")
print(f"分析对象:{type(obj).__name__} at {hex(id(obj))}")
print(f"{*}")
referrers = gc.get_referrers(obj)
()
i, ref (referrers[:], ):
ref_type = (ref).__name__
()
(ref, ):
key, value ref.items():
value obj:
()
(ref, (, )):
()
second_level = gc.get_referrers(ref)
second_level:
()
:
():
.data = {}
():
.data[key] = value
cache = LeakyCache()
i ():
cache.add(, []*)
ObjectTracker.find_object_sources(cache.data)
# 安装
pip install objgraph
# 生成图谱需要 Graphviz
# Ubuntu/Debian
sudo apt-get install graphviz
# macOS
brew install graphviz
# Windows
# 从 https://graphviz.org/download/ 下载安装
import objgraph
import gc
# 基础统计
def analyze_object_types():
"""分析当前内存中的对象类型"""
print("\n内存中最多的对象类型(Top 20):")
objgraph.show_most_common_types(limit=20)
# 增长分析
def track_object_growth():
"""追踪对象数量增长"""
# 第一次统计
gc.collect()
objgraph.show_growth(limit=10)
# 创建一些对象
leaked_list = []
for i in range(10000):
leaked_list.append({'data': [0]*100})
# 第二次统计
print("\n执行操作后的对象增长:")
objgraph.show_growth(limit=10)
# 运行分析
analyze_object_types()
track_object_growth()
import objgraph
import os
class Node:
"""链表节点(可能产生循环引用)"""
def __init__(self, value):
self.value = value
self.next = None
self.prev = None
class CircularList:
"""循环链表(演示内存泄漏)"""
def __init__(self):
self.head = None
self.size = 0
def add(self, value):
new_node = Node(value)
if not self.head:
self.head = new_node
new_node.next = new_node
new_node.prev = new_node
else:
tail = self.head.prev
tail.next = new_node
new_node.prev = tail
new_node.next = self.head
self.head.prev = new_node
self.size += 1
# 创建循环引用
def create_circular_references():
"""创建包含循环引用的对象"""
lists = []
for i in range(10):
circular_list = CircularList()
for j ():
circular_list.add()
lists.append(circular_list)
lists
():
leaked_lists = create_circular_references()
target = leaked_lists[]
()
output_file =
objgraph.show_backrefs([target], max_depth=, filename=output_file, refcounts=)
()
output_file =
objgraph.show_refs([target.head], max_depth=, filename=output_file, refcounts=)
()
leaked_lists
leaked = visualize_references()
()
objgraph.show_chain(
objgraph.find_backref_chain(
leaked[], objgraph.is_proper_module
),
filename=
)
import objgraph
import tracemalloc
import gc
from functools import wraps
class MemoryLeakDetector:
"""内存泄漏检测器(生产环境友好)"""
def __init__(self, threshold_mb=50):
self.threshold_mb = threshold_mb
self.baseline = None
self.snapshots = []
def start_monitoring(self):
"""开始监控"""
gc.collect()
tracemalloc.start()
self.baseline = tracemalloc.take_snapshot()
print("✅ 内存监控已启动")
def check_memory(self, label="checkpoint"):
"""检查内存状态"""
if not self.baseline:
print("⚠️ 请先调用 start_monitoring()")
return
gc.collect()
current = tracemalloc.take_snapshot()
self.snapshots.append((label, current))
# 计算增量
stats = current.compare_to(self.baseline, 'lineno')
total_increase = sum(s.size_diff for s in stats if s.size_diff > 0)
increase_mb = total_increase / 1024/1024
print(f"\n")
()
()
increase_mb > .threshold_mb:
()
._analyze_leak(stats)
:
()
()
():
()
i, stat (stats[:], ):
stat.size_diff > :
()
()
()
()
objgraph.show_growth(limit=)
():
()
()
objgraph.show_most_common_types(limit=)
()
suspicious_types = [, , , ]
obj_type suspicious_types:
objects = objgraph.by_type(obj_type)
(objects) > :
()
sample = objects[] objects
sample:
output_file = os.path.join(output_dir, )
objgraph.show_refs([sample], filename=output_file, max_depth=)
()
.snapshots:
latest_label, latest_snapshot = .snapshots[-]
()
top_stats = latest_snapshot.statistics()
()
i, stat (top_stats[:], ):
frame = stat.traceback[]
()
()
()
():
():
():
gc.collect()
before = tracemalloc.take_snapshot()
result = func(*args, **kwargs)
gc.collect()
after = tracemalloc.take_snapshot()
stats = after.compare_to(before, )
total_increase = (s.size_diff s stats s.size_diff > )
increase_mb = total_increase / /
increase_mb > :
()
()
stat stats[:]:
stat.size_diff > :
()
result
wrapper
decorator
detector = MemoryLeakDetector(threshold_mb=)
detector.start_monitoring()
():
cache = {}
i ():
cache[] = []*
(cache)
result = process_large_dataset()
detector.check_memory()
detector.generate_report()
import tracemalloc
import objgraph
import gc
import psutil
import os
class MemoryDebugger:
"""内存调试完整工作流"""
@staticmethod
def step1_confirm_leak():
"""步骤 1:确认是否真的有内存泄漏"""
print("="*70)
print("步骤 1: 确认内存泄漏")
print("="*70)
process = psutil.Process(os.getpid())
baseline = process.memory_info().rss / 1024/1024
print(f"基线内存:{baseline:.2f} MB")
# 模拟工作负载
for iteration in range(5):
# 执行业务逻辑
_ = [0]*1000000
gc.collect()
current = process.memory_info().rss / 1024/1024
increase = current - baseline
print(f"迭代 {iteration +1}: {current:.2f} MB (+{increase:.2f} MB)")
if increase > 100:
print("⚠️ 确认内存持续增长,可能存在泄漏!")
return True
()
():
(+*)
()
(*)
tracemalloc.start()
snapshot1 = tracemalloc.take_snapshot()
leaked_data = []
i ():
leaked_data.append([]*)
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, )
()
stat top_stats[:]:
stat.size_diff > :
()
()
tracemalloc.stop()
():
(+*)
()
(*)
gc.collect()
()
objgraph.show_growth(limit=)
leaked_cache
leaked_cache = {}
i ():
leaked_cache[i] = []*
()
objgraph.show_growth(limit=)
leaked_cache:
sample_obj = (leaked_cache.values())[]
objgraph.show_backrefs([sample_obj], filename=, max_depth=)
()
():
(+*)
()
(*)
tracemalloc.start()
before = tracemalloc.take_snapshot()
collections OrderedDict
:
():
.cache = OrderedDict()
.max_size = max_size
():
key .cache:
.cache.move_to_end(key)
.cache[key] = value
(.cache) > .max_size:
.cache.popitem(last=)
cache = LRUCache(max_size=)
i ():
cache.(i, []*)
after = tracemalloc.take_snapshot()
stats = after.compare_to(before, )
total_increase = (s.size_diff s stats s.size_diff > )
()
total_increase / / < :
()
:
()
tracemalloc.stop()
__name__ == :
debugger = MemoryDebugger()
debugger.step1_confirm_leak():
debugger.step2_locate_source()
debugger.step3_analyze_objects()
debugger.step4_verify_fix()
import tracemalloc
import threading
import time
from datetime import datetime
class ProductionMemoryMonitor:
"""生产环境内存监控(低开销)"""
def __init__(self, check_interval=300, alert_threshold_mb=500):
self.check_interval = check_interval
self.alert_threshold_mb = alert_threshold_mb
self.running = False
self.thread = None
def start(self):
"""启动监控线程"""
if self.running:
return
self.running = True
tracemalloc.start()
self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.thread.start()
print(f"✅ 内存监控已启动(每 {self.check_interval} 秒检查一次)")
def stop(self):
"""停止监控"""
self.running = False
if self.thread:
self.thread.join()
tracemalloc.stop()
print("⏹ 内存监控已停止")
def _monitor_loop():
baseline =
.running:
:
snapshot = tracemalloc.take_snapshot()
baseline :
baseline = snapshot
:
._check_memory(baseline, snapshot)
time.sleep(.check_interval)
Exception e:
()
():
stats = current.compare_to(baseline, )
total_increase = (s.size_diff s stats s.size_diff > )
increase_mb = total_increase / /
timestamp = datetime.now().strftime()
increase_mb > .alert_threshold_mb:
()
()
()
i, stat (stats[:], ):
stat.size_diff > :
()
()
:
()
monitor = ProductionMemoryMonitor(check_interval=, alert_threshold_mb=)
monitor.start()
:
leaked = []
i ():
leaked.append([]*)
time.sleep()
KeyboardInterrupt:
:
monitor.stop()
发现内存持续增长 ↓ 使用 psutil 确认物理内存增长 ↓ tracemalloc 定位代码位置 ├─ 找到明确位置 → 修复代码 └─ 位置不明确 ↓ objgraph 分析对象关系 ├─ 发现循环引用 → 使用弱引用或手动打破 ├─ 发现缓存无限增长 → 添加 LRU 或 TTL └─ 发现资源未关闭 → 使用上下文管理器
# 1. 使用上下文管理器
with open('file.txt') as f:
data = f.read()
# 2. 限制缓存大小
from functools import lru_cache
@lru_cache(maxsize=1000)
def expensive_function(arg):
return arg ** 2
# 3. 使用弱引用
import weakref
class Cache:
def __init__(self):
self._cache = weakref.WeakValueDictionary()
# 4. 定期清理
def cleanup_old_data(cache, max_age_seconds=3600):
now = time.time()
to_delete = [
k for k, v in cache.items()
if now - v['timestamp'] > max_age_seconds
]
for k in to_delete:
del cache[k]
# 5. 使用生成器处理大数据
def process_large_file(filename):
with open(filename) as f:
for line in f:
# 逐行处理,不加载整个文件
yield process_line(line)
本文介绍了 Python 内存泄漏的基础概念及两种主流追踪工具的使用。通过 tracemalloc 可精确定位代码行级的内存增量,配合 objgraph 则能可视化对象间的引用关系,尤其适用于解决循环引用问题。文章提供了从开发环境诊断到生产环境监控的完整方案,并给出了防御性编程的最佳实践,帮助开发者构建更稳定的 Python 应用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online