跳到主要内容 深入理解 Python 异步编程:async、await 与同步函数详解 | 极客日志
Python
深入理解 Python 异步编程:async、await 与同步函数详解 详细解析了 Python 异步编程的核心概念,对比了同步与异步函数的区别及性能差异。重点讲解了 async 关键字定义协程函数、await 关键字暂停协程让出控制权以及 asyncio.create_task 实现并发任务调用的机制。通过实际场景如 Web API、文件处理、数据库操作展示了异步应用,并纠正了常见误区如 await 非阻塞、异步非自动并发等。最后提供了最佳实践建议,包括何时使用异步、错误处理、资源管理及性能优化技巧,帮助开发者编写高效可扩展的应用程序。
莫名其妙 发布于 2026/3/27 更新于 2026/4/18 6 浏览引言
在现代 Python 开发中,异步编程已经成为处理 I/O 密集型任务的标准方式。理解 async、await 和同步函数的区别,对于编写高效、可扩展的应用程序至关重要。
本文将通过理论解释、代码示例和实际场景,帮助你全面理解 Python 异步编程的核心概念。
同步函数 vs 异步函数
什么是同步函数?
同步函数 是传统的函数调用方式,代码按顺序执行,每一步都必须等待前一步完成。
time
():
( )
time.sleep( )
( )
result = sync_function()
(result)
import
def
sync_function
"""同步函数示例"""
print
"开始执行"
2
print
"执行完成"
return
"结果"
print
✅ 简单直观,易于理解
✅ 代码执行顺序清晰
❌ 阻塞执行,无法并发
❌ 处理大量 I/O 操作时效率低下
什么是异步函数? 异步函数 使用 async def 定义,可以在等待 I/O 操作时让出控制权,允许其他任务并发执行。
import asyncio
async def async_function ():
"""异步函数示例"""
print ("开始执行" )
await asyncio.sleep(2 )
print ("执行完成" )
return "结果"
async def main ():
result = await async_function()
print (result)
asyncio.run(main())
✅ 非阻塞执行,可以并发处理多个任务
✅ 适合 I/O 密集型操作(网络请求、文件读写等)
✅ 提高资源利用率
❌ 代码复杂度较高
❌ 需要理解事件循环机制
对比示例:处理 100 个网络请求 import time
import asyncio
import aiohttp
def sync_fetch (url ):
"""同步获取网页内容"""
time.sleep(1 )
return f"Response from {url} "
def sync_process_100_requests ():
"""同步处理 100 个请求"""
start = time.time()
results = []
for i in range (100 ):
result = sync_fetch(f"http://example.com/{i} " )
results.append(result)
end = time.time()
print (f"同步方式耗时:{end - start:.2 f} 秒" )
return results
async def async_fetch (session, url ):
"""异步获取网页内容"""
await asyncio.sleep(1 )
return f"Response from {url} "
async def async_process_100_requests ():
"""异步处理 100 个请求"""
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [async_fetch(session, f"http://example.com/{i} " ) for i in range (100 )]
results = await asyncio.gather(*tasks)
end = time.time()
print (f"异步方式耗时:{end - start:.2 f} 秒" )
return results
if __name__ == "__main__" :
sync_process_100_requests()
asyncio.run(async_process_100_requests())
同步方式:~100 秒(顺序执行)
异步方式:~1 秒(并发执行)
async 关键字详解
async 的作用 async 关键字用于定义一个协程函数(coroutine function) ,它返回一个协程对象(coroutine object) 。
async def my_async_function ():
return "Hello"
coro = my_async_function()
print (type (coro))
result = my_async_function()
result = await my_async_function()
result = asyncio.run(my_async_function())
async 函数的特征 async def fetch_data ():
data = await some_async_operation()
return data
async def main ():
result = await func()
print (result)
result = asyncio.run(func())
print (result)
async def func ():
return 42
coro = func()
常见错误示例
async def get_data ():
return "data"
def main ():
result = get_data()
print (result)
async def main ():
result = await get_data()
print (result)
def sync_func ():
result = await async_func()
async def async_func ():
result = await async_func()
def sync_func ():
result = asyncio.run(async_func())
await 关键字详解
await 的作用机制 await 关键字用于暂停当前协程的执行 ,等待异步操作完成,同时让出控制权给事件循环 ,允许其他协程运行。
async def task1 ():
print ("Task 1 开始" )
await asyncio.sleep(2 )
print ("Task 1 完成" )
async def task2 ():
print ("Task 2 开始" )
await asyncio.sleep(1 )
print ("Task 2 完成" )
async def main ():
await asyncio.gather(task1(), task2())
asyncio.run(main())
await 的执行流程
遇到 await 表达式
暂停当前协程
将控制权返回给事件循环
事件循环执行其他就绪的协程
await 的表达式完成后
恢复当前协程的执行
返回结果
await vs 同步阻塞 import time
import asyncio
def sync_blocking ():
print ("开始阻塞" )
time.sleep(2 )
print ("阻塞结束" )
async def async_waiting ():
print ("开始等待" )
await asyncio.sleep(2 )
print ("等待结束" )
async def demo ():
print ("=== 同步阻塞演示 ===" )
start = time.time()
sync_blocking()
sync_blocking()
print (f"耗时:{time.time()- start:.2 f} 秒\n" )
print ("=== 异步等待演示 ===" )
start = time.time()
await asyncio.gather(async_waiting(), async_waiting())
print (f"耗时:{time.time()- start:.2 f} 秒" )
asyncio.run(demo())
time.sleep():阻塞整个线程,其他代码无法执行
await asyncio.sleep():暂停当前协程,其他协程可以执行
await 可以等待什么?
async def func1 ():
return "result"
async def func2 ():
result = await func1()
return result
async def example ():
async with aiofiles.open ('file.txt' ) as f:
content = await f.read()
async def example ():
async for line in async_file_reader():
print (line)
asyncio.create_task 详解
什么是 create_task? asyncio.create_task() 是 Python 3.7+ 引入的函数,用于将协程包装成 Task 对象并立即调度执行 。它是并发执行多个异步任务的关键工具。
import asyncio
async def my_coroutine ():
await asyncio.sleep(1 )
return "完成"
task = asyncio.create_task(my_coroutine())
create_task 的核心作用
立即调度执行 :创建 Task 后,协程立即开始执行,不需要等待
返回 Task 对象 :可以跟踪任务状态、取消任务、获取结果
并发执行 :多个 Task 可以并发运行
import asyncio
import time
async def task (name: str , duration: int ):
print (f"[{time.time():.2 f} ] 任务 {name} 开始" )
await asyncio.sleep(duration)
print (f"[{time.time():.2 f} ] 任务 {name} 完成" )
return f"任务 {name} 的结果"
async def main ():
start = time.time()
task1 = asyncio.create_task(task("A" , 2 ))
task2 = asyncio.create_task(task("B" , 1 ))
task3 = asyncio.create_task(task("C" , 3 ))
results = await asyncio.gather(task1, task2, task3)
end = time.time()
print (f"\n所有任务完成,耗时:{end - start:.2 f} 秒" )
print (f"结果:{results} " )
asyncio.run(main())
create_task vs await import asyncio
async def slow_task (name: str ):
print (f"{name} 开始" )
await asyncio.sleep(2 )
print (f"{name} 完成" )
return f"{name} 的结果"
async def sequential ():
print ("=== 顺序执行 ===" )
result1 = await slow_task("任务 1" )
result2 = await slow_task("任务 2" )
result3 = await slow_task("任务 3" )
return [result1, result2, result3]
async def concurrent ():
print ("=== 并发执行 ===" )
task1 = asyncio.create_task(slow_task("任务 1" ))
task2 = asyncio.create_task(slow_task("任务 2" ))
task3 = asyncio.create_task(slow_task("任务 3" ))
result1 = await task1
result2 = await task2
result3 = await task3
return [result1, result2, result3]
async def using_gather ():
print ("=== 使用 gather ===" )
tasks = [
asyncio.create_task(slow_task("任务 1" )),
asyncio.create_task(slow_task("任务 2" )),
asyncio.create_task(slow_task("任务 3" ))
]
results = await asyncio.gather(*tasks)
return results
方式 执行方式 耗时 适用场景 await coro()顺序执行 累加 需要顺序执行 create_task() + await并发执行 最大值 需要并发,且需要单独处理每个任务 asyncio.gather()并发执行 最大值 需要并发,且统一处理结果(推荐)
create_task vs asyncio.gather import asyncio
async def fetch_data (url: str ):
await asyncio.sleep(1 )
return f"数据来自 {url} "
async def with_create_task ():
task1 = asyncio.create_task(fetch_data("url1" ))
task2 = asyncio.create_task(fetch_data("url2" ))
result1 = await task1
print (f"第一个结果:{result1} " )
task2.cancel()
try :
result2 = await task2
except asyncio.CancelledError:
print ("任务 2 被取消" )
print (f"任务 1 完成:{task1.done()} " )
print (f"任务 2 完成:{task2.done()} " )
async def with_gather ():
results = await asyncio.gather(
fetch_data("url1" ),
fetch_data("url2" ),
fetch_data("url3" )
)
return results
async def with_gather_error_handling ():
async def fetch_with_error (url: str ):
if url == "error" :
raise ValueError("模拟错误" )
await asyncio.sleep(1 )
return f"数据来自 {url} "
results = await asyncio.gather(
fetch_with_error("url1" ),
fetch_with_error("error" ),
fetch_with_error("url3" ),
return_exceptions=True
)
for i, result in enumerate (results):
if isinstance (result, Exception):
print (f"任务 {i} 失败:{result} " )
else :
print (f"任务 {i} 成功:{result} " )
使用 create_task :需要单独控制任务(取消、检查状态、单独等待)
使用 gather :统一处理多个任务,代码更简洁(推荐)
create_task 的实际应用场景
场景 1:后台任务(Fire-and-Forget) import asyncio
async def send_notification (user_id: int , message: str ):
"""发送通知(不需要等待完成)"""
await asyncio.sleep(1 )
print (f"通知已发送给用户 {user_id} : {message} " )
async def process_order (order_id: int ):
"""处理订单"""
print (f"开始处理订单 {order_id} " )
await asyncio.sleep(2 )
print (f"订单 {order_id} 处理完成" )
asyncio.create_task(send_notification(order_id, "订单处理完成" ))
return f"订单 {order_id} 已处理"
async def main ():
order = await process_order(123 )
print (f"主流程完成:{order} " )
await asyncio.sleep(2 )
asyncio.run(main())
场景 2:超时控制 import asyncio
async def fetch_with_timeout (url: str , timeout: float ):
"""带超时的请求"""
task = asyncio.create_task(fetch_data(url))
try :
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
task.cancel()
return f"请求 {url} 超时"
async def fetch_data (url: str ):
await asyncio.sleep(5 )
return f"数据来自 {url} "
async def main ():
result = await fetch_with_timeout("http://example.com" , timeout=2.0 )
print (result)
asyncio.run(main())
场景 3:任务取消和状态检查 import asyncio
async def long_running_task (name: str , duration: int ):
try :
print (f"任务 {name} 开始" )
await asyncio.sleep(duration)
print (f"任务 {name} 完成" )
return f"任务 {name} 的结果"
except asyncio.CancelledError:
print (f"任务 {name} 被取消" )
raise
async def main ():
task1 = asyncio.create_task(long_running_task("A" , 5 ))
task2 = asyncio.create_task(long_running_task("B" , 3 ))
await asyncio.sleep(2 )
print (f"任务 A 完成:{task1.done()} " )
print (f"任务 B 完成:{task2.done()} " )
task1.cancel()
try :
result1 = await task1
except asyncio.CancelledError:
print ("任务 A 已被取消" )
result2 = await task2
print (f"任务 B 结果:{result2} " )
asyncio.run(main())
create_task 的常见错误
错误 1:忘记 await Task
async def main ():
task = asyncio.create_task(some_async_function())
return "完成"
async def main ():
task = asyncio.create_task(some_async_function())
result = await task
return result
async def main ():
await asyncio.gather(some_async_function())
return "完成"
错误 2:在事件循环外使用
def sync_function ():
task = asyncio.create_task(async_function())
async def async_function ():
task = asyncio.create_task(another_async_function())
await task
def sync_function ():
asyncio.run(async_function())
错误 3:混用 create_task 和直接 await
async def main ():
await slow_task("A" )
task = asyncio.create_task(slow_task("B" ))
await slow_task("C" )
await task
async def main ():
task1 = asyncio.create_task(slow_task("A" ))
task2 = asyncio.create_task(slow_task("B" ))
task3 = asyncio.create_task(slow_task("C" ))
await asyncio.gather(task1, task2, task3)
create_task 最佳实践
1. 后台任务管理 import asyncio
from typing import List
class BackgroundTaskManager :
"""后台任务管理器"""
def __init__ (self ):
self .tasks: List [asyncio.Task] = []
def add_task (self, coro ):
"""添加后台任务"""
task = asyncio.create_task(coro)
self .tasks.append(task)
return task
async def wait_all (self ):
"""等待所有任务完成"""
if self .tasks:
await asyncio.gather(*self .tasks, return_exceptions=True )
def cancel_all (self ):
"""取消所有任务"""
for task in self .tasks:
if not task.done():
task.cancel()
async def main ():
manager = BackgroundTaskManager()
manager.add_task(send_notification(1 , "消息 1" ))
manager.add_task(send_notification(2 , "消息 2" ))
print ("主流程执行中..." )
await asyncio.sleep(1 )
await manager.wait_all()
print ("所有任务完成" )
2. 任务包装器 import asyncio
import logging
async def safe_task (coro, task_name: str ):
"""安全的任务包装器,自动处理异常"""
try :
result = await coro
logging.info(f"任务 {task_name} 完成" )
return result
except Exception as e:
logging.error(f"任务 {task_name} 失败:{e} " )
return None
async def main ():
task1 = asyncio.create_task(safe_task(risky_operation(), "任务 1" ))
task2 = asyncio.create_task(safe_task(another_operation(), "任务 2" ))
results = await asyncio.gather(task1, task2)
return results
3. 限制并发数量 import asyncio
class TaskLimiter :
"""限制并发任务数量"""
def __init__ (self, max_concurrent: int ):
self .semaphore = asyncio.Semaphore(max_concurrent)
self .tasks = []
async def add_task (self, coro ):
"""添加任务(受并发限制)"""
async with self .semaphore:
return await coro
async def run_tasks (self, coros ):
"""运行多个任务"""
tasks = [asyncio.create_task(self .add_task(coro)) for coro in coros]
return await asyncio.gather(*tasks)
async def main ():
limiter = TaskLimiter(max_concurrent=3 )
coros = [fetch_data(f"url{i} " ) for i in range (10 )]
results = await limiter.run_tasks(coros)
return results
总结:create_task 的核心要点
立即调度 :create_task() 创建 Task 后立即开始执行
并发执行 :多个 Task 可以并发运行
任务控制 :可以取消、检查状态、获取结果
与 await 的区别 :await 会等待完成,create_task() 立即返回
与 gather 的关系 :gather() 内部使用 create_task(),但更简洁
适用场景 :后台任务、超时控制、任务管理
实际应用场景
场景 1:Web API 请求处理 from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/sync" )
def sync_endpoint ():
time.sleep(1 )
return {"status" : "ok" }
@app.get("/async" )
async def async_endpoint ():
await asyncio.sleep(1 )
return {"status" : "ok" }
同步方式:服务器每秒只能处理 1 个请求
异步方式:服务器可以并发处理多个请求
场景 2:批量文件处理 import aiofiles
import asyncio
from pathlib import Path
def sync_process_files (file_paths ):
results = []
for path in file_paths:
with open (path, 'r' ) as f:
content = f.read()
results.append(process(content))
return results
async def async_process_files (file_paths ):
async def process_file (path ):
async with aiofiles.open (path, 'r' ) as f:
content = await f.read()
return process(content)
tasks = [process_file(path) for path in file_paths]
results = await asyncio.gather(*tasks)
return results
场景 3:数据库操作 import asyncpg
import asyncio
async def fetch_users ():
conn = await asyncpg.connect('postgresql://...' )
try :
users = await conn.fetch('SELECT * FROM users' )
return users
finally :
await conn.close()
async def fetch_all_data ():
users, posts, comments = await asyncio.gather(
fetch_users(),
fetch_posts(),
fetch_comments()
)
return users, posts, comments
场景 4:任务调度与后台任务(使用 create_task) import asyncio
async def background_task (task_id: str ):
"""后台任务:不需要等待完成"""
print (f"任务 {task_id} 开始" )
await asyncio.sleep(5 )
print (f"任务 {task_id} 完成" )
async def critical_operation ():
"""关键操作:需要等待完成"""
print ("关键操作开始" )
await asyncio.sleep(1 )
print ("关键操作完成" )
return "结果"
async def main ():
result = await critical_operation()
print (f"得到结果:{result} " )
task1 = asyncio.create_task(background_task("task-1" ))
task2 = asyncio.create_task(background_task("task-2" ))
print ("主流程继续执行..." )
await asyncio.sleep(6 )
print ("所有任务完成" )
asyncio.run(main())
常见误区
误区 1:await 就是同步等待
await some_async_function()
async def task1 ():
await asyncio.sleep(2 )
async def task2 ():
await asyncio.sleep(1 )
await asyncio.gather(task1(), task2())
误区 2:异步函数会自动并发 async def main ():
await func1()
await func2()
await func3()
async def main ():
await func1()
await func2()
await func3()
async def main ():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
task3 = asyncio.create_task(func3())
await asyncio.gather(task1, task2, task3)
async def main ():
await asyncio.gather(func1(), func2(), func3())
误区 3:所有函数都应该用 async
async def add (a, b ):
return a + b
def add (a, b ):
return a + b
async def fetch_url (url ):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
误区 4:忘记 await 异步函数 async def get_data ():
return "data"
def main ():
result = get_data()
print (result)
async def get_data ():
return "data"
async def main ():
result = await get_data()
print (result)
误区 5:create_task 后忘记等待 async def main ():
asyncio.create_task(long_running_task())
return "完成"
async def main ():
task = asyncio.create_task(long_running_task())
result = await task
return result
await asyncio.gather(long_running_task())
return "完成"
task = asyncio.create_task(background_task())
await asyncio.sleep(10 )
return "完成"
最佳实践
1. 何时使用异步?
网络请求(HTTP API、WebSocket)
文件 I/O(读写文件)
数据库操作
等待外部服务响应
处理大量并发连接
CPU 密集型计算(图像处理、数值计算)
简单的同步操作(不需要并发)
已经优化的同步代码(不要为了异步而异步)
2. 异步函数设计原则
async def fetch_user_data (user_id: int ):
"""清晰的异步函数"""
async with aiohttp.ClientSession() as session:
async with session.get(f'/api/users/{user_id} ' ) as response:
return await response.json()
async def bad_function ():
time.sleep(1 )
await asyncio.sleep(1 )
3. 错误处理 async def robust_async_function ():
try :
result = await some_async_operation()
return result
except Exception as e:
logger.error(f"操作失败:{e} " )
raise
async def batch_operations ():
tasks = [operation(i) for i in range (10 )]
results = await asyncio.gather(*tasks, return_exceptions=True )
for i, result in enumerate (results):
if isinstance (result, Exception):
print (f"任务 {i} 失败:{result} " )
else :
print (f"任务 {i} 成功:{result} " )
4. 资源管理
async def process_file ():
async with aiofiles.open ('file.txt' , 'r' ) as f:
content = await f.read()
async def query_database ():
async with asyncpg.create_pool('postgresql://...' ) as pool:
async with pool.acquire() as conn:
result = await conn.fetch('SELECT * FROM users' )
5. 性能优化技巧
async def fetch_multiple_urls (urls ):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_with_control (urls ):
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
results = []
for task in tasks:
try :
result = await task
results.append(result)
except Exception as e:
print (f"任务失败:{e} " )
results.append(None )
return results
async def fetch_with_limit (urls, limit=10 ):
semaphore = asyncio.Semaphore(limit)
async def fetch_with_semaphore (url ):
async with semaphore:
return await fetch_url(url)
tasks = [asyncio.create_task(fetch_with_semaphore(url)) for url in urls]
return await asyncio.gather(*tasks)
总结
核心概念回顾
同步函数 :顺序执行,阻塞等待
简单直观
适合 CPU 密集型任务
不适合大量 I/O 操作
异步函数(async) :定义协程函数
返回协程对象
必须用 await 或事件循环执行
适合 I/O 密集型任务
await 关键字 :异步等待
暂停当前协程
让出控制权给事件循环
不阻塞整个线程
允许其他协程并发执行
asyncio.create_task() :创建并调度任务
将协程包装成 Task 对象
立即开始执行(不等待完成)
实现并发执行多个任务
可以取消、检查状态、获取结果
关键区别总结 特性 同步函数 异步函数 + await create_task 执行方式 顺序执行 顺序执行(单个协程) 并发执行(多个任务) 阻塞性 阻塞线程 暂停协程,不阻塞线程 不阻塞,立即返回 适用场景 CPU 密集型 I/O 密集型(单个) I/O 密集型(多个并发) 并发能力 无 无(单个) 高(多个) 代码复杂度 低 中 中高 返回类型 直接返回值 协程对象 Task 对象
记忆要点
async def = 定义异步函数,返回协程对象
await = 异步等待,暂停协程但不阻塞线程
异步函数必须用 await ,否则不会执行
await 不等于同步阻塞 ,它允许并发执行
I/O 操作用异步,CPU 计算用同步
create_task() = 创建任务并立即调度,实现并发执行
多个 await 顺序执行,多个 create_task 并发执行
gather() 内部使用 create_task(),但更简洁
实践建议
🎯 从简单的异步函数开始练习
🎯 理解事件循环的工作原理
🎯 在实际项目中逐步应用
🎯 注意错误处理和资源管理
🎯 不要过度使用异步(适合的场景才用)
延伸阅读 微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online