首页 文章资讯内容详情

详解golang net之transport

2026-06-01 4 花语

本文内容纲要:

关于golanghttptransport的讲解,网上有很多文章进行了解读,但都比较粗,很多代码实现并没有讲清楚。故给出更加详细的实现说明。整体看下来细节实现层面还是比较难懂的。

本次使用golang版本1.12.9

transport实现了RoundTripper接口,该接口只有一个方法RoundTrip(),故transport的入口函数就是RoundTrip()。transport的主要功能其实就是缓存了长连接,用于大量http请求场景下的连接复用,减少发送请求时TCP(TLS)连接建立的时间损耗,同时transport还能对连接做一些限制,如连接超时时间,每个host的最大连接数等。transport对长连接的缓存和控制仅限于TCP+(TLS)+HTTP1,不对HTTP2做缓存和限制。

tranport包含如下几个主要概念:

连接池:在idleConn中保存了不同类型(connectMethodKey)的请求连接(persistConn)。当发生请求时,首先会尝试从连接池中取一条符合其请求类型的连接使用 readLoop/writeLoop:连接之上的功能,循环处理该类型的请求(发送request,返回response) roundTrip:请求的真正入口,接收到一个请求后会交给writeLoop和readLoop处理。

一对readLoop/writeLoop只能处理一条连接,如果这条连接上没有更多的请求,则关闭连接,退出循环,释放系统资源

下述代码都来自golang源码的src/net/httptransport.go文件

