首页 文章资讯内容详情

golang 管道

2026-06-01 4 花语

本文内容纲要:

2.管道

简介

Golang的原子并发特性使得它很容易构造流数据管道,这使得Golang可有效的使用I/O和多CPU特性。本文提出一些关于管道的示

例,在这个过程中突出了操作失败的微妙之处和介绍处理失败的具体技术。

什么是管道

在Golang对于管道没有明确的定义;它只是许多种并发程序中的一种。管道是通道连接的一系列阶段,每个阶段是一组

goroutine运行相同的功能。在每个阶段,goroutine运行步骤为:

从上游经过入境通道接受值

对数据执行一些功能操作,通常会产生新的值

从下游经过出境通道发送值

除了开始和最后阶段只有一个入境通道或者一个出境通道外,其他每个阶段有任意数量的入境通道和出境通道,。开始阶段有时

又称为源或者生产者;最后一个阶段又称为sink或者消费者。

我们将开始一个简单的示例来解释管道的思想和技术。稍后,我们将展示更多相关的例子。

平方数

一个通道有三个阶段。

第一阶段:gen,以从列表读出整数的方式转换整数列表到一个通道。gen函数开始goroutine后,在通道上发送整数并且在在所

有的值被发送完后将通道关闭:

funcgen(nums...int)<-chanint{

out:=make(chanint)

gofunc(){

for_,n:=rangenums{

out<-n

}

close(out)

}()

returnout

}

第二阶段:sq,从通道接受整数,然后将接受到的每个整数值的平方后返回到一个通道。在入境通道关闭和发送所有下行值的阶

段结束后,关闭出口通道:

funcsq(in<-chanint)<-chanint{

out:=make(chanint)

gofunc(){

forn:=rangein{

out<-n*n

}

close(out)

}()

returnout

}

main函数建立了通道并运行最后一个阶段:它接受来自第二阶段的值并打印出每个值,直到通道关闭:

funcmain(){

//Setupthepipeline.

c:=gen(2,3)

out:=sq(c)

//Consumetheoutput.

fmt.Println(<-out)//4

fmt.Println(<-out)//9

}

由于sq有相同类型的入境和出境通道,我们可以写任意次。我们也可以重写main函数,像其他阶段一样做一系列循环:

funcmain(){

//Setupthepipelineandconsumetheoutput.

forn:=rangesq(sq(gen(2,3))){

fmt.Println(n)//16then81

}

}



扇出,扇入

扇出(fan-out):多个函数能从相同的通道中读数据,直到通道关闭;这提供了一种在一组“人员”中分发任务的方式,使得

CPU和I/O的并行处理.

扇入(fan-in):一个函数能从多个输入中读取并处理数据,而这多个输入通道映射到一个单通道,该单通道随着所有输入的结

束而关闭。

我们可以改变通道去运行两个sq实例,每个实例从相同的输入通道读取数据。我们引入了一个新函数merge去扇入结果:

funcmain(){

in:=gen(2,3)

//Distributethesqworkacrosstwogoroutinesthatbothreadfromin.

c1:=sq(in)

c2:=sq(in)

//Consumethemergedoutputfromc1andc2.

forn:=rangemerge(c1,c2){

fmt.Println(n)//4then9,or9then4

}

}

merge函数通过为每一个入境通道开启一个goroutine去复制数值到唯一的出境通道,从而实现了转换通道列表到一个单通道。一

旦所有的outputgoroutine启动,所有在通道上的发送完成后merge函数开启一个以上的goroutine用于关闭出境通道。

在一个关闭的通道上发送没有意义,所以在关闭之前确保所有的发送完成是重要的。sync.WaitGroup类型提供了一个简单的方法

去组织同步:

funcmerge(cs...<-chanint)<-chanint{

varwgsync.WaitGroup

out:=make(chanint)

//Startanoutputgoroutineforeachinputchannelincs.output

//copiesvaluesfromctooutuntilcisclosed,thencallswg.Done.

output:=func(c<-chanint){

forn:=rangec{

out<-n

}

wg.Done()

}

wg.Add(len(cs))

for_,c:=rangecs{

gooutput(c)

}

//Startagoroutinetocloseoutoncealltheoutputgoroutinesare

//done.Thismuststartafterthewg.Addcall.

gofunc(){

wg.Wait()

close(out)

}()

returnout

}

短暂停止

管道函数模型:

当所有的发送操作结束后,阶段关闭他们的出境通道。

阶段持续接收来自入境通道的值,直到那些通道关闭。

这个模型允许每一个接收阶段通过range循环的写数据,确保一旦所有向下游发送的值发送成功,所有的goroutine退出。

