1 异步编程:为什么它是 Python 高性能的关键
同步与异步是性能优化的分水岭。在处理需要调用多个外部 API 的任务时,同步版本往往耗时较长,而改用异步后可显著降低总耗时。
解析 Python asyncio 库的核心机制,包括事件循环、协程、Future 与 Task 的关系。通过同步与异步代码对比展示性能差异,详解 await 关键字及异步上下文管理器。涵盖 aiohttp 客户端构建、任务队列模式实现以及并发控制、超时处理和错误恢复等优化策略,帮助开发者掌握高并发程序设计。

同步与异步是性能优化的分水岭。在处理需要调用多个外部 API 的任务时,同步版本往往耗时较长,而改用异步后可显著降低总耗时。
想象你在餐厅点餐的场景:
这就是异步编程的核心优势:避免不必要的等待,充分利用等待时间执行其他任务。
import time
import asyncio
# 同步版本:顺序执行,总耗时=各任务耗时之和
def sync_task():
start = time.time()
for i in range(3):
time.sleep(1) # 模拟 I/O 操作
print(f"同步任务{i}完成")
print(f"同步总耗时:{time.time() - start:.2f}秒")
# 异步版本:并发执行,总耗时≈最慢任务耗时
async def async_task():
start = time.time()
await asyncio.gather(
asyncio.sleep(1, result="异步任务 0 完成"),
asyncio.sleep(1, result="异步任务 1 完成"),
asyncio.sleep(1, result="异步任务 2 完成")
)
print(f"异步总耗时:{time.time() - start:.2f}秒")
# 运行对比
sync_task()
asyncio.run(async_task())
事件循环是 asyncio 的调度中心,它像一个高效的交通警察,管理着所有协程的执行顺序。
事件循环的核心工作机制如下:
import asyncio
async def understanding_event_loop():
"""理解事件循环的工作原理"""
loop = asyncio.get_running_loop()
print(f"事件循环:{loop}")
print(f"循环是否运行:{loop.is_running()}")
print(f"循环是否关闭:{loop.is_closed()}")
# 获取事件循环的多种方式
def get_loop_demo():
"""演示获取事件循环的不同方法"""
try:
# 方法 1: 获取当前运行中的循环(推荐)
loop = asyncio.get_running_loop()
except RuntimeError:
# 方法 2: 获取或创建新循环
loop = asyncio.get_event_loop()
# 方法 3: 创建新循环
new_loop = asyncio.new_event_loop()
return loop
asyncio.run(understanding_event_loop())
关键洞察:事件循环采用单线程模型,通过任务切换而非并行执行来实现并发,这避免了多线程的锁竞争和上下文切换开销。
协程是异步编程的基本执行单元,通过 async/await 语法实现执行暂停和恢复。
import asyncio
from types import coroutine
class CoroutineInsight:
"""协程机制深入解析"""
@staticmethod
async def simple_coroutine():
"""简单协程示例"""
print("开始执行协程")
await asyncio.sleep(1)
print("协程执行完成")
return "结果"
@staticmethod
def coroutine_state_analysis():
"""分析协程状态变化"""
async def stateful_coroutine():
print("阶段 1 执行")
await asyncio.sleep(0.5)
print("阶段 2 执行")
return "完成"
# 创建协程对象(未执行)
coro = stateful_coroutine()
print(f"协程类型:{type(coro)}")
print(f"协程对象:{coro}")
# 执行协程
return asyncio.run(coro)
# 协程状态生命周期
@staticmethod
async def coroutine_lifecycle():
"""演示协程的完整生命周期"""
print("1. 创建协程对象")
coro = CoroutineInsight.simple_coroutine()
print("2. 通过事件循环执行")
result = await coro
print(f"3. 执行完成,结果:{result}")
# asyncio.run(CoroutineInsight.coroutine_lifecycle())
Future 是底层的结果容器,而 Task 是 Future 的子类,专门用于包装协程。
import asyncio
from asyncio import Future, Task
async def future_vs_task_demo():
"""Future 和 Task 的区别演示"""
# 1. Future 示例:手动控制的结果容器
future = Future()
print(f"Future 初始状态:{future.done()}")
# 模拟异步设置结果
def set_result():
future.set_result("手动设置的结果")
# 延迟设置结果
loop = asyncio.get_running_loop()
loop.call_soon(set_result)
result = await future
print(f"Future 结果:{result}, 状态:{future.done()}")
# 2. Task 示例:自动执行的协程包装器
async def task_function():
await asyncio.sleep(0.5)
return "任务执行结果"
task = asyncio.create_task(task_function())
print(f"Task 初始状态:{task.done()}")
task_result = await task
print(f"Task 结果:{task_result}, 状态:{task.done()}")
# asyncio.run(future_vs_task_demo())
await 不仅仅是'等待',更是执行权转让的指令。
import asyncio
import time
class AwaitMechanism:
"""await 机制深入解析"""
@staticmethod
async def mock_io_operation(name, duration):
"""模拟 I/O 操作"""
print(f"[{time.time():.3f}] {name}: 开始 I/O 操作")
await asyncio.sleep(duration)
print(f"[{time.time():.3f}] {name}: I/O 操作完成")
return f"{name}-结果"
@staticmethod
async def await_breakdown():
"""分解 await 的执行过程"""
print("=== await 执行过程分析 ===")
# 顺序 await
start = time.time()
result1 = await AwaitMechanism.mock_io_operation("任务 1", 1)
result2 = await AwaitMechanism.mock_io_operation("任务 2", 1)
print(f"顺序执行耗时:{time.time() - start:.2f}秒")
# 并发 await
start = time.time()
task1 = asyncio.create_task(AwaitMechanism.mock_io_operation("并发任务 1", 1))
task2 = asyncio.create_task(AwaitMechanism.mock_io_operation("并发任务 2", 1))
results = await asyncio.gather(task1, task2)
print(f"并发执行耗时:{time.time() - start:.2f}秒")
return results
# asyncio.run(AwaitMechanism.await_breakdown())
异步上下文管理器通过 __aenter__ 和 __aexit__ 方法管理异步资源。
import asyncio
class AsyncDatabaseConnection:
"""模拟异步数据库连接"""
async def connect(self):
await asyncio.sleep(0.5)
print("数据库连接已建立")
return self
async def execute(self, query):
await asyncio.sleep(0.2)
print(f"执行查询:{query}")
return f"结果-{query}"
async def close(self):
await asyncio.sleep(0.1)
print("数据库连接已关闭")
class AsyncResourceManager:
"""异步上下文管理器"""
async def __aenter__(self):
self.db = AsyncDatabaseConnection()
await self.db.connect()
return self.db
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.db.close()
if exc_type:
print(f"发生异常:{exc_type}")
return True
async def async_context_demo():
"""异步上下文管理器演示"""
async with AsyncResourceManager() as db:
result = await db.execute("SELECT * FROM users")
print(f"查询结果:{result}")
# asyncio.run(async_context_demo())
使用 aiohttp 构建高性能 HTTP 客户端。
import aiohttp
import asyncio
import time
from typing import List, Dict
class AsyncHttpClient:
"""高性能异步 HTTP 客户端"""
def __init__(self, max_connections: int = 10):
self.semaphore = asyncio.Semaphore(max_connections)
async def fetch_url(self, session: aiohttp.ClientSession, url: str) -> Dict:
"""获取单个 URL 的内容"""
async with self.semaphore:
try:
start_time = time.time()
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
content = await response.text()
elapsed = time.time() - start_time
return {
'url': url,
'status': response.status,
'content_length': len(content),
'elapsed_time': elapsed,
'success': True
}
except Exception as e:
return {
'url': url,
'status': None,
'error': str(e),
'success': False
}
async def batch_fetch(self, urls: List[str]) -> List[Dict]:
"""批量获取 URL"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 性能对比测试
async def performance_comparison():
"""同步 vs 异步性能对比"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3"
] * 3 # 12 个请求
client = AsyncHttpClient(max_connections=5)
# 异步版本
start = time.time()
results = await client.batch_fetch(urls)
async_time = time.time() - start
successful = sum(1 for r in results if r and r.get('success'))
print(f"异步版本:耗时{async_time:.2f}秒,成功{successful}/12 个请求")
print(f"平均响应时间:{async_time/len(urls):.2f}秒/请求")
# asyncio.run(AsyncHttpClient.performance_comparison())
实现生产者和消费者模式的异步任务队列。
import asyncio
import random
from typing import Any, Callable
class AsyncTaskQueue:
"""异步任务队列"""
def __init__(self, max_size: int = 100, num_workers: int = 3):
self.queue = asyncio.Queue(maxsize=max_size)
self.workers = []
self.num_workers = num_workers
self.is_running = False
async def producer(self, data_generator: Callable):
"""生产者协程"""
for item in data_generator():
await self.queue.put(item)
print(f"生产任务:{item}")
# 发送结束信号
for _ in range(self.num_workers):
await self.queue.put(None)
async def worker(self, worker_id: int, processor: Callable):
"""工作者协程"""
print(f"工作者{worker_id}启动")
while self.is_running:
task = await self.queue.get()
if task is None: # 结束信号
self.queue.task_done()
break
try:
result = await processor(task, worker_id)
print(f"工作者{worker_id}处理完成:{task} -> {result}")
except Exception as e:
print(f"工作者{worker_id}处理失败:{task}, 错误:{e}")
finally:
self.queue.task_done()
async def process_batch(self, data_generator: Callable, processor: Callable):
"""批量处理任务"""
self.is_running = True
# 启动工作者
self.workers = [
asyncio.create_task(self.worker(i, processor)) for i in range(self.num_workers)
]
# 启动生产者
producer_task = asyncio.create_task(self.producer(data_generator))
# 等待所有任务完成
await producer_task
await self.queue.join()
# 等待工作者完成
for worker in self.workers:
worker.cancel()
self.is_running = False
# 使用示例
async def task_queue_demo():
"""任务队列演示"""
def data_generator():
"""模拟数据生成器"""
for i in range(10):
yield f"task_{i}"
async def task_processor(task: str, worker_id: int) -> str:
"""任务处理器"""
process_time = random.uniform(0.5, 2.0)
await asyncio.sleep(process_time)
return f"processed_by_{worker_id}"
queue = AsyncTaskQueue(num_workers=2)
await queue.process_batch(data_generator, task_processor)
# asyncio.run(AsyncTaskQueue.task_queue_demo())
基于实际项目经验,总结以下性能优化策略。
import asyncio
import time
from functools import wraps
def async_timing_decorator(func):
"""异步函数计时装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
try:
result = await func(*args, **kwargs)
elapsed = time.time() - start
print(f"{func.__name__} 执行耗时:{elapsed:.3f}秒")
return result
except Exception as e:
elapsed = time.time() - start
print(f"{func.__name__} 执行失败,耗时:{elapsed:.3f}秒,错误:{e}")
raise
return wrapper
class AsyncOptimization:
"""异步编程优化工具类"""
@staticmethod
async def optimized_gather(tasks, max_concurrent: int = None):
"""带并发控制的 gather"""
if max_concurrent is None:
return await asyncio.gather(*tasks)
semaphore = asyncio.Semaphore(max_concurrent)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
@staticmethod
async def with_timeout(coro, timeout: float, default=None):
"""带超时的协程执行"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
print(f"操作超时,返回默认值:{default}")
return default
@staticmethod
def create_uvloop_policy():
"""使用 uvloop 提升性能(如果可用)"""
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
print("已启用 uvloop 加速")
return True
except ImportError:
print("未安装 uvloop,使用默认事件循环")
return False
# 性能优化演示
@staticmethod
async def optimization_demo():
"""优化技术演示"""
@async_timing_decorator
async def simulated_io_task(task_id, duration=1):
await asyncio.sleep(duration)
return f"任务{task_id}完成"
# 创建测试任务
tasks = [simulated_io_task(i, i * 0.5) for i in range(5)]
print("=== 普通 gather ===")
await AsyncOptimization.optimized_gather(tasks)
print("\n=== 并发限制 gather ===")
await AsyncOptimization.optimized_gather(tasks, max_concurrent=2)
print("\n=== 超时控制 ===")
await AsyncOptimization.with_timeout(simulated_io_task("timeout", 2), 1, "默认结果")
# asyncio.run(AsyncOptimization.optimization_demo())
健壮的异步应用需要完善的错误处理机制。
import asyncio
from typing import Any, List, Tuple
class AsyncErrorHandler:
"""异步错误处理工具"""
@staticmethod
async def safe_gather(*coros, return_exceptions=True):
"""安全的 gather,防止单个任务失败影响整体"""
return await asyncio.gather(*coros, return_exceptions=return_exceptions)
@staticmethod
async def with_retry(coro, max_retries: int = 3, delay: float = 1.0):
"""带重试的协程执行"""
last_exception = None
for attempt in range(max_retries):
try:
return await coro
except Exception as e:
last_exception = e
print(f"第{attempt + 1}次尝试失败:{e}")
if attempt < max_retries - 1:
await asyncio.sleep(delay * (2 ** attempt)) # 指数退避
raise last_exception or Exception("未知错误")
@staticmethod
async def execute_with_shield(coro):
"""使用 shield 防止取消"""
try:
return await asyncio.shield(coro)
except asyncio.CancelledError:
print("任务被取消保护,继续执行")
return await coro
# 错误处理演示
@staticmethod
async def error_handling_demo():
"""错误处理演示"""
async def unreliable_task(task_id):
if task_id % 3 == 0:
raise ValueError(f"任务{task_id}故意失败")
await asyncio.sleep(0.5)
return f"任务{task_id}成功"
tasks = [unreliable_task(i) for i in range(6)]
print("=== 安全 gather 演示 ===")
results = await AsyncErrorHandler.safe_gather(*tasks)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务{i}失败:{result}")
else:
print(f"任务{i}成功:{result}")
print("\n=== 重试机制演示 ===")
try:
result = await AsyncErrorHandler.with_retry(unreliable_task(0), max_retries=2)
print(f"重试结果:{result}")
except Exception as e:
print(f"最终失败:{e}")
# asyncio.run(AsyncErrorHandler.error_handling_demo())

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online