mongodb连接池c++ 封装详解大数据

linux平台下mongodb c++连接池封装,线程安全

//函数返回0:成功 >0 出错 
class cmongo{ 
    public: 
        //默认构造函数,默认连接数为1 
        cmongo(); 
        //传入连接数到构造函数,默认连接数为size 
        cmongo(int size); 
        //析构函数 
        ~cmongo(); 
    public: 
        //设置tcp读写超时时间 
        int set_wr_timeout(double t); 
        //连接 
        int conn(string mhost="127.0.0.1",int mport=27017); 
        //设置db collection 
        int setdb(string mdb,string mcollection); 
  
        int setindex(string key); 
        //查询 
        int get(map<string,string>& out,vector<string> in,string key,string key_val); 
        //投递一批要查询的字段,fields为要查询哪些字段 
        int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key); 
        //dump key-value dumpkey对应一个value 
        int dumpkey(map< string,string >& rout,string key,string val); 
        //dump key->map<key,value> dumpkey对应一组value 
        int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key); 
        //写入 
        int set(map<string,string> in,string key,string key_val); 
        //批量写入 
        //更新接口,批量更新key="id" 
        //  "123456":<key,value>,<key,value> 
        //  "123457":<key,value>,<key,value> 
        int sets(map< string,map<string,string> > in,string key); 
        //删除 
        int remove(string key,string key_val); 
    private: 
        string doc; 
        //tcp读写超时时间 
        double wr_timeout; 
        pthread_mutex_t _jobmux; 
        sem_t _jobsem; 
        map<DBClientConnection*,bool> _joblst; 
        pthread_mutex_t _dbmux; 
  
}; 
  
  
  
