首页 文章资讯内容详情

Mongodb源码分析--日志及持久化

2026-05-31 4 花语

本文内容纲要:

在本系列的第一篇文章(主函数入口)中,介绍了mongodb会在系统启动同时,初始化了日志持久化服务,该功能貌似是1.7版本后引入到系统中的,主要用于解决因系统宕机时,内存中的数据未写入磁盘而造成的数据丢失。其机制主要是通过log方式定时将操作日志(如cud操作等)记录到db的journal文件夹下,这样当系统再次重启时从该文件夹下恢复丢失的(内存)数据。也就是在_initAndListen()函数体(db.cpp文件第511行)中下面这一行代码:

dur::startup();

今天就以这个函数为起点,看一下mongodb的日志持久化的流程,及实现方式。

在Mongodb中,提供持久化的类一般都以dur开头,比如下面几个:

dur.cpp:封装持久化主要方法和实现,以便外部使用

dur_commitjob.cpp:持久化任务工作(单元),封装延时队列TaskQueue,操作集合vector

dur_journal.cpp:提供日志文件/路径,创建,遍历等操作

dur_journalformat.h:日志文件格式定义

dur_preplogbuffer.cpp:构造用于输出的日志buffer

dur_recover.h:日志恢复类(后台任务方式BackgroupJob)

dur_stats.h:统计类,包括提交/同步数据次数等

dur_writetodatafiles.cpp:封装写入数据文件mongofile方法

durop.h:持久化操作类,提供序列化,创建操作(FileCreatedOp),DROP操作(DropDbOp)

首先我们看一下dur::startup()方法实现(dur.cpp),如下:

/**atstartup,recover,andthenstartthejournalthreads*/

voidstartup(){

if(!cmdLine.dur)/*判断命令行启动参数是否为持久化*/

return;

DurableInterface::enableDurability();//对持久化变量_impl设置为DurableImpl方式

journalMakeDir();/*构造日志文件所要存储的路径:dur_journal.cpp*/

try{

recover();/*从上一次系统crash中恢复数据日志信息:dur_recover.cpp*/

}

catch(...){

log()<<"exceptionduringrecovery"<

throw;

}

preallocateFiles();

boost::threadt(durThread);

}

注意:上面的DurableInterface,因为mongodb使用类似接口方式,从而约定不同的持久化方式实现,如下:

classDurableInterface:boost::noncopyable{

virtualvoid*writingPtr(void*x,unsignedlen)=0;

virtualvoidcreatedFile(stringfilename,unsignedlonglonglen)=0;

virtualvoiddeclareWriteIntent(void*x,unsignedlen)=0;

virtualvoid*writingAtOffset(void*buf,unsignedofs,unsignedlen)=0;

....

}

接口定义了写文件的方式及方法等等。

并且mongodb包括了两种实现方式,即:

classNonDurableImpl:publicDurableInterface{/*非持久化,基于内存临时存储*/

}

classDurableImpl:publicDurableInterface{/*持久化,支持磁盘存储*/

}

再回到startup函数最后一行:boost::threadt(durThread);

该行代码会创建一个线程来运行durThread方法,该方法就是持久化线程,如下:

