首页 文章资讯内容详情

Golang 任务队列策略 -- 读《JOB QUEUES IN GO》

2026-06-01 3 花语

本文内容纲要:

-一种"非任务队列"的任务队列 -最简单的任务队列 -限流 -关闭工作者 -等待woker执行完成 -超时设置 -CancelWorker -总结

Golang在异步处理上有着上佳的表现。因为goroutines和channels是非常容易使用且有效的异步处理手段。下面我们一起来看一看Golang的简易任务队列

一种"非任务队列"的任务队列

有些时候,我们需要做异步处理但是并不需要一个任务对列,这类问题我们使用Golang可以非常简单的实现。如下:

goprocess(job)

这的确是很多场景下的绝佳选择,比如操作一个HTTP请求等待结果。然而,在一些相对复杂高并发的场景下,你就不能简单的使用该方法来实现异步处理。这时候,你需要一个队列来管理需要处理的任务,并且按照一定的顺序来处理这些任务。

最简单的任务队列

接下来看一个最简单的任务队列和工作者模型。

funcworker(jobChan<-chanJob){ forjob:=rangejobChan{ process(job) } } //makeachannelwithacapacityof100. jobChan:=make(chanJob,100) //starttheworker goworker(jobChan) //enqueueajob jobChan<-job

代码中创建了一个Job对象的channel,容量为100。然后开启一个工作者协程从channel中去除任务并执行。任务的入队操作就是将一个Job对象放入任务channel中。

虽然上面只有短短的几行代码,却完成了很多的工作。我们实现了一个简易的线程安全的、支持并发的、可靠的任务队列。

限流

上面的例子中,我们初始化了一个容量为100的任务channel。

//makeachannelwithacapacityof100. jobChan:=make(chanJob,100)

这意味着任务的入队操作十分简单,如下:

//enqueueajob jobChan<-job

这样一来,当jobchannel中已经放入100个任务的时候,入队操作将会阻塞,直至有任务被工作者处理完成。这通常不是一个好的现象,因为我们通常不希望程序出现阻塞等待。这时候,我们通常希望有一个超时机制来告诉服务调用方,当前服务忙,稍后重试。我之前的博文--我读《通过Go来处理每分钟达百万的数据请求》介绍过类似的限流策略。这里方法类似,就是当队列满的时候,返回503,告诉调用方服务忙。代码如下:

//TryEnqueuetriestoenqueueajobtothegivenjobchannel.Returnstrueif //theoperationwassuccessful,andfalseifenqueuingwouldnothavebeen //possiblewithoutblocking.Jobisnotenqueuedinthelattercase. funcTryEnqueue(jobJob,jobChan<-chanJob)bool{ select{ casejobChan<-job: returntrue default: returnfalse } }

这样一来,我们尝试入队的时候,如果入队失败,放回一个false,这样我们再对这个返回值处理如下:

if!TryEnqueue(job,chan){ http.Error(w,"maxcapacityreached",503) return }

这样就简单的实现了限流操作。当jobChan满的时候,程序会走到default返回false,从而告知调用方当前的服务器情况。

关闭工作者

到上面的步骤,限流已经可以解决,那么我们接下来考虑,怎么才能优雅的关闭工作者?假设我们决定不再向任务队列插入任务,我们希望让所有的已入队任务执行完成,我们可以非常简单的实现:

close(jobChan)

没错,就是这一行代码,我们就可以让任务队列不再接收新任务(仍然可以从channel读取job),如果我们想执行队列里的已经存在的任务,只需要:

forjob:=rangejobChan{...}

所有已经入队的job会正常被woker取走执行。但是,这样实际上还存在一个问题,就是主协成不会等待工作者执行完工作就会退出。它不知道工作者协成什么时候能够处理完以上的任务。可以运行的例子如下:

packagemain import( "fmt" ) varjobChanchanint funcworker(jobChan<-chanint){ forjob:=rangejobChan{ fmt.Printf("执行任务%d\n",job) } } funcmain(){ jobChan=make(chanint,100) //入队 fori:=1;i<=10;i++{ jobChan<-i } close(jobChan) goworker(jobChan) }

运行发现,woker无法保证执行完channel中的job就退出了。那我们怎么解决这个问题?

等待woker执行完成

使用sysc.WaitGroup:

packagemain import( "fmt" "sync" ) varjobChanchanint varwgsync.WaitGroup funcworker(jobChan<-chanint){ deferwg.Done() forjob:=rangejobChan{ fmt.Printf("执行任务%d\n",job) } } funcmain(){ jobChan=make(chanint,100) //入队 fori:=1;i<=10;i++{ jobChan<-i } wg.Add(1) close(jobChan) goworker(jobChan) wg.Wait() }

使用这种协程间同步的方法,协成会等待worker执行完job才会退出。运行结果:

执行任务1 执行任务2 执行任务3 执行任务4 执行任务5 执行任务6 执行任务7 执行任务8 执行任务9 执行任务10 Processfinishedwithexitcode0

这样是完美的么?在设计功能的时候,为了防止协程假死,我们应该给协程设置一个超时。

超时设置

上面的例子中wg.Wait()会一直等待,直到wg.Done()被调用。但是如果这个操作假死,无法调用,将永远等待。这是我们不希望看到的,因此,我们可以给他设置一个超时时间。方法如下:

packagemain import( "fmt" "sync" "time" ) varjobChanchanint varwgsync.WaitGroup funcworker(jobChan<-chanint){ deferwg.Done() forjob:=rangejobChan{ fmt.Printf("执行任务%d\n",job) time.Sleep(1*time.Second) } } funcmain(){ jobChan=make(chanint,100) //入队 fori:=1;i<=10;i++{ jobChan<-i } wg.Add(1) close(jobChan) goworker(jobChan) res:=WaitTimeout(&wg,5*time.Second) ifres{ fmt.Println("执行完成退出") }else{ fmt.Println("执行超时退出") } } //超时机制 funcWaitTimeout(wg*sync.WaitGroup,timeouttime.Duration)bool{ ch:=make(chanstruct{}) gofunc(){ wg.Wait() close(ch) }() select{ case<-ch: returntrue case<-time.After(timeout): returnfalse } }

执行结果如下:

执行任务1 执行任务2 执行任务3 执行任务4 执行任务5 执行超时退出 Processfinishedwithexitcode0

这样,5s超时生效,虽然不是所有的任务被执行,由于超时,也会退出。

有时候我们希望woker丢弃在执行的工作,也就是cancel操作,怎么处理?

CancelWorker

我们可以借助context.Context实现。如下:

packagemain import( "context" "fmt" "sync" "time" ) varjobChanchanint varctxcontext.Context varcancelcontext.CancelFunc funcworker(jobChan<-chanint,ctxcontext.Context){ for{ select{ case<-ctx.Done(): return casejob:=<-jobChan: fmt.Printf("执行任务%d\n",job) time.Sleep(1*time.Second) } } } funcmain(){ jobChan=make(chanint,100) //带有取消功能的contex ctx,cancel=context.WithCancel(context.Background()) //入队 fori:=1;i<=10;i++{ jobChan<-i } close(jobChan) goworker(jobChan,ctx) time.Sleep(2*time.Second) //調用cancel cancel() }

結果如下:

执行任务1 执行任务2 Processfinishedwithexitcode0

可以看出,我们等待2s后,我们主动调用了取消操作,woker协程主动退出。

这是借助context包实现了取消操作,实质上也是监听一个channel的操作,那我们有没有可能不借助context实现取消操作呢?

不使用context的超时机制实现取消:

packagemain import( "fmt" "time" ) varjobChanchanint funcworker(jobChan<-chanint,cancelChan<-chanstruct{}){ for{ select{ case<-cancelChan: return casejob:=<-jobChan: fmt.Printf("执行任务%d\n",job) time.Sleep(1*time.Second) } } } funcmain(){ jobChan=make(chanint,100) //通过chan取消操作 cancelChan:=make(chanstruct{}) //入队 fori:=1;i<=10;i++{ jobChan<-i } close(jobChan) goworker(jobChan,cancelChan) time.Sleep(2*time.Second) //关闭chan close(cancelChan) }

这样,我们使用一个关闭chan的信号实现了取消操作。原因是无缓冲chan读取会阻塞,当关闭后,可以读取到空,因此会执行select里的return.

总结

照例总结一波,本文介绍了golang协程间的同步和通信的一些方法,任务队列的最简单实现。关于工作者池的实现,我在其他博文也写到了,这里不多写。本文更多是工具性的代码,写功能时候可以借用,比如超时、取消、chan的操作等。

本文内容总结:一种"非任务队列"的任务队列,最简单的任务队列,限流,关闭工作者,等待woker执行完成,超时设置,CancelWorker,总结,

原文链接:https://www.cnblogs.com/artong0416/p/7883381.html