首页 文章资讯内容详情

Python Async/Await入门指南

2026-06-01 4 花语

本文内容纲要:

转自:https://zhuanlan.zhihu.com/p/27258289

本文将会讲述Python3.5之后出现的async/await的使用方法,以及它们的一些使用目的,如果错误,欢迎指正。

昨天看到DavidBeazley在16年的一个演讲:FearandAwaitinginAsync,给了我不少的感悟和启发,于是想梳理下自己的思路,所以有了以下这篇文章。

Python在3.5版本中引入了关于协程的语法糖async和await,关于协程的概念可以先看我在上一篇文章提到的内容。

看下Python中常见的几种函数形式:

普通函数

deffunction(): return1

生成器函数

defgenerator(): yield1

在3.5过后,我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。

异步函数(协程)

asyncdefasync_function(): return1

异步生成器

asyncdefasync_generator(): yield1

通过类型判断可以验证函数的类型

importtypes print(type(function)istypes.FunctionType)print(type(generator())istypes.GeneratorType)print(type(async_function())istypes.CoroutineType)print(type(async_generator())istypes.AsyncGeneratorType)

直接调用异步函数不会返回结果,而是返回一个coroutine对象:

print(async_function()) #<coroutineobjectasync_functionat0x102ff67d8>

协程需要通过其他方式来驱动,因此可以使用这个协程对象的send方法给协程发送一个值:

print(async_function().send(None))

不幸的是,如果通过上面的调用会抛出一个异常:

StopIteration:1

因为生成器/协程在正常返回退出时会抛出一个StopIteration异常,而原来的返回值会存放在StopIteration对象的value属性中,通过以下捕获可以获取协程真正的返回值:

try: async_function().send(None)exceptStopIterationase:print(e.value)#1

通过上面的方式来新建一个run函数来驱动协程函数:

defrun(coroutine):try:coroutine.send(None)exceptStopIterationase:returne.value

在协程函数中,可以通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果:

asyncdefasync_function(): return1asyncdefawait_coroutine():result=awaitasync_function()print(result)run(await_coroutine())#1

要注意的是,await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。

而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。

查看Awaitable抽象类的代码,表明了只要一个类实现了__await__方法,那么通过它构造出来的实例就是一个Awaitable:

classAwaitable(metaclass=ABCMeta):__slots__=()@abstractmethoddef__await__(self):yield@classmethoddef__subclasshook__(cls,C):ifclsisAwaitable:return_check_methods(C,"__await__")returnNotImplemented

而且可以看到,Coroutine类也继承了Awaitable,而且实现了send,throw和close方法。所以await一个调用异步函数返回的协程对象是合法的。

classCoroutine(Awaitable):__slots__=()@abstractmethoddefsend(self,value):...@abstractmethoddefthrow(self,typ,val=None,tb=None):...defclose(self):...@classmethoddef__subclasshook__(cls,C):ifclsisCoroutine:return_check_methods(C,__await__,send,throw,close)returnNotImplemented

接下来是异步生成器,来看一个例子:

假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:

classPotato: @classmethod defmake(cls,num,*args,**kws):potatos=[]foriinrange(num):potatos.append(cls.__new__(cls,*args,**kws))returnpotatosall_potatos=Potato.make(5)

现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:

deftake_potatos(num):count=0whileTrue:iflen(all_potatos)==0:sleep(.1)else:potato=all_potatos.pop()yieldpotatocount+=1ifcount==num:breakdefbuy_potatos():bucket=[]forpintake_potatos(50):bucket.append(p)

对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:

asyncdeftake_potatos(num):count=0whileTrue:iflen(all_potatos)==0:awaitask_for_potato()potato=all_potatos.pop()yieldpotatocount+=1ifcount==num:break

当货架上的土豆没有了之后,我可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程:

asyncdefask_for_potato(): awaitasyncio.sleep(random.random())all_potatos.extend(Potato.make(random.randint(1,10)))

当生产者完成和返回之后,这是便能从await挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程:

asyncdefbuy_potatos(): bucket=[]asyncforpintake_potatos(50):bucket.append(p)print(fGotpotato{id(p)}...)

asyncfor语法表示我们要后面迭代的是一个异步生成器。

defmain(): importasyncioloop=asyncio.get_event_loop()res=loop.run_until_complete(buy_potatos())loop.close()

用asyncio运行这段代码,结果是这样的:

Gotpotato4338641384... Gotpotato4338641160...Gotpotato4338614736...Gotpotato4338614680...Gotpotato4338614568...Gotpotato4344861864...Gotpotato4344843456...Gotpotato4344843400...Gotpotato4338641384...Gotpotato4338641160......

既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:

defmain(): importasyncioloop=asyncio.get_event_loop()res=loop.run_until_complete(asyncio.wait([buy_potatos(),buy_tomatos()]))loop.close()

再来运行这段代码:

