首页 文章资讯内容详情

MongoDB使用小结:一些常用操作分享

2026-06-01 4 花语

本文内容纲要:

本文整理了一年多以来我常用的MongoDB操作,涉及mongo-shell、pymongo,既有运维层面也有应用层面,内容有浅有深,这也就是我从零到熟练的历程。

MongoDB的使用之前也分享过一篇,稍微高阶点:见这里:《MongoDB使用小结》

1、shell登陆和显示

假设在本机上有一个端口为17380的MongoDB服务,假设已经把mongobin文件加入到系统PATH下。

登陆:mongo--port17380

显示DB:showdbs

进入某DB:usetest_cswuyg

显示集合:showtables

2、简单查找

查找文档:db.test_mac_id.find({a:b})

删除文档:db.test_mac_id.remove({a:b})

查找找到某一天的数据:

db.a.find({D:ISODate(2014-04-21T00:00:00Z)})或者db.a.find({D:ISODate(2014-04-21)})

删除某一天的数据:

db.region_mac_id_result.remove({"D":ISODate(2014-04-17)})

小于2014.6.5的数据:

db.xxx.find({E:{$lt:ISODate(2014-06-05)}})

大于等于2014.6.1的数据:

db.xxx.find({E:{$gte:ISODate("2014-05-29")}}).count()

两个条件:

db.xxx.find({E:{$gte:ISODate("2014-05-29"),$lte:ISODate("2014-06-04")}}).count()

json中的嵌套对象查询,采用“点”的方式:

mongos>db.wyg.find({"a.b":{$exists:true}})

{"_id":"c","a":{"b":10}}

某个字段存在,且小于1000有多少:

db.stat.find({_:ISODate("2014-06-17"),"123":{$exists:1,$lte:1000}},{"123":1}).count()

3、存在和遍历统计

存在i:1,且存在old_id字段:

mongos>varit=db.test.find({i:1,"old_id":{$exists:1}})

遍历计数1:mongos>varcount=0;while(it.hasNext()){if(it.next()["X"].length==32)++count}print(count)

遍历计数2:mongos>varcount=0;while(it.hasNext()){varitem=it.next();if(item[X].length==32&&item[_id]!=item[X])++count;if(!item[X])++count;}print(count)

4、插入和更新

db.test.findOne({_id:cswuyg})

null

db.test.insert({_id:cswuyg,super_admin:true})

db.test.findOne({_id:cswuyg})

{

"_id":"cswuyg",

"super_admin":true

}

db.test.update({_id:cswuyg},{$set:{super_admin:true}})

5、repair操作

对某个DB执行repair:进入要repair的db,执行db.repairDatabase()

对mongodb整个实例执行repair:numactl--interleave=all/mongod--repair--dbpath=/home/disk1/mongodata/shard/

6、mongodb任务操作

停止某个操作:

[xxx]$mongo--port17380 MongoDBshellversion:2.4.5 connectingto:127.0.0.1:17380/test mongos>db.currentOp() {"inprog":[...]} mongos>db.killOp("shard0001:163415563")

批量停止:

db.currentOp().inprog.forEach(function(item){db.killOp(item.opid)})

当查询超过1000秒的,停止:

db.currentOp().inprog.forEach(function(item){if(item.secs_running>1000)db.killOp(item.opid)})

停止某个数据源的查询:

db.currentOp().inprog.forEach(function(item){if(item.ns=="cswuyg.cswuyg")db.killOp(item.opid)})

把所有在等待锁的操作显示出来:

db.currentOp().inprog.forEach(function(item){if(item.waitingForLock)print(JSON.stringify(item))})

把处于等待中的分片显示出来:

db.currentOp().inprog.forEach(function(item){if(item.waitingForLock){print(item.opid.substr(0,9));print(item.op);}})

把非等待的分片显示出来:

db.currentOp().inprog.forEach(function(item){if(!item.waitingForLock){varlock_info=item["opid"];print(lock_info.substr(0,9));print(item["op"]);}})

查找所有的查询任务:

db.currentOp().inprog.forEach(function(item){if(item.op=="query"){print(item.opid);}})

查找所有的非查询任务:

db.currentOp().inprog.forEach(function(item){if(item.op!="query"){print(item.opid);}})

查找所有的操作:

db.currentOp().inprog.forEach(function(item){print(item.op,item.opid);});

常用js脚本,可直接复制到mongo-shell下使用:

显示当前所有的任务状态:

print("##########");db.currentOp().inprog.forEach(function(item){if(item.waitingForLock){varlock_info=item["opid"];print("waiting:",lock_info,item.op,item.ns);}});print("----");db.currentOp().inprog.forEach(function(item){if(!item.waitingForLock){varlock_info=item["opid"];print("doing",lock_info,item.op,item.ns);}});print("##########");

杀掉某些特定任务:

(1)

db.currentOp().inprog.forEach(function(item){if(item.waitingForLock){varlock_info=item["opid"];if(item.op=="query"&&item.secs_running>60&&item.ns=="cswuyg.cswuyg"){db.killOp(item.opid)}}})

(2)

db.currentOp().inprog.forEach(function(item){ varlock_info=item["opid"]; if(item.op=="query"&&item.secs_running>1000){ print("kill",item.opid); db.killOp(item.opid) } })

7、删除并返回数据

old_item=db.swuyg.findAndModify({query:{"_id":"aabbccdd"},fields:{"D":1,E:1,F:1},remove:true})

fields里面为1的是要返回的数据。

8、分布式集群部署情况

(1)细致到collection的显示:sh.status()

(2)仅显示分片:

useconfig;db.shards.find()

{"_id":"shard0000","host":"xxhost:10001"}

{"_id":"shard0001","host":"yyhost:10002"}

....

(3)

useadmin

db.runCommand({listshards:1})

列出所有的shardserver

9、正则表达式查找

正则表达式查询:

mongos>db.a.find({"tt":/t*/i})

{"_id":ObjectId("54b21e0f570cb10de814f86b"),"aa":"1","tt":"tt"}

其中i表明是否是case-insensitive,有i则表示忽略大小写

db.testing.find({"name":/[7-9]/})

当name的值为789这几个数字组成的字符串时,查询命中。

10、查询性能

db.testing.find({name:123}).explain()

输出结果:

{ "cursor":"BasicCursor", "isMultiKey":false, "n":1, "nscannedObjects":10, "nscanned":10, "nscannedObjectsAllPlans":10, "nscannedAllPlans":10, "scanAndOrder":false, "indexOnly":false, "nYields":0, "nChunkSkips":0, "millis":0, "indexBounds":{ }, "server":"xxx:10001" }

11、更新或插入

当该key不存在的时候执行插入操作,当存在的时候则不管,可以使用setOnInsert

db.wyg.update({_id:id},{$setOnInsert:{a:a},$set:{b:b}},true)

当id存在的时候,忽略setOnInsert。

当id存在的时候,如果要插入,则插入{a:a}

最后的参数true,则是指明,当update不存在的_id时,执行插入操作。默认是false,只更新,不插入。

push、setOnInsert:db.cswuyg.update({"_id":"abc"},{$push:{"name":"c"},$setOnInsert:{"cc":"xx"}},true)

12、计算DB中collection的数量

db.system.namespaces.count()

13、增加数字,采用$inc

db.cswuyg.update({"a.b":{$exists:true}},{$inc:{a.b:2}})

也就是对象a.b的值,增加了2

注意$inc只能用于数值。

14、删除某个key

db.cswuyg.update({_id:c},{$unset:{b:{$exists:true}}})

{"_id":"c","a":{"b":12},"b":7}

转变为:

{"_id":"c","a":{"b":12}}

15、增加key:value

db.cswuyg.update({_id:z},{$set:{hello:z}})

{"_id":"z","b":1}

转变为:

{"_id":"z","b":1,"hello":"z"}

16、删除数据库、删除表

删除数据库:db.dropDatabase();

删除表:db.mytable.drop();

17、查找到数据只看某列

只显示key名为D的数据:db.test.find({},{D:1})

18、查看分片存储情况

(1)所有DB的分片存储信息,包括chunks数、shardkey信息:db.printShardingStatus()

(2)db.collection.getShardDistribution()获取collection各个分片的数据存储情况

(3)sh.status()显示本mongos集群所有DB的信息,包含了ShardKey信息

19、查看collection的索引

db.cswuyg.getIndexes()

20、开启某collection的分片功能

./bin/mongo–port20000

mongos>useadmin

switchedtodbadmin

