首页 文章资讯内容详情

Mongodb源码分析--主程序入口main()

2026-06-01 4 花语

本文内容纲要:

作为这个系列的开篇,本人特此声明,因为本人技术功力有限,且对mongodb源码目前也在研究探索中,可能会对mongodb内部某些实现机制及原作者的意图领会不够精确,因此错误再所难免,希望大家批评指正。另外本文所使用的mongodb源码为1.8rc1,同时如果有条件的话,大家可以安装vs2010,用C++来编译调试mongodb源码,以便通过运行过程中的数据和流程来验证自己的判断。

VS2010C++下编译调试MongoDB源码

http://www.cnblogs.com/daizhj/archive/2011/03/07/1973764.html

好了,开始今天的正文吧。

为了理解mongodb整体的运行机制,首先我们需要对其主要运行流程有一个大概的理解,而主入口函数main无疑是最佳突破口。首先我们在VS2010中打开db.sln文件,并打开db.cpp文件,找到主入口函数(位于文件613行),如下:

intmain(intargc,char*argv[]){

staticStaticObserverstaticObserver;

getcurns=ourgetns;

po::options_descriptiongeneral_options("Generaloptions");//常规选项

#ifdefined(_WIN32)

po::options_descriptionwindows_scm_options("WindowsServiceControlManageroptions");//windows服务控制管理选项仅限windows平台

#endif

po::options_descriptionreplication_options("Replicationoptions");//Replication选项

po::options_descriptionms_options("Master/slaveoptions");//主从选项

po::options_descriptionrs_options("Replicasetoptions");//Replica设置选项

po::options_descriptionsharding_options("Shardingoptions");//数据分片选项

po::options_descriptionvisible_options("Allowedoptions");//可见选项

po::options_descriptionhidden_options("Hiddenoptions");//隐藏选项

po::positional_options_descriptionpositional_options;

general_options.add_options()

("auth","runwithsecurity")

("cpu","periodicallyshowcpuandiowaitutilization")

("dbpath",po::value(),"directoryfordatafiles")

("diaglog",po::value(),"0=off1=W2=R3=both7=W+somereads")

("directoryperdb","eachdatabasewillbestoredinaseparatedirectory")

.....

该方法的开头代码(上面)主要是绑定一个配置操作选项的说明,包括命令行模式下的参数说明,因为内容较长,这里就不做过多描述了,需要说明options_description的是这些内容被放到了boost库(一个C++开源库)的options_description对象中,其类型结构可以理解为key/value模式,主要用于记录一系列的选项描述(符)信息,以便于通过名称查询相应选项信息。同时mongodb将选项大致归为8类,如上所述。

接下说看一下其初始化时命令行参数的操作,如下:

if(argc==1)

cout<

{

po::variables_mapparams;

stringerror_message=arg_error_check(argc,argv);

if(error_message!=""){

cout<

show_help_text(visible_options);

return0;

}

if(!CmdLine::store(argc,argv,visible_options,hidden_options,positional_options,params))

return0;

上面方法对main主函数参数argc,argv及上面的那些选项实例进行存储并以此绑定到params实例上,因为接下来会通过params来设置cmdLine对象(CmdLine类型),并最终以该对象做为最终在mongodb内部标记相应启动命令参数信息的对象。形如:

if(params.count("version")){

cout<

printGitVersion();

return0;

}

if(params.count("dbpath")){

dbpath=params["dbpath"].as();

if(params.count("fork")&&dbpath[0]!=/){

//weneedtochangedbpathifweforksincewechange

//cwdto"/"

//forkonlyexistson*nix

//so/issafe

dbpath=cmdLine.cwd+"/"+dbpath;

}

}

else{

dbpath="d:/data/db/";//我在此处改了源码

}

if(params.count("directoryperdb")){

directoryperdb=true;

}

if(params.count("cpu")){

cmdLine.cpu=true;

}

......

当搜集到足够的启动信息(参数)后,mongodb开启执行下面两行代码:

Module::configAll(params);

上面用params来配置加载的模块信息,而就目前而言,mongodb中的模块有两个:其类模式和MMS模块,后者是当mongodb监视服务有效情况下,以后台线程方式(BackgroundJob)运行的程序,类定义如下:

/**MongoMonitoringService

ifenabled,thisrunsinthebackgroundandspingsmss

*/

classMMS:publicBackgroundJob,Module{

....

}

因为相关代码比较简单,这里就不多作说明了,如果大家感兴趣的话,以后会专门写一篇介绍Module,BackgroundJob的文章。

回到正文,模块实始化完成了,就会运行如下代码:

dataFileSync.go();

这里要说明的是dataFileSync类也派生自BackgroundJob类,而BackgroundJob的功能就是生成一个后台线程并执行相应任务。而当前dataFileSync的任务就是在一段时间后(cmdLine.syncdelay)将内存中的数据flash到磁盘上(因为mongodb使用mmap方式将数据先放入内存中),代码如下:

classDataFileSync:publicBackgroundJob{

......

voidrun(){

if(cmdLine.syncdelay==0)

log()<<"warning:--syncdelay0isnotrecommendedandcanhavestrangeperformance"<

elseif(cmdLine.syncdelay==1)

log()<<"--syncdelay1"<

elseif(cmdLine.syncdelay!=60)

log(1)<<"--syncdelay"<

inttime_flushing=0;

while(!inShutdown()){

flushDiagLog();

if(cmdLine.syncdelay==0){

//incaseatsomepointweaddanoptiontochangeatruntime

sleepsecs(5);

continue;

}

sleepmillis((longlong)std::max(0.0,(cmdLine.syncdelay*1000)-time_flushing));

if(inShutdown()){

//occasionalissuetryingtoflushduringshutdownwhensleepinterrupted

break;

}

Date_tstart=jsTime();

intnumFiles=MemoryMappedFile::flushAll(true);//使用系统提供的内存映射文件方法

time_flushing=(int)(jsTime()-start);

globalFlushCounters.flushed(time_flushing);

log(1)<<"flushingmmaptook"<

}

}

......

main主函数完成上面方法后,就会启动侦听方法,开始侦听客户端的链接请求,如下:

initAndListen(cmdLine.port,appsrvPath);

该侦听方法会最终调用db.cpp(467行)的如下方法,我们来看一下该方法做了些什么:

void_initAndListen(intlistenPort,constchar*appserverLoc=NULL){

首先是初始化一个名称“initandlisten”线程用于侦听客户端传来的操作信息(可能有误):

Client::initThread("initandlisten");

接着判断当前系统是32或64位系统?并获取当前进程ID并输出进程ID及数据库路径,端口信息以及当前mongodb及系统信息(这些信息也就是我们在命令行下经常看到的启动mongodb信息)

boolis32bit=sizeof(int*)==4;

{

#if!defined(_WIN32)

pid_tpid=getpid();

#else

DWORDpid=GetCurrentProcessId();

#endif

Nullstream&l=log();

l<<"MongoDBstarting:pid="<

if(replSettings.master)l<<"master="<

if(replSettings.slave)l<<"slave="<<(int)replSettings.slave;

l<<(is32bit?"32":"64")<<"-bit"<

}

DEVlog()<<"_DEBUGbuild(whichisslower)"<

show_warnings();

log()<

printGitVersion();

printSysInfo();

完成这一步之后,接下来mongodb就会对相应路径下的数据文件进行检查,如出现文件错误(文件不存在等):

stringstreamss;

ss<<"dbpath("<

uassert(10296,ss.str().c_str(),boost::filesystem::exists(dbpath));

stringstreamss;

ss<<"repairpath("<

uassert(12590,ss.str().c_str(),boost::filesystem::exists(repairpath));

同时使用"路径锁"方式来移除指定路径下的临时文件夹信息,如下:

acquirePathLock();

remove_all(dbpath+"/_tmp/");

接着,mongodb还会启动持久化功能,该功能貌似是1.7版本后引入到系统中的,主要用于解决因系统宕机时,内存中的数据未写入磁盘而造成的数据丢失。其机制主要是通过log方式定时将操作日志(如cud操作等)记录到db的journal文件夹下,这样当系统再次重启时从该文件夹下恢复丢失的(内存)数据。有关这部分内容我会专门写文章加以介绍。

dur::startup();

if(cmdLine.durOptions&CmdLine::DurRecoverOnly)

return;

注:其命令行枚举定义如下

enum{//bitstobeORed

DurDumpJournal=1,//dumpdiagnosticsonthejournalduringrecovery

DurScanOnly=2,//dontdoanyrealwork,justscananddumpifdumpspecified

DurRecoverOnly=4,//terminateafterrecoverystep

DurParanoid=8,//paranoidmodeenablesextrachecks

DurAlwaysCommit=16//doagroupcommiteverytimethewritelockisreleased

};

intdurOptions;//--durOptionsfordebugging

完成这一步之后,系统还会初始化脚本引擎,因为mongodb支持脚本语法做为其操作数据库的语言,如下:

if(scriptingEnabled){

ScriptEngine::setup();

globalScriptEngine->setCheckInterruptCallback(jsInterruptCallback);

globalScriptEngine->setGetInterruptSpecCallback(jsGetInterruptSpecCallback);

}

当这些主要工作做完之后,最后系统会调用下面方法正式启动侦听操作:

voidlisten(intport){

log()<<"waitingforconnectionsonport"<

OurListenerl(cmdLine.bind_ip,port);

l.setAsTimeTracker();

startReplication();

if(!noHttpInterface)

boost::threadweb(boost::bind(&webServerThread,newRestAdminAccess()/*takesownership*/));

#if(TESTEXHAUST)

boost::threadthr(testExhaust);

#endif

l.initAndListen();

}

注意上面的OurListener类其initAndListen()方法位于message.cpp中,因为mongodb采用message相关类来封装c/s双在的数据和操作:

voidListener::initAndListen(){

checkTicketNumbers();

vectormine=ipToAddrs(_ip.c_str(),_port);

vectorsocks;

SOCKETmaxfd=0;//neededforselect()

for(vector::iteratorit=mine.begin(),end=mine.end();it!=end;++it){

SockAddr&me=*it;

SOCKETsock=::socket(me.getType(),SOCK_STREAM,0);

if(sock==INVALID_SOCKET){

log()<<"ERROR:listen():invalidsocket?"<

}

if(me.getType()==AF_UNIX){

#if!defined(_WIN32)

if(unlink(me.getAddr().c_str())==-1){

intx=errno;

if(x!=ENOENT){

log()<<"couldntunlinksocketfile"<

continue;

}

}

#endif

}

elseif(me.getType()==AF_INET6){

//IPv6canalsoacceptIPv4connectionsasmappedaddresses(::ffff:127.0.0.1)

//ThatcausesaconflictifwedontdosetittoIPV6_ONLY

constintone=1;

setsockopt(sock,IPPROTO_IPV6,IPV6_V6ONLY,(constchar*)&one,sizeof(one));

}

prebindOptions(sock);

if(::bind(sock,me.raw(),me.addressSize)!=0){

intx=errno;

log()<<"listen():bind()failed"<

if(x==EADDRINUSE)

log()<<"addralreadyinuse"<

closesocket(sock);

return;

}

#if!defined(_WIN32)

if(me.getType()==AF_UNIX){

if(chmod(me.getAddr().c_str(),0777)==-1){

log()<<"couldntchmodsocketfile"<

}

ListeningSockets::get()->addPath(me.getAddr());

}

#endif

if(::listen(sock,128)!=0){

log()<<"listen():listen()failed"<

closesocket(sock);

return;

}

ListeningSockets::get()->add(sock);

socks.push_back(sock);

if(sock>maxfd)

maxfd=sock;

}

staticlongconnNumber=0;

structtimevalmaxSelectTime;

while(!inShutdown()){

fd_setfds[1];

FD_ZERO(fds);

for(vector::iteratorit=socks.begin(),end=socks.end();it!=end;++it){

FD_SET(*it,fds);

}

maxSelectTime.tv_sec=0;

maxSelectTime.tv_usec=10000;

constintret=select(maxfd+1,fds,NULL,NULL,&maxSelectTime);

if(ret==0){

#ifdefined(__linux__)

_elapsedTime+=(10000-maxSelectTime.tv_usec)/1000;

#else

_elapsedTime+=10;

#endif

continue;

}

_elapsedTime+=ret;//assume1mstograbconnection.veryrough

if(ret<0){

intx=errno;

#ifdefEINTR

if(x==EINTR){

log()<<"select()signalcaught,continuing"<

continue;

}

#endif

if(!inShutdown())

log()<<"select()failure:ret="<

return;

}

for(vector::iteratorit=socks.begin(),end=socks.end();it!=end;++it){

if(!(FD_ISSET(*it,fds)))

continue;

SockAddrfrom;

ints=accept(*it,from.raw(),&from.addressSize);

if(s<0){

intx=errno;//sonoglobalissues

if(x==ECONNABORTED||x==EBADF){

log()<<"Listeneronport"<<_port<<"aborted"<

return;

}

if(x==0&&inShutdown()){

return;//socketclosed

}

if(!inShutdown())

log()<<"Listener:accept()returns"<

continue;

}

if(from.getType()!=AF_UNIX)

disableNagle(s);

if(_logConnect&&!cmdLine.quiet)

log()<<"connectionacceptedfrom"<

accepted(s,from);

}

}

}

上面方法基本上就是一个无限循环(while(!inShutdown()))的侦听服务端,它调用操作系统的底层socketapi接口,并将侦听到的结果使用accepted()方法进行接收。这里要注意的是因为最终我们使用的是OurListener进行的侦听,所以最终系统会调用OurListener所实现的虚(virtual)方法,如下:

classOurListener:publicListener{

public:

OurListener(conststring&ip,intp):Listener(ip,p){}

virtualvoidaccepted(MessagingPort*mp){

if(!connTicketHolder.tryAcquire()){

log()<<"connectionrefusedbecausetoomanyopenconnections:"<

//TODO:wouldbeniceifwenotifiedthem...

mp->shutdown();

deletemp;

return;

}

try{

boost::threadthr(boost::bind(&connThread,mp));

}

catch(boost::thread_resource_error&){

log()<<"cantcreatenewthread,closingconnection"<

mp->shutdown();

deletemp;

}

catch(...){

log()<<"unkonwnexceptionstartingconnThread"<

mp->shutdown();

deletemp;

}

}

};

上面方法中的try{}语句中包含的是boost库中的thread方法,其主要提供了跨操作系统的线程创建方式及相关并行操作(相关信息参数boost官方网站),我们这里只要知道,通过该语句,我们最终用一个线程来运行connThread方法及其所需参数mp即可。下面看一下connThread方法的代码:

voidconnThread(MessagingPort*inPort){

TicketHolderReleaserconnTicketReleaser(&connTicketHolder);

/*todo:movetoClientobject*/

LastError*le=newLastError();

lastError.reset(le);

inPort->_logLevel=1;

auto_ptrdbMsgPort(inPort);

Client&c=Client::initThread("conn",inPort);

try{

c.getAuthenticationInfo()->isLocalHost=dbMsgPort->farEnd.isLocalHost();

Messagem;

while(1){

inPort->clearCounters();

if(!dbMsgPort->recv(m)){

if(!cmdLine.quiet)

log()<<"endconnection"<farEnd.toString()<

dbMsgPort->shutdown();

break;

}

sendmore:

if(inShutdown()){

log()<<"gotrequestaftershutdown()"<

break;

}

lastError.startRequest(m,le);

DbResponsedbresponse;

assembleResponse(m,dbresponse,dbMsgPort->farEnd);

if(dbresponse.response){

dbMsgPort->reply(m,*dbresponse.response,dbresponse.responseTo);

if(dbresponse.exhaust){

...出现问题时

}

}

networkCounter.hit(inPort->getBytesIn(),inPort->getBytesOut());

m.reset();

}

}

......

//threadending...

{

Client*c=currentClient.get();

if(c)c->shutdown();

}

globalScriptEngine->threadDone();

}

上面代码主要工作就是不断循环[while(1)]获取当前客户端发来的信息(上面已封装成了message)并将其信息进行分析,并根据相应操作标志位确定当前操作是CRUD或构建索引等[assembleResponse()],如果一些正常,则向客户端发送应答信息:

voidconnThread(MessagingPort*inPort){

TicketHolderReleaserconnTicketReleaser(&connTicketHolder);

/*todo:movetoClientobject*/

LastError*le=newLastError();

lastError.reset(le);

inPort->_logLevel=1;

auto_ptrdbMsgPort(inPort);

Client&c=Client::initThread("conn",inPort);

try{

c.getAuthenticationInfo()->isLocalHost=dbMsgPort->farEnd.isLocalHost();

Messagem;

while(1){

inPort->clearCounters();

if(!dbMsgPort->recv(m)){

if(!cmdLine.quiet)

log()<<"endconnection"<farEnd.toString()<

dbMsgPort->shutdown();

break;

}

sendmore:

if(inShutdown()){

log()<<"gotrequestaftershutdown()"<

break;

}

lastError.startRequest(m,le);

DbResponsedbresponse;

assembleResponse(m,dbresponse,dbMsgPort->farEnd);

if(dbresponse.response){

dbMsgPort->reply(m,*dbresponse.response,dbresponse.responseTo);

if(dbresponse.exhaust){

...出现问题时

}

}

networkCounter.hit(inPort->getBytesIn(),inPort->getBytesOut());

m.reset();

}

}

......

//threadending...

{

Client*c=currentClient.get();

if(c)c->shutdown();

}

globalScriptEngine->threadDone();

}

运行到这里,main函数的使命就完成了,本来想用一张时序图来大致回顾一下,只有等有时间再补充了。

好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍客户端发起查询操作时,Mongodb的执行流程和运行机制。

原文链接:http://www.cnblogs.com/daizhj/archive/2011/03/17/1987311.html

作者:daizhj,代震军

微博:http://t.sina.com.cn/daizhj

Tags:mongodb,c++,sourcecode

Tags:mongodb,c++,sourcecode

本文内容总结:

原文链接:https://www.cnblogs.com/daizhj/archive/2011/03/17/1987311.html