首页 文章资讯内容详情

[译]使用golang每分钟处理百万请求

2026-06-01 3 花语

[译]使用golang每分钟处理百万请求

在Malwarebytes,我们正在经历惊人的增长,自从我在1年前加入硅谷的这家公司以来,我的主要职责是为多个系统做架构和开发,为这家安全公司的快速发展以及百万日活产品所必需的基础设施提供支持。我曾在一些不同的公司从事反病毒和反恶意软件行业超过12年,我知道这些系统最终会因为我们每天处理的大量数据而变得十分复杂。

有趣的是,在过去9年左右的时间里,我所参与的所有Web后端的开发工作大部分都是在Rails框架的基础上用ruby实现的。不要误解我的意思,虽然我很喜欢Rails框架和Ruby,我相信这是一个会让你感到惊叹的环境,但一段时间之后,你就会用ruby的方式来进行思考和设计系统,却忘记了本来可以利用多线程,并行化,快速执行和小内存开销来使你的软件架构变得如此高效和简单。作为一个多年的C/C++,Delphi和C#开发人员,我也同样开始意识到了如何在工作中使用正确的工具来降低事情的复杂度。

作为首席架构师,我不太重视对互联网所进行的语言和框架之争。我相信软件的效率(efficiency),生产力(productivity)和代码可维护性主要取决于你构建解决方案的简单程度。

问题

在构建我们的匿名检测和分析系统时,我们的目标是能够处理来自数百万个端点的的大量POST请求。Web处理程序将接收一个JSON文档,该文档可能包含了需要写入AmazonS3(注:这个是亚马逊的云计算服务平台)的许多负载(payload)的集合,以便我们的map-reduce系统稍后对这些数据进行处理。

从传统上来说,我们会考虑利用以下工具(基本都是开源的)创建一个工作层架构:

Sidekiq Resque DelayedJob ElasticbeanstalkWorkerTier RabbitMQ andsoon…

然后创建2个不同的集群,一个用于Web前端,另一个用于后台工作的处理,以扩展可以处理的后台工作的数量。

但是从一开始,我们的团队就知道我们应该使用go语言进行开发,因为在讨论阶段我们就意识到了这可能是一个吞吐量非常大的系统。我使用go大概有2年左右的时间,我们也开发了一些系统,但是没有一个系统有如此大的吞吐量。

我们开始创建了一些结构体,用于定义通过POST调用获取的网络请求负载(payload),同时定义了一个方法将这些负载上传到S3。

