Python26_并发协程
Python26_并发协程文章目录Python26_并发协程[toc]第一章基础概念1.1 什么是协程Coroutine1.2 协程、线程、进程的区别1.3 核心关键字第二章asyncio 核心机制2.1 事件循环Event Loop2.2 创建和运行任务2.3 并发执行多个任务第三章异步 I/O 与常用操作3.1 模拟 I/O 操作3.2 异步上下文管理器3.3 异步迭代器第四章异常处理与取消4.1 协程中的异常4.2 任务取消Cancellation第五章高级主题5.1 在协程中运行同步代码5.2 有界信号量Semaphore控制并发数5.3 超时控制第六章常见错误与调试6.1 常见错误清单6.2 调试技巧第七章实战模式7.1 生产者-消费者模式使用 Queue7.2 Web 爬虫并发模板附录速查表第一章基础概念1.1 什么是协程CoroutineQ协程是什么与普通函数有什么区别A协程是一种可以暂停执行并在稍后恢复的函数。与普通函数不同协程使用async def定义调用时不会立即执行而是返回一个协程对象。importasyncio# 普通函数defnormal_func():returnHello# 协程函数asyncdefcoro_func():returnHello Async# 区别演示print(normal_func())# 立即执行输出: Helloprint(coro_func())# 不执行输出: coroutine object ...print(asyncio.run(coro_func()))# 正确执行输出: Hello Async1.2 协程、线程、进程的区别特性进程 (Process)线程 (Thread)协程 (Coroutine)资源占用高独立内存空间中共享进程内存低单线程内切换开销高需要切换页表中需要切换栈低用户态切换数据共享需要 IPC共享内存需锁单线程安全适用场景CPU 密集型I/O 密集型高并发 I/OPython实现multiprocessingthreadingasyncioQ为什么协程适合高并发 I/OA协程在单线程内通过事件循环调度遇到 I/O 操作时主动让出控制权不阻塞其他协程。相比线程协程切换无需内核介入内存占用极小约 1KB可轻松支持数万并发连接。1.3 核心关键字关键字作用async def定义协程函数await暂停协程等待另一个协程/可等待对象完成async for异步迭代async with异步上下文管理器第二章asyncio 核心机制2.1 事件循环Event LoopQ什么是事件循环A事件循环是asyncio的核心负责调度和执行协程。它维护一个任务队列不断检查哪些协程可以继续执行。importasyncioasyncdefsay_hello():print(Hello)awaitasyncio.sleep(1)# 模拟 I/O挂起协程print(World)# 方式1使用 asyncio.run()推荐Python 3.7asyncio.run(say_hello())# 方式2手动管理事件循环旧方式loopasyncio.get_event_loop()loop.run_until_complete(say_hello())loop.close()2.2 创建和运行任务Qasyncio.create_task()和await直接调用有什么区别Aawait coro()顺序执行等待完成才继续create_task()并发执行立即创建后台任务importasyncioimporttimeasyncdeftask(name,delay):print(f任务{name}开始)awaitasyncio.sleep(delay)print(f任务{name}完成)returnf结果{name}asyncdefmain():starttime.time()# 方式1顺序执行总耗时 3秒# result1 await task(A, 1)# result2 await task(B, 2)# 方式2并发执行总耗时 2秒task1asyncio.create_task(task(A,1))task2asyncio.create_task(task(B,2))result1awaittask1 result2awaittask2print(f耗时:{time.time()-start:.2f}秒)print(f结果:{result1},{result2})asyncio.run(main())2.3 并发执行多个任务Q如何并发执行多个协程并等待全部完成A使用asyncio.gather()或asyncio.wait()importasyncioasyncdeffetch_data(url):awaitasyncio.sleep(1)# 模拟网络请求returnf数据来自{url}asyncdefmain():urls[url1,url2,url3]# 方式1gather - 等待全部完成返回结果列表resultsawaitasyncio.gather(*[fetch_data(url)forurlinurls],return_exceptionsTrue# 捕获异常而不中断)print(results)# 方式2wait - 更灵活的控制tasks[asyncio.create_task(fetch_data(url))forurlinurls]done,pendingawaitasyncio.wait(tasks,return_whenasyncio.ALL_COMPLETED)fortaskindone:print(task.result())asyncio.run(main())gathervswait对比特性gatherwait返回值结果列表保持顺序完成/未完成的任务集异常处理return_exceptionsTrue需手动检查使用场景简单并发需要精细控制如超时、部分完成第三章异步 I/O 与常用操作3.1 模拟 I/O 操作Q为什么time.sleep()不能在协程中使用Atime.sleep()会阻塞整个线程导致事件循环无法调度其他协程。必须使用asyncio.sleep()。importasyncioimporttimeasyncdefbad_example():print(开始)time.sleep(2)# ❌ 错误阻塞整个事件循环print(结束)asyncdefgood_example():print(开始)awaitasyncio.sleep(2)# ✅ 正确挂起当前协程让出控制权print(结束)3.2 异步上下文管理器Q如何实现异步资源管理如数据库连接A使用async with和__aenter__/__aexit__importasyncioclassAsyncDatabase:asyncdefconnect(self):awaitasyncio.sleep(0.1)print(数据库已连接)returnselfasyncdefquery(self,sql):awaitasyncio.sleep(0.5)returnf查询结果:{sql}asyncdefclose(self):awaitasyncio.sleep(0.1)print(数据库已关闭)asyncdef__aenter__(self):awaitself.connect()returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):awaitself.close()asyncdefmain():asyncwithAsyncDatabase()asdb:resultawaitdb.query(SELECT * FROM users)print(result)asyncio.run(main())3.3 异步迭代器Q如何处理异步数据流A使用async for和__aiter__/__anext__importasyncioimportrandomclassAsyncDataStream:def__init__(self,count):self.countcount self.current0def__aiter__(self):returnselfasyncdef__anext__(self):ifself.currentself.count:raiseStopAsyncIterationawaitasyncio.sleep(0.1)# 模拟异步获取数据self.current1returnf数据块-{self.current}asyncdefmain():streamAsyncDataStream(3)asyncfordatainstream:print(f接收到:{data})asyncio.run(main())第四章异常处理与取消4.1 协程中的异常Q协程中的异常如何捕获A使用常规try/except或在gather中设置return_exceptionsTrueimportasyncioasyncdefrisky_task():awaitasyncio.sleep(1)raiseValueError(出错了)asyncdefmain():# 方式1直接捕获try:awaitrisky_task()exceptValueErrorase:print(f捕获异常:{e})# 方式2gather 中处理resultsawaitasyncio.gather(risky_task(),risky_task(),return_exceptionsTrue# 异常会作为结果返回不抛出)forrinresults:ifisinstance(r,Exception):print(f任务异常:{r})else:print(f任务结果:{r})asyncio.run(main())4.2 任务取消CancellationQ如何取消正在运行的协程A使用task.cancel()协程内部需捕获CancelledError进行清理importasyncioasyncdeflong_running_task():try:whileTrue:print(工作中...)awaitasyncio.sleep(1)exceptasyncio.CancelledError:print(收到取消信号正在清理...)# 执行清理操作关闭连接、保存状态等raise# 必须重新抛出让取消传播asyncdefmain():taskasyncio.create_task(long_running_task())awaitasyncio.sleep(3)task.cancel()# 发送取消请求try:awaittaskexceptasyncio.CancelledError:print(任务已取消)asyncio.run(main())第五章高级主题5.1 在协程中运行同步代码Q如何在协程中调用阻塞的同步函数A使用loop.run_in_executor()将同步代码放到线程池中执行importasyncioimporttimeimportrequests# 同步 HTTP 库defsync_http_request(url):同步阻塞函数time.sleep(2)# 模拟耗时操作returnf同步请求结果:{url}asyncdefmain():loopasyncio.get_event_loop()# 在线程池中执行同步函数不阻塞事件循环resultawaitloop.run_in_executor(None,# 默认线程池sync_http_request,https://example.com)print(result)# 批量处理多个同步任务urls[url1,url2,url3]tasks[loop.run_in_executor(None,sync_http_request,url)forurlinurls]resultsawaitasyncio.gather(*tasks)print(results)asyncio.run(main())5.2 有界信号量Semaphore控制并发数Q如何限制同时运行的协程数量A使用asyncio.Semaphoreimportasyncioasyncdeflimited_task(semaphore,task_id):asyncwithsemaphore:# 获取信号量超过限制会等待print(f任务{task_id}开始)awaitasyncio.sleep(1)print(f任务{task_id}完成)asyncdefmain():# 最多同时运行3个协程semaphoreasyncio.Semaphore(3)tasks[limited_task(semaphore,i)foriinrange(10)]awaitasyncio.gather(*tasks)asyncio.run(main())5.3 超时控制Q如何为协程设置超时A使用asyncio.wait_for()或asyncio.timeout()Python 3.11importasyncioasyncdefslow_task():awaitasyncio.sleep(10)return完成asyncdefmain():# 方式1wait_for所有版本try:resultawaitasyncio.wait_for(slow_task(),timeout2.0)exceptasyncio.TimeoutError:print(任务超时)# 方式2timeout 上下文管理器Python 3.11try:asyncwithasyncio.timeout(2.0):resultawaitslow_task()exceptTimeoutError:print(任务超时)asyncio.run(main())第六章常见错误与调试6.1 常见错误清单错误原因解决方案RuntimeError: no running event loop直接调用协程函数使用asyncio.run()或获取事件循环RuntimeError: cannot be called from a running event loop嵌套调用asyncio.run()使用await或create_taskSyntaxError: await outside async function在非 async 函数中使用 await将函数改为async defRuntimeWarning: coroutine was never awaited创建了协程但未 await确保所有协程都被 await 或创建为任务程序卡住无响应混用阻塞 I/O 和协程使用asyncio专用库或run_in_executor6.2 调试技巧importasyncioimportlogging# 开启调试模式logging.basicConfig(levellogging.DEBUG)asyncio.run(main(),debugTrue)# 查看当前所有任务asyncdefdebug_tasks():tasksasyncio.all_tasks()fortaskintasks:print(f任务:{task.get_name()}, 状态:{完成iftask.done()else运行中})第七章实战模式7.1 生产者-消费者模式使用 Queueimportasyncioasyncdefproducer(queue,n):foriinrange(n):awaitasyncio.sleep(0.5)awaitqueue.put(f产品-{i})print(f生产: 产品-{i})awaitqueue.put(None)# 结束信号asyncdefconsumer(queue,name):whileTrue:itemawaitqueue.get()ifitemisNone:breakawaitasyncio.sleep(1)# 模拟处理print(f消费者{name}处理:{item})asyncdefmain():queueasyncio.Queue(maxsize5)# 有界队列防止内存爆炸producers[asyncio.create_task(producer(queue,5))]consumers[asyncio.create_task(consumer(queue,i))foriinrange(2)]awaitasyncio.gather(*producers)awaitqueue.join()# 等待队列处理完成forcinconsumers:c.cancel()asyncio.run(main())7.2 Web 爬虫并发模板importasyncioimportaiohttp# 需要: pip install aiohttpasyncdeffetch(session,url,semaphore):asyncwithsemaphore:try:asyncwithsession.get(url,timeout10)asresponse:returnawaitresponse.text()exceptExceptionase:returnfError:{e}asyncdefmain():urls[https://example.com]*100semaphoreasyncio.Semaphore(10)# 限制并发10asyncwithaiohttp.ClientSession()assession:tasks[fetch(session,url,semaphore)forurlinurls]resultsawaitasyncio.gather(*tasks,return_exceptionsTrue)print(f成功获取{len(results)}个页面)asyncio.run(main())附录速查表# 1. 定义协程asyncdefmy_coro():awaitasyncio.sleep(1)returndone# 2. 运行协程asyncio.run(my_coro())# 入口点只调用一次# 3. 并发执行tasks[asyncio.create_task(my_coro())for_inrange(10)]resultsawaitasyncio.gather(*tasks)# 4. 等待第一个完成done,pendingawaitasyncio.wait(tasks,return_whenasyncio.FIRST_COMPLETED)# 5. 超时控制resultawaitasyncio.wait_for(my_coro(),timeout5.0)# 6. 限制并发semasyncio.Semaphore(5)asyncwithsem:awaitmy_coro()# 7. 后台任务taskasyncio.create_task(my_coro())# ... 其他代码 ...resultawaittask# 获取结果# 8. 取消任务task.cancel()try:awaittaskexceptasyncio.CancelledError:pass