首页 文章资讯内容详情

Mongodb源码分析--Replication之OpLog

2026-05-31 3 花语

本文内容纲要:

在之前的文章中,介绍了关于master-slave模式下的主从端代码的执行流程,因为当时篇幅所限,未对oplog的数据结构以及mongodb的local数据库作过多阐述,而这可能会让不知道其内容的朋友看代码时云里雾里找不到头绪,今天我专门用一篇文章来大致解释一下(这些内容可能会在后面章节中有所涉及)。

首先了解一个local数据库:

在mongod中,出于特殊目的(复制机制),保留性使用了local数据库。当使用认证机制时,对local数据库等同于认证admin数据库。

当使用复制集(Replicasets)模式时,其会使用下面的local数据库:

local.system.replset用于复制集配置对象存储(通过shell下的rs.conf()或直接查询)

local.oplog.rs一个cappedcollection集合.可在命令行下使用--oplogSize选项设置该集合大小尺寸.

local.replset.minvalid通常在复制集内使用,用于跟踪同步状态(syncstatus)

主从复制模式(Master/Slave):

*Master

olocal.oplog.$main存储"oplog"信息

olocal.slaves存储在master结点上相应slave结点的同步情况(比如syncTo时间戳等)

*Slave

olocal.sources从结点所要链接的master结点信息(可通过--source配置参数指定)

*Other

olocal.me未知待查:)

olocal.pair.*(replicapairs选项,目前已不推荐使用)

除了了解local之外,还有oplog的数据结构(存储在local.oplog.$main)要解释一下:

{ts:...,op:...,ns:...,o:...o2:...}

上面就是一条oplog信息,复制机制就是通过这些信息来进行节点间的数据同步并维护数据一致性的,其中:

ts:8字节的时间戳,由4字节unixtimestamp+4字节自增计数表示。

这个值很重要,在选举(如master宕机时)新primary时,会选择ts最大的那个secondary作为新primary。

op:1字节的操作类型,例如i表示insert,d表示delete。

ns:操作所在的namespace。

o:操作所对应的document,即当前操作的内容(比如更新操作时要更新的的字段和值)

o2:在执行更新操作时的where条件,仅限于update时才有该属性

其中op,可以是如下几种情形之一:

"i":insert

"u":update

"d":delete

"c":dbcmd

"db":声明当前数据库(其中ns被设置成为=>数据库名称+.)

"n":noop,即空操作,其会定期执行以确保时效性

了解了这些内容之后,大家通过下面方式来验证一下上面的这些内容:

1.本地构造复制集

D:\mongodb\bin>mongod--replSetmyoplogs--dbpathd:\mongodb\db

2.使用mongo连接到上面结点并初始化

D:\mongodb\bin>mongo

MongoDBshellversion:...

connectingto:test

rs.initiate()

3.查看oplog复制集信息

uselocal

switchedtodblocal

db.oplog.rs.find()

{"ts":{"t":1306207268000,"i":1},"h":NumberLong(0),"op":"n","ns":"","o":{"msg":"initiatingset"}}

4.开始添加记录

usetest

switchedtodbtest

db.foo.insert({x:1})

db.foo.update({x:1},{$set:{y:1}})

db.foo.update({x:2},{$set:{y:1}},true)

db.foo.remove({x:1})

5.查看上面操作生成的oplog信息:

uselocal

switchedtodblocal

db.oplog.rs.find()

{"ts":{"t":1306207268000,"i":1},"h":NumberLong(0),"op":"n","ns":"","o":{"msg":"initiatingset"}}

{"ts":{"t":1306207310000,"i":1},"h":NumberLong("3138280161636515857"),"op":"i","ns":"test.foo","o":{"_id":ObjectId("4ddb244d2d0d00000000551a"),"x":1}}

{"ts":{"t":1306207314000,"i":1},"h":NumberLong("772196482295043060"),"op":"u","ns":"test.foo","o2":{"_id":ObjectId("4ddb244d2d0d00000000551a")},"o":{"$set":{"y":1}}}

{"ts":{"t":1306207317000,"i":1},"h":NumberLong("7272647888810218413"),"op":"i","ns":"test.foo","o":{"_id":ObjectId("4ddb2455be10000000002f6a"),"x":2,"y":1}}