voiddurThread(){

Client::initThread("dur");

constintHowOftenToGroupCommitMs=90;/*多少时间提交一组信息,单位:毫秒*/

//注:commitJob对象用于封装并执行提交一组操作

while(!inShutdown()){

sleepmillis(10);

CodeBlock::Withinw(durThreadMain);/*定义代码块锁,该设计很讨巧,接下来会介绍*/

try{

intmillis=HowOftenToGroupCommitMs;

{

stats.rotate();//统计最新的_lastRotate信息

{

Timert;/*声明定时器*/

/*遍历日志文件夹下的文件并更新文件的“最新更新时间”标志位并移除无效或关闭之前使用的日志文件:dur_journal.cpp*/

journalRotate();

millis-=t.millis();/*线程睡眠时间为90减去遍历时间*/

assert(millis<=HowOftenToGroupCommitMs);

if(millis<5)

millis=5;

}

//wedothisinacoupleblocks,whichmakesitatinybitfaster(onlyalittle)onthroughput,

//butislikelyalsolessspikyonourcpuusage,whichisgood:

sleepmillis(millis/2);

//从commitJob的defer任务队列中获取任务并执行,详情参见:taskqueue.h的invoke()和dur_commitjob.cpp的

//Writes::D::go(constWrites::D&d)方法(用于非延迟写入信息操作)

commitJob.wi()._deferred.invoke();

sleepmillis(millis/2);

//按mongodb开发者的理解,通过将休眠时间减少一半(millis/2)并紧跟着继续从队列中取任务,

//以此小幅提升读取队列系统的吞吐量

commitJob.wi()._deferred.invoke();

}

go();//执行提交一组信息操作

}

catch(std::exception&e){/*服务如果突然crash*/

log()<<"exceptionindurThreadcausingimmediateshutdown:"<

abort();//basedonmyTerminate()

}

}

cc().shutdown();//关闭当前线程,Client::initThread("dur")

}

下面是go()的实现代码:

staticvoidgo(){

if(!commitJob.hasWritten()){/*hasWritten一般在CUD操作时会变为true,后面会加以介绍*/

commitJob.notifyCommitted();/*发送信息已存储到磁盘的通知*/

return;

}

{

readlocktrylk("",1000);/*声明读锁*/

if(lk.got()){

groupCommit();/*提交一组操作*/

return;

}

}

//当未取到读锁时,可能获取读锁比较慢,则直接使用写锁,不过写锁会用更多的RAM

writelocklk;

groupCommit();

}

/**locking:inreadlockwhencalled.*/

staticvoid_groupCommit(){

stats.curr->_commits++;/*提交次数加1*/

......

//预定义页对齐的日志缓存对象,该对象对会commitJob.ops()的返回值(该返回值类型vector)进行对象序列化

//并保存到commitJob._ab中,供下面方法调用,位于dur_preplogbuffer.cpp-->_PREPLOGBUFFER()方法

PREPLOGBUFFER();

//todo:writetothejournaloutsidelocks,asthiswritecanbeslow.

//however,becarefulthenaboutremapprivateviewasthatcannotbedone

//ifnewwritesarethenpendingintheprivatemaps.

WRITETOJOURNAL(commitJob._ab);/*写入journal信息,最终操作位于dur_journal.cpp的Journal::journal(constAlignedBuilder&b)方法*/

//dataisnowinthejournal,whichissufficientforacknowledginggetLastError.

//(oktocrashafterthat)

commitJob.notifyCommitted();

WRITETODATAFILES();/*写信息到mongofile文件中*/

commitJob.reset();/*重置当前任务操作*/

//REMAPPRIVATEVIEW

//remapping私有视图必须在WRITETODATAFILES方法之后调用,否则无法读出新写入的数据

DEVassert(!commitJob.hasWritten());

if(!dbMutex.isWriteLocked()){

//thisneedsdoneinawritelock(asthereisashortwindowduringremappingwheneachview

//mightnotexist)thuswedoitonthenextacquisitionofthatinsteadofhere(thereisno

//rushifyouarentwritinganyway--butitmusthappen,ifitisdone,beforeanyuncommitted

//writesoccur).Ifdesired,perhpasthiscanbeeliminatedonposixasitmaybethattheremap

//israce-freethere.

//

dbMutex._remapPrivateViewRequested=true;

}

else{

stats.curr->_commitsInWriteLock++;

//however,ifwearealreadywritelocked,wemustdoitnow--upthecalltreesomeone

//maydoawritewithoutanewlockacquisition.thiscanhappenwhenMongoMMF::close()calls

//thismethodwhenafile(anditsviews)isabouttogoaway.

//

REMAPPRIVATEVIEW();

}

}

到这里只是知道mongodb会定时从任务队列中获取相应任务并统一写入,写入journal和mongofile文件后再重置任务队列及递增相应统计计数信息(如privateMapBytes用于REMAPPRIVATEVIEW)。

但任务队列中的操作信息又是如何生成的呢?这个比较简单,我们只要看一下相应的cud数据操作时的代码即可,这里以插入(insert)数据为例:

我们找到pdfile.cpp文件的插入记录方法,如下(1467行):

DiskLocDataFileMgr::insert(constchar*ns,constvoid*obuf,intlen,boolgod,constBSONElement&writeId,boolmayAddIndex){

......

r=(Record*)getDur().writingPtr(r,lenWHdr);//位于1588行

该方法用于将客户端提交的数据(信息)写入到持久化队列(defer)中去,如下(按函数调用顺序):

void*DurableImpl::writingPtr(void*x,unsignedlen){

void*p=x;

declareWriteIntent(p,len);

returnp;

}

voidDurableImpl::declareWriteIntent(void*p,unsignedlen){

commitJob.note(p,len);

}

voidCommitJob::note(void*p,intlen){

DEVdbMutex.assertWriteLocked();

dassert(cmdLine.dur);

if(!_wi._alreadyNoted.checkAndSet(p,len)){

MemoryMappedFile::makeWritable(p,len);/*设置可写入mmap文件的信息*/

if(!_hasWritten){

assert(!dbMutex._remapPrivateViewRequested);

//设置写信息标志位,用于进行_groupCommit(上面提到)时进行判断

_hasWritten=true;

}

......

//向defer任务队列中加入操作信息

_wi.insertWriteIntent(p,len);

wassert(_wi._writes.size()<2000000);

assert(_wi._writes.size()<20000000);

......

}

其中insertWriteIntent方法定义如下:

voidinsertWriteIntent(void*p,intlen){

Dd;

d.p=p;/*操作记录record类型*/

d.len=len;/*记录长度*/

_deferred.defer(d);/*延期任务队列:TaskQueue类型*/

}

到这里总结一下,mongodb在启动时,专门初始化一个线程不断循环(除非应用crash掉),用于在一定时间周期内来从defer队列中获取要持久化的数据并写入到磁盘的journal(日志)和mongofile(数据)处,当然因为它不是在用户添加记录时就写到磁盘上,所以按mongodb开发者说,它不会造成性能上的损耗,因为看过代码发现,当进行CUD操作时,记录(Record类型)都被放入到defer队列中以供延时批量(groupcommit)提交写入,但相信其中时间周期参数是个要认真考量的参数,系统为90毫秒,如果该值更低的话,可能会造成频繁磁盘操作,过高又会造成系统宕机时数据丢失过多。

最后对文中那个mongodb设置很计巧的代码做一下简要分析,代码如下:

CodeBlock::Withinw(durThreadMain);

它的作为就是一个对多线程访问指定代码块加锁的功能,其类定义如下(位于race.h):

classCodeBlock{

volatileintn;

unsignedtid;

voidfail(){

log()<<"synchronization(racecondition)failure"<

printStackTrace();

abort();/**/

}

voidenter(){

if(++n!=1)fail();/*当已有线程执行该代码块时,则执行fail*/

#ifdefined(_WIN32)

tid=GetCurrentThreadId();

#endif

}

voidleave(){/*只有调用leave操作,才会--n,即在线程执行完该代码块时调用*/

if(--n!=0)fail();

}

public:

CodeBlock():n(0){}

classWithin{

CodeBlock&_s;

public:

Within(CodeBlock&s):_s(s){_s.enter();}

~Within(){_s.leave();}

};

voidassertWithin(){

assert(n==1);

#ifdefined(_WIN32)

assert(GetCurrentThreadId()==tid);

#endif

}

};

#else

通过其内部类Within的构造函数和析构函数,分别调用了_s.enter,_s.leave()方法,这样只要在一个代码块之前定义一个该类实例,则从下一行开始到codeblock结束之后,该进程内只允许一个线程执行该代码块,呵呵。

参考链接:http://www.infoq.com/cn/news/2011/03/MongoDB-1.8

原文链接:http://www.cnblogs.com/daizhj/archive/2011/03/21/1990344.html

作者:daizhj,代震军

微博:http://t.sina.com.cn/daizhj

Tags:mongodb,c++,sourcecode

本文内容总结:

原文链接:https://www.cnblogs.com/daizhj/archive/2011/03/21/1990344.html