最近工作上遇到一个需求:需要根据nginx日志去统计每个域名的qps(Query Per Second,每秒查询率)数据。
解决了日志读取等问题之后,为了写一个尽可能高效的统计模块,我决定用多线程去计数,然后将统计结果保存在Map中。用多线程去计数的需求还是比较常见的。
HashMap 线程不安全,操作时只能加synchronized,结果还是单线程的计数,效率太低。ConcurrentHashMap是线程安全的,就用它了。
先看第一版代码:
1 // 先定义一个全局的Map 2 private Map<String, Integer> counter = new ConcurrentHashMap<>(); 3 4 // 统计方法 5 private static final String separator = "|-|"; 6 public void countLog(NginxLog nginxLog) { 7 String key = nginxLog.getHost() + separator + nginxLog.getDate(); 8 // 先取一下之前的值,然后再加一插入进去 9 Integer oldValue = counter.putIfAbsent(key, 1); 10 if (oldValue != null) { 11 counter.put(key, oldValue.intValue() + 1); 12 } 13 }
这段统计代码显然是不行的,ConcurrentHashMap虽然是线程安全类,并且也能保证所提供的方法是线程安全的,但是这并不代表使用它你的程序就是线程安全的。
在这段代码中counter.putIfAbsent()操作是原子性操作,counter.put()也是原子操作。但两者组合起来这就产生问题了。
我们举个例子:比如说现在有两个线程先后运行到counter.putIfAbsent()方法,然后两个线程都取到了同样的oldValue值,假设此值为10,然后两个线程都将执行counter.put()方法,此时两个线程都是在执行counter.put(key, 11)。这显然是不合理的,计数次数理应为12的。
为了解决这个问题,我想到了两种思路:
1、给countLog方法加上synchronized同步,如此使用ConcurrentHashMap就没有多大必要了,改成HashMap好了,这就是最开始的思路,代码如下:
1 private Map<String, Integer> counter = new HashMap<>(); 2 3 public synchronized void countLog(NginxLog nginxLog) { 4 String key = nginxLog.getHost() + separator + nginxLog.getDate(); 5 // 先取一下之前的值,然后再加一插入进去 6 Integer oldValue = counter.putIfAbsent(key, 1); 7 if (oldValue != null) { 8 counter.put(key, oldValue.intValue() + 1); 9 } 10 }
执行测试运行结果为:
2017-11-28 14:43:17,292 INFO NginxLogCountTest - count: 100000, costTime: 23 ms
因为加了同步锁,相当于计数都是单线程在进行的,因此统计结果也是正确的,耗时23ms
2、第二种思路,使用AtomicInteger类计数。ConcurrentHashMap和AtomicInteger类组合。代码如下:
1 private Map<String, AtomicInteger> counter = new ConcurrentHashMap<>(); 2 3 public void countLog(NginxLog nginxLog) { 4 String key = nginxLog.getHost() + separator + nginxLog.getDate(); 5 AtomicInteger oldValue = counter.putIfAbsent(key, new AtomicInteger(1)); 6 if (oldValue != null) { 7 oldValue.incrementAndGet(); 8 } 9 }
执行测试运行结果为:
2017-11-28 14:53:14,655 INFO NginxLogCountTest - count: 100000, costTime: 11 ms
这种解决方案里面将AtomicInteger和ConcurrentHashMap组合到一起,counter.putIfAbsent()执行后可以获得当前值的AtomicInteger对象,这个时候使用AtomicInteger对象的incrementAndGet方法。这种组合相当于将两步操作分担给两个线程安全类来处理了。
从执行时间来看相对于单线程计数也还是有一定优势的。
最后附上测试用的代码:
1 @Resource 2 private NginxLogCount nginxLogCount; 3 4 @Test 5 public void testCountLog() { 6 NginxLog nginxLog = new NginxLog(); 7 nginxLog.setHost("test.com"); 8 nginxLog.setDate("2017-11-28T11:43:46+08:00"); 9 String key = nginxLog.getHost() + "|-|" + nginxLog.getDate(); 10 long startTime = System.currentTimeMillis(); 11 for (int j = 0; j < 10; j++) { 12 Thread thread = new Thread(new Runnable() { 13 @Override 14 public void run() { 15 for (int i = 0; i < 10000; i++) { 16 nginxLogCount.countLog(nginxLog); 17 } 18 } 19 }); 20 thread.setDaemon(false); 21 thread.start(); 22 } 23 long endTime = System.currentTimeMillis(); 24 try { 25 // 等待运行结束 26 TimeUnit.SECONDS.sleep(10); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 log.info("count: {}, costTime: {} ms", nginxLogCount.getValue(key), endTime - startTime); 31 } 32 33 // getValue方法 34 public int getValue(String key) { 35 AtomicInteger value = counter.get(key); 36 return value == null ? 0 : value.intValue(); 37 }
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11352.html