Gotpotato4423119312... Gottomato4423119368...Gotpotato4429291024...Gotpotato4421640768...Gottomato4429331704...Gottomato4429331760...Gottomato4423119368...Gotpotato4429331760...Gotpotato4429331704...Gotpotato4429346688...Gotpotato4429346072...Gottomato4429347360......

看下AsyncGenerator的定义,它需要实现__aiter__和__anext__两个核心方法,以及asend,athrow,aclose方法。

classAsyncGenerator(AsyncIterator):__slots__=()asyncdef__anext__(self):...@abstractmethodasyncdefasend(self,value):...@abstractmethodasyncdefathrow(self,typ,val=None,tb=None):...asyncdefaclose(self):...@classmethoddef__subclasshook__(cls,C):ifclsisAsyncGenerator:return_check_methods(C,__aiter__,__anext__,asend,athrow,aclose)returnNotImplemented

异步生成器是在3.6之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:

bucket=[pasyncforpintake_potatos(50)]

类似的,还有await表达式:

result=[awaitfun()forfuninfuncsifawaitcondition()]

除了函数之外,类实例的普通方法也能用async语法修饰:

classThreeTwoOne: asyncdefbegin(self):print(3)awaitasyncio.sleep(1)print(2)awaitasyncio.sleep(1)print(1)awaitasyncio.sleep(1)returnasyncdefgame():t=ThreeTwoOne()awaitt.begin()print(start)

实例方法的调用同样是返回一个coroutine:

function=ThreeTwoOne.beginmethod=function.__get__(ThreeTwoOne,ThreeTwoOne())importinspectassertinspect.isfunction(function)assertinspect.ismethod(method)assertinspect.iscoroutine(method())

同理还有类方法:

classThreeTwoOne: @classmethod asyncdefbegin(cls):print(3)awaitasyncio.sleep(1)print(2)awaitasyncio.sleep(1)print(1)awaitasyncio.sleep(1)returnasyncdefgame():awaitThreeTwoOne.begin()print(start)

根据PEP492中,async也可以应用到上下文管理器中,__aenter__和__aexit__需要返回一个Awaitable:

classGameContext: asyncdef__aenter__(self):print(gameloading...)awaitasyncio.sleep(1)asyncdef__aexit__(self,exc_type,exc,tb):print(gameexit...)awaitasyncio.sleep(1)asyncdefgame():asyncwithGameContext():print(gamestart...)awaitasyncio.sleep(2)

在3.7版本,contextlib中会新增一个asynccontextmanager装饰器来包装一个实现异步协议的上下文管理器:

fromcontextlibimportasynccontextmanager @asynccontextmanagerasyncdefget_connection():conn=awaitacquire_db_connection()try:yieldfinally:awaitrelease_db_connection(conn)

async修饰符也能用在__call__方法上:

classGameContext: asyncdef__aenter__(self):self._started=time()print(gameloading...)awaitasyncio.sleep(1)returnselfasyncdef__aexit__(self,exc_type,exc,tb):print(gameexit...)awaitasyncio.sleep(1)asyncdef__call__(self,*args,**kws):ifargs[0]==time:returntime()-self._startedasyncdefgame():asyncwithGameContext()asctx:print(gamestart...)awaitasyncio.sleep(2)print(gametime:,awaitctx(time))

await和yieldfrom

Python3.3的yieldfrom语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:

defsub_gen(): yield1yield2yield3defgen():return(yieldfromsub_gen())defmain():forvalingen():print(val)#1#2#3

利用这一特性,使用yieldfrom能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yieldfrom语法来创建协程:

#https://docs.python.org/3.4/library/asyncio-task.html importasyncio @asyncio.coroutine defcompute(x,y):print("Compute%s+%s..."%(x,y))yieldfromasyncio.sleep(1.0)returnx+y@asyncio.coroutinedefprint_sum(x,y):result=yieldfromcompute(x,y)print("%s+%s=%s"%(x,y,result))loop=asyncio.get_event_loop()loop.run_until_complete(print_sum(1,2))loop.close()

然而,用yieldfrom容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python3.5推出了更新的async/await表达式来作为协程的语法。

因此类似以下的调用是等价的:

asyncwithlock: ...with(yieldfromlock):...######################defmain():return(yieldfromcoro())defmain():return(awaitcoro())

那么,怎么把生成器包装为一个协程对象呢?这时候可以用到types包中的coroutine装饰器(如果使用asyncio做驱动的话,那么也可以使用asyncio的coroutine装饰器),@types.coroutine装饰器会将一个生成器函数包装为协程对象:

importasyncio importtypes @types.coroutinedefcompute(x,y):print("Compute%s+%s..."%(x,y))yieldfromasyncio.sleep(1.0)returnx+yasyncdefprint_sum(x,y):result=awaitcompute(x,y)print("%s+%s=%s"%(x,y,result))loop=asyncio.get_event_loop()loop.run_until_complete(print_sum(1,2))loop.close()