cmongo::cmongo(int size){ 
    //doc 
    doc=string(DB_DB)+"."+string(DB_COLLECTION); 
    wr_timeout=3; 
    //最大连接0-200 
    if(size<0){ 
        size=1; 
    } 
    if(size>200){ 
        size=200; 
    } 
    if(_joblst.size()>0){ 
        return; 
    } 
    bool auto_conn=true; 
    pthread_mutex_init(&_jobmux,NULL); 
    if((sem_init(&_jobsem,0,0))<0){ 
        return; 
    } 
    pthread_mutex_lock(&_jobmux); 
    for(int i=0;i<size;++i){ 
        DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout); 
        if(pconn != NULL){ 
            _joblst[pconn]=false; 
        } 
    } 
    pthread_mutex_unlock(&_jobmux); 
  
  
} 
cmongo::~cmongo(){ 
    doc=""; 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it != _joblst.end()){ 
        delete it->first; 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
} 
int cmongo::set_wr_timeout(double t){ 
    wr_timeout=t; 
    return RET_OK; 
} 
int cmongo::conn(string mhost,int mport){ 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        string errmsg=""; 
        HostAndPort hp(mhost,mport); 
        if(!(it->first->connect(hp,errmsg))){ 
            cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl; 
            it->second=true; 
        } 
        sem_post(&_jobsem); 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
    return RET_OK; 
} 
int cmongo::setdb(string mdb,string mcollection){ 
    if(mdb.empty() || mcollection.empty()){ 
        return RET_PARERR; 
    } 
    doc=mdb+"."+mcollection; 
    return RET_OK; 
} 
int cmongo::setindex(string key){ 
    if(key.empty()){ 
        return RET_PARERR; 
    }   
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
    string bindex="{"+key+":1}"; 
    it->first->ensureIndex(doc,fromjson(bindex)); 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return RET_OK; 
} 
//out为检索出来的key-value数据对应,in 为要检索的字段,key,key_value为要检索的条件,暂不支持多条件检索 
//单列查询 
int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){ 
    //key key_val 要检索字段 
    if(key.empty() || key_val.empty() || in.size()<=0){ 
        return RET_PARERR; 
    } 
    BSONObjBuilder b; 
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
        b.append(*iter,1); 
    } 
  
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
    BSONObj ob=b.obj(); 
  
    BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob); 
  
    map<string,string> temp; 
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
        string mkey=*iter; 
        temp[*iter]=p.getStringField(mkey.c_str()); 
    } 
    out=temp; 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return RET_OK; 
} 
//查询key为key的一批数据的 某些字段 
//fields为要查询的字段集 
//key="id" 值为in 一批key 
//返回key->map<key,value> 
int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){ 
    if(key.empty()){ 
        return RET_PARERR; 
    } 
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
  
    BSONObjBuilder b; 
    b.append(key,1); 
    for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ 
        b.append(*iter,1); 
    } 
  
    BSONObj p=b.obj(); 
    for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){ 
        BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p); 
        map<string,string> temp; 
        for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ 
            string mkey=*iter; 
            temp[*iter]=ob.getStringField(mkey.c_str());    
        } 
        rout[ob.getStringField(key.c_str())]=temp; 
    } 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return RET_OK; 
} 
//dumpkey key-value 返回 key对应的val值 
//key val 
int cmongo::dumpkey(map< string,string >& rout,string key,string val){ 
    if(key.empty()){ 
        return RET_PARERR; 
    } 
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
  
    BSONObjBuilder b; 
    b.append(key,1); 
    if(!val.empty()){ 
        b.append(val,1); 
    } 
  
    BSONObj p=b.obj(); 
  
    pthread_mutex_lock(&_dbmux); 
    auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); 
    while(cursor->more()){ 
        BSONObj ob=cursor->next(); 
        rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str()); 
    } 
    pthread_mutex_unlock(&_dbmux); 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return RET_OK; 
} 
//dumpkey key对应多个value 
//key->map<key,value>. 
//其实dumpvals接口完全可以包含dumpkey,为了方便运用独立出来 
//out 返回的key 对应的map<key,value> 
//in 每个key需要对应的返回哪些字段 
//key="id" 
int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){ 
    if(key.empty()){ 
        return RET_PARERR; 
    } 
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
  
    BSONObjBuilder b; 
    b.append(key,1); 
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
        b.append(*iter,1); 
    } 
  
    BSONObj p=b.obj(); 
  
    pthread_mutex_lock(&_dbmux); 
    auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); 
    while(cursor->more()){ 
        BSONObj ob=cursor->next(); 
        map<string,string> temp; 
        for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ 
            string val=*iter; 
            temp[val]=ob.getStringField(val.c_str()); 
        } 
        rout[ob.getStringField(key.c_str())]=temp; 
        temp.clear(); 
    } 
    pthread_mutex_unlock(&_dbmux); 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return RET_OK; 
} 
//更新接口,暂不支持key对应多条记录的更新 
int cmongo::set(map<string,string> in,string key,string key_val){ 
    //如果map没有数据,返回参数错误 
    if(in.size()<=0 || key.empty() || key_val.empty()){ 
        return RET_PARERR; 
    } 
    BSONObjBuilder b; 
    map<string,string>::iterator iter; 
    for(iter=in.begin();iter!=in.end();++iter){ 
        b.append(iter->first,iter->second); 
    } 
      
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
    BSONObj ob=b.obj(); 
    it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true); 
  
    int ret=RET_OK; 
    string errmsg=it->first->getLastError(); 
    if(!errmsg.empty()){ 
        ret=RET_ERR; 
    } 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return ret; 
} 
//更新接口,批量更新key="id" 
//  "123456":<key,value>,<key,value> 
//  "123457":<key,value>,<key,value> 
int cmongo::sets(map< string,map<string,string> > in,string key){ 
    //如果map没有数据,返回参数错误 
    if(in.size()<=0 || key.empty() ){ 
        return RET_PARERR; 
    } 
  
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
  
    int ret=RET_OK; 
    map< string,map<string,string> >::iterator iter; 
    for(iter=in.begin();iter!=in.end();++iter){ 
        BSONObjBuilder b; 
        for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){ 
            b.append(iter2->first,iter2->second); 
        } 
        BSONObj ob=b.obj(); 
        it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true); 
        string errmsg=it->first->getLastError(); 
        if(!errmsg.empty()){ 
            ret=RET_ERR; 
        } 
    } 
      
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return ret; 
} 
//删除接口,删除记录 key=id key_val=587.即删除id="587"的记录 
int cmongo::remove(string key,string key_val){ 
  
    if(key.empty() || key_val.empty()){ 
        return RET_PARERR; 
    } 
    sem_wait(&_jobsem); 
    pthread_mutex_lock(&_jobmux); 
    map<DBClientConnection*,bool>::iterator it=_joblst.begin(); 
    while(it!=_joblst.end()){ 
        if(it->second == false){ 
            it->second=true; 
            break; 
        } 
        it++; 
    } 
    pthread_mutex_unlock(&_jobmux); 
  
    it->first->remove(doc,BSON(key << key_val)); 
  
    pthread_mutex_lock(&_jobmux); 
    it->second=false; 
    pthread_mutex_unlock(&_jobmux); 
    sem_post(&_jobsem); 
  
    return RET_OK; 
} 
 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/8964.html

(0)
上一篇 2021年7月19日 09:02
下一篇 2021年7月19日 09:02

相关推荐

发表回复

登录后才能评论