typeRoundTripperinterface{ //RoundTripexecutesasingleHTTPtransaction,returning //aResponsefortheprovidedRequest. // //RoundTripshouldnotattempttointerprettheresponse.In //particular,RoundTripmustreturnerr==nilifitobtained //aresponse,regardlessoftheresponsesHTTPstatuscode. //Anon-nilerrshouldbereservedforfailuretoobtaina //response.Similarly,RoundTripshouldnotattemptto //handlehigher-levelprotocoldetailssuchasredirects, //authentication,orcookies. // //RoundTripshouldnotmodifytherequest,exceptfor //consumingandclosingtheRequestsBody.RoundTripmay //readfieldsoftherequestinaseparategoroutine.Callers //shouldnotmutateorreusetherequestuntiltheResponses //Bodyhasbeenclosed. // //RoundTripmustalwaysclosethebody,includingonerrors, //butdependingontheimplementationmaydosoinaseparate //goroutineevenafterRoundTripreturns.Thismeansthat //callerswantingtoreusethebodyforsubsequentrequests //mustarrangetowaitfortheClosecallbeforedoingso. // //TheRequestsURLandHeaderfieldsmustbeinitialized. RoundTrip(*Request)(*Response,error) }

Transport结构体中的主要成员如下(没有列出所有成员):

wantIdle要求关闭所有idle的persistConn reqCancelermap[*Request]func(error)用于取消request idleConnmap[connectMethodKey][]*persistConnidle状态的persistConn连接池,最大值受maxIdleConnsPerHost限制 idleConnChmap[connectMethodKey]chan*persistConn用于给调用者传递persistConn connPerHostCountmap[connectMethodKey]int表示一类连接上的host数目,最大值受MaxConnsPerHost限制 connPerHostAvailablemap[connectMethodKey]chanstruct{}与connPerHostCount配合使用,判断该类型的连接数目是否已经达到上限 idleLRUconnLRU长度受MaxIdleConns限制,队列方式保存所有idle的pconn altProtoatomic.Valuenilormap[string]RoundTripper,key为URIscheme,表示处理该scheme的RoundTripper实现。注意与TLSNextProto的不同,前者表示URI的scheme,后者表示tls之上的协议。如前者不会体现http2,后者会体现http2 Proxyfunc(*Request)(*url.URL,error)为request返回一个代理的url DisableKeepAlivesbool是否取消长连接 DisableCompressionbool是否取消HTTP压缩 MaxIdleConnsint所有host的idle状态的最大连接数目,即idleConn中所有连接数 MaxIdleConnsPerHostint每个host的idle状态的最大连接数目,即idleConn中的key对应的连接数 MaxConnsPerHost每个host上的最大连接数目,含dialing/active/idle状态的connections。http2时,每个host只允许有一条idle的conneciton DialContextfunc(ctxcontext.Context,network,addrstring)(net.Conn,error)创建未加密的tcp连接,比Dial函数增加了context控制 Dialfunc(network,addrstring)(net.Conn,error)创建未加密的tcp连接,废弃,使用DialContext DialTLSfunc(network,addrstring)(net.Conn,error)为非代理模式的https创建连接的函数,如果该函数非空,则不会使用Dial函数,且忽略TLSClientConfig和TLSHandshakeTimeout;反之使用Dila和TLSClientConfig。即有限使用DialTLS进行tls协商 TLSClientConfig*tls.Configtlsclient用于tls协商的配置 IdleConnTimeout连接保持idle状态的最大时间,超时关闭pconn TLSHandshakeTimeouttime.Durationtls协商的超时时间 ResponseHeaderTimeouttime.Duration发送完request后等待serveresponse的时间 TLSNextProtomap[string]func(authoritystring,c*tls.Conn)RoundTripper在tls协商带NPN/ALPN的扩展后,transport如何切换到其他协议。指tls之上的协议(next指的就是tls之上的意思) ProxyConnectHeaderHeader在CONNECT请求时,配置request的首部信息,可选 MaxResponseHeaderBytes指定server响应首部的最大字节数

Transport.roundTrip是主入口,它通过传入一个request参数,由此选择一个合适的长连接来发送该request并返回response。整个流程主要分为两步:

使用getConn函数来获得底层TCP(TLS)连接;调用roundTrip函数进行上层协议(HTTP)处理。

func(t*Transport)roundTrip(req*Request)(*Response,error){ t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx:=req.Context() trace:=httptrace.ContextClientTrace(ctx) ifreq.URL==nil{ req.closeBody() returnnil,errors.New("http:nilRequest.URL") } ifreq.Header==nil{ req.closeBody() returnnil,errors.New("http:nilRequest.Header") } scheme:=req.URL.Scheme isHTTP:=scheme=="http"||scheme=="https" //下面判断request首部的有效性 ifisHTTP{ fork,vv:=rangereq.Header{ if!httpguts.ValidHeaderFieldName(k){ returnnil,fmt.Errorf("net/http:invalidheaderfieldname%q",k) } for_,v:=rangevv{ if!httpguts.ValidHeaderFieldValue(v){ returnnil,fmt.Errorf("net/http:invalidheaderfieldvalue%qforkey%v",v,k) } } } } //判断是否使用注册的RoundTrip来处理对应的scheme。对于使用tcp+tls+http1(wss协议升级)的场景 //不能使用注册的roundTrip。后续代码对tcp+tls+http1或tcp+http1进行了roundTrip处理 ift.useRegisteredProtocol(req){ altProto,_:=t.altProto.Load().(map[string]RoundTripper) ifaltRT:=altProto[scheme];altRT!=nil{ ifresp,err:=altRT.RoundTrip(req);err!=ErrSkipAltProtocol{ returnresp,err } } } //后续仅处理URLscheme为http或https的连接 if!isHTTP{ req.closeBody() returnnil,&badStringError{"unsupportedprotocolscheme",scheme} } ifreq.Method!=""&&!validMethod(req.Method){ returnnil,fmt.Errorf("net/http:invalidmethod%q",req.Method) } ifreq.URL.Host==""{ req.closeBody() returnnil,errors.New("http:noHostinrequestURL") } //下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled) //的情况是不会进行重试的。具体参见shouldRetryRequest函数 for{ select{ case<-ctx.Done(): req.closeBody() returnnil,ctx.Err() default: } //treqgetsmodifiedbyroundTrip,soweneedtorecreateforeachretry. treq:=&transportRequest{Request:req,trace:trace} //connectMethodForRequest函数通过输入一个request返回一个connectMethod(简称cm),该类型通过 //{proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1} //来表示一个请求。一个符合connectMethod描述的request将会在Transport.idleConn中匹配到一类长连接。 cm,err:=t.connectMethodForRequest(treq) iferr!=nil{ req.closeBody() returnnil,err } //获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中, //使用其自注册的RoundTrip处理。该函数描述参见下面内容。 //从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。 pconn,err:=t.getConn(treq,cm)//如果获取底层连接失败,无法继续上层协议的请求,直接返回错误iferr!=nil{//每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置t.setReqCanceler(req,nil) req.closeBody() returnnil,err } varresp*Response//pconn.alt就是从Transport.TLSNextProto中获取的,它表示TLS之上的协议,如HTTP2。从persistConn.alt的注释中可以看出//目前alt仅支持HTTP2协议,后续可能会支持更多协议。ifpconn.alt!=nil{ //HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制t.decHostConnCount(cm.key()) //清除getConn中设置的标记。具体参见getConn t.setReqCanceler(req,nil) resp,err=pconn.alt.RoundTrip(req) }else{//pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。//通过writeLoop发送request,通过readLoop返回responseresp,err=pconn.roundTrip(treq) }//如果成功返回response,则整个处理结束.iferr==nil{ returnresp,nil }//判断该request是否满足重试条件,大部分场景是不支持重试的,仅有少部分情况支持,如errServerClosedIdle//err非nil时实际并没有在原来的连接上重试,且pconn没有关闭,提了issueif!pconn.shouldRetryRequest(req,err){ //Issue16465:returnunderlyingnet.Conn.Readerrorfrompeek,//aswevehistoricallydone.ife,ok:=err.(transportReadFromServerError);ok{ err=e.err } returnnil,err } testHookRoundTripRetried() //Rewindthebodyifwereableto.//用于重定向场景ifreq.GetBody!=nil{ newReq:=*req varerrerror newReq.Body,err=req.GetBody() iferr!=nil{ returnnil,err } req=&newReq }} }

getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接

func(t*Transport)getConn(treq*transportRequest,cmconnectMethod)(*persistConn,error){ req:=treq.Request trace:=treq.trace ctx:=req.Context() iftrace!=nil&&trace.GetConn!=nil{ trace.GetConn(cm.addr()) } //从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接 ifpc,idleSince:=t.getIdleConn(cm);pc!=nil{ iftrace!=nil&&trace.GotConn!=nil{ trace.GotConn(pc.gotIdleConnTrace(idleSince)) } //此处设置transport.reqCanceler比较难理解,主要功能是做一个标记,用于判断当前到执行pconn.roundTrip //期间,request有没有被(如Request.Cancel,Request.Context().Done())取消,被取消的request将无需继续roundTrip处理 t.setReqCanceler(req,func(error){}) returnpc,nil } typedialResstruct{ pc*persistConn errerror } //该chan中用于存放通过dialConn函数新创建的长连接persistConn(后续简称pconn),表示一条TCP(TLS)的底层连接. dialc:=make(chandialRes) //cmKey实际就是把connectMethod中的元素全部字符串化。cmKey作为一类连接的标识,如Transport.idleConn[cmKey]就表示一类特定的连接 cmKey:=cm.key() //CopythesehookssowedontraceonthepostPendingDialin //thegoroutinewelaunch.Issue11136. testHookPrePendingDial:=testHookPrePendingDial testHookPostPendingDial:=testHookPostPendingDial //在尝试获取连接的时候,如果此时正在创建一条连接,但最后没有选择这条新建的连接(有其它调用者释放了一条连接), //此时,handlePendingDial负责将这条新创建的连接放到Transport.idleConn连接池中 handlePendingDial:=func(){ testHookPrePendingDial() gofunc(){ ifv:=<-dialc;v.err==nil{ //将一条连接放入连接池中,描述见下文--tryPutIdleConn t.putOrCloseIdleConn(v.pc) }else{ t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc:=make(chanerror,1) //为request设置ReqCanceler。transport代码中不会主动调用该ReqCanceler函数(会在 //roundTrip中调用replaceReqCanceler将其覆盖),可能的原因是transport提供了一个对外APICancelRequest, //用户可以调用该函数取消连接,此时会调用该ReqCanceler。需要注意的是从CancelRequest的注释中可以看出,该API //已经被废弃,这段代码后面可能会被删除(如果有不同看法,请指出) t.setReqCanceler(req,func(errerror){cancelc<-err}) //如果对host上建立的连接有限制 ift.MaxConnsPerHost>0{ select{ //incHostConnCount会根据主机已经建立的连接是否达到t.MaxConnsPerHost来返回一个未关闭 //的chan(连接数达到MaxConnsPerHost)或关闭的chan(连接数未达到MaxConnsPerHost), //返回未关闭的chan时会阻塞等待其他请求释放连接,不能新创建pconn;反之可以使用新创建的pconn case<-t.incHostConnCount(cmKey): //等待获取某一类连接对应的chan。tryPutIdleConn函数中会尝试将新建或释放的连接放入到该chan中 casepc:=<-t.getIdleConnCh(cm): iftrace!=nil&&trace.GotConn!=nil{ trace.GotConn(httptrace.GotConnInfo{Conn:pc.conn,Reused:pc.isReused()}) } returnpc,nil //下面2个case都表示request被取消,其中Cancel被废弃,建议使用Context来取消request case<-req.Cancel: returnnil,errRequestCanceledConn case<-req.Context().Done(): returnnil,req.Context().Err() caseerr:=<-cancelc: iferr==errRequestCanceled{ err=errRequestCanceledConn } returnnil,err } } gofunc(){ //新建连接,创建好后将其放入dialcchan中 pc,err:=t.dialConn(ctx,cm) dialc<-dialRes{pc,err} }() //下面会通过两种途径来获得连接:从dialc中获得通过dialConn新建的连接;通过idleConnCh获得其他request释放的连接 //如果首先获取到的是dialConn新建的连接,直接返回该连接即可;如果首先获取到的是其他request释放的连接,在返回该连接前 //需要调用handlePendingDial来处理dialConn新建的连接。 idleConnCh:=t.getIdleConnCh(cm) select{ //获取dialConn新建的连接 casev:=<-dialc: //Ourdialfinished. ifv.pc!=nil{ iftrace!=nil&&trace.GotConn!=nil&&v.pc.alt==nil{ trace.GotConn(httptrace.GotConnInfo{Conn:v.pc.conn}) } returnv.pc,nil } //仅针对MaxConnsPerHost>0有效,对应上面的incHostConnCount() t.decHostConnCount(cmKey) //下面用于返回更易读的错误信息 select{ case<-req.Cancel: //Itwasanerrorduetocancelation,soprioritizethat //errorvalue.(Issue16049) returnnil,errRequestCanceledConn case<-req.Context().Done(): returnnil,req.Context().Err() caseerr:=<-cancelc: iferr==errRequestCanceled{ err=errRequestCanceledConn } returnnil,err default: //Itwasntanerrorduetocancelation,so //returntheoriginalerrormessage: returnnil,v.err } //获取其他request释放的连接 casepc:=<-idleConnCh: //Anotherrequestfinishedfirstanditsnet.Conn //becameavailablebeforeourdial.Orsomebody //elsesdialthattheydidntuse. //Butourdialisstillgoing,sogiveitaway //whenitfinishes: handlePendingDial() iftrace!=nil&&trace.GotConn!=nil{ trace.GotConn(httptrace.GotConnInfo{Conn:pc.conn,Reused:pc.isReused()}) } returnpc,nil //如果request取消,也需要调用handlePendingDial处理新建的连接 case<-req.Cancel: handlePendingDial() returnnil,errRequestCanceledConn case<-req.Context().Done(): handlePendingDial() returnnil,req.Context().Err() caseerr:=<-cancelc: handlePendingDial() iferr==errRequestCanceled{ err=errRequestCanceledConn } returnnil,err } }

tryPutIdleConn函数用来将一条新创建或回收的连接放回连接池中,以便后续使用。与getIdleConnCh配合使用,后者用于获取一类连接对应的chan。在如下场景会将一个连接放回idleConn中

在readLoop成功之后(当然还有其他判断,如底层链路没有返回EOF错误); 创建一个新连接且新连接没有被使用时; roundTrip一开始发现request被取消时 func(t*Transport)tryPutIdleConn(pconn*persistConn)error{ //当不使用长连接或该主机上的连接数小于0(即不允许缓存任何连接)时,返回错误并关闭创建的连接(此处没有做关闭处理, //但存在不适用的连接时必须关闭,如使用putOrCloseIdleConn)。 //可以看出当不使用长连接时,Transport不能缓存连接 ift.DisableKeepAlives||t.MaxIdleConnsPerHost<0{ returnerrKeepAlivesDisabled } ifpconn.isBroken(){ returnerrConnBroken } //如果是HTTP2连接,则直接返回,不缓存该连接 ifpconn.alt!=nil{ returnerrNotCachingH2Conn } //为新连接标记可重用状态,新创建的连接肯定是可以重用的,用于在Transport.roundTrip //中的shouldRetryRequest函数中判断连接是否可以重用 pconn.markReused() //该key对应Transport.idleConn中的key,标识特定的连接 key:=pconn.cacheKey t.idleMu.Lock() defert.idleMu.Unlock() //idleConnCh中的chan元素用于存放可用的连接pconn,每类连接都有一个chan waitingDialer:=t.idleConnCh[key] select{ //如果此时有调用者等待一个连接,则直接将该连接传递出去,不进行保存,这种做法有利于提高效率 casewaitingDialer<-pconn: //Weredonewiththispconnandsomebodyelseis //currentlywaitingforaconnofthistype(theyre //activelydialing,butthisconnisready //first).Chromecallsthissocketlatebinding.See //https://insouciant.org/tech/connection-management-in-chromium/ returnnil default: //如果没有调用者等待连接,则清除该chan。删除map中的chan直接会关闭该chan ifwaitingDialer!=nil{ //Theyhadpopulatedthis,buttheirdialwon //first,sowecancleanupthismapentry. delete(t.idleConnCh,key) } } //与DisableKeepAlives有点像,当用户需要关闭所有idle的连接时,不会再缓存连接 ift.wantIdle{ returnerrWantIdle } ift.idleConn==nil{ t.idleConn=make(map[connectMethodKey][]*persistConn) } idles:=t.idleConn[key] //当主机上该类连接数超过Transport.MaxIdleConnsPerHost时,不能再保存新的连接,返回错误并关闭连接 iflen(idles)>=t.maxIdleConnsPerHost(){ returnerrTooManyIdleHost } //需要缓存的连接与连接池中已有的重复,系统退出(这种情况下系统已经发生了混乱,直接退出) for_,exist:=rangeidles{ ifexist==pconn{ log.Fatalf("dupidlepconn%pinfreelist",pconn) } } //添加待缓存的连接 t.idleConn[key]=append(idles,pconn) t.idleLRU.add(pconn) //受MaxIdleConns的限制,添加策略变为:添加新的连接,删除最老的连接。 //MaxIdleConns限制了所有类型的idle状态的最大连接数目,而MaxIdleConnsPerHost限制了host上单一类型的最大连接数目 //idleLRU中保存了所有的连接,此处的作用为,找出最老的连接并移除 ift.MaxIdleConns!=0&&t.idleLRU.len()>t.MaxIdleConns{ oldest:=t.idleLRU.removeOldest() oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } //为新添加的连接设置超时时间 ift.IdleConnTimeout>0{ ifpconn.idleTimer!=nil{ //如果该连接是被释放的,则重置超时时间 pconn.idleTimer.Reset(t.IdleConnTimeout) }else{ //如果该连接时新建的,则设置超时时间并设置超时动作pconn.closeConnIfStillIdle //closeConnIfStillIdle用于释放连接,从Transport.idleLRU和Transport.idleConn中移除并关闭该连接 pconn.idleTimer=time.AfterFunc(t.IdleConnTimeout,pconn.closeConnIfStillIdle) } } pconn.idleAt=time.Now() returnnil }

dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop

func(t*Transport)dialConn(ctxcontext.Context,cmconnectMethod)(*persistConn,error){ pconn:=&persistConn{ t:t, cacheKey:cm.key(), reqch:make(chanrequestAndChan,1), writech:make(chanwriteRequest,1), closech:make(chanstruct{}), writeErrCh:make(chanerror,1), writeLoopDone:make(chanstruct{}), } trace:=httptrace.ContextClientTrace(ctx) wrapErr:=func(errerror)error{ ifcm.proxyURL!=nil{ //Returnatypederror,perIssue16997 return&net.OpError{Op:"proxyconnect",Net:"tcp",Err:err} } returnerr } //调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout //参数会被忽略 ifcm.scheme()=="https"&&t.DialTLS!=nil{ varerrerror //调用注册的连接函数创建一条连接,注意cm.addr()的实现,如果该连接存在proxy,则此处是与proxy建立TLS连接;否则直接连server。 //存在proxy时,与server建立连接分为2步:与proxy建立TLP(TLS)连接;与server建立HTTP(HTTPS)连接 //func(cm*connectMethod)addr()string{ //ifcm.proxyURL!=nil{ //returncanonicalAddr(cm.proxyURL) //} //returncm.targetAddr //} pconn.conn,err=t.DialTLS("tcp",cm.addr()) iferr!=nil{ returnnil,wrapErr(err) } ifpconn.conn==nil{ returnnil,wrapErr(errors.New("net/http:Transport.DialTLSreturned(nil,nil)")) } //如果连接类型是TLS的,则需要处理TLS协商 iftc,ok:=pconn.conn.(*tls.Conn);ok{ //Handshakehere,incaseDialTLSdidnt.TLSNextProtobelow //dependsonitforknowingtheconnectionstate. iftrace!=nil&&trace.TLSHandshakeStart!=nil{ trace.TLSHandshakeStart() } //启动TLS协商,如果协商失败需要关闭连接 iferr:=tc.Handshake();err!=nil{ gopconn.conn.Close() iftrace!=nil&&trace.TLSHandshakeDone!=nil{ trace.TLSHandshakeDone(tls.ConnectionState{},err) } returnnil,err } cs:=tc.ConnectionState() iftrace!=nil&&trace.TLSHandshakeDone!=nil{ trace.TLSHandshakeDone(cs,nil) } //保存TLS协商结果 pconn.tlsState=&cs } }else{ //使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr() conn,err:=t.dial(ctx,"tcp",cm.addr()) iferr!=nil{ returnnil,wrapErr(err) } pconn.conn=conn //如果scheme是需要TLS协商的,则处理TLS协商,否则为普通的HTTP连接 ifcm.scheme()=="https"{ varfirstTLSHoststring iffirstTLSHost,_,err=net.SplitHostPort(cm.addr());err!=nil{ returnnil,wrapErr(err) } //进行TLS协商,具体参见下文addTLS iferr=pconn.addTLS(firstTLSHost,trace);err!=nil{ returnnil,wrapErr(err) } } } //处理proxy的情况 switch{ //不存在proxy直接跳过 casecm.proxyURL==nil: casecm.proxyURL.Scheme=="socks5": conn:=pconn.conn d:=socksNewDialer("tcp",conn.RemoteAddr().String()) ifu:=cm.proxyURL.User;u!=nil{ auth:=&socksUsernamePassword{ Username:u.Username(), } auth.Password,_=u.Password() d.AuthMethods=[]socksAuthMethod{ socksAuthMethodNotRequired, socksAuthMethodUsernamePassword, } d.Authenticate=auth.Authenticate } if_,err:=d.DialWithConn(ctx,conn,"tcp",cm.targetAddr);err!=nil{ conn.Close() returnnil,err } //如果存在proxy,且server的scheme为"http",如果需要代理认证,则设置认证信息 casecm.targetScheme=="http": pconn.isProxy=true ifpa:=cm.proxyAuth();pa!=""{ pconn.mutateHeaderFunc=func(hHeader){ h.Set("Proxy-Authorization",pa) } } //如果存在proxy,且server的scheme为"https"。与"http"不同,在与server进行tls协商前,会给proxy //发送一个method为"CONNECT"的HTTP请求,如果请求通过(返回200),则可以继续与server进行TLS协商 casecm.targetScheme=="https": //该conn表示与proxy建立的连接 conn:=pconn.conn hdr:=t.ProxyConnectHeader ifhdr==nil{ hdr=make(Header) } connectReq:=&Request{ Method:"CONNECT", URL:&url.URL{Opaque:cm.targetAddr}, Host:cm.targetAddr, Header:hdr, } ifpa:=cm.proxyAuth();pa!=""{ connectReq.Header.Set("Proxy-Authorization",pa) } //发送"CONNECT"http请求 connectReq.Write(conn) //Readresponse. //Okaytouseanddiscardbufferedreaderhere,because //TLSserverwillnotspeakuntilspokento. br:=bufio.NewReader(conn) resp,err:=ReadResponse(br,connectReq) iferr!=nil{ conn.Close() returnnil,err } //proxy返回非200,表示无法建立连接,可能情况如proxy认证失败 ifresp.StatusCode!=200{ f:=strings.SplitN(resp.Status,"",2) conn.Close() iflen(f)<2{ returnnil,errors.New("unknownstatuscode") } returnnil,errors.New(f[1]) } } //与proxy建立连接后,再与server进行TLS协商 ifcm.proxyURL!=nil&&cm.targetScheme=="https"{ iferr:=pconn.addTLS(cm.tlsHost(),trace);err!=nil{ returnnil,err } } //后续进行TLS之上的协议处理,如果TLS之上的协议为注册协议,则使用注册的roundTrip进行处理 //TLS之上的协议为TLS协商过程中使用NPN/ALPN扩展协商出的协议,如HTTP2(参见golang.org/x/net/http2) ifs:=pconn.tlsState;s!=nil&&s.NegotiatedProtocolIsMutual&&s.NegotiatedProtocol!=""{ ifnext,ok:=t.TLSNextProto[s.NegotiatedProtocol];ok{ return&persistConn{alt:next(cm.targetAddr,pconn.conn.(*tls.Conn))},nil } } ift.MaxConnsPerHost>0{ pconn.conn=&connCloseListener{Conn:pconn.conn,t:t,cmKey:pconn.cacheKey} } //创建读写通道,writeLoop用于发送request,readLoop用于接收响应。roundTrip函数中会通过chan给writeLoop发送 //request,通过chan从readLoop接口response。每个连接都有一个readLoop和writeLoop,连接关闭后,这2个Loop也会退出。 //pconn.br给readLoop使用,pconn.bw给writeLoop使用,注意此时已经建立了tcp连接。 pconn.br=bufio.NewReader(pconn) pconn.bw=bufio.NewWriter(persistConnWriter{pconn}) gopconn.readLoop() gopconn.writeLoop() returnpconn,nil }

addTLS用于进行非注册协议下的TLS协商

func(pconn*persistConn)addTLS(namestring,trace*httptrace.ClientTrace)error{ //InitiateTLSandcheckremotehostnameagainstcertificate. cfg:=cloneTLSConfig(pconn.t.TLSClientConfig) ifcfg.ServerName==""{ cfg.ServerName=name } ifpconn.cacheKey.onlyH1{ cfg.NextProtos=nil } plainConn:=pconn.conn //配置TLSclient,包含一个TCP连接和TLC配置 tlsConn:=tls.Client(plainConn,cfg) errc:=make(chanerror,2) vartimer*time.Timer //设置TLS超时时间,并在超时后往errc中写入一个tlsHandshakeTimeoutError{} ifd:=pconn.t.TLSHandshakeTimeout;d!=0{ timer=time.AfterFunc(d,func(){ errc<-tlsHandshakeTimeoutError{} }) } gofunc(){ iftrace!=nil&&trace.TLSHandshakeStart!=nil{ trace.TLSHandshakeStart() } //执行TLS协商,如果协商没有超时,则将协商结果err放入errc中 err:=tlsConn.Handshake() iftimer!=nil{ timer.Stop() } errc<-err }() //阻塞等待TLS协商结果,如果协商失败或协商超时,关闭底层连接 iferr:=<-errc;err!=nil{ plainConn.Close() iftrace!=nil&&trace.TLSHandshakeDone!=nil{ trace.TLSHandshakeDone(tls.ConnectionState{},err) } returnerr } //获取协商结果并设置到pconn.tlsState cs:=tlsConn.ConnectionState() iftrace!=nil&&trace.TLSHandshakeDone!=nil{ trace.TLSHandshakeDone(cs,nil) } pconn.tlsState=&cs pconn.conn=tlsConn returnnil }

在获取到底层TCP(TLS)连接后在roundTrip中处理上层协议:即发送HTTPrequest,返回HTTPresponse。roundTrip给writeLoop提供request,从readLoop获取response。

一个roundTrip用于处理一类request。

func(pc*persistConn)roundTrip(req*transportRequest)(resp*Response,errerror){ testHookEnterRoundTrip() //此处与getConn中的"t.setReqCanceler(req,func(error){})"相对应,用于判断request是否被取消 //返回false表示request被取消,不必继续后续请求,关闭连接并返回错误 if!pc.t.replaceReqCanceler(req.Request,pc.cancelRequest){ pc.t.putOrCloseIdleConn(pc) returnnil,errRequestCanceled } pc.mu.Lock() //与readLoop配合使用,表示期望的响应的个数 pc.numExpectedResponses++ //dialConn中定义的函数,设置了proxy的认证信息 headerFn:=pc.mutateHeaderFunc pc.mu.Unlock() ifheaderFn!=nil{ headerFn(req.extraHeaders()) } //Askforacompressedversionifthecallerdidntsettheir //ownvalueforAccept-Encoding.Weonlyattemptto //uncompressthegzipstreamifwewerethelayerthat //requestedit. requestedGzip:=false //如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且 //仅在调用者没有设置这些首部时设置 if!pc.t.DisableCompression&& req.Header.Get("Accept-Encoding")==""&& req.Header.Get("Range")==""&& req.Method!="HEAD"{ //Requestgziponly,notdeflate.Deflateisambiguousand //notasuniversallysupportedanyway. //See:https://zlib.net/zlib_faq.html#faq39 // //NotethatwedontrequestthisforHEADrequests, //duetoabuginnginx: //https://trac.nginx.org/nginx/ticket/358 //https://golang.org/issue/5522 // //Wedontrequestgzipiftherequestisforarange,since //auto-decodingaportionofagzippeddocumentwilljustfail //anyway.Seehttps://golang.org/issue/8923 requestedGzip=true req.extraHeaders().Set("Accept-Encoding","gzip") } //用于处理首部含"Expect:100-continue"的request,客户端使用该首部探测服务器是否能够 //处理request首部中的规格要求(如长度过大的request)。 varcontinueChchanstruct{} ifreq.ProtoAtLeast(1,1)&&req.Body!=nil&&req.expectsContinue(){ continueCh=make(chanstruct{},1) } //HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会 //新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置 //"Connection:close",则为request设置该首部。将底层表现与上层协议保持一致。 ifpc.t.DisableKeepAlives&&!req.wantsClose(){ req.extraHeaders().Set("Connection","close") } //用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞 gone:=make(chanstruct{}) deferclose(gone) deferfunc(){ iferr!=nil{ pc.t.setReqCanceler(req.Request,nil) } }() constdebugRoundTrip=false //Writetherequestconcurrentlywithwaitingforaresponse, //incasetheserverdecidestoreplybeforereadingourfull //requestbody. //表示发送了多少个字节的request,debug使用 startBytesWritten:=pc.nwrite //给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop //接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response writeErrCh:=make(chanerror,1) pc.writech<-writeRequest{req,writeErrCh,continueCh} //给readLoop封装并发送信息 resc:=make(chanresponseAndError) pc.reqch<-requestAndChan{ req:req.Request, ch:resc, addedGzip:requestedGzip, continueCh:continueCh, callerGone:gone, } varrespHeaderTimer<-chantime.Time cancelChan:=req.Request.Cancel ctxDoneChan:=req.Context().Done() //该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse //退出roundtrip函数 for{ testHookWaitResLoop() select{ //writeLoop返回发送request后的结果 caseerr:=<-writeErrCh: ifdebugRoundTrip{ req.logf("writeErrChresv:%T/%#v",err,err) } iferr!=nil{ pc.close(fmt.Errorf("writeerror:%v",err)) returnnil,pc.mapRoundTripError(req,startBytesWritten,err) } //设置一个接收response的定时器,如果在这段时间内没有接收到response(即没有进入下面代码 //的"casere:=<-resc:"分支),超时后进入""case<-respHeaderTimer:分支,关闭连接, //防止readLoop一直等待读取response,导致处理阻塞;没有超时则关闭定时器 ifd:=pc.t.ResponseHeaderTimeout;d>0{ ifdebugRoundTrip{ req.logf("startingtimerfor%v",d) } timer:=time.NewTimer(d) defertimer.Stop()//preventleaks respHeaderTimer=timer.C } //处理底层连接关闭。"case<-cancelChan:"和”case<-ctxDoneChan:“为request关闭,request //关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。 case<-pc.closech: ifdebugRoundTrip{ req.logf("closechrecv:%T%#v",pc.closed,pc.closed) } returnnil,pc.mapRoundTripError(req,startBytesWritten,pc.closed) //等待获取response超时,关闭连接 case<-respHeaderTimer: ifdebugRoundTrip{ req.logf("timeoutwaitingforresponseheaders.") } pc.close(errTimeout) returnnil,errTimeout //接收到readLoop返回的response结果 casere:=<-resc: //极异常情况,直接程序panic if(re.res==nil)==(re.err==nil){ panic(fmt.Sprintf("internalerror:exactlyoneofresorerrshouldbeset;nil=%v",re.res==nil)) } ifdebugRoundTrip{ req.logf("rescrecv:%p,%T/%#v",re.res,re.err,re.err) } ifre.err!=nil{ returnnil,pc.mapRoundTripError(req,startBytesWritten,re.err) } returnre.res,nil //request取消 case<-cancelChan: pc.t.CancelRequest(req.Request) //将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0) cancelChan=nil case<-ctxDoneChan: pc.t.cancelRequest(req.Request,req.Context().Err()) cancelChan=nil ctxDoneChan=nil } } }

writeLoop用于发送request请求

func(pc*persistConn)writeLoop(){ deferclose(pc.writeLoopDone) for{ //writeLoop会阻塞等待两个IOcase: //循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request; //如果底层连接关闭,则退出writeLoop select{ casewr:=<-pc.writech: startBytesWritten:=pc.nwrite //构造request并发送request请求。waitForContinue用于处理首部含"Expect:100-continue"的request err:=wr.req.Request.write(pc.bw,pc.isProxy,wr.req.extra,pc.waitForContinue(wr.continueCh)) ifbre,ok:=err.(requestBodyReadError);ok{ err=bre.error //Errorsreadingfromtheusers //Request.Bodyarehighpriority. //Setitherebeforesendingonthe //channelsbeloworcalling //pc.close()whichtearstown //connectionsandcausesother //errors. wr.req.setError(err) } iferr==nil{ err=pc.bw.Flush() } //请求失败时,需要关闭request和底层连接 iferr!=nil{ wr.req.Request.closeBody() ifpc.nwrite==startBytesWritten{ err=nothingWrittenError{err} } } //将结果发送给readLoop的pc.wroteRequest()函数处理 pc.writeErrCh<-err//tothebodyreader,whichmightrecycleus //将结果返回给roundTrip处理,防止响应超时 wr.ch<-err //如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech, //同时会导致readLoop退出 iferr!=nil{ pc.close(err) return } case<-pc.closech: return } } }

readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。当readLoop正常处理完一个response之后,会将连接重新放入到连接池中;

当readloop退出后,该连接会被关闭移除。

func(pc*persistConn)readLoop(){ closeErr:=errReadLoopExiting//defaultvalue,ifnotchangedbelow //当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个 //处理,任何一个loop退出(协议升级除外)则该连接不可用 //readLoo跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源 deferfunc(){ pc.close(closeErr) pc.t.removeIdleConn(pc) }() //尝试将连接放回连接池 tryPutIdleConn:=func(trace*httptrace.ClientTrace)bool{ iferr:=pc.t.tryPutIdleConn(pc);err!=nil{ closeErr=err iftrace!=nil&&trace.PutIdleConn!=nil&&err!=errKeepAlivesDisabled{ trace.PutIdleConn(err) } returnfalse } iftrace!=nil&&trace.PutIdleConn!=nil{ trace.PutIdleConn(nil) } returntrue } //eofcisusedtoblockcallergoroutinesreadingfromResponse.Body //atEOFuntilthisgoroutineshas(potentially)addedtheconnection //backtotheidlepool. //从上面注释可以看出该变量主要用于阻塞调用者协程读取EOF的resp.body, //直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样, //便于连接快速重用 eofc:=make(chanstruct{}) //出现错误时也需要释放读取resp.Body的协程,防止调用者协程挂死 deferclose(eofc)//unblockreaderonerrors //Readthisonce,beforeloopstarts.(toavoidracesintests) testHookMu.Lock() testHookReadLoopBeforeNextRead:=testHookReadLoopBeforeNextRead testHookMu.Unlock() alive:=true foralive{ //获取允许的response首部的最大字节数 pc.readLimit=pc.maxHeaderResponseSize() //从接收buffer中peek一个字节来判断底层是否接收到response。roundTrip保证了request先于response发送。 //此处peek会阻塞等待response(这也是roundtrip中设置response超时定时器的原因)。goroutine中的read/write //操作都是阻塞模式。 _,err:=pc.br.Peek(1) pc.mu.Lock() //如果期望的response为0,则直接退出readLoop并关闭连接,此时连接上没有需要处理的数据, //关闭连接,释放系统资源。 ifpc.numExpectedResponses==0{ pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() //阻塞等待roundTrip发来的数据 rc:=<-pc.reqch trace:=httptrace.ContextClientTrace(rc.req.Context()) varresp*Response //如果有response数据,则读取并解析为Response格式 iferr==nil{ resp,err=pc.readResponse(rc,trace) }else{ //可能的错误如server端关闭,发送EOF err=transportReadFromServerError{err} closeErr=err } //底层没有接收到server的任何数据,断开该连接,可能原因是在client发出request的同时,server关闭 //了连接。参见transportReadFromServerError的注释。 iferr!=nil{ ifpc.readLimit<=0{ err=fmt.Errorf("net/http:serverresponseheadersexceeded%dbytes;aborted",pc.maxHeaderResponseSize()) } //传递错误信息给roundTrip并退出loop select{ caserc.ch<-responseAndError{err:err}: case<-rc.callerGone: return } return } pc.readLimit=maxInt64//effictivelynolimitforresponsebodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock() //判断response是否可写,在使用101SwitchingProtocol进行协议升级时需要返回一个可写的resp.body //如果使用了101SwitchingProtocol,升级完成后就与transport没有关系了(后续使用http2或websocket等) bodyWritable:=resp.bodyIsWritable() //判断response的body是否为空,如果body为空,则不必读取body内容(HEAD的resp.body没有数据) hasBody:=rc.req.Method!="HEAD"&&resp.ContentLength!=0 //如果server关闭连接或client关闭连接或非预期的响应码或使用了协议升级,这几种情况下不能在该连接上继续 //接收响应,退出readLoop ifresp.Close||rc.req.Close||resp.StatusCode<=199||bodyWritable{ //Dontdokeep-aliveonerrorifeitherpartyrequestedaclose //orwegetanunexpectedinformational(1xx)response. //StatusCode100isalreadyhandledabove. alive=false } //此处用于处理body为空或协议升级场景,会尝试将连接放回连接池,对于后者,连接由调用者管理,退出readLoop if!hasBody||bodyWritable{ pc.t.setReqCanceler(rc.req,nil) //在返回response前将连接放回连接池,快速回收利用。回收连接需要按顺序满足: //1.alive为true //2.接收到EOF错误,此时底层连接关闭,该连接不可用 //3.成功发送request; //此处的执行顺序很重要,将连接返回连接池的操作放到最后,即在协议升级的场景,服务端不再 //发送数据的场景,以及request发送失败的场景下都不会将连接放回连接池,这些情况会导致 //alive为false,readLoop退出并关闭该连接(协议升级后的连接不能关闭) alive=alive&& !pc.sawEOF&& pc.wroteRequest()&& tryPutIdleConn(trace) ifbodyWritable{ //协议升级之后还是会使用同一条连接,设置closeErr为errCallerOwnsConn,这样在readLoop //return后不会被pc.close(closeErr)关闭连接 closeErr=errCallerOwnsConn } select{ //1:将response成功返回后继续等待下一个response; //2:如果roundTrip退出,(此时无法返回给roundTripresponse)则退出readLoop。 //即roundTrip接收完response后退出不会影响readLoop继续运行 caserc.ch<-responseAndError{res:resp}: case<-rc.callerGone: return } //Nowthattheyvereadfromtheunbufferedchannel,theyresafely //outoftheselectthatalsowaitsonthisgoroutinetodie,so //wereallowedtoexitnowifneeded(ifaliveisfalse) testHookReadLoopBeforeNextRead() continue } //下面处理responsebody存在数据的场景,逻辑与body不存在数据的场景类似 waitForBodyRead:=make(chanbool,2) //初始化body的处理函数,读取完response会返回EOF,这类连接是可重用的 body:=&bodyEOFSignal{ body:resp.Body, earlyCloseFn:func()error{ waitForBodyRead<-false <-eofc//willbeclosedbydeferredcallattheendofthefunction returnnil }, fn:func(errerror)error{ isEOF:=err==io.EOF waitForBodyRead<-isEOF ifisEOF{ <-eofc//seecommentaboveeofcdeclaration }elseiferr!=nil{ ifcerr:=pc.canceled();cerr!=nil{ returncerr } } returnerr }, } //返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致 //readLoop阻塞在下面"casebodyEOF:=<-waitForBodyRead:"中 resp.Body=body ifrc.addedGzip&&strings.EqualFold(resp.Header.Get("Content-Encoding"),"gzip"){ resp.Body=&gzipReader{body:body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength=-1 resp.Uncompressed=true } //此处与处理不带resp.body的场景相同 select{ caserc.ch<-responseAndError{res:resp}: case<-rc.callerGone: return } //Beforeloopingbacktothetopofthisfunctionandpeekingon //thebufio.Reader,waitforthecallergoroutinetofinish //readingtheresponsebody.(orforcancelationordeath) select{ casebodyEOF:=<-waitForBodyRead: pc.t.setReqCanceler(rc.req,nil)//beforepcmightreturntoidlepool alive=alive&& //如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。 //注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(httpresponse.Body)关闭,一个是底层通道(TCP)关闭。 bodyEOF&& !pc.sawEOF&& pc.wroteRequest()&& tryPutIdleConn(trace) //释放阻塞的读操作 ifbodyEOF{ eofc<-struct{}{} } case<-rc.req.Cancel: alive=false pc.t.CancelRequest(rc.req) case<-rc.req.Context().Done(): alive=false pc.t.cancelRequest(rc.req,rc.req.Context().Err()) case<-pc.closech: alive=false } testHookReadLoopBeforeNextRead() } }

本文内容总结:

原文链接:https://www.cnblogs.com/charlieroro/p/11409153.html