{"ts":{"t":1306207325000,"i":1},"h":NumberLong("3083832920223263240"),"op":"d","ns":"test.foo","b":true,"o":{"_id":ObjectId("4ddb244d2d0d00000000551a")}}

注:其它常用命令:db.oplog.$main.help()db.printReplicationInfo();

知道上面这些内容之后,我们来大致看一下mongod在源码层面上是如何存储oplog的,先请看下面文件:

oplog.cpp:顾名思义,它的作用就是存储和读取oplog,其主要包括方法:

createOplog():初始化本地local.oplog.$main集合信息,包括大小尺寸等

_logOpOld():用于在master/slave模式下向local.oplog.$main添加oplog信息

_logOpObjRS():用于在replset模式下向local.oplog.rs模式下添加oplog信息

append_O_Obj():向已存在的obj对象后追加新的对象元素信息(主要用于更新类型的oplog)

pretouchN()及pretouchOperation():某些情况下MongoDB会锁住数据库。如果此时正有数百个请求,则它们会堆积起来,造成许多问题。这里使用下面的优化方式来避免锁定:每次更新前,先查询记录。查询操作会将对象放入内存,于是更新则会尽可能的迅速。在"主/从"部署方案中,从节点可以使用“-pretouch”参数运行,这也可以得到相同的效果。

下面就借助源码来进一步深入了解,首先回顾一下之前这篇文章中的所描述的这段代码:

//repl.cpp