但在一个真实的管道上,阶段并不总是接收所有的入境值。有时设计是这样的:接收者可能只需要一个子集值就能取得进展。更

多时候是一个阶段早早的退出,因为一个入境值代表一个早期阶段的错误。在这两种情况下接收者不应该等待剩余的值到达,我

们想要早期阶段停止产生后续阶段不需要的值。

在我们的示例中,如果一个阶段不能处理所有的入境值,那么试图发送这些值得goroutine将无限期的阻塞:

//Consumethefirstvaluefromoutput.

out:=merge(c1,c2)

fmt.Println(<-out)//4or9

return

//Sincewedidntreceivethesecondvaluefromout,

//oneoftheoutputgoroutinesishungattemptingtosendit.

}

这是一个资源锁:goroutine消耗内存和运行资源,并且在goroutine栈中的堆引用防止数据被回收。Goroutine不能垃圾回收;它

们必须自己退出。

当下游阶段在接收所有的入境值失败后,我们需要安排管道的上游阶段退出。一种实现方法是将出境通道改为一个缓冲区。该缓

冲区能保存固定数量的值;如果缓冲区有空闲就立即发送操作完成信号:

c:=make(chanint,2)//buffersize2

c<-1//succeedsimmediately

c<-2//succeedsimmediately

c<-3//blocksuntilanothergoroutinedoes<-candreceives1



当在通道创建就预先知道待发送的数值个数时,通过使用缓冲区可以简化代码。例如,我们可以重写gen来将整数列表复制到

带有缓冲区的通道中,也可以避免创建一个新的goroutine:

funcgen(nums...int)<-chanint{

out:=make(chanint,len(nums))

for_,n:=rangenums{

out<-n

}

close(out)

returnout

}

回到管道中处于阻塞状态的goroutine,我们可以考虑为merge返回的出境通道增加一个缓冲区:

funcmerge(cs...<-chanint)<-chanint{

varwgsync.WaitGroup

out:=make(chanint,1)//enoughspacefortheunreadinputs

//...therestisunchanged...

尽管它修正了程序中goroutine的阻塞问题,但却不能称为好代码。在这里,缓冲区大小选取为1,取决于预知merge将会接

收的数值个数及下游各阶段将会消费的数值个数。这很脆弱:如果我们给gen多传了一个数值,或者下游阶段少读了一些数值,

goroute的阻塞问题会再次出现。

作为代替,我们需要为下游各阶段提供一种手段,来向发送方表明指明它们将停止接收数据的输入。

显式取消

当main没有接受完out所有的值就决定退出时,它必须告知上游状态(upstreamstage)的goroutines,让它丢弃正在发送中的数据

。通过在一个叫做done的channel上发送数据,即可实现。例子里有两个受阻的发送方,所以发送的值有两组:

funcmain(){

in:=gen(2,3)

//Distributethesqworkacrosstwogoroutinesthatbothreadfromin.

c1:=sq(in)

c2:=sq(in)

//Consumethefirstvaluefromoutput.

done:=make(chanstruct{},2)

out:=merge(done,c1,c2)

fmt.Println(<-out)//4or9

//Telltheremainingsenderswereleaving.

done<-struct{}{}

done<-struct{}{}

}

使用select语句,让发送中的goroutines取代了发送操作。这条语句既可以处理在发送out的情形,也可以处理从done中接受一个

值的情况。done的值类型是空结构,因为它的数值并不重要:它是一个接受事件,表明out的发送应该被丢弃。outputgoroutines

继续在channelc内循环运行,而不会阻塞上游状态(upstreamstage):

funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{

varwgsync.WaitGroup

out:=make(chanint)

//Startanoutputgoroutineforeachinputchannelincs.output

//copiesvaluesfromctooutuntilcisclosedoritreceivesavalue

//fromdone,thenoutputcallswg.Done.

output:=func(c<-chanint){

forn:=rangec{

select{

caseout<-n:

case<-done:

}

}

wg.Done()

}

//...therestisunchanged...

但是这种方法有个问题:下游的接收者需要知道潜在会被阻塞的上游发送者的数量。追踪这些数量不仅枯燥,还容易出错。

我们要有一个方法告知一个未知的、无限数量的go程序向下游发送它们的值。在GO里面我们通过关闭一个通道来实现,因为一个

在已关闭通道上的接收操作总能立即执行,并返回该元素类型的零值。

这意味着main函数只需关闭“done”通道就能开启所有发送者。close实际上是传给发送者的一个广播信号。我们扩展每一个管道

函数接收“done”参数并通过一个“defer”语句触发“close”,这样所有来自main的返回路径都会以信号通知管道退出。

funcmain(){

//Setupadonechannelthatssharedbythewholepipeline,

//andclosethatchannelwhenthispipelineexits,asasignal

//forallthegoroutineswestartedtoexit.

done:=make(chanstruct{})

deferclose(done)

in:=gen(done,2,3)

//Distributethesqworkacrosstwogoroutinesthatbothreadfromin.

c1:=sq(done,in)

c2:=sq(done,in)

//Consumethefirstvaluefromoutput.

out:=merge(done,c1,c2)

fmt.Println(<-out)//4or9

//donewillbeclosedbythedeferredcall.

}

管道里的每个状态现在都可以随意的提早退出了:sq可以在它的循环中退出,因为我们知道如果done已经被关闭了,也会关闭上

游的gen状态。sq通过defer语句,保证不管从哪个返回路径,它的outchannel都会被关闭。

funcsq(done<-chanstruct{},in<-chanint)<-chanint{

out:=make(chanint)

gofunc(){

deferclose(out)

forn:=rangein{

select{

caseout<-n*n:

case<-done:

return

}

}

}()

returnout

}

下面列出了构建管道的指南:

状态会在所有发送操作做完后,关闭它们的流出channel

状态会持续接收从流入channel输入的数值,直到channel关闭或者其发送者被释放。

管道要么保证足够能存下所有发送数据的缓冲区,要么接收来自接收者明确的要放弃channel的信号,来保证释放发送者。



让我们考虑一个更真实的管道情况。

MD5摘要生成算法在校验文件时非常有用。命令行工具md5sum会生成文件的摘要的列表。

%md5sum*.go

d47c2bbc28298ca9befdfbc5d3aa4e65bounded.go

ee869afd31f83cbb2d10ee81b2b831dcparallel.go

b88175e65fdcbc01ac08aaf1fd9b5e96serial.go

我们的样例程序就像md5sum差不多,不一样的是它会以文件夹作为参数,并打印每个文件夹内文件的索引值,按路径名排序。

%gorunserial.go.

d47c2bbc28298ca9befdfbc5d3aa4e65bounded.go

ee869afd31f83cbb2d10ee81b2b831dcparallel.go

b88175e65fdcbc01ac08aaf1fd9b5e96serial.go

程序的主函数调用辅助函数MD5ALL,它会返回一个map,将路径名对应到索引值,之后排序并打印结果。

funcmain(){

//CalculatetheMD5sumofallfilesunderthespecifieddirectory,

//thenprinttheresultssortedbypathname.

m,err:=MD5All(os.Args[1])

iferr!=nil{

fmt.Println(err)

return

}

varpaths[]string

forpath:=rangem{

paths=append(paths,path)

}

sort.Strings(paths)

for_,path:=rangepaths{

fmt.Printf("%x%s\n",m[path],path)

}

}

MD5ALL函数是我们要讨论的。在serial.go中,它的实现没有使用并发,只是简单地遍历文件树,读取文件并生成摘要。

//MD5Allreadsallthefilesinthefiletreerootedatrootandreturnsamap

//fromfilepathtotheMD5sumofthefilescontents.Ifthedirectorywalk

//failsoranyreadoperationfails,MD5Allreturnsanerror.

funcMD5All(rootstring)(map[string][md5.Size]byte,error){

m:=make(map[string][md5.Size]byte)

err:=filepath.Walk(root,func(pathstring,infoos.FileInfo,errerror)error{

iferr!=nil{

returnerr

}

ifinfo.IsDir(){

returnnil

}

data,err:=ioutil.ReadFile(path)

iferr!=nil{

returnerr

}

m[path]=md5.Sum(data)

returnnil

})

iferr!=nil{

returnnil,err

}

returnm,nil

}



I在parallel.go里,我们把MD5All分解为两个状态的管道。第一个状态,sumFiles,遍历目录,在一个新的Goroutine里对每个

文件做摘要,并把结果发送到类型为result的channel:

typeresultstruct{

pathstring

sum[md5.Size]byte

errerror

}

sumFiles返回两个channel:一个用来传递result,另一个用来返回filepath.Walk的错误。遍历函数启动一个新的Goroutine

来处理每个常规文件,之后检查done。如果done已经被关闭了,遍历就立刻停止:

funcsumFiles(done<-chanstruct{},rootstring)(<-chanresult,<-chanerror){

//Foreachregularfile,startagoroutinethatsumsthefileandsends

//theresultonc.Sendtheresultofthewalkonerrc.

c:=make(chanresult)

errc:=make(chanerror,1)

gofunc(){

varwgsync.WaitGroup

err:=filepath.Walk(root,func(pathstring,infoos.FileInfo,errerror)error{

iferr!=nil{

returnerr

}

ifinfo.IsDir(){

returnnil

}

wg.Add(1)

gofunc(){

data,err:=ioutil.ReadFile(path)

select{

casec<-result{path,md5.Sum(data),err}:

case<-done:

}

wg.Done()

}()

//Abortthewalkifdoneisclosed.

select{

case<-done:

returnerrors.New("walkcanceled")

default:

returnnil

}

})

//Walkhasreturned,soallcallstowg.Addaredone.Starta

//goroutinetocloseconceallthesendsaredone.

gofunc(){

wg.Wait()

close(c)

}()

//Noselectneededhere,sinceerrcisbuffered.

errc<-err

}()

returnc,errc

}

MD5All从c接收所有的摘要值。MD5All返回早先的错误,通过defer关闭done:

funcMD5All(rootstring)(map[string][md5.Size]byte,error){

//MD5Allclosesthedonechannelwhenitreturns;itmaydosobefore

//receivingallthevaluesfromcanderrc.

done:=make(chanstruct{})

deferclose(done)

c,errc:=sumFiles(done,root)

m:=make(map[string][md5.Size]byte)

forr:=rangec{

ifr.err!=nil{

returnnil,r.err

}

m[r.path]=r.sum

}

iferr:=<-errc;err!=nil{

returnnil,err

}

returnm,nil

}



受限的并发

在parallel.go里实现的MD5All对每个文件启动一个新的Goroutine。如果目录里含有很多大文件,这可能会导致申请大量内存,

超出机器上的可用内存。

我们可以通过控制并行读取的文件数量来限制内存的申请。在bounded.go,我们创建固定数量的用于读取文件的Goroutine,来

限制内存使用。现在整个管道有三个状态:遍历树,读取并对文件做摘要,收集摘要值。

第一个状态,walkFiles,发送树里的每个常规文件的路径:

funcwalkFiles(done<-chanstruct{},rootstring)(<-chanstring,<-chanerror){

paths:=make(chanstring)

errc:=make(chanerror,1)

gofunc(){

//ClosethepathschannelafterWalkreturns.

deferclose(paths)

//Noselectneededforthissend,sinceerrcisbuffered.

errc<-filepath.Walk(root,func(pathstring,infoos.FileInfo,errerror)error{

iferr!=nil{

returnerr

}

ifinfo.IsDir(){

returnnil

}

select{

casepaths<-path:

case<-done:

returnerrors.New("walkcanceled")

}

returnnil

})

}()

returnpaths,errc

}

中间的状态启动固定数量的digesterGoroutine,从paths接收文件名,并将结果result发送到channelc:

funcdigester(done<-chanstruct{},paths<-chanstring,cchan<-result){

forpath:=rangepaths{

data,err:=ioutil.ReadFile(path)

select{

casec<-result{path,md5.Sum(data),err}:

case<-done:

return

}

}

}



不像之前的例子,digester并不关闭输出channel,因为多个Goroutine会发送到共享的channel。另一边,MD5All中的代码会

在所有digester完成后关闭channel:

//Startafixednumberofgoroutinestoreadanddigestfiles.

c:=make(chanresult)

varwgsync.WaitGroup

constnumDigesters=20

wg.Add(numDigesters)

fori:=0;i

gofunc(){

digester(done,paths,c)

wg.Done()

}()

}

gofunc(){

wg.Wait()

close(c)

}()

我们也可以让每个digester创建并返回自己的输出channel,但是这就需要一个单独的Goroutine来扇入所有结果。

最终从c收集到所有结果result,并检查从errc传入的错误。这个错误的检查不能提早,因为在这个时间点之前,walkFiles可能

会因为正在发送消息给下游而阻塞:

m:=make(map[string][md5.Size]byte)

forr:=rangec{

ifr.err!=nil{

returnnil,r.err

}

m[r.path]=r.sum

}

//CheckwhethertheWalkfailed.

iferr:=<-errc;err!=nil{

returnnil,err

}

returnm,nil

}

这篇文章展示了使用Go构建流数据管道的技术。要慎重处理这种管道产生的错误,因为管道里的每个状态都可能因为向下游发送

数值而阻塞,而下游的状态却不再关心输入的数据。我们展示了如何将关闭channel作为“完成”信号广播给所有由管道启动的

Goroutine,并且定义了正确构建管道的指南。

进一步阅读:

Go并发模式(视频)展示了Go的并发特性的基础知识,并演示了应用这些知识的方法。

高级Go并发模式(视频)覆盖了关于Go特性更复杂的使用场景,尤其是select。

DouglasMcIlroy的论文《一窥级数数列》展示了Go使用的这类并发技术是如何优雅地支持复杂计算。

本文内容总结:

原文链接:https://www.cnblogs.com/zhangym/p/6249603.html