概述
Guava包里的Service接口用于封装一个服务对象的运行状态、包括start和stop等方法。例如web服务器,RPC服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。
使用一个服务
一个服务正常生命周期有:
- Service.State.NEW
- Service.State.STARTING
- Service.State.RUNNING
- Service.State.STOPPING
- Service.State.TERMINATED
服务一旦被停止就无法再重新启动了。如果服务在starting、running、stopping状态出现问题、会进入Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回this对象形成方法调用链。注意:只有在当前服务的状态是NEW时才能调用startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法stopAsync() 。但是不像startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。
Service也提供了一些方法用于等待服务状态转换的完成:
通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加Listener(这时的状态是NEW)、否则之前已发生的状态转换事件是无法在新添加的Listener上被重新触发的。
同步使用awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出IllegalStateException异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。
Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承Guava包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。
基础实现类
AbstractIdleService
AbstractIdleService 类简单实现了Service接口、其在running状态时不会执行任何动作–因此在running时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承AbstractIdleService类,然后自己实现startUp() 和shutDown()方法就可以了。
protected void startUp() { servlets.add(new GcStatsServlet()); } protected void shutDown() {}
如上面的例子、由于任何请求到GcStatsServlet时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。
AbstractExecutionThreadService
AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:
public void run() { while (isRunning()) { // perform a unit of work } }
另外,你还可以重载triggerShutdown()方法让run()方法结束返回。
重载startUp()和shutDown()方法是可选的,不影响服务本身状态的管理
protected void startUp() { dispatcher.listenForConnections(port, queue); } protected void run() { Connection connection; while ((connection = queue.take() != POISON)) { process(connection); } } protected void triggerShutdown() { dispatcher.stopListeningForConnections(queue); queue.put(POISON); }
start()内部会调用startUp()方法,创建一个线程、然后在线程内调用run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。
AbstractScheduledService
AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,你需要实现scheduler()方法。通常情况下,你可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现;具体用法见javadoc。
AbstractService
如需要自定义的线程管理、可以通过扩展 AbstractService类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承AbstractService类。
继承AbstractService方法必须实现两个方法.
- doStart(): 首次调用startAsync()时会同时调用doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用notifyStarted()方法;启动失败则调用notifyFailed()
- doStop(): 首次调用stopAsync()会同时调用doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。
doStart和doStop方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。
使用ServiceManager
除了对Service接口提供基础的实现类,Guava还提供了 ServiceManager类使得涉及到多个Service集合的操作更加容易。通过实例化ServiceManager类来创建一个Service集合,你可以通过以下方法来管理它们:
- startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
- stopAsync() :将停止所有被管理的服务。
- addListener :会添加一个ServiceManager.Listener,在服务状态转换中会调用该Listener
- awaitHealthy() :会等待所有的服务达到Running状态
- awaitStopped():会等待所有服务达到终止状态
检测类的方法有:
- isHealthy() :如果所有的服务处于Running状态、会返回True
- servicesByState():以状态为索引返回当前所有服务的快照
- startupTimes() :返回一个Map对象,记录被管理的服务启动的耗时、以毫秒为单位,同时Map默认按启动时间排序。
我们建议整个服务的生命周期都能通过ServiceManager来管理,不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。例如:当一个服务不是通过startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于New状态。
附:TestCase、也可以作为练习Demo
ServiceTest
</pre> /* * Copyright (C) 2013 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static com.google.common.util.concurrent.Service.State.FAILED; import static com.google.common.util.concurrent.Service.State.NEW; import static com.google.common.util.concurrent.Service.State.RUNNING; import static com.google.common.util.concurrent.Service.State.STARTING; import static com.google.common.util.concurrent.Service.State.STOPPING; import static com.google.common.util.concurrent.Service.State.TERMINATED; import junit.framework.TestCase; /** * Unit tests for {@link Service} */ public class ServiceTest extends TestCase { /** Assert on the comparison ordering of the State enum since we guarantee it. */ public void testStateOrdering() { // List every valid (direct) state transition. assertLessThan(NEW, STARTING); assertLessThan(NEW, TERMINATED); assertLessThan(STARTING, RUNNING); assertLessThan(STARTING, STOPPING); assertLessThan(STARTING, FAILED); assertLessThan(RUNNING, STOPPING); assertLessThan(RUNNING, FAILED); assertLessThan(STOPPING, FAILED); assertLessThan(STOPPING, TERMINATED); } private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) { if (a.compareTo(b) >= 0) { fail(String.format("Expected %s to be less than %s", a, b)); } } } <pre>
AbstractIdleServiceTest
/* * Copyright (C) 2009 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static org.truth0.Truth.ASSERT; import com.google.common.collect.Lists; import junit.framework.TestCase; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Tests for {@link AbstractIdleService}. * * @author Chris Nokleberg * @author Ben Yu */ public class AbstractIdleServiceTest extends TestCase { // Functional tests using real thread. We only verify publicly visible state. // Interaction assertions are done by the single-threaded unit tests. public static class FunctionalTest extends TestCase { private static class DefaultService extends AbstractIdleService { @Override protected void startUp() throws Exception {} @Override protected void shutDown() throws Exception {} } public void testServiceStartStop() throws Exception { AbstractIdleService service = new DefaultService(); service.startAsync().awaitRunning(); assertEquals(Service.State.RUNNING, service.state()); service.stopAsync().awaitTerminated(); assertEquals(Service.State.TERMINATED, service.state()); } public void testStart_failed() throws Exception { final Exception exception = new Exception("deliberate"); AbstractIdleService service = new DefaultService() { @Override protected void startUp() throws Exception { throw exception; } }; try { service.startAsync().awaitRunning(); fail(); } catch (RuntimeException e) { assertSame(exception, e.getCause()); } assertEquals(Service.State.FAILED, service.state()); } public void testStop_failed() throws Exception { final Exception exception = new Exception("deliberate"); AbstractIdleService service = new DefaultService() { @Override protected void shutDown() throws Exception { throw exception; } }; service.startAsync().awaitRunning(); try { service.stopAsync().awaitTerminated(); fail(); } catch (RuntimeException e) { assertSame(exception, e.getCause()); } assertEquals(Service.State.FAILED, service.state()); } } public void testStart() { TestService service = new TestService(); assertEquals(0, service.startUpCalled); service.startAsync().awaitRunning(); assertEquals(1, service.startUpCalled); assertEquals(Service.State.RUNNING, service.state()); ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder(); } public void testStart_failed() { final Exception exception = new Exception("deliberate"); TestService service = new TestService() { @Override protected void startUp() throws Exception { super.startUp(); throw exception; } }; assertEquals(0, service.startUpCalled); try { service.startAsync().awaitRunning(); fail(); } catch (RuntimeException e) { assertSame(exception, e.getCause()); } assertEquals(1, service.startUpCalled); assertEquals(Service.State.FAILED, service.state()); ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder(); } public void testStop_withoutStart() { TestService service = new TestService(); service.stopAsync().awaitTerminated(); assertEquals(0, service.startUpCalled); assertEquals(0, service.shutDownCalled); assertEquals(Service.State.TERMINATED, service.state()); ASSERT.that(service.transitionStates).isEmpty(); } public void testStop_afterStart() { TestService service = new TestService(); service.startAsync().awaitRunning(); assertEquals(1, service.startUpCalled); assertEquals(0, service.shutDownCalled); service.stopAsync().awaitTerminated(); assertEquals(1, service.startUpCalled); assertEquals(1, service.shutDownCalled); assertEquals(Service.State.TERMINATED, service.state()); ASSERT.that(service.transitionStates) .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder(); } public void testStop_failed() { final Exception exception = new Exception("deliberate"); TestService service = new TestService() { @Override protected void shutDown() throws Exception { super.shutDown(); throw exception; } }; service.startAsync().awaitRunning(); assertEquals(1, service.startUpCalled); assertEquals(0, service.shutDownCalled); try { service.stopAsync().awaitTerminated(); fail(); } catch (RuntimeException e) { assertSame(exception, e.getCause()); } assertEquals(1, service.startUpCalled); assertEquals(1, service.shutDownCalled); assertEquals(Service.State.FAILED, service.state()); ASSERT.that(service.transitionStates) .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder(); } public void testServiceToString() { AbstractIdleService service = new TestService(); assertEquals("TestService [NEW]", service.toString()); service.startAsync().awaitRunning(); assertEquals("TestService [RUNNING]", service.toString()); service.stopAsync().awaitTerminated(); assertEquals("TestService [TERMINATED]", service.toString()); } public void testTimeout() throws Exception { // Create a service whose executor will never run its commands Service service = new TestService() { @Override protected Executor executor() { return new Executor() { @Override public void execute(Runnable command) {} }; } }; try { service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS); fail("Expected timeout"); } catch (TimeoutException e) { ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString()); } } private static class TestService extends AbstractIdleService { int startUpCalled = 0; int shutDownCalled = 0; final List<State> transitionStates = Lists.newArrayList(); @Override protected void startUp() throws Exception { assertEquals(0, startUpCalled); assertEquals(0, shutDownCalled); startUpCalled++; assertEquals(State.STARTING, state()); } @Override protected void shutDown() throws Exception { assertEquals(1, startUpCalled); assertEquals(0, shutDownCalled); shutDownCalled++; assertEquals(State.STOPPING, state()); } @Override protected Executor executor() { transitionStates.add(state()); return MoreExecutors.sameThreadExecutor(); } } } <pre>
AbstractScheduledServiceTest
</pre> /* * Copyright (C) 2011 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import com.google.common.util.concurrent.Service.State; import junit.framework.TestCase; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Unit test for {@link AbstractScheduledService}. * * @author Luke Sandberg */ public class AbstractScheduledServiceTest extends TestCase { volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS); volatile ScheduledFuture<?> future = null; volatile boolean atFixedRateCalled = false; volatile boolean withFixedDelayCalled = false; volatile boolean scheduleCalled = false; final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) { @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }; public void testServiceStartStop() throws Exception { NullService service = new NullService(); service.startAsync().awaitRunning(); assertFalse(future.isDone()); service.stopAsync().awaitTerminated(); assertTrue(future.isCancelled()); } private class NullService extends AbstractScheduledService { @Override protected void runOneIteration() throws Exception {} @Override protected Scheduler scheduler() { return configuration; } @Override protected ScheduledExecutorService executor() { return executor; } } public void testFailOnExceptionFromRun() throws Exception { TestService service = new TestService(); service.runException = new Exception(); service.startAsync().awaitRunning(); service.runFirstBarrier.await(); service.runSecondBarrier.await(); try { future.get(); fail(); } catch (ExecutionException e) { // An execution exception holds a runtime exception (from throwables.propogate) that holds our // original exception. assertEquals(service.runException, e.getCause().getCause()); } assertEquals(service.state(), Service.State.FAILED); } public void testFailOnExceptionFromStartUp() { TestService service = new TestService(); service.startUpException = new Exception(); try { service.startAsync().awaitRunning(); fail(); } catch (IllegalStateException e) { assertEquals(service.startUpException, e.getCause()); } assertEquals(0, service.numberOfTimesRunCalled.get()); assertEquals(Service.State.FAILED, service.state()); } public void testFailOnExceptionFromShutDown() throws Exception { TestService service = new TestService(); service.shutDownException = new Exception(); service.startAsync().awaitRunning(); service.runFirstBarrier.await(); service.stopAsync(); service.runSecondBarrier.await(); try { service.awaitTerminated(); fail(); } catch (IllegalStateException e) { assertEquals(service.shutDownException, e.getCause()); } assertEquals(Service.State.FAILED, service.state()); } public void testRunOneIterationCalledMultipleTimes() throws Exception { TestService service = new TestService(); service.startAsync().awaitRunning(); for (int i = 1; i < 10; i++) { service.runFirstBarrier.await(); assertEquals(i, service.numberOfTimesRunCalled.get()); service.runSecondBarrier.await(); } service.runFirstBarrier.await(); service.stopAsync(); service.runSecondBarrier.await(); service.stopAsync().awaitTerminated(); } public void testExecutorOnlyCalledOnce() throws Exception { TestService service = new TestService(); service.startAsync().awaitRunning(); // It should be called once during startup. assertEquals(1, service.numberOfTimesExecutorCalled.get()); for (int i = 1; i < 10; i++) { service.runFirstBarrier.await(); assertEquals(i, service.numberOfTimesRunCalled.get()); service.runSecondBarrier.await(); } service.runFirstBarrier.await(); service.stopAsync(); service.runSecondBarrier.await(); service.stopAsync().awaitTerminated(); // Only called once overall. assertEquals(1, service.numberOfTimesExecutorCalled.get()); } public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { final CountDownLatch terminationLatch = new CountDownLatch(1); AbstractScheduledService service = new AbstractScheduledService() { volatile ScheduledExecutorService executorService; @Override protected void runOneIteration() throws Exception {} @Override protected ScheduledExecutorService executor() { if (executorService == null) { executorService = super.executor(); // Add a listener that will be executed after the listener that shuts down the executor. addListener(new Listener() { @Override public void terminated(State from) { terminationLatch.countDown(); } }, MoreExecutors.sameThreadExecutor()); } return executorService; } @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); }}; service.startAsync(); assertFalse(service.executor().isShutdown()); service.awaitRunning(); service.stopAsync(); terminationLatch.await(); assertTrue(service.executor().isShutdown()); assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS)); } public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { final CountDownLatch failureLatch = new CountDownLatch(1); AbstractScheduledService service = new AbstractScheduledService() { volatile ScheduledExecutorService executorService; @Override protected void runOneIteration() throws Exception {} @Override protected void startUp() throws Exception { throw new Exception("Failed"); } @Override protected ScheduledExecutorService executor() { if (executorService == null) { executorService = super.executor(); // Add a listener that will be executed after the listener that shuts down the executor. addListener(new Listener() { @Override public void failed(State from, Throwable failure) { failureLatch.countDown(); } }, MoreExecutors.sameThreadExecutor()); } return executorService; } @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); }}; try { service.startAsync().awaitRunning(); fail("Expected service to fail during startup"); } catch (IllegalStateException expected) {} failureLatch.await(); assertTrue(service.executor().isShutdown()); assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS)); } public void testSchedulerOnlyCalledOnce() throws Exception { TestService service = new TestService(); service.startAsync().awaitRunning(); // It should be called once during startup. assertEquals(1, service.numberOfTimesSchedulerCalled.get()); for (int i = 1; i < 10; i++) { service.runFirstBarrier.await(); assertEquals(i, service.numberOfTimesRunCalled.get()); service.runSecondBarrier.await(); } service.runFirstBarrier.await(); service.stopAsync(); service.runSecondBarrier.await(); service.awaitTerminated(); // Only called once overall. assertEquals(1, service.numberOfTimesSchedulerCalled.get()); } private class TestService extends AbstractScheduledService { CyclicBarrier runFirstBarrier = new CyclicBarrier(2); CyclicBarrier runSecondBarrier = new CyclicBarrier(2); volatile boolean startUpCalled = false; volatile boolean shutDownCalled = false; AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); volatile Exception runException = null; volatile Exception startUpException = null; volatile Exception shutDownException = null; @Override protected void runOneIteration() throws Exception { assertTrue(startUpCalled); assertFalse(shutDownCalled); numberOfTimesRunCalled.incrementAndGet(); assertEquals(State.RUNNING, state()); runFirstBarrier.await(); runSecondBarrier.await(); if (runException != null) { throw runException; } } @Override protected void startUp() throws Exception { assertFalse(startUpCalled); assertFalse(shutDownCalled); startUpCalled = true; assertEquals(State.STARTING, state()); if (startUpException != null) { throw startUpException; } } @Override protected void shutDown() throws Exception { assertTrue(startUpCalled); assertFalse(shutDownCalled); shutDownCalled = true; if (shutDownException != null) { throw shutDownException; } } @Override protected ScheduledExecutorService executor() { numberOfTimesExecutorCalled.incrementAndGet(); return executor; } @Override protected Scheduler scheduler() { numberOfTimesSchedulerCalled.incrementAndGet(); return configuration; } } public static class SchedulerTest extends TestCase { // These constants are arbitrary and just used to make sure that the correct method is called // with the correct parameters. private static final int initialDelay = 10; private static final int delay = 20; private static final TimeUnit unit = TimeUnit.MILLISECONDS; // Unique runnable object used for comparison. final Runnable testRunnable = new Runnable() {@Override public void run() {}}; boolean called = false; private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, long delay, TimeUnit unit) { assertFalse(called); // only called once. called = true; assertEquals(SchedulerTest.initialDelay, initialDelay); assertEquals(SchedulerTest.delay, delay); assertEquals(SchedulerTest.unit, unit); assertEquals(testRunnable, command); } public void testFixedRateSchedule() { Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); schedule.schedule(null, new ScheduledThreadPoolExecutor(1) { @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); return null; } }, testRunnable); assertTrue(called); } public void testFixedDelaySchedule() { Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit); schedule.schedule(null, new ScheduledThreadPoolExecutor(10) { @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); return null; } }, testRunnable); assertTrue(called); } private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { public AtomicInteger scheduleCounter = new AtomicInteger(0); @Override protected Schedule getNextSchedule() throws Exception { scheduleCounter.incrementAndGet(); return new Schedule(0, TimeUnit.SECONDS); } } public void testCustomSchedule_startStop() throws Exception { final CyclicBarrier firstBarrier = new CyclicBarrier(2); final CyclicBarrier secondBarrier = new CyclicBarrier(2); final AtomicBoolean shouldWait = new AtomicBoolean(true); Runnable task = new Runnable() { @Override public void run() { try { if (shouldWait.get()) { firstBarrier.await(); secondBarrier.await(); } } catch (Exception e) { throw new RuntimeException(e); } } }; TestCustomScheduler scheduler = new TestCustomScheduler(); Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); firstBarrier.await(); assertEquals(1, scheduler.scheduleCounter.get()); secondBarrier.await(); firstBarrier.await(); assertEquals(2, scheduler.scheduleCounter.get()); shouldWait.set(false); secondBarrier.await(); future.cancel(false); } public void testCustomSchedulerServiceStop() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); service.startAsync().awaitRunning(); service.firstBarrier.await(); assertEquals(1, service.numIterations.get()); service.stopAsync(); service.secondBarrier.await(); service.awaitTerminated(); // Sleep for a while just to ensure that our task wasn't called again. Thread.sleep(unit.toMillis(3 * delay)); assertEquals(1, service.numIterations.get()); } public void testBig() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { // Explicitly yield to increase the probability of a pathological scheduling. Thread.yield(); return new Schedule(0, TimeUnit.SECONDS); } }; } }; service.useBarriers = false; service.startAsync().awaitRunning(); Thread.sleep(50); service.useBarriers = true; service.firstBarrier.await(); int numIterations = service.numIterations.get(); service.stopAsync(); service.secondBarrier.await(); service.awaitTerminated(); assertEquals(numIterations, service.numIterations.get()); } private static class TestAbstractScheduledCustomService extends AbstractScheduledService { final AtomicInteger numIterations = new AtomicInteger(0); volatile boolean useBarriers = true; final CyclicBarrier firstBarrier = new CyclicBarrier(2); final CyclicBarrier secondBarrier = new CyclicBarrier(2); @Override protected void runOneIteration() throws Exception { numIterations.incrementAndGet(); if (useBarriers) { firstBarrier.await(); secondBarrier.await(); } } @Override protected ScheduledExecutorService executor() { // use a bunch of threads so that weird overlapping schedules are more likely to happen. return Executors.newScheduledThreadPool(10); } @Override protected void startUp() throws Exception {} @Override protected void shutDown() throws Exception {} @Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(delay, unit); }}; } } public void testCustomSchedulerFailure() throws Exception { TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); service.startAsync().awaitRunning(); for (int i = 1; i < 4; i++) { service.firstBarrier.await(); assertEquals(i, service.numIterations.get()); service.secondBarrier.await(); } Thread.sleep(1000); try { service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS); fail(); } catch (IllegalStateException e) { assertEquals(State.FAILED, service.state()); } } private static class TestFailingCustomScheduledService extends AbstractScheduledService { final AtomicInteger numIterations = new AtomicInteger(0); final CyclicBarrier firstBarrier = new CyclicBarrier(2); final CyclicBarrier secondBarrier = new CyclicBarrier(2); @Override protected void runOneIteration() throws Exception { numIterations.incrementAndGet(); firstBarrier.await(); secondBarrier.await(); } @Override protected ScheduledExecutorService executor() { // use a bunch of threads so that weird overlapping schedules are more likely to happen. return Executors.newScheduledThreadPool(10); } @Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (numIterations.get() > 2) { throw new IllegalStateException("Failed"); } return new Schedule(delay, unit); }}; } } } } <pre>
AbstractServiceTest
</pre> /* * Copyright (C) 2009 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static java.lang.Thread.currentThread; import static java.util.concurrent.TimeUnit.SECONDS; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Service.Listener; import com.google.common.util.concurrent.Service.State; import junit.framework.TestCase; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; /** * Unit test for {@link AbstractService}. * * @author Jesse Wilson */ public class AbstractServiceTest extends TestCase { private Thread executionThread; private Throwable thrownByExecutionThread; public void testNoOpServiceStartStop() throws Exception { NoOpService service = new NoOpService(); RecordingListener listener = RecordingListener.record(service); assertEquals(State.NEW, service.state()); assertFalse(service.isRunning()); assertFalse(service.running); service.startAsync(); assertEquals(State.RUNNING, service.state()); assertTrue(service.isRunning()); assertTrue(service.running); service.stopAsync(); assertEquals(State.TERMINATED, service.state()); assertFalse(service.isRunning()); assertFalse(service.running); assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.STOPPING, State.TERMINATED), listener.getStateHistory()); } public void testNoOpServiceStartAndWaitStopAndWait() throws Exception { NoOpService service = new NoOpService(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); } public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception { NoOpService service = new NoOpService(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); } public void testNoOpServiceStopIdempotence() throws Exception { NoOpService service = new NoOpService(); RecordingListener listener = RecordingListener.record(service); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.stopAsync(); service.stopAsync(); assertEquals(State.TERMINATED, service.state()); assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.STOPPING, State.TERMINATED), listener.getStateHistory()); } public void testNoOpServiceStopIdempotenceAfterWait() throws Exception { NoOpService service = new NoOpService(); service.startAsync().awaitRunning(); service.stopAsync().awaitTerminated(); service.stopAsync(); assertEquals(State.TERMINATED, service.state()); } public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception { NoOpService service = new NoOpService(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.stopAsync().awaitTerminated(); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); } public void testNoOpServiceStartStopAndWaitUninterruptible() throws Exception { NoOpService service = new NoOpService(); currentThread().interrupt(); try { service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); assertTrue(currentThread().isInterrupted()); } finally { Thread.interrupted(); // clear interrupt for future tests } } private static class NoOpService extends AbstractService { boolean running = false; @Override protected void doStart() { assertFalse(running); running = true; notifyStarted(); } @Override protected void doStop() { assertTrue(running); running = false; notifyStopped(); } } public void testManualServiceStartStop() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); assertEquals(State.STARTING, service.state()); assertFalse(service.isRunning()); assertTrue(service.doStartCalled); service.notifyStarted(); // usually this would be invoked by another thread assertEquals(State.RUNNING, service.state()); assertTrue(service.isRunning()); service.stopAsync(); assertEquals(State.STOPPING, service.state()); assertFalse(service.isRunning()); assertTrue(service.doStopCalled); service.notifyStopped(); // usually this would be invoked by another thread assertEquals(State.TERMINATED, service.state()); assertFalse(service.isRunning()); assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.STOPPING, State.TERMINATED), listener.getStateHistory()); } public void testManualServiceNotifyStoppedWhileRunning() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); service.notifyStarted(); service.notifyStopped(); assertEquals(State.TERMINATED, service.state()); assertFalse(service.isRunning()); assertFalse(service.doStopCalled); assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.TERMINATED), listener.getStateHistory()); } public void testManualServiceStopWhileStarting() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); assertEquals(State.STARTING, service.state()); assertFalse(service.isRunning()); assertTrue(service.doStartCalled); service.stopAsync(); assertEquals(State.STOPPING, service.state()); assertFalse(service.isRunning()); assertFalse(service.doStopCalled); service.notifyStarted(); assertEquals(State.STOPPING, service.state()); assertFalse(service.isRunning()); assertTrue(service.doStopCalled); service.notifyStopped(); assertEquals(State.TERMINATED, service.state()); assertFalse(service.isRunning()); assertEquals( ImmutableList.of( State.STARTING, State.STOPPING, State.TERMINATED), listener.getStateHistory()); } /** * This tests for a bug where if {@link Service#stopAsync()} was called while the service was * {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get * called multiple times. */ public void testManualServiceStopMultipleTimesWhileStarting() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); final AtomicInteger stopppingCount = new AtomicInteger(); service.addListener(new Listener() { @Override public void stopping(State from) { stopppingCount.incrementAndGet(); } }, MoreExecutors.sameThreadExecutor()); service.startAsync(); service.stopAsync(); assertEquals(1, stopppingCount.get()); service.stopAsync(); assertEquals(1, stopppingCount.get()); } public void testManualServiceStopWhileNew() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.stopAsync(); assertEquals(State.TERMINATED, service.state()); assertFalse(service.isRunning()); assertFalse(service.doStartCalled); assertFalse(service.doStopCalled); assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory()); } public void testManualServiceFailWhileStarting() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); service.notifyFailed(EXCEPTION); assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory()); } public void testManualServiceFailWhileRunning() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); service.notifyStarted(); service.notifyFailed(EXCEPTION); assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED), listener.getStateHistory()); } public void testManualServiceFailWhileStopping() throws Exception { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); service.notifyStarted(); service.stopAsync(); service.notifyFailed(EXCEPTION); assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED), listener.getStateHistory()); } public void testManualServiceUnrequestedStop() { ManualSwitchedService service = new ManualSwitchedService(); service.startAsync(); service.notifyStarted(); assertEquals(State.RUNNING, service.state()); assertTrue(service.isRunning()); assertFalse(service.doStopCalled); service.notifyStopped(); assertEquals(State.TERMINATED, service.state()); assertFalse(service.isRunning()); assertFalse(service.doStopCalled); } /** * The user of this service should call {@link #notifyStarted} and {@link * #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}. */ private static class ManualSwitchedService extends AbstractService { boolean doStartCalled = false; boolean doStopCalled = false; @Override protected void doStart() { assertFalse(doStartCalled); doStartCalled = true; } @Override protected void doStop() { assertFalse(doStopCalled); doStopCalled = true; } } public void testAwaitTerminated() throws Exception { final NoOpService service = new NoOpService(); Thread waiter = new Thread() { @Override public void run() { service.awaitTerminated(); } }; waiter.start(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.stopAsync(); waiter.join(100); // ensure that the await in the other thread is triggered assertFalse(waiter.isAlive()); } public void testAwaitTerminated_FailedService() throws Exception { final ManualSwitchedService service = new ManualSwitchedService(); final AtomicReference<Throwable> exception = Atomics.newReference(); Thread waiter = new Thread() { @Override public void run() { try { service.awaitTerminated(); fail("Expected an IllegalStateException"); } catch (Throwable t) { exception.set(t); } } }; waiter.start(); service.startAsync(); service.notifyStarted(); assertEquals(State.RUNNING, service.state()); service.notifyFailed(EXCEPTION); assertEquals(State.FAILED, service.state()); waiter.join(100); assertFalse(waiter.isAlive()); assertTrue(exception.get() instanceof IllegalStateException); assertEquals(EXCEPTION, exception.get().getCause()); } public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable { ThreadedService service = new ThreadedService(); RecordingListener listener = RecordingListener.record(service); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.awaitRunChecks(); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); throwIfSet(thrownByExecutionThread); assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.STOPPING, State.TERMINATED), listener.getStateHistory()); } public void testThreadedServiceStopIdempotence() throws Throwable { ThreadedService service = new ThreadedService(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.awaitRunChecks(); service.stopAsync(); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); throwIfSet(thrownByExecutionThread); } public void testThreadedServiceStopIdempotenceAfterWait() throws Throwable { ThreadedService service = new ThreadedService(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.awaitRunChecks(); service.stopAsync().awaitTerminated(); service.stopAsync(); assertEquals(State.TERMINATED, service.state()); executionThread.join(); throwIfSet(thrownByExecutionThread); } public void testThreadedServiceStopIdempotenceDoubleWait() throws Throwable { ThreadedService service = new ThreadedService(); service.startAsync().awaitRunning(); assertEquals(State.RUNNING, service.state()); service.awaitRunChecks(); service.stopAsync().awaitTerminated(); service.stopAsync().awaitTerminated(); assertEquals(State.TERMINATED, service.state()); throwIfSet(thrownByExecutionThread); } public void testManualServiceFailureIdempotence() { ManualSwitchedService service = new ManualSwitchedService(); RecordingListener.record(service); service.startAsync(); service.notifyFailed(new Exception("1")); service.notifyFailed(new Exception("2")); assertEquals("1", service.failureCause().getMessage()); try { service.awaitRunning(); fail(); } catch (IllegalStateException e) { assertEquals("1", e.getCause().getMessage()); } } private class ThreadedService extends AbstractService { final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1); /* * The main test thread tries to stop() the service shortly after * confirming that it is running. Meanwhile, the service itself is trying * to confirm that it is running. If the main thread's stop() call happens * before it has the chance, the test will fail. To avoid this, the main * thread calls this method, which waits until the service has performed * its own "running" check. */ void awaitRunChecks() throws InterruptedException { assertTrue("Service thread hasn't finished its checks. " + "Exception status (possibly stale): " + thrownByExecutionThread, hasConfirmedIsRunning.await(10, SECONDS)); } @Override protected void doStart() { assertEquals(State.STARTING, state()); invokeOnExecutionThreadForTest(new Runnable() { @Override public void run() { assertEquals(State.STARTING, state()); notifyStarted(); assertEquals(State.RUNNING, state()); hasConfirmedIsRunning.countDown(); } }); } @Override protected void doStop() { assertEquals(State.STOPPING, state()); invokeOnExecutionThreadForTest(new Runnable() { @Override public void run() { assertEquals(State.STOPPING, state()); notifyStopped(); assertEquals(State.TERMINATED, state()); } }); } } private void invokeOnExecutionThreadForTest(Runnable runnable) { executionThread = new Thread(runnable); executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread thread, Throwable e) { thrownByExecutionThread = e; } }); executionThread.start(); } private static void throwIfSet(Throwable t) throws Throwable { if (t != null) { throw t; } } public void testStopUnstartedService() throws Exception { NoOpService service = new NoOpService(); RecordingListener listener = RecordingListener.record(service); service.stopAsync(); assertEquals(State.TERMINATED, service.state()); try { service.startAsync(); fail(); } catch (IllegalStateException expected) {} assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory())); } public void testFailingServiceStartAndWait() throws Exception { StartFailingService service = new StartFailingService(); RecordingListener listener = RecordingListener.record(service); try { service.startAsync().awaitRunning(); fail(); } catch (IllegalStateException e) { assertEquals(EXCEPTION, service.failureCause()); assertEquals(EXCEPTION, e.getCause()); } assertEquals( ImmutableList.of( State.STARTING, State.FAILED), listener.getStateHistory()); } public void testFailingServiceStopAndWait_stopFailing() throws Exception { StopFailingService service = new StopFailingService(); RecordingListener listener = RecordingListener.record(service); service.startAsync().awaitRunning(); try { service.stopAsync().awaitTerminated(); fail(); } catch (IllegalStateException e) { assertEquals(EXCEPTION, service.failureCause()); assertEquals(EXCEPTION, e.getCause()); } assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.STOPPING, State.FAILED), listener.getStateHistory()); } public void testFailingServiceStopAndWait_runFailing() throws Exception { RunFailingService service = new RunFailingService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); try { service.awaitRunning(); fail(); } catch (IllegalStateException e) { assertEquals(EXCEPTION, service.failureCause()); assertEquals(EXCEPTION, e.getCause()); } assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.FAILED), listener.getStateHistory()); } public void testThrowingServiceStartAndWait() throws Exception { StartThrowingService service = new StartThrowingService(); RecordingListener listener = RecordingListener.record(service); try { service.startAsync().awaitRunning(); fail(); } catch (IllegalStateException e) { assertEquals(service.exception, service.failureCause()); assertEquals(service.exception, e.getCause()); } assertEquals( ImmutableList.of( State.STARTING, State.FAILED), listener.getStateHistory()); } public void testThrowingServiceStopAndWait_stopThrowing() throws Exception { StopThrowingService service = new StopThrowingService(); RecordingListener listener = RecordingListener.record(service); service.startAsync().awaitRunning(); try { service.stopAsync().awaitTerminated(); fail(); } catch (IllegalStateException e) { assertEquals(service.exception, service.failureCause()); assertEquals(service.exception, e.getCause()); } assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.STOPPING, State.FAILED), listener.getStateHistory()); } public void testThrowingServiceStopAndWait_runThrowing() throws Exception { RunThrowingService service = new RunThrowingService(); RecordingListener listener = RecordingListener.record(service); service.startAsync(); try { service.awaitTerminated(); fail(); } catch (IllegalStateException e) { assertEquals(service.exception, service.failureCause()); assertEquals(service.exception, e.getCause()); } assertEquals( ImmutableList.of( State.STARTING, State.RUNNING, State.FAILED), listener.getStateHistory()); } public void testFailureCause_throwsIfNotFailed() { StopFailingService service = new StopFailingService(); try { service.failureCause(); fail(); } catch (IllegalStateException e) { // expected } service.startAsync().awaitRunning(); try { service.failureCause(); fail(); } catch (IllegalStateException e) { // expected } try { service.stopAsync().awaitTerminated(); fail(); } catch (IllegalStateException e) { assertEquals(EXCEPTION, service.failureCause()); assertEquals(EXCEPTION, e.getCause()); } } public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException { final StartFailingService service = new StartFailingService(); service.startAsync(); assertEquals(State.FAILED, service.state()); service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor()); Thread thread = new Thread() { @Override public void run() { // Internally stopAsync() grabs a lock, this could be any such method on AbstractService. service.stopAsync(); } }; thread.start(); thread.join(100); assertFalse(thread + " is deadlocked", thread.isAlive()); } public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception { final NoOpThreadedService service = new NoOpThreadedService(); service.addListener(new Listener() { @Override public void running() { service.awaitRunning(); } }, MoreExecutors.sameThreadExecutor()); service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS); service.stopAsync(); } public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception { final NoOpThreadedService service = new NoOpThreadedService(); service.addListener(new Listener() { @Override public void terminated(State from) { service.stopAsync().awaitTerminated(); } }, MoreExecutors.sameThreadExecutor()); service.startAsync().awaitRunning(); Thread thread = new Thread() { @Override public void run() { service.stopAsync().awaitTerminated(); } }; thread.start(); thread.join(100); assertFalse(thread + " is deadlocked", thread.isAlive()); } private static class NoOpThreadedService extends AbstractExecutionThreadService { final CountDownLatch latch = new CountDownLatch(1); @Override protected void run() throws Exception { latch.await(); } @Override protected void triggerShutdown() { latch.countDown(); } } private static class StartFailingService extends AbstractService { @Override protected void doStart() { notifyFailed(EXCEPTION); } @Override protected void doStop() { fail(); } } private static class RunFailingService extends AbstractService { @Override protected void doStart() { notifyStarted(); notifyFailed(EXCEPTION); } @Override protected void doStop() { fail(); } } private static class StopFailingService extends AbstractService { @Override protected void doStart() { notifyStarted(); } @Override protected void doStop() { notifyFailed(EXCEPTION); } } private static class StartThrowingService extends AbstractService { final RuntimeException exception = new RuntimeException("deliberate"); @Override protected void doStart() { throw exception; } @Override protected void doStop() { fail(); } } private static class RunThrowingService extends AbstractService { final RuntimeException exception = new RuntimeException("deliberate"); @Override protected void doStart() { notifyStarted(); throw exception; } @Override protected void doStop() { fail(); } } private static class StopThrowingService extends AbstractService { final RuntimeException exception = new RuntimeException("deliberate"); @Override protected void doStart() { notifyStarted(); } @Override protected void doStop() { throw exception; } } private static class RecordingListener extends Listener { static RecordingListener record(Service service) { RecordingListener listener = new RecordingListener(service); service.addListener(listener, MoreExecutors.sameThreadExecutor()); return listener; } final Service service; RecordingListener(Service service) { this.service = service; } @GuardedBy("this") final List<State> stateHistory = Lists.newArrayList(); final CountDownLatch completionLatch = new CountDownLatch(1); ImmutableList<State> getStateHistory() throws Exception { completionLatch.await(); synchronized (this) { return ImmutableList.copyOf(stateHistory); } } @Override public synchronized void starting() { assertTrue(stateHistory.isEmpty()); assertNotSame(State.NEW, service.state()); stateHistory.add(State.STARTING); } @Override public synchronized void running() { assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory)); stateHistory.add(State.RUNNING); service.awaitRunning(); assertNotSame(State.STARTING, service.state()); } @Override public synchronized void stopping(State from) { assertEquals(from, Iterables.getLast(stateHistory)); stateHistory.add(State.STOPPING); if (from == State.STARTING) { try { service.awaitRunning(); fail(); } catch (IllegalStateException expected) { assertNull(expected.getCause()); assertTrue(expected.getMessage().equals( "Expected the service to be RUNNING, but was STOPPING")); } } assertNotSame(from, service.state()); } @Override public synchronized void terminated(State from) { assertEquals(from, Iterables.getLast(stateHistory, State.NEW)); stateHistory.add(State.TERMINATED); assertEquals(State.TERMINATED, service.state()); if (from == State.NEW) { try { service.awaitRunning(); fail(); } catch (IllegalStateException expected) { assertNull(expected.getCause()); assertTrue(expected.getMessage().equals( "Expected the service to be RUNNING, but was TERMINATED")); } } completionLatch.countDown(); } @Override public synchronized void failed(State from, Throwable failure) { assertEquals(from, Iterables.getLast(stateHistory)); stateHistory.add(State.FAILED); assertEquals(State.FAILED, service.state()); assertEquals(failure, service.failureCause()); if (from == State.STARTING) { try { service.awaitRunning(); fail(); } catch (IllegalStateException e) { assertEquals(failure, e.getCause()); } } try { service.awaitTerminated(); fail(); } catch (IllegalStateException e) { assertEquals(failure, e.getCause()); } completionLatch.countDown(); } } public void testNotifyStartedWhenNotStarting() { AbstractService service = new DefaultService(); try { service.notifyStarted(); fail(); } catch (IllegalStateException expected) {} } public void testNotifyStoppedWhenNotRunning() { AbstractService service = new DefaultService(); try { service.notifyStopped(); fail(); } catch (IllegalStateException expected) {} } public void testNotifyFailedWhenNotStarted() { AbstractService service = new DefaultService(); try { service.notifyFailed(new Exception()); fail(); } catch (IllegalStateException expected) {} } public void testNotifyFailedWhenTerminated() { NoOpService service = new NoOpService(); service.startAsync().awaitRunning(); service.stopAsync().awaitTerminated(); try { service.notifyFailed(new Exception()); fail(); } catch (IllegalStateException expected) {} } private static class DefaultService extends AbstractService { @Override protected void doStart() {} @Override protected void doStop() {} } private static final Exception EXCEPTION = new Exception(); } <pre>
ServiceManagerTest
</pre> /* * Copyright (C) 2012 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static java.util.Arrays.asList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.testing.NullPointerTester; import com.google.common.testing.TestLogHandler; import com.google.common.util.concurrent.ServiceManager.Listener; import junit.framework.TestCase; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Formatter; import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; /** * Tests for {@link ServiceManager}. * * @author Luke Sandberg * @author Chris Nokleberg */ public class ServiceManagerTest extends TestCase { private static class NoOpService extends AbstractService { @Override protected void doStart() { notifyStarted(); } @Override protected void doStop() { notifyStopped(); } } /* * A NoOp service that will delay the startup and shutdown notification for a configurable amount * of time. */ private static class NoOpDelayedSerivce extends NoOpService { private long delay; public NoOpDelayedSerivce(long delay) { this.delay = delay; } @Override protected void doStart() { new Thread() { @Override public void run() { Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); notifyStarted(); } }.start(); } @Override protected void doStop() { new Thread() { @Override public void run() { Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); notifyStopped(); } }.start(); } } private static class FailStartService extends NoOpService { @Override protected void doStart() { notifyFailed(new IllegalStateException("failed")); } } private static class FailRunService extends NoOpService { @Override protected void doStart() { super.doStart(); notifyFailed(new IllegalStateException("failed")); } } private static class FailStopService extends NoOpService { @Override protected void doStop() { notifyFailed(new IllegalStateException("failed")); } } public void testServiceStartupTimes() { Service a = new NoOpDelayedSerivce(150); Service b = new NoOpDelayedSerivce(353); ServiceManager serviceManager = new ServiceManager(asList(a, b)); serviceManager.startAsync().awaitHealthy(); ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes(); assertEquals(2, startupTimes.size()); assertTrue(startupTimes.get(a) >= 150); assertTrue(startupTimes.get(b) >= 353); } public void testServiceStartStop() { Service a = new NoOpService(); Service b = new NoOpService(); ServiceManager manager = new ServiceManager(asList(a, b)); RecordingListener listener = new RecordingListener(); manager.addListener(listener); assertState(manager, Service.State.NEW, a, b); assertFalse(manager.isHealthy()); manager.startAsync().awaitHealthy(); assertState(manager, Service.State.RUNNING, a, b); assertTrue(manager.isHealthy()); assertTrue(listener.healthyCalled); assertFalse(listener.stoppedCalled); assertTrue(listener.failedServices.isEmpty()); manager.stopAsync().awaitStopped(); assertState(manager, Service.State.TERMINATED, a, b); assertFalse(manager.isHealthy()); assertTrue(listener.stoppedCalled); assertTrue(listener.failedServices.isEmpty()); } public void testFailStart() throws Exception { Service a = new NoOpService(); Service b = new FailStartService(); Service c = new NoOpService(); Service d = new FailStartService(); Service e = new NoOpService(); ServiceManager manager = new ServiceManager(asList(a, b, c, d, e)); RecordingListener listener = new RecordingListener(); manager.addListener(listener); assertState(manager, Service.State.NEW, a, b, c, d, e); try { manager.startAsync().awaitHealthy(); fail(); } catch (IllegalStateException expected) { } assertFalse(listener.healthyCalled); assertState(manager, Service.State.RUNNING, a, c, e); assertEquals(ImmutableSet.of(b, d), listener.failedServices); assertState(manager, Service.State.FAILED, b, d); assertFalse(manager.isHealthy()); manager.stopAsync().awaitStopped(); assertFalse(manager.isHealthy()); assertFalse(listener.healthyCalled); assertTrue(listener.stoppedCalled); } public void testFailRun() throws Exception { Service a = new NoOpService(); Service b = new FailRunService(); ServiceManager manager = new ServiceManager(asList(a, b)); RecordingListener listener = new RecordingListener(); manager.addListener(listener); assertState(manager, Service.State.NEW, a, b); try { manager.startAsync().awaitHealthy(); fail(); } catch (IllegalStateException expected) { } assertTrue(listener.healthyCalled); assertEquals(ImmutableSet.of(b), listener.failedServices); manager.stopAsync().awaitStopped(); assertState(manager, Service.State.FAILED, b); assertState(manager, Service.State.TERMINATED, a); assertTrue(listener.stoppedCalled); } public void testFailStop() throws Exception { Service a = new NoOpService(); Service b = new FailStopService(); Service c = new NoOpService(); ServiceManager manager = new ServiceManager(asList(a, b, c)); RecordingListener listener = new RecordingListener(); manager.addListener(listener); manager.startAsync().awaitHealthy(); assertTrue(listener.healthyCalled); assertFalse(listener.stoppedCalled); manager.stopAsync().awaitStopped(); assertTrue(listener.stoppedCalled); assertEquals(ImmutableSet.of(b), listener.failedServices); assertState(manager, Service.State.FAILED, b); assertState(manager, Service.State.TERMINATED, a, c); } public void testToString() throws Exception { Service a = new NoOpService(); Service b = new FailStartService(); ServiceManager manager = new ServiceManager(asList(a, b)); String toString = manager.toString(); assertTrue(toString.contains("NoOpService")); assertTrue(toString.contains("FailStartService")); } public void testTimeouts() throws Exception { Service a = new NoOpDelayedSerivce(50); ServiceManager manager = new ServiceManager(asList(a)); manager.startAsync(); try { manager.awaitHealthy(1, TimeUnit.MILLISECONDS); fail(); } catch (TimeoutException expected) { } manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown manager.stopAsync(); try { manager.awaitStopped(1, TimeUnit.MILLISECONDS); fail(); } catch (TimeoutException expected) { } manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown } /** * This covers a case where if the last service to stop failed then the stopped callback would * never be called. */ public void testSingleFailedServiceCallsStopped() { Service a = new FailStartService(); ServiceManager manager = new ServiceManager(asList(a)); RecordingListener listener = new RecordingListener(); manager.addListener(listener); try { manager.startAsync().awaitHealthy(); fail(); } catch (IllegalStateException expected) { } assertTrue(listener.stoppedCalled); } /** * This covers a bug where listener.healthy would get called when a single service failed during * startup (it occurred in more complicated cases also). */ public void testFailStart_singleServiceCallsHealthy() { Service a = new FailStartService(); ServiceManager manager = new ServiceManager(asList(a)); RecordingListener listener = new RecordingListener(); manager.addListener(listener); try { manager.startAsync().awaitHealthy(); fail(); } catch (IllegalStateException expected) { } assertFalse(listener.healthyCalled); } /** * This covers a bug where if a listener was installed that would stop the manager if any service * fails and something failed during startup before service.start was called on all the services, * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to * stop the timer(!). */ public void testFailStart_stopOthers() throws TimeoutException { Service a = new FailStartService(); Service b = new NoOpService(); final ServiceManager manager = new ServiceManager(asList(a, b)); manager.addListener(new Listener() { @Override public void failure(Service service) { manager.stopAsync(); }}); manager.startAsync(); manager.awaitStopped(10, TimeUnit.MILLISECONDS); } private static void assertState( ServiceManager manager, Service.State state, Service... services) { Collection<Service> managerServices = manager.servicesByState().get(state); for (Service service : services) { assertEquals(service.toString(), state, service.state()); assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING); assertTrue(managerServices + " should contain " + service, managerServices.contains(service)); } } /** * This is for covering a case where the ServiceManager would behave strangely if constructed * with no service under management. Listeners would never fire because the ServiceManager was * healthy and stopped at the same time. This test ensures that listeners fire and isHealthy * makes sense. */ public void testEmptyServiceManager() { Logger logger = Logger.getLogger(ServiceManager.class.getName()); logger.setLevel(Level.FINEST); TestLogHandler logHandler = new TestLogHandler(); logger.addHandler(logHandler); ServiceManager manager = new ServiceManager(Arrays.<Service>asList()); RecordingListener listener = new RecordingListener(); manager.addListener(listener, MoreExecutors.sameThreadExecutor()); manager.startAsync().awaitHealthy(); assertTrue(manager.isHealthy()); assertTrue(listener.healthyCalled); assertFalse(listener.stoppedCalled); assertTrue(listener.failedServices.isEmpty()); manager.stopAsync().awaitStopped(); assertFalse(manager.isHealthy()); assertTrue(listener.stoppedCalled); assertTrue(listener.failedServices.isEmpty()); // check that our NoOpService is not directly observable via any of the inspection methods or // via logging. assertEquals("ServiceManager{services=[]}", manager.toString()); assertTrue(manager.servicesByState().isEmpty()); assertTrue(manager.startupTimes().isEmpty()); Formatter logFormatter = new Formatter() { @Override public String format(LogRecord record) { return formatMessage(record); } }; for (LogRecord record : logHandler.getStoredLogRecords()) { assertFalse(logFormatter.format(record).contains("NoOpService")); } } /** * This is for a case where a long running Listener using the sameThreadListener could deadlock * another thread calling stopAsync(). */ public void testListenerDeadlock() throws InterruptedException { final CountDownLatch failEnter = new CountDownLatch(1); Service failRunService = new AbstractService() { @Override protected void doStart() { new Thread() { @Override public void run() { notifyStarted(); notifyFailed(new Exception("boom")); } }.start(); } @Override protected void doStop() { notifyStopped(); } }; final ServiceManager manager = new ServiceManager( Arrays.asList(failRunService, new NoOpService())); manager.addListener(new ServiceManager.Listener() { @Override public void failure(Service service) { failEnter.countDown(); // block forever! Uninterruptibles.awaitUninterruptibly(new CountDownLatch(1)); } }, MoreExecutors.sameThreadExecutor()); // We do not call awaitHealthy because, due to races, that method may throw an exception. But // we really just want to wait for the thread to be in the failure callback so we wait for that // explicitly instead. manager.startAsync(); failEnter.await(); assertFalse("State should be updated before calling listeners", manager.isHealthy()); // now we want to stop the services. Thread stoppingThread = new Thread() { @Override public void run() { manager.stopAsync().awaitStopped(); } }; stoppingThread.start(); // this should be super fast since the only non stopped service is a NoOpService stoppingThread.join(1000); assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive()); } /** * Catches a bug where when constructing a service manager failed, later interactions with the * service could cause IllegalStateExceptions inside the partially constructed ServiceManager. * This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated * the original error (which was not constructing ServiceManager correctly). */ public void testPartiallyConstructedManager() { Logger logger = Logger.getLogger("global"); logger.setLevel(Level.FINEST); TestLogHandler logHandler = new TestLogHandler(); logger.addHandler(logHandler); NoOpService service = new NoOpService(); service.startAsync(); try { new ServiceManager(Arrays.asList(service)); fail(); } catch (IllegalArgumentException expected) {} service.stopAsync(); // Nothing was logged! assertEquals(0, logHandler.getStoredLogRecords().size()); } public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() { // The implementation of this test is pretty sensitive to the implementation :( but we want to // ensure that if weird things happen during construction then we get exceptions. final NoOpService service1 = new NoOpService(); // This service will start service1 when addListener is called. This simulates service1 being // started asynchronously. Service service2 = new Service() { final NoOpService delegate = new NoOpService(); @Override public final void addListener(Listener listener, Executor executor) { service1.startAsync(); delegate.addListener(listener, executor); } // Delegates from here on down @Override public final Service startAsync() { return delegate.startAsync(); } @Override public final Service stopAsync() { return delegate.stopAsync(); } @Override public final ListenableFuture<State> start() { return delegate.start(); } @Override public final ListenableFuture<State> stop() { return delegate.stop(); } @Override public State startAndWait() { return delegate.startAndWait(); } @Override public State stopAndWait() { return delegate.stopAndWait(); } @Override public final void awaitRunning() { delegate.awaitRunning(); } @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { delegate.awaitRunning(timeout, unit); } @Override public final void awaitTerminated() { delegate.awaitTerminated(); } @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { delegate.awaitTerminated(timeout, unit); } @Override public final boolean isRunning() { return delegate.isRunning(); } @Override public final State state() { return delegate.state(); } @Override public final Throwable failureCause() { return delegate.failureCause(); } }; try { new ServiceManager(Arrays.asList(service1, service2)); fail(); } catch (IllegalArgumentException expected) { assertTrue(expected.getMessage().contains("started transitioning asynchronously")); } } /** * This test is for a case where two Service.Listener callbacks for the same service would call * transitionService in the wrong order due to a race. Due to the fact that it is a race this * test isn't guaranteed to expose the issue, but it is at least likely to become flaky if the * race sneaks back in, and in this case flaky means something is definitely wrong. * * <p>Before the bug was fixed this test would fail at least 30% of the time. */ public void testTransitionRace() throws TimeoutException { for (int k = 0; k < 1000; k++) { List<Service> services = Lists.newArrayList(); for (int i = 0; i < 5; i++) { services.add(new SnappyShutdownService(i)); } ServiceManager manager = new ServiceManager(services); manager.startAsync().awaitHealthy(); manager.stopAsync().awaitStopped(1, TimeUnit.SECONDS); } } /** * This service will shutdown very quickly after stopAsync is called and uses a background thread * so that we know that the stopping() listeners will execute on a different thread than the * terminated() listeners. */ private static class SnappyShutdownService extends AbstractExecutionThreadService { final int index; final CountDownLatch latch = new CountDownLatch(1); SnappyShutdownService(int index) { this.index = index; } @Override protected void run() throws Exception { latch.await(); } @Override protected void triggerShutdown() { latch.countDown(); } @Override protected String serviceName() { return this.getClass().getSimpleName() + "[" + index + "]"; } } public void testNulls() { ServiceManager manager = new ServiceManager(Arrays.<Service>asList()); new NullPointerTester() .setDefault(ServiceManager.Listener.class, new RecordingListener()) .testAllPublicInstanceMethods(manager); } private static final class RecordingListener extends ServiceManager.Listener { volatile boolean healthyCalled; volatile boolean stoppedCalled; final Set<Service> failedServices = Sets.newConcurrentHashSet(); @Override public void healthy() { healthyCalled = true; } @Override public void stopped() { stoppedCalled = true; } @Override public void failure(Service service) { failedServices.add(service); } } } <pre>
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/140810.html