voidstartReplication(){

/*ifwearegoingtobeareplicaset,wearentdoingotherformsofreplication.*/

if(!cmdLine._replSet.empty()){

......

//绑定函数,oplog.cpp->_logOpRS(),用于处理replset类型的oplog操作

newRepl();

return;

}

//绑定函数,oplog.cpp->_logOpOld(),用于处理master-slave类型的oplog操作

oldRepl();

上面的newRepl()及oldRepl()主要是实现函数绑定,它会将oplog.cpp文件中的_logOpRS和_logOpOld方法加以绑定(本人猜测可能因为1.6版之后引入了replset,造成方法名称的重新命名和分配,导致这里用这种方式对老的方法进行“过渡”)。

接着上面方面会调用oplog.cpp文件的createOplog()来构造master的“local.oplog.$main”集合,如下:

//如果是master,则构造并启动线程方法replMasterThread

if(replSettings.master||replPair){

if(replSettings.master)

log(1)<<"master=true"<

replSettings.master=true;

//构造oplog集合"local.oplog.$main"

createOplog();

boost::threadt(replMasterThread);

}

在上面初始化及函数指针绑定任务完成了,就可以在数据同步时,根据oplog集合信息来进行同步了,如下:

//repl.cpp

voidReplSource::sync_pullOpLog_applyOperation(BSONObj&op,OpTime*localLogTail,boolalreadyLocked){

.....

if(cmdLine.pretouch&&!alreadyLocked/*在非写锁下,因为写锁下进行pretouch操作没意义*/){

if(cmdLine.pretouch>1){

/*note:thisisbad-shouldbeputinReplSource.butthisisfirsttest...*/

staticintcountdown;

assert(countdown>=0);

if(countdown>0){

countdown--;//waspretouchedonaprevpass

}

else{

constintm=4;

if(tp.get()==0){

intnthr=min(8,cmdLine.pretouch);

nthr=max(nthr,1);

tp.reset(newThreadPool(nthr));

}

vectorv;

oplogReader.peek(v,cmdLine.pretouch);

unsigneda=0;

while(1){

if(a>=v.size())break;

unsignedb=a+m-1;//v[a..b]

if(b>=v.size())b=v.size()-1;

tp->schedule(pretouchN,v,a,b);

DEVcout<<"pretouchtask:"<

a+=m;

}

//执行oplog.cpp中的预取操作

pretouchOperation(op);

tp->join();

countdown=v.size();

}

}

else{

//执行oplog.cpp中的预取操作

pretouchOperation(op);

}

}

......

//使用获取到的oplog操作信息进行同步操作,有关该方法本人已在这个链接中有关介绍

//Mongodb源码分析--Replication之主从模式--Slave

applyOperation(op);

......

}

上面方法中出现了预取操作,该技术主要是出现运行效率的考虑,下面是pretouchOperation方法的具体实现(pretouch方法实现与其大同小异,感兴趣的朋友可自行阅读):

//oplog.cpp中的相应方法

//某些情况下MongoDB会锁住数据库。如果此时正有数百个请求,则它们会堆积起来,造成许多问题。

//这里使用下面的优化方式来避免锁定:

//每次更新前,先查询记录。查询操作会将对象放入内存,于是更新则会尽可能的迅速。

//在主/从部署方案中,从节点可以使用“-pretouch”参数运行,这也可以得到相同的效果。

voidpretouchOperation(constBSONObj&op){

//nopointpretouchingifwritelocked.

//notsureifthiswilleverfire,butjustincase.

if(dbMutex.isWriteLocked())//写锁下则不执行pretouch操作

return;

constchar*which="o";//数据的对象

constchar*opType=op.getStringField("op");

if(*opType==i)

;

elseif(*opType==u)//如是更新操作时,则执行touch

which="o2";//更新的条件pattern

else

return;

/*todo:otheroperations*/

try{

BSONObjo=op.getObjectField(which);

BSONElement_id;

//获取_id信息,用于下面find查找

if(o.getObjectID(_id)){

constchar*ns=op.getStringField("ns");

BSONObjBuilderb;

b.append(_id);

BSONObjresult;

readlocklk(ns);

Client::Contextctx(ns);

//获取要更新的obj

if(Helpers::findById(cc(),ns,b.done(),result))

_dummy_z+=result.objsize();//touch

}

}

catch(DBException&){

log()<<"ignoringassertioninpretouchOperation()"<

}

}

说到这里只是获取oplog的操作,那mongod进行cud操作时,log是如何记录到日志集合中的呢?还记得上面提到的函数绑定吗?我们来看一下oplog.cpp中的下面方法:

/*@@paramopstr:

cuserCreateNS

iinsert

nno-op/keepalive

ddelete/remove

uupdate

*/

voidlogOp(constchar*opstr,constchar*ns,constBSONObj&obj,BSONObj*patt,bool*b){

if(replSettings.master){

_logOp(opstr,ns,0,obj,patt,b);//这里将会执行之前函数绑定的方法,比如:_logOpOld

}

//用于在shard模式下oplog操作

logOpForSharding(opstr,ns,obj,patt);

}

上面的logOp方法几乎出现在了所有cud操作场景中,比如在update操作时:

//update.cpp

staticUpdateResult_updateById(boolisOperatorUpdate,intidIdxNo,ModSet*mods,intprofile,NamespaceDetails*d,

NamespaceDetailsTransient*nsdt,

boolgod,constchar*ns,

constBSONObj&updateobj,BSONObjpatternOrig,boollogop,OpDebug&debug){

......

if(logop){

......

if(mss->needOpLogRewrite()){

DEBUGUPDATE("\trewriteupdate:"<getOpLogRewrite());

logOp("u",ns,mss->getOpLogRewrite(),&pattern);//记录日志

}

else{

logOp("u",ns,updateobj,&pattern);//记录日志

}

}

.....

}

这里以_logOpOld()方法为例,来看一下mongod是如何向local.oplog.$main集合中添加oplog日志的:

/*wewritetolocal.oplog.$main:

{ts:...,op:...,ns:...,o:...o2:...}

ts:anOpTimetimestamp

op:

"i"insert

"u"update

"d"delete

"c"dbcmd

"db"declarespresenceofadatabase(nsissettothedbname+.)

"n"noop

logNS-wheretologit.0/nullmeans"local.oplog.$main".

bb:

ifnotnull,specifiesabooleantopassalongtotheothersideasb:param.

usedfor"justOne"or"upsert"flagsond,u

notethisisusedforsinglecollectionloggingevenwhen--replSetisenabled.

*/

staticvoid_logOpOld(constchar*opstr,constchar*ns,constchar*logNS,constBSONObj&obj,BSONObj*o2,bool*bb){

DEVassertInWriteLock();

staticBufBuilderbufbuilder(8*1024);

//如是master上的local.slaves,则更新该数据库缓存信息

if(strncmp(ns,"local.",6)==0){

if(strncmp(ns,"local.slaves",12)==0){

resetSlaveCache();

}

return;

}

//获取当前系统时间作为操作时间

constOpTimets=OpTime::now();

Client::Contextcontext;

/*wejumpthroughabunchofhoopsheretoavoidcopyingtheobjbuffertwice--

insteadwedoasinglecopytothedestinationpositioninthememorymappedfile.

*/

//生成bson格式的oplog的对象

bufbuilder.reset();

BSONObjBuilderb(bufbuilder);

b.appendTimestamp("ts",ts.asDate());

b.append("op",opstr);

b.append("ns",ns);

if(bb)

b.appendBool("b",*bb);

if(o2)//如果不为空,表示执行操作的条件(where)

b.append("o2",*o2);

BSONObjpartial=b.done();//partialiseverythingexcepttheo:...part.

intpo_sz=partial.objsize();

intlen=po_sz+obj.objsize()+1+2/*o:*/;

Record*r;

//为0/null,表示"local.oplog.$main"

if(logNS==0){

logNS="local.oplog.$main";

if(localOplogMainDetails==0){

Client::Contextctx(logNS,dbpath,0,false);

localDB=ctx.db();

assert(localDB);

localOplogMainDetails=nsdetails(logNS);

assert(localOplogMainDetails);

}

Client::Contextctx(logNS,localDB,false);

//在指定Namespace下的Extent中分配一个record空间,用于下面添加log信息

r=theDataFileMgr.fast_oplog_insert(localOplogMainDetails,logNS,len);

}

else{

//在指定的logNS中分配一个record空间

Client::Contextctx(logNS,dbpath,0,false);

assert(nsdetails(logNS));

//如第一次分配firstweallocatethespace,thenwefillitbelow.

r=theDataFileMgr.fast_oplog_insert(nsdetails(logNS),logNS,len);

}

//将操作条件(where)追加到已存在对象

append_O_Obj(r->data,partial,obj);

//设置最近操作时间

context.getClient()->setLastOp(ts.asDate());

if(logLevel>=6){

BSONObjtemp(r);

log(6)<<"loggingop:"<

}

}

/**givenaBSONobject,createanewoneatdstwhichistheexisting(partial)object

withanewobjectelementappendedattheendwithfieldname"o".

@parampartialalreadybuildobjectwitheverythingexcepttheomember.e.g.somethinglike:

{ts:...,ns:...,o2:...}

@paramoabsonobjecttobeaddedwithfieldname"o"

@dstwheretoputthenewlybuiltcombinedobject.e.g.endsupassomethinglike:

{ts:...,ns:...,o2:...,o:...}

*/

voidappend_O_Obj(char*dst,constBSONObj&partial,constBSONObj&o){

constintsize1=partial.objsize()-1;//lesstheEOOchar

constintoOfs=size1+3;//3=byteBSONOBJTYPE+byteo+byte\0

void*p=getDur().writingPtr(dst,oOfs+o.objsize()+1);

memcpy(p,partial.objdata(),size1);

//adjustoverallbsonobjectsizefortheo:field

*(static_cast<unsigned*>(p))+=o.objsize()+1/*fieldtypebyte*/+2/*"o"fieldname*/;

char*b=static_cast(p);

b+=size1;

*b++=(char)Object;

*b++=o;//{o:...}

*b++=0;//nullterminate"o"fieldname

memcpy(b,o.objdata(),o.objsize());

b+=o.objsize();

*b=EOO;

}

好了,关于oplog的主要内容介绍的差不多了,感兴趣的朋友可以下载相关源码做进一步的分析。

参考链接:

http://www.snailinaturtleneck.com/blog/2010/10/14/getting-to-know-your-oplog/

http://www.snailinaturtleneck.com/blog/2010/08/02/replica-sets-part-2-what-are-replica-sets/

原文链接:http://www.cnblogs.com/daizhj/archive/2011/06/27/mongodb_sourcecode_oplog.html

作者:daizhj,代震军

微博:http://t.sina.com.cn/daizhj

Tags:mongodb,c++,Replica,master-slave,oplog,localdatabase

本文内容总结:

原文链接:https://www.cnblogs.com/daizhj/archive/2011/06/27/mongodb_sourcecode_oplog.html