typePayloadCollectionstruct{ WindowsVersionstring`json:"version"` Tokenstring`json:"token"` Payloads[]Payload`json:"data"` } typePayloadstruct{ //[redacted] } func(p*Payload)UploadToS3()error{ //thestorageFoldermethodensuresthattherearenonamecollisionin //casewegetsametimestampinthekeyname storage_path:=fmt.Sprintf("%v/%v",p.storageFolder,time.Now().UnixNano()) bucket:=S3Bucket b:=new(bytes.Buffer) encodeErr:=json.NewEncoder(b).Encode(payload) ifencodeErr!=nil{ returnencodeErr } //EverythingweposttotheS3bucketshouldbemarkedprivate varacl=s3.Private varcontentType="application/octet-stream" returnbucket.PutReader(storage_path,b,int64(b.Len()),contentType,acl,s3.Options{}) }

从原生方法到Go协程

一开始我们用非常原生的方法来实现POST句柄,尝试通过使用一个简单的协程来将作业的处理并行化:

funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ ifr.Method!="POST"{ w.WriteHeader(http.StatusMethodNotAllowed) return } //Readthebodyintoastringforjsondecoding varcontent=&PayloadCollection{} err:=json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content) iferr!=nil{ w.Header().Set("Content-Type","application/json;charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } //GothrougheachpayloadandqueueitemsindividuallytobepostedtoS3 for_,payload:=rangecontent.Payloads{ gopayload.UploadToS3()//<-----DONTDOTHIS } w.WriteHeader(http.StatusOK) }

对于适当的载荷,上面的方法在大多数情况下能够工作,但是在大规模载荷的情况下,上面的方法很快被证明不能够发挥很好的作用。我们想像会有很多的请求,但是当部署第一个版本到生产环境时,没有想到会有如此的量级。我们完全低估了载荷的数量。

上述方法在几个方面都很糟糕。它没有办法控制产生的go协程数量。既然我们每分钟会收到百万请求,这段代码会很快崩溃。

再试一次

我们需要寻找别的方法。开始我们就讨论了我们需要保持请求句柄的生命周期的短暂性以及请求的处理要在后台进行。当然,这是在Rails的世界中用Ruby必须要做到的,否则会阻塞所有能用的web工作处理器,不论你是使用puma,unicorn或者passenger。然后我们需要利用常见的解决方案来做到这一点,例如Resque,Sidekiq,SQS等。当然还有很多其它方法也能做到这一点,

所以第二次迭代中我们会创建一个缓冲通道(bufferedchannel),作业可以插入到缓冲通道中并将作业的负载上传到S3,因为我们可以控制缓冲通道中元素的数量,并且我们有大量的内存将作业插入缓冲通道,我们认为这种方法是没有任何问题的。

varQueuechanPayload funcinit(){ Queue=make(chanPayload,MAX_QUEUE) } funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ ... //GothrougheachpayloadandqueueitemsindividuallytobepostedtoS3 for_,payload:=rangecontent.Payloads{ Queue<-payload } ... }

然后从缓冲通道中取出作业进行处理,像下面这样:

funcStartProcessor(){ for{ select{ casejob:=<-Queue: job.payload.UploadToS3()//<--STILLNOTGOOD } } }

说实话,我不知道我们在想什么。这个夜晚注定是用红牛度过的。这种方法并没有给我们带来任何好处,我们用缓冲队列代替了有缺陷的并发,但这只是推迟了问题。我们的同步处理器一次只向S3上传一个有效载荷(payload),由于传入请求的速率远远大于单个处理器上传到S3的能力,我们的缓冲通道很快就达到了极限并阻止了请求句柄可以添加更多作业的能力。

我们只是避免了这个问题,系统的死期最终也进入了倒计时。在我们部署这个有缺陷的版本几分钟后,延迟率会以固定的速率增加。

更好的解决方案

为了创建一个2层的channel系统,我们决定使用一个通用模式,一个用来插入作业,一个用来控制作业队列上同时运行的工作协程。

我们的想法是将并行上传稳定在一个可持续的速率,这不会削弱机器的性能,也不会产生到S3的连接错误。所以我们选择创建了一个Job/Worker模式。对于熟悉Java,C#等的人来说,想像一下如何以golang的方式用channel来实现一个worker线程池。

var( MaxWorker=os.Getenv("MAX_WORKERS") MaxQueue=os.Getenv("MAX_QUEUE") ) //Jobrepresentsthejobtoberun typeJobstruct{ PayloadPayload } //Abufferedchannelthatwecansendworkrequestson. varJobQueuechanJob //Workerrepresentstheworkerthatexecutesthejob typeWorkerstruct{ WorkerPoolchanchanJob JobChannelchanJob quit chanbool } funcNewWorker(workerPoolchanchanJob)Worker{ returnWorker{ WorkerPool:workerPool, JobChannel:make(chanJob), quit:make(chanbool)} } //Startmethodstartstherunloopfortheworker,listeningforaquitchannelin //caseweneedtostopit func(wWorker)Start(){ gofunc(){ for{ //registerthecurrentworkerintotheworkerqueue. w.WorkerPool<-w.JobChannel select{ casejob:=<-w.JobChannel: //wehavereceivedaworkrequest. iferr:=job.Payload.UploadToS3();err!=nil{ log.Errorf("ErroruploadingtoS3:%s",err.Error()) } case<-w.quit: //wehavereceivedasignaltostop return } } }() } //Stopsignalstheworkertostoplisteningforworkrequests. func(wWorker)Stop(){ gofunc(){ w.quit<-true }() }

我们修改了Web请求句柄,创建一个带有负载的Job结构体实例,并将其发送到JobQueuechannel中以供workers获取。

funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ ifr.Method!="POST"{ w.WriteHeader(http.StatusMethodNotAllowed) return } //Readthebodyintoastringforjsondecoding varcontent=&PayloadCollection{} err:=json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content) iferr!=nil{ w.Header().Set("Content-Type","application/json;charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } //GothrougheachpayloadandqueueitemsindividuallytobepostedtoS3 for_,payload:=rangecontent.Payloads{ //letscreateajobwiththepayload work:=Job{Payload:payload} //Pushtheworkontothequeue. JobQueue<-work } w.WriteHeader(http.StatusOK) }

在我们的Web服务器初始化期间,我们创建了一个Dispatcher并调用Run()来创建workspool并开始侦听即将出现在JobQueue中的作业。

dispatcher:=NewDispatcher(MaxWorker) dispatcher.Run()

下面是dispatcher的实现代码:

typeDispatcherstruct{ //Apoolofworkerschannelsthatareregisteredwiththedispatcher WorkerPoolchanchanJob } funcNewDispatcher(maxWorkersint)*Dispatcher{ pool:=make(chanchanJob,maxWorkers) return&Dispatcher{WorkerPool:pool} } func(d*Dispatcher)Run(){ //startingnnumberofworkers fori:=0;i<d.maxWorkers;i++{ worker:=NewWorker(d.pool) worker.Start() } god.dispatch() } func(d*Dispatcher)dispatch(){ for{ select{ casejob:=<-JobQueue: //ajobrequesthasbeenreceived gofunc(jobJob){ //trytoobtainaworkerjobchannelthatisavailable. //thiswillblockuntilaworkerisidle jobChannel:=<-d.WorkerPool //dispatchthejobtotheworkerjobchannel jobChannel<-job }(job) } } }

注意,我们提供了添加到workspool中的works最大数量。既然我们在工程中使用了带有容器化Go环境的AmazonElasticbeanstalk,我们就会尝试一直遵循12-factor方法论来在生产环境中配置我们的系统。因此我们会从环境变量中读取这些值。这样我们就可以控制JobQueue的work数量,调整这些值后可以快速生效而无需重新部署集群。

var( MaxWorker=os.Getenv("MAX_WORKERS") MaxQueue=os.Getenv("MAX_QUEUE") )

结论

我信奉简单致胜。我们原本设计了一个使用大量队列和后台wokers并且部署复杂的系统,但我们决定使用Elasticbeanstalk的自动扩展能力以及Golang为我们提供开箱即用的高效和简单的并发方法。

你总能为你的工作找到正确的工具。有时候在你的ruby系统中需要一个强大的web处理器,请考虑一下Ruby之外的生态系统,你可以获得更简单但更强大的替代解决方案。

原文链接: