mongodb连接池c++ 封装

linux平台下mongodb c++连接池封装,线程安全

//函数返回0:成功 >0 出错
class cmongo{
    public:
        //默认构造函数,默认连接数为1
        cmongo();
        //传入连接数到构造函数,默认连接数为size
        cmongo(int size);
        //析构函数
        ~cmongo();
    public:
        //设置tcp读写超时时间
        int set_wr_timeout(double t);
        //连接
        int conn(string mhost="127.0.0.1",int mport=27017);
        //设置db collection
        int setdb(string mdb,string mcollection);
 
        int setindex(string key);
        //查询
        int get(map<string,string>& out,vector<string> in,string key,string key_val);
        //投递一批要查询的字段,fields为要查询哪些字段
        int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key);
        //dump key-value dumpkey对应一个value
        int dumpkey(map< string,string >& rout,string key,string val);
        //dump key->map<key,value> dumpkey对应一组value
        int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key);
        //写入
        int set(map<string,string> in,string key,string key_val);
        //批量写入
        //更新接口,批量更新key="id"
        //  "123456":<key,value>,<key,value>
        //  "123457":<key,value>,<key,value>
        int sets(map< string,map<string,string> > in,string key);
        //删除
        int remove(string key,string key_val);
    private:
        string doc;
        //tcp读写超时时间
        double wr_timeout;
        pthread_mutex_t _jobmux;
        sem_t _jobsem;
        map<DBClientConnection*,bool> _joblst;
        pthread_mutex_t _dbmux;
 
};
 
 
 
cmongo::cmongo(int size){
    //doc
    doc=string(DB_DB)+"."+string(DB_COLLECTION);
    wr_timeout=3;
    //最大连接0-200
    if(size<0){
        size=1;
    }
    if(size>200){
        size=200;
    }
    if(_joblst.size()>0){
        return;
    }
    bool auto_conn=true;
    pthread_mutex_init(&_jobmux,NULL);
    if((sem_init(&_jobsem,0,0))<0){
        return;
    }
    pthread_mutex_lock(&_jobmux);
    for(int i=0;i<size;++i){
        DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout);
        if(pconn != NULL){
            _joblst[pconn]=false;
        }
    }
    pthread_mutex_unlock(&_jobmux);
 
 
}
cmongo::~cmongo(){
    doc="";
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it != _joblst.end()){
        delete it->first;
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
}
int cmongo::set_wr_timeout(double t){
    wr_timeout=t;
    return RET_OK;
}
int cmongo::conn(string mhost,int mport){
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        string errmsg="";
        HostAndPort hp(mhost,mport);
        if(!(it->first->connect(hp,errmsg))){
            cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl;
            it->second=true;
        }
        sem_post(&_jobsem);
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
    return RET_OK;
}
int cmongo::setdb(string mdb,string mcollection){
    if(mdb.empty() || mcollection.empty()){
        return RET_PARERR;
    }
    doc=mdb+"."+mcollection;
    return RET_OK;
}
int cmongo::setindex(string key){
    if(key.empty()){
        return RET_PARERR;
    }  
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
    string bindex="{"+key+":1}";
    it->first->ensureIndex(doc,fromjson(bindex));
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return RET_OK;
}
//out为检索出来的key-value数据对应,in 为要检索的字段,key,key_value为要检索的条件,暂不支持多条件检索
//单列查询
int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){
    //key key_val 要检索字段
    if(key.empty() || key_val.empty() || in.size()<=0){
        return RET_PARERR;
    }
    BSONObjBuilder b;
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
        b.append(*iter,1);
    }
 
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
    BSONObj ob=b.obj();
 
    BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob);
 
    map<string,string> temp;
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
        string mkey=*iter;
        temp[*iter]=p.getStringField(mkey.c_str());
    }
    out=temp;
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return RET_OK;
}
//查询key为key的一批数据的 某些字段
//fields为要查询的字段集
//key="id" 值为in 一批key
//返回key->map<key,value>
int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){
    if(key.empty()){
        return RET_PARERR;
    }
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
 
    BSONObjBuilder b;
    b.append(key,1);
    for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
        b.append(*iter,1);
    }
 
    BSONObj p=b.obj();
    for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){
        BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p);
        map<string,string> temp;
        for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
            string mkey=*iter;
            temp[*iter]=ob.getStringField(mkey.c_str());   
        }
        rout[ob.getStringField(key.c_str())]=temp;
    }
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return RET_OK;
}
//dumpkey key-value 返回 key对应的val值
//key val
int cmongo::dumpkey(map< string,string >& rout,string key,string val){
    if(key.empty()){
        return RET_PARERR;
    }
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
 
    BSONObjBuilder b;
    b.append(key,1);
    if(!val.empty()){
        b.append(val,1);
    }
 
    BSONObj p=b.obj();
 
    pthread_mutex_lock(&_dbmux);
    auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
    while(cursor->more()){
        BSONObj ob=cursor->next();
        rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str());
    }
    pthread_mutex_unlock(&_dbmux);
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return RET_OK;
}
//dumpkey key对应多个value
//key->map<key,value>.
//其实dumpvals接口完全可以包含dumpkey,为了方便运用独立出来
//out 返回的key 对应的map<key,value>
//in 每个key需要对应的返回哪些字段
//key="id"
int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){
    if(key.empty()){
        return RET_PARERR;
    }
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
 
    BSONObjBuilder b;
    b.append(key,1);
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
        b.append(*iter,1);
    }
 
    BSONObj p=b.obj();
 
    pthread_mutex_lock(&_dbmux);
    auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
    while(cursor->more()){
        BSONObj ob=cursor->next();
        map<string,string> temp;
        for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
            string val=*iter;
            temp[val]=ob.getStringField(val.c_str());
        }
        rout[ob.getStringField(key.c_str())]=temp;
        temp.clear();
    }
    pthread_mutex_unlock(&_dbmux);
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return RET_OK;
}
//更新接口,暂不支持key对应多条记录的更新
int cmongo::set(map<string,string> in,string key,string key_val){
    //如果map没有数据,返回参数错误
    if(in.size()<=0 || key.empty() || key_val.empty()){
        return RET_PARERR;
    }
    BSONObjBuilder b;
    map<string,string>::iterator iter;
    for(iter=in.begin();iter!=in.end();++iter){
        b.append(iter->first,iter->second);
    }
     
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
    BSONObj ob=b.obj();
    it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true);
 
    int ret=RET_OK;
    string errmsg=it->first->getLastError();
    if(!errmsg.empty()){
        ret=RET_ERR;
    }
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return ret;
}
//更新接口,批量更新key="id"
//  "123456":<key,value>,<key,value>
//  "123457":<key,value>,<key,value>
int cmongo::sets(map< string,map<string,string> > in,string key){
    //如果map没有数据,返回参数错误
    if(in.size()<=0 || key.empty() ){
        return RET_PARERR;
    }
 
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
 
    int ret=RET_OK;
    map< string,map<string,string> >::iterator iter;
    for(iter=in.begin();iter!=in.end();++iter){
        BSONObjBuilder b;
        for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){
            b.append(iter2->first,iter2->second);
        }
        BSONObj ob=b.obj();
        it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true);
        string errmsg=it->first->getLastError();
        if(!errmsg.empty()){
            ret=RET_ERR;
        }
    }
     
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return ret;
}
//删除接口,删除记录 key=id key_val=587.即删除id="587"的记录
int cmongo::remove(string key,string key_val){
 
    if(key.empty() || key_val.empty()){
        return RET_PARERR;
    }
    sem_wait(&_jobsem);
    pthread_mutex_lock(&_jobmux);
    map<DBClientConnection*,bool>::iterator it=_joblst.begin();
    while(it!=_joblst.end()){
        if(it->second == false){
            it->second=true;
            break;
        }
        it++;
    }
    pthread_mutex_unlock(&_jobmux);
 
    it->first->remove(doc,BSON(key << key_val));
 
    pthread_mutex_lock(&_jobmux);
    it->second=false;
    pthread_mutex_unlock(&_jobmux);
    sem_post(&_jobsem);
 
    return RET_OK;
}
 

编程技巧