mongodb 通过MapReduce统计用户Pv Uv详解大数据

    通过spring data 操作mongodb,利用map reduce 来统计用户访问的Pv Uv。

    详细代码见 https://github.com/WangErXiao/spring-data

    具体的spring-data 操作mongodb这里不做介绍。这里只介绍mongo map reduce。

    

@Component 
public class UserDaoImpl extends MongoBaseDao implements UserDao { 
    public void insertRecord(UserVisitRecord record) { 
        getMongoTemplate().insert(record); 
    } 
    public void statisUserPvUv(String date) { 
 
        String map = "function() {                                        " 
                + "   if(this.date=='"+date+"'){                          " 
                + "      emit(this.date ,{uv:1,pv:1,userIds:this.userId}) " 
                + "     }" 
                + "   } "; 
 
        String reduce = "function(key, values) {            " 
                + "   var temp = new Array();               " 
                + "   var userIds= new Array();             " 
                + "   for(i = 0; i < values.length; i++) {  " 
                + "      userIds=userIds.concat(values[i].userIds);" 
                + "   }        " 
                + "   userIds.sort();         " 
                + "   for(i = 0; i < userIds.length; i++) {" 
                + "         if( userIds[i] == userIds[i+1]) { continue;}" 
                + "         temp[temp.length]=userIds[i];" 
                + "   }        " 
                + "   return {uv:temp.length,pv:userIds.length,userIds:userIds};" 
                + " }"; 
 
        MapReduceOutput mapReduceOutput = getMongoTemplate().getCollection("userVisitRecord").mapReduce(map,reduce,"tmp",null); 
        DBCollection resultColl = mapReduceOutput.getOutputCollection(); 
        try { 
            DBCursor cursor = resultColl.find(); 
            while (cursor.hasNext()) { 
                DBObject dbObject = cursor.next(); 
                if (dbObject.get("value") != null) { 
                    UserStaticModel userStaticModel=new UserStaticModel(); 
                    userStaticModel.setUv(Math.round((double)((DBObject) dbObject.get("value")).get("uv"))); 
                    userStaticModel.setPv(Math.round((double) ((DBObject) dbObject.get("value")).get("pv"))); 
                    List<String>userIds=(List) ((DBObject) dbObject.get("value")).get("userIds"); 
                    Set<String> idSet=new HashSet<>(userIds); 
                    userStaticModel.setUserIds(new ArrayList(idSet)); 
                    userStaticModel.setDate(date); 
                    getMongoTemplate().insert(userStaticModel); 
                } 
            } 
        }catch (Exception e){ 
            e.printStackTrace(); 
        }finally { 
            resultColl.drop(); 
        } 
    } 
 
    public UserStaticModel findStatic(String date) { 
        Query query=new Query(); 
        query.addCriteria(Criteria.where("date").is(date)); 
        return getMongoTemplate().findOne(query,UserStaticModel.class); 
    } 
}

    这段代码中staticUserPvUv方法统计某天用户访问的Pv Uv。

  map  reduce方法如下:

  String map = "function() {                                              " 
                + "   if(this.date=='"+date+"'){                          " 
                + "      emit(this.date ,{uv:1,pv:1,userIds:this.userId}) " 
                + "     }" 
                + "   } "; 
  String reduce = "function(key, values) {            " 
                + "   var temp = new Array();               " 
                + "   var userIds= new Array();             " 
                + "   for(i = 0; i < values.length; i++) {  " 
                + "      userIds=userIds.concat(values[i].userIds);" 
                + "   }        " 
                + "   userIds.sort();         " 
                + "   for(i = 0; i < userIds.length; i++) {" 
                + "         if( userIds[i] == userIds[i+1]) { continue;}" 
                + "         temp[temp.length]=userIds[i];" 
                + "   }        " 
                + "   return {uv:temp.length,pv:userIds.length,userIds:userIds};" 
                + " }";

看到这里很多人会疑惑:map方法为啥emit为

emit(this.date ,{uv:1,pv:1,userIds:this.userId})

而不直接

emit(this.date ,{userId:this.userId})

刚刚开始我也是这么写的,这么写会产生以下结果:

  1. 当某天只有一条记录:该记录就不走reduce ,直接出来,你得到的value就只有一个userId字符串,其他啥也没有。pv,uv 自然也没有。所以 你在emit 应该初始化{pv:1,uv:1,userIds:this.userId}

  2. 当某天记录特别多,超过100条的emit,mongo比较缺德的是,它会把这100的reduce的结果重新自动emit,所以这里把map中emit的对象结构和reduce的return返回的对象结构写成一致的原因。同一个key ,当每超过100个emit,结果就会从新emit,所以这个结果的pv uv 是无效的,这里只会用到重新emit的userIds,然后在继续在reduce进行统计。

这两个点是mongo mapreduce 比较坑爹的地方。注意这两点其他都OK了

转发标柱来源:http://my.oschina.net/robinyao/blog/467591

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/8965.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论