mongodb连接池c++ 封装详解大数据

linux平台下mongodb c++连接池封装,线程安全

//函数返回0:成功 >0 出错 
class cmongo{ 
public: 
//默认构造函数,默认连接数为1 
cmongo(); 
//传入连接数到构造函数,默认连接数为size 
cmongo(int size); 
//析构函数 
~cmongo(); 
public: 
//设置tcp读写超时时间 
int set_wr_timeout(double t); 
//连接 
int conn(string mhost="127.0.0.1",int mport=27017); 
//设置db collection 
int setdb(string mdb,string mcollection); 
int setindex(string key); 
//查询 
int get(map<string,string>& out,vector<string> in,string key,string key_val); 
//投递一批要查询的字段,fields为要查询哪些字段 
int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key); 
//dump key-value dumpkey对应一个value 
int dumpkey(map< string,string >& rout,string key,string val); 
//dump key->map<key,value> dumpkey对应一组value 
int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key); 
//写入 
int set(map<string,string> in,string key,string key_val); 
//批量写入 
//更新接口,批量更新key="id" 
//  "123456":<key,value>,<key,value> 
//  "123457":<key,value>,<key,value> 
int sets(map< string,map<string,string> > in,string key); 
//删除 
int remove(string key,string key_val); 
private: 
string doc; 
//tcp读写超时时间 
double wr_timeout; 
pthread_mutex_t _jobmux; 
sem_t _jobsem; 
map<DBClientConnection*,bool> _joblst; 
pthread_mutex_t _dbmux; 
}; 
cmongo::cmongo(int size){ 
//doc 
doc=string(DB_DB)+"."+string(DB_COLLECTION); 
wr_timeout=3; 
//最大连接0-200 
if(size<0){ 
size=1; 
} 
if(size>200){ 
size=200; 
} 
if(_joblst.size()>0){ 
return; 
} 
bool auto_conn=true; 
pthread_mutex_init(&_jobmux,NULL); 
if((sem_init(&_jobsem,0,0))<0){ 
return; 
} 
pthread_mutex_lock(&_jobmux); 
for(int i=0;i<size;++i){ 
DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout); 
if(pconn != NULL){ 
_joblst[pconn]=false; 
} 
} 
pthread_mutex_unlock(&_jobmux); 
} 
cmongo::~cmongo(){ 
doc=""; 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it != _joblst.end()){ 
delete it->first; 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
} 
int cmongo::set_wr_timeout(double t){ 
wr_timeout=t; 
return RET_OK; 
} 
int cmongo::conn(string mhost,int mport){ 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
string errmsg=""; 
HostAndPort hp(mhost,mport); 
if(!(it->first->connect(hp,errmsg))){ 
cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl; 
it->second=true; 
} 
sem_post(&_jobsem); 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
return RET_OK; 
} 
int cmongo::setdb(string mdb,string mcollection){ 
if(mdb.empty() || mcollection.empty()){ 
return RET_PARERR; 
} 
doc=mdb+"."+mcollection; 
return RET_OK; 
} 
int cmongo::setindex(string key){ 
if(key.empty()){ 
return RET_PARERR; 
}   
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
string bindex="{"+key+":1}"; 
it->first->ensureIndex(doc,fromjson(bindex)); 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return RET_OK; 
} 
//out为检索出来的key-value数据对应,in 为要检索的字段,key,key_value为要检索的条件,暂不支持多条件检索 
//单列查询 
int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){ 
//key key_val 要检索字段 
if(key.empty() || key_val.empty() || in.size()<=0){ 
return RET_PARERR; 
} 
BSONObjBuilder b; 
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
b.append(*iter,1); 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
BSONObj ob=b.obj(); 
BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob); 
map<string,string> temp; 
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
string mkey=*iter; 
temp[*iter]=p.getStringField(mkey.c_str()); 
} 
out=temp; 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return RET_OK; 
} 
//查询key为key的一批数据的 某些字段 
//fields为要查询的字段集 
//key="id" 值为in 一批key 
//返回key->map<key,value> 
int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){ 
if(key.empty()){ 
return RET_PARERR; 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
BSONObjBuilder b; 
b.append(key,1); 
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ 
b.append(*iter,1); 
} 
BSONObj p=b.obj(); 
for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){ 
BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p); 
map<string,string> temp; 
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ 
string mkey=*iter; 
temp[*iter]=ob.getStringField(mkey.c_str());    
} 
rout[ob.getStringField(key.c_str())]=temp; 
} 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return RET_OK; 
} 
//dumpkey key-value 返回 key对应的val值 
//key val 
int cmongo::dumpkey(map< string,string >& rout,string key,string val){ 
if(key.empty()){ 
return RET_PARERR; 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
BSONObjBuilder b; 
b.append(key,1); 
if(!val.empty()){ 
b.append(val,1); 
} 
BSONObj p=b.obj(); 
pthread_mutex_lock(&_dbmux); 
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); 
while(cursor->more()){ 
BSONObj ob=cursor->next(); 
rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str()); 
} 
pthread_mutex_unlock(&_dbmux); 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return RET_OK; 
} 
//dumpkey key对应多个value 
//key->map<key,value>. 
//其实dumpvals接口完全可以包含dumpkey,为了方便运用独立出来 
//out 返回的key 对应的map<key,value> 
//in 每个key需要对应的返回哪些字段 
//key="id" 
int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){ 
if(key.empty()){ 
return RET_PARERR; 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
BSONObjBuilder b; 
b.append(key,1); 
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
b.append(*iter,1); 
} 
BSONObj p=b.obj(); 
pthread_mutex_lock(&_dbmux); 
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); 
while(cursor->more()){ 
BSONObj ob=cursor->next(); 
map<string,string> temp; 
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
string val=*iter; 
temp[val]=ob.getStringField(val.c_str()); 
} 
rout[ob.getStringField(key.c_str())]=temp; 
temp.clear(); 
} 
pthread_mutex_unlock(&_dbmux); 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return RET_OK; 
} 
//更新接口,暂不支持key对应多条记录的更新 
int cmongo::set(map<string,string> in,string key,string key_val){ 
//如果map没有数据,返回参数错误 
if(in.size()<=0 || key.empty() || key_val.empty()){ 
return RET_PARERR; 
} 
BSONObjBuilder b; 
map<string,string>::iterator iter; 
for(iter=in.begin();iter!=in.end();++iter){ 
b.append(iter->first,iter->second); 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
BSONObj ob=b.obj(); 
it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true); 
int ret=RET_OK; 
string errmsg=it->first->getLastError(); 
if(!errmsg.empty()){ 
ret=RET_ERR; 
} 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return ret; 
} 
//更新接口,批量更新key="id" 
//  "123456":<key,value>,<key,value> 
//  "123457":<key,value>,<key,value> 
int cmongo::sets(map< string,map<string,string> > in,string key){ 
//如果map没有数据,返回参数错误 
if(in.size()<=0 || key.empty() ){ 
return RET_PARERR; 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
int ret=RET_OK; 
map< string,map<string,string> >::iterator iter; 
for(iter=in.begin();iter!=in.end();++iter){ 
BSONObjBuilder b; 
for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){ 
b.append(iter2->first,iter2->second); 
} 
BSONObj ob=b.obj(); 
it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true); 
string errmsg=it->first->getLastError(); 
if(!errmsg.empty()){ 
ret=RET_ERR; 
} 
} 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return ret; 
} 
//删除接口,删除记录 key=id key_val=587.即删除id="587"的记录 
int cmongo::remove(string key,string key_val){ 
if(key.empty() || key_val.empty()){ 
return RET_PARERR; 
} 
sem_wait(&_jobsem); 
pthread_mutex_lock(&_jobmux); 
map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
while(it!=_joblst.end()){ 
if(it->second == false){ 
it->second=true; 
break; 
} 
it++; 
} 
pthread_mutex_unlock(&_jobmux); 
it->first->remove(doc,BSON(key << key_val)); 
pthread_mutex_lock(&_jobmux); 
it->second=false; 
pthread_mutex_unlock(&_jobmux); 
sem_post(&_jobsem); 
return RET_OK; 
} 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/8964.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论