mongos>db.runCommand({enablesharding"test})

{"ok":1}

开启usercollection分片功能:

mongos>db.runCommand({shardcollection:test.user,key:{_id:1}})

{"collectionsharded":"test.user","ok":1}

21、判断当前是否是shard集群

isdbgrid:用来确认当前是否是ShardingCluster

>db.runCommand({isdbgrid:1}); 是: {"isdbgrid":1,"hostname":"xxxhost","ok":1} 不是: { "ok":0, "errmsg":"nosuchcmd:isdbgrid", "code":59, "badcmd":{ "isdbgrid":1 } }

22、$addToSet、$each插入数组数据

mongos>db.cswuyg.find()

{"_id":"wyg","a":"c","add":["a","b"]}

mongos>db.cswuyg.update({"_id":"wyg"},{"$set":{"a":"c"},"$addToSet":{"add":{"$each":["a","c"]}}},true)

mongos>db.cswuyg.find()

{"_id":"wyg","a":"c","add":["a","b","c"]}

$each是为了实现list中的每个元素都插入,如果没有$each,则会把整个list作为一个元素插入,变成了2维数组。

$addToSet会判断集合是否需要排重,保证集合不重。$push可以对数组添加元素,但它只是直接插入数据,不做排重。

eg:db.test.update({"a":1},{$push:{"name":{$each:["a","c"]}}})

eg:db.test.update({"a":1},{$addToSet:{"name":{$each:["a","d"]}}})

不去重插入pushAll

db.push.insert({"a":"b","c":["c","d"]})

db.push.find()

{"_id":ObjectId("53e4be775fdf37629312b96c"),"a":"b","c":["c","d"]

}

db.push.update({"a":"b"},{$pushAll:{"c":["z","d"]}})

db.push.find()

{"_id":ObjectId("53e4be775fdf37629312b96c"),"a":"b","c":["c","d",

"z","d"]}

pushAll跟push类似,不同的是pushAll可以一次插入多个value,而不需要使用$each。

23、刷新配置信息

db.runCommand("flushRouterConfig");

24、批量更新

db.xxx.update({"_id":{$exists:1}},{$set:{"_":ISODate("2014-03-21T00:00:00Z")}},true,true)

最后一个参数表示是否要批量更新,如果不指定,则一次只更新一个document。

25、dumpDB

mongodump支持从DB磁盘文件、运行中的MongoD服务中dump出bson数据文件。

(1)关闭MongoD之后,从它的DB磁盘文件中dump出数据(注:仅限单实例mongod):

mongodump--dbpath=/home/disk1/mongodata/shard/-dcswuyg-o/home/disk2/mongodata/shard

参考:

http://docs.mongodb.org/manual/reference/program/mongodump/

http://stackoverflow.com/questions/5191186/how-do-i-dump-data-for-a-given-date

(2)从运行的MongoD中导出指定日期数据,采用-q查询参数:

mongodump-hxxxhost--port17380--dbcswuyg--collectiontest-q"{D:{\$gte:{\$date:`date-d"20140410"+%s`000},\$lt:{\$date:`date+%s`000}}}"

mongodump-dbpath=/home/disk3/mongodb/data/shard1/-dcswuyg-ctest-o/home/disk9/mongodata/shard1_2/-q"{_:{\$gte:{\$date:`date-d"20140916"+%s`000},\$lt:{\$date:`date-d"20140918"+%s`000}}}"

dump出来的bson文件去掉了索引、碎片空间,所有相比DB磁盘文件要小很多。

26、restoreDB

restore的时候,不能先建索引,必须是restore完数据之后再建索引,否则restore的时候会非常慢。而且一般不需要自己手动建索引,在数据bson文件的同目录下有一个索引bson文件(system.indexes.bson),restore完数据之后会mongorestore自动根据该文件创建索引。

(1)从某个文件restore

mongorestore--hostxxxhost--port17380--dbcswuyg--collectioncswuyg./cswuyg_1406330331/cswuyg/cswuyg_bak.bson

(2)从目录restore

./mongorestore--port27018/home/disk2/mongodata/shard/

(3)dump和restoreconfigureserver:

mongodump--hostxxxhost--port19913--dbconfig-o/home/work/cswuyg/test/config

mongorestore--hostxxxhost--port19914/home/work/cswuyg/test/config

27、创建索引

看看当前的testcollection上有啥索引: mongos>db.test.getIndexes() [ { "v":1, "key":{ "_id":1 }, "ns":"cswuyg.test", "name":"_id_" } ] 当前只有_id这个默认索引,我要在testcollection上为index字段创建索引: mongos>db.test.ensureIndex({"index":1}) 创建完了之后,再看看testcollection上的索引有哪些: mongos>db.test.getIndexes() [ { "v":1, "key":{ "_id":1 }, "ns":"cswuyg.test", "name":"_id_" }, { "v":1, "key":{ "index":1 }, "ns":"cswuyg.test", "name":"index_1" } ]

创建索引,并指定过期时间:db.a.ensureIndex({_:-1},{expireAfterSeconds:1000})1000Second.

修改过期时间:db.runCommand({"collMod":"a",index:{keyPattern:{"_":-1},expireAfterSeconds:60}})

28、删除索引

db.post.dropIndexes()删除post上所有索引

db.post.dropIndex({name:1})删除指定的单个索引

29、唯一索引问题

如果集群在_id上进行了分片,则无法再在其他字段上建立唯一索引:

mongos>db.a.ensureIndex({b:1},{unique:true}) { "raw":{ "set_a/xxxhost:20001,yyyhost:20002":{ "createdCollectionAutomatically":false, "numIndexesBefore":1, "ok":0, "errmsg":"cannotcreateuniqueindexover{b:1.0}withshardkeypattern{_id:1.0}", "code":67 }, "set_b/xxxhost:30001,yyyhost:30002":{ "createdCollectionAutomatically":false, "numIndexesBefore":1, "ok":0, "errmsg":"cannotcreateuniqueindexover{b:1.0}withshardkeypattern{_id:1.0}", "code":67 } }, "code":67, "ok":0, "errmsg":"{set_a/xxxhost:20001,yyyhos:20002:\"cannotcreateuniqueindexover{b:1.0}withshardkeypattern{_id:1.0}\",set_b/xxxhost:30001,yyyhost:30002:\"cannotcreateuniqueindexover{b:1.0}withshardkeypattern{_id:1.0}\"}" }

之所以出现这个错误是因为MongoDB无法保证集群中除了片键以外其他字段的唯一性,能保证片键的唯一性是因为文档根据片键进行切分,一个特定的文档只属于一个分片,MongoDB只要保证它在那个分片上唯一就在整个集群中唯一,实现分片集群上的文档唯一性一种方法是在创建片键的时候指定它的唯一性。

30、因迁移导致出现count得到的数字是真实数字的两倍

cswuyg>db.test.find({D:ISODate(2015-08-31),B:active}).count()

52118

cswuyg>db.test.find({D:ISODate(2015-08-31),B:active}).forEach(function(item){db.test_count.insert(item)})

cswuyg>db.test_count.count()

26445

解决方法:http://docs.mongodb.org/manual/reference/command/count/#behavior

“Onashardedcluster,countcanresultinaninaccuratecountiforphaneddocumentsexistorifachunkmigrationisinprogress.

Toavoidthesesituations,onashardedcluster,usethe$groupstageofthedb.collection.aggregate()methodto$sumthedocuments.”

31、自定义MongoDB操作函数

可以把自己写的js代码保存在某个地方,让MongoDB加载它,然后就可以在MongoDB的命令行里操作它们。

mongodbshell默认会加载~/.mongorc.js文件

例如以下修改了启动提示文字、左侧提示文字,增加了my_show_shardsshell函数用于显示当前shardedcollection的chunks在各分片的负载情况:

//~/.mongorc.js //showatbegin varcompliment=["attractive","intelligent","likebatman"]; varindex=Math.floor(Math.random()*3); print("Hello,yourelookingparticularly"+compliment[index]+"today!"); //changetheprompt prompt=function(){ if(typeofdb=="undefined"){ return"(nodb)>"; } //Checkthelastdboperation try{ db.runCommand({getLastError:1}); } catch(e){ print(e); } returndb+">"; } //showallshardschunks functionmy_show_shards(){ varconfig_db=db.getSiblingDB("config"); varcollections={}; varshards={}; varshard_it=config_db.chunks.find().snapshot(); while(shard_it.hasNext()){ next_item=shard_it.next(); collections[JSON.stringify(next_item["ns"]).replace(/\"/g,"")]=1; shards[JSON.stringify(next_item["shard"]).replace(/\"/g,"")]=1; } varlist_collections=[]; varlist_shards=[]; for(itemincollections){ list_collections.push(item); } for(iteminshards){ list_shards.push(item); } list_collections.forEach(function(collec){ list_shards.forEach(function(item){ obj={}; obj["shard"]=item; obj["ns"]=collec; it=config_db.chunks.find(obj); print(collec,item,it.count()); }) }) }

32、关闭mongod

mongoadmin--port17380--eval"db.shutdownServer()"

33、查看chunks信息

eg:进入到configdb下,执行

db.chunks.find()

34、预分片参考

http://blog.zawodny.com/2011/03/06/mongodb-pre-splitting-for-faster-data-loading-and-importing/

35、DB重命名

db.copyDatabase("xx_config_20141113","xx_config")

usexx_config_20141113

db.dropDatabase();

远程拷贝DB:

db.copyDatabase(fromdb,todb,fromhost,username,password)

db.copyDatabase("t_config","t_config_v1","xxxhost:17380")

这个拷贝过程很慢。

注意,sharded的DB是无法拷贝的,所以sharded的DB也无法采用上面的方式重命名。

参考:http://docs.objectrocket.com/features.html"Yourremotedatabaseisshardedthroughmongos."

拷贝collection:

db.collection.copyTo("newcollection")

同样,sharded的collection也无法拷贝。

36、聚合运算

包括:

1、pipeline;

2、map-reduce

管道:

http://docs.mongodb.org/manual/tutorial/aggregation-with-user-preference-data/

http://docs.mongodb.org/manual/reference/operator/aggregation/#aggregation-expression-operators

http://api.mongodb.org/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate

2.6之前的MongoDB,管道不支持超过16MB的返回集合。

可以使用$out操作符,把结果写入到collection中。如果aggregation成功,$out会替换已有的colleciton,但不会修改索引信息,如果失败,则什么都不做。

http://docs.mongodb.org/manual/reference/operator/aggregation/out/#pipe._S_out

37、aggregatepipelinedemo

demo1,基础:

cswuyg_test>db.cswuyg_test.aggregate({"$match":{"_":ISODate("2015-02-16"),"$or":[{"13098765":{"$exists":true}},{"13123456":{"$exists":true}}]}},{"$group":{"_id":"$_","13098765":{"$sum":"$13098765"},"13123456":{"$sum":"$13123456"}}})

demo2,使用磁盘:

db.test.aggregate([{$match:{D:{$nin:[a,b,c]},_:{$lte:ISODate("2015-02-27"),$gte:ISODate("2015-02-26")}}},{$group:{_id:{a:$a,b:$b,D:$D},value:{$sum:1}}}],{allowDiskUse:true})

demo3,指定输出文档:

db.cswuyg.aggregate([{$match:{D:{$nin:[a,b,c]},_:{$lte:ISODate("2015-02-10"),$gte:ISODate("2015-02-09")}}},{$group:{_id:{C:$C,D:$D},value:{$sum:1}}},{"$out":"cswuyg_out"}],{allowDiskUse:true})

db.b.aggregate([{$match:{a:{$size:6}}},{$group:{_id:{a:$a}}},{$out:hh_col}],{allowDiskUse:true})

注:指定输出文档,只能输出到本DB下。

aggregate练习:

pymongo代码:

[{$match:{uI:{u$in:[XXX]},uH:uid,u12345678:{u$exists:True},_:{$lte:datetime.datetime(2015,6,1,23,59,59),$gte:datetime.datetime(2015,6,1,0,0)}}},{$group:{_id:{u12345678:u$12345678,uG:u$G},value:{$sum:1}}}]

shell下代码:

db.test.aggregate([{$match:{_:ISODate("2015-06-01"),"H":"id","12345678":{"$exists":true},"I":"XXX"}},{$group:{_id:{"12345678":"$12345678","G":"$G"},"value":{"$sum":1}}}],{allowDiskUse:true})

38、修改Key:Value中的Value

给字段B的值加上大括号{:

db.test.find({_:ISODate("2014-11-02")}).forEach(function(item){if(/{.+}/.test(item["B"])){}else{print(item["B"]);db.test.update({"_id":item["_id"]},{"$set":{"B":"{"+item["B"]+"}"}})}})

39、修改primaryshard

db.runCommand({"movePrimary":"test","to":"shard0000"})

这样子testDB里的非shardedcocllection数据就会被存放到shard0000中,非空DB的话,可能会有一个migrate的过程。

40、mongodb默认开启autobalancer

balancer是sharded集群的负载均衡工具,新建集群的时候默认开启,除非你在config里把它关闭掉:

config>db.settings.find()

{"_id":"chunksize","value":64}

{"_id":"balancer","activeWindow":{"start":"14:00","stop":"19:30"},"stopped":false}

activeWindow指定autobalancer执行均衡的时间窗口。

stopped说明是否使用autobalancer。

手动启动balancer:sh.startBalancer()

判断当前balancer是否在跑:sh.isBalancerRunning()

41、MongoDB插入性能优化

插入性能:200W的数据,在之前没有排序就直接插入,耗时4小时多,现在,做了排序,插入只需要5分钟。排序对于单机版本的MongoDB性能更佳,避免了随机插入引发的频繁随机IO。

排序:在做分文件排序的时候,文件分得越小,排序越快,当然也不能小到1,否则频繁打开文件也耗费时间。

42、MongoDB数组操作

1、更新/插入数据,不考虑重复值:

mongos>db.test.update({"helo":"he2"},{"$push":{"name":"b"}})

多次插入后结果:

{"_id":ObjectId("54a7aa2be53662aebc28585f"),"helo":"he2","name":["a","b","b"]}

2、更新/插入数据,保证不重复:

mongos>db.test.update({"helo":"she"},{"$addToSet":{"name":"b"}})

WriteResult({"nMatched":1,"nUpserted":0,"nModified":0})

多次插入后结果:

{"_id":ObjectId("54dd6e1b570cb10de814f86d"),"helo":"she","name":["b"]}

保证只有一个值。

3、数组元素个数:

$size用来指定数组的元素个数,显示fruit数组长度为3的document:

mongos>db.a.find({"fruit":{$size:3}})

{"_id":ObjectId("54b334798220cd3ad74db314"),"fruit":["apple","orange","cherry"]}

43、更换Key:Value中的Key

verRelease换为TEST

db.test.find().forEach(function(item){db.test.update({_id:item["_id"]},{"$set":{"TEST":item["verRelease"]}},{"$unset":{"verRelease":{"exists":true}}})})

或者更新部分修改为:

db.test.update({D:ISODate("2014-11-02")},{$rename:{"AcT":"AM"}},false,true)

44、手动在shell下moveChunk

config>sh.moveChunk("xx.yy",{"_id":{"D":ISODate("2015-02-24T00:00:00Z"),"id":"3f"}},"shard0003")

如果出现错误,参考这里:可能需要重启

http://stackoverflow.com/questions/26640861/movechunk-failed-to-engage-to-shard-in-the-data-transfer-cant-accept-new-chunk

45、MongoDB升级后的兼容问题

MongoDB2.4切换到2.6之后,出现数据没有插入,pymongo代码:

update_obj={"$set":{"a":1}}

update_obj["$inc"]=inc_objs

update_obj["$push"]=list_objs

db.test.update({"_id":id},update_obj,True)

2.6里,inc如果是空的,则会导致整条日志没有插入,2.4则inc为空也可以正常运行。

46、格式化json显示

db.collection.find().pretty()

47、更新replica集群的域名信息

cfg=rs.conf() cfg.members[0].host="xxxhost:20000" cfg.members[1].host="yyyhost:20001" cfg.members[2].host="zzzhost:20002" rs.reconfig(cfg)

48、不要直接修改local.system.replset

不要直接修改local.system.replset,因为他只能修改本机器的本地信息。

但是如果出现了:

{ "errmsg":"exception:cantuselocalhostinreplsetmembernamesexceptwhenusingitforallmembers", "code":13393, "ok":0 }

这样的错误,那就要修改本机的local.system.replset,然后重启。

49、排重统计

(1)aggregate

result=db.flu_test.aggregate([{$match:{_:ISODate("2015-05-01")}},{$group:{_id:"$F",value:{$sum:1}}}],{allowDiskUse:true})

result.itcount()

(2)distinct

flu_test>db.flu_test.distinct(F,{_:ISODate("2015-06-22")})

但是,由于distinct将结果保存在list中,所以很容易触发文档超过16MB的错误:

2015-06-23T15:31:34.479+0800distinctfailed:{

"errmsg":"exception:distincttoobig,16mbcap",

"code":17217,

"ok":0

}atsrc/mongo/shell/collection.js:1108

非排重文档量统计:

mongos>count=db.flu_test.aggregate([{$match:{_:ISODate("2015-05-21")}},{$group:{_id:null,value:{$sum:1}}}],{allowDiskUse:true})

{"_id":null,"value":3338987}

50、pymongo优先读取副本集Secondary节点

优先读取副本集Secondary节点,可以减少primary节点负担,在primary节点跟secondary节点同步延迟较短、业务对数据不要求实时一致时可以利用副本集做读写分离和负载均衡。

副本集集群的读取有这几种使用方式:

primary:默认参数,只从主节点读取;

primaryPreferred:大部分从主节点上读取,主节点不可用时从Secondary节点读取;

secondary:只从Secondary节点上进行读取操作;

secondaryPreferred

:优先从Secondary节点读取,Secondary节点不可用时从主节点读取;

nearest:从网络延迟最低的节点上读取。

(1)测试1优先从secondary读取数据:

importpymongo client=pymongo.MongoReplicaSetClient(xxxhost:yyyport,replicaSet=my_set,readPreference=secondaryPreferred) printclient.read_preference#显示当前的读取设定 foriinxrange(1,10000):#循环10000次,用mongostat观察查询负载 a=client[history][20140409].find_one({"ver_code":"128"}) printa

(2)测试2直接连接某Secondary节点读取数据:

importpymongo client=pymongo.MongoClient(xxxhost,yyyport,slaveOk=True) a=client[msgdc_ip][query_ip].find().count() printa

参考:

http://www.lanceyan.com/category/tech/mongodb

http://emptysqua.re/blog/reading-from-mongodb-replica-sets-with-pymongo/

http://api.mongodb.org/python/current/api/pymongo/read_preferences.html#module-pymongo.read_preferences

http://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient

注意:3.0之后MongoReplicaSetClient函数是要被放弃的。

但是测试时发现:在较低版本中,需要使用MongoReplicaSetClient,MongoClient无法实现pymongo.ReadPreference.SECONDARY_PREFERRED功能。

2015.12.28补充:

51、为副本集设置标签

可以为副本集中的每个成员设置tag(标签),设置标签的好处是后面读数据时,应用可以指定从某类标签的副本上读数据。

为replica设置tag 在副本shell下执行: varconf=rs.conf() conf.members[0].tags={"location":"beijing"} conf.members[1].tags={"location":"hangzhou"} conf.members[2].tags={"location":"guangzhou"} rs.reconfig(conf)

参考:https://docs.mongodb.org/v3.0/tutorial/configure-replica-set-tag-sets/

52、副本集碎片整理的一种方法

使用MMAPv1存储引擎时,对于频繁大数据量的写入删除操作,碎片问题会变得很严重。在数据同步耗时不严重的情况下,我们不需要对每个副本做repair,而是轮流“卸下副本,删除对应的磁盘文件,重新挂上副本”。每个重新挂上的副本都会自动去重新同步一遍数据,碎片问题就解决了。

53、存储引擎升级为wiredTiger

我们当前的版本是MongoDB3.0.6,没有开启wiredTiger引擎,现在打算升级到wiredTiger引擎。

我们是SharedCluster,shard是ReplicaSet。升级比较简单,只需要逐步对每一个副本都执行存储引擎升级即可,不影响线上服务。

升级时,只在启动命令中添加:--storageEnginewiredTiger。

步骤:首先,下掉一个副本;然后,把副本的磁盘文件删除掉;接着,在该副本的启动命令中添加--storageEnginewiredTiger后启动。这就升级完一个副本,等副本数据同步完成之后,其它副本也照样操作(或者处理完一个副本之后,拷贝该副本磁盘文件替换掉另一个副本的磁盘文件)。风险:如果数据量巨大,且有建索引的需求,容易出现内存用尽。

分片集群的configureserver可以不升级。

升级之后磁盘存储优化效果极度明显,22GB的数据会被精简到1.3GB。

升级后的磁盘文件完全变了,所以不同存储引擎下的磁盘文件不能混用。

升级参考:https://docs.mongodb.org/manual/tutorial/change-sharded-cluster-wiredtiger/

54、oplogSizeMB不要设置得太大

启动配置中的这个字段是为了设置oplogcollection的大小,oplog是操作记录,它是一个cappedcollection,在副本集群中,设置得太小可能导致secondary无法及时从primary同步数据。默认情况下是磁盘大小的5%。但是,如果这个字段设置得太大,可能导致暴内存,oplog的数据几乎是完全加载在内存中,一旦太大,必然暴内存,导致OOM。而且因为这个collection是capped,MongoDB启动之后无法修改其大小。

本文所在:http://www.cnblogs.com/cswuyg/p/4595799.html

本文内容总结:

原文链接:https://www.cnblogs.com/cswuyg/p/4595799.html