尽管两个函数分别使用了新旧语法,但他们都是协程对象,也分别称作nativecoroutine以及generator-basedcoroutine,因此不用担心语法问题。

下面观察一个asyncio中Future的例子:

importasyncio future=asyncio.Future()asyncdefcoro1():awaitasyncio.sleep(1)future.set_result(data)asyncdefcoro2():print(awaitfuture)loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([coro1(),coro2()]))loop.close()

两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果data。

future可以被await证明了future对象是一个Awaitable,进入Future类的源码可以看到有一段代码显示了future实现了__await__协议:

classFuture: ... def__iter__(self):ifnotself.done():self._asyncio_future_blocking=Trueyieldself#ThistellsTasktowaitforcompletion.assertself.done(),"yieldfromwasntusedwithfuture"returnself.result()#Mayraisetoo.ifcompat.PY35:__await__=__iter__#makecompatiblewithawaitexpression

当执行awaitfuture这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。

当future执行set_result方法时,会触发以下的代码,设置结果,标记future已经完成:

defset_result(self,result):...ifself._state!=_PENDING:raiseInvalidStateError({}:{!r}.format(self._state,self))self._result=resultself._state=_FINISHEDself._schedule_callbacks()

最后future会调度自身的回调函数,触发Task._step()告知Task驱动future从之前挂起的点恢复执行,不难看出,future会执行下面的代码:

classFuture: ... def__iter__(self):...assertself.done(),"yieldfromwasntusedwithfuture"returnself.result()#Mayraisetoo.

最终返回结果给调用方。

前面讲了那么多关于asyncio的例子,那么除了asyncio,就没有其他协程库了吗?asyncio作为python的标准库,自然受到很多青睐,但它有时候还是显得太重量了,尤其是提供了许多复杂的轮子和协议,不便于使用。

你可以理解为,asyncio是使用async/await语法开发的协程库,而不是有asyncio才能用async/await,除了asyncio之外,curio和trio是更加轻量级的替代物,而且也更容易使用。

curio的作者是DavidBeazley,下面是使用curio创建tcpserver的例子,据说这是dabeaz理想中的一个异步服务器的样子:

fromcurioimportrun,spawnfromcurio.socketimport*asyncdefecho_server(address):sock=socket(AF_INET,SOCK_STREAM)sock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)sock.bind(address)sock.listen(5)print(Serverlisteningat,address)asyncwithsock:whileTrue:client,addr=awaitsock.accept()awaitspawn(echo_client,client,addr)asyncdefecho_client(client,addr):print(Connectionfrom,addr)asyncwithclient:whileTrue:data=awaitclient.recv(100000)ifnotdata:breakawaitclient.sendall(data)print(Connectionclosed)if__name__==__main__:run(echo_server,(,25000))

无论是asyncio还是curio,或者是其他异步协程库,在背后往往都会借助于IO的事件循环来实现异步,下面用几十行代码来展示一个简陋的基于事件驱动的echo服务器:

fromsocketimportsocket,AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDRfromselectorsimportDefaultSelector,EVENT_READselector=DefaultSelector()pool={}defrequest(client_socket,addr):client_socket,addr=client_socket,addrdefhandle_request(key,mask):data=client_socket.recv(100000)ifnotdata:client_socket.close()selector.unregister(client_socket)delpool[addr]else:client_socket.sendall(data)returnhandle_requestdefrecv_client(key,mask):sock=key.fileobjclient_socket,addr=sock.accept()req=request(client_socket,addr)pool[addr]=reqselector.register(client_socket,EVENT_READ,req)defecho_server(address):sock=socket(AF_INET,SOCK_STREAM)sock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)sock.bind(address)sock.listen(5)selector.register(sock,EVENT_READ,recv_client)try:whileTrue:events=selector.select()forkey,maskinevents:callback=key.datacallback(key,mask)exceptKeyboardInterrupt:sock.close()if__name__==__main__:echo_server((,25000))

验证一下:

#terminal1 $nclocalhost25000 helloworld helloworld #terminal2 $nclocalhost25000 helloworld helloworld

现在知道,完成异步的代码不一定要用async/await,使用了async/await的代码也不一定能做到异步,async/await是协程的语法糖,使协程之间的调用变得更加清晰,使用async修饰的函数调用时会返回一个协程对象,await只能放在async修饰的函数里面使用,await后面必须要跟着一个协程对象或Awaitable,await的目的是等待协程控制流的返回,而实现暂停并挂起函数的操作是yield。

个人认为,async/await以及协程是Python未来实现异步编程的趋势,我们将会在更多的地方看到他们的身影,例如协程库的curio和trio,web框架的sanic,数据库驱动的asyncpg等等...在Python3主导的今天,作为开发者,应该及时拥抱和适应新的变化,而基于async/await的协程凭借良好的可读性和易用性日渐登上舞台,看到这里,你还不赶紧上车?

参考:

PEP492PEP525

本文内容总结:

原文链接:https://www.cnblogs.com/dhcn/p/9032461.html