小规模的流处理框架.Part 1: thread pools

原文链接 作者:Tomasz Nurkiewicz 译者:simonwang
(译者:强力推荐这篇文章,作者设计了一个用于小流量的流式数据处理框架,并详细给出了每一个需要注意的设计细节,对比了不同设计方案的优缺点,能够让你对流处理过程,某些设计模式和设计原则以及指标度量工具有一个更深刻的认识!)
GeeCON 2016上我为我的公司准备了一个编程竞赛,这次的任务是设计并实现一个能够满足以下要求的系统:

系统能够每秒处理1000个任务,每一个Event至少有2个属性:

  • clientId-我们希望每一秒有多个任务是在同一个客户端下处理的(译者:不同的clientId对应不同的ClientProjection,即对应不同的一系列操作)
  • UUID-全局唯一的

消费一个任务要花费10毫秒,为这样的流设计一个消费者:

有几个关于以上要求的重要细节:

在这篇文章中,我会引导你们使用一些成功的方案并做一些小小的突破,你将要学习如何使用精确地有针对性的度量器来解决问题。

Naive sequential processing

我们可以在迭代器中处理这个问题,首先我们可以对API做一些假设,想象一下它会是这个样子:

interface EventStream {

    void consume(EventConsumer consumer);

}

@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);
}

@Value
class Event {

    private final Instant created = Instant.now();
    private final int clientId;
    private final UUID uuid;

}

一个典型的推送式API,和JMS很像。需要注意的是EventConsumer是阻塞的,这就意味着它不会返回新的Event,除非前一个已经被处理完毕了。这仅仅是我做出的一个假设,而且它没有太大的违反之前的要求,这也是JMS中消息侦听者的工作机制。下面是一个简单的实现,这个实现只是简单的添加了一个工作间隔为10ms的侦听器:

class ClientProjection implements EventConsumer {

    @Override
    public Event consume(Event event) {
        Sleeper.randSleep(10, 1);//译者:这里只是用睡眠来代替实际编程中一些耗时的操作
        return event;
    }

}

当然在现实生活中这个consumer可能会在数据库中做一些存储操作,或者进行远程调用等等。我在睡眠时间的分布上添加了一些随机性,目的是使得手动测试更加贴近实际情况(译者:实际情况中耗时操作的用时不尽相同,所以要随机化):

class Sleeper {

    private static final Random RANDOM = new Random();

    static void randSleep(double mean, double stdDev) {
        final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
        try {
            TimeUnit.MICROSECONDS.sleep((long) micros);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

//...

EventStream es = new EventStream();  //some real implementation here
es.consume(new ClientProjection());

以上的代码能够编译并运行,但为了满足设计要求我们必须要插入一些度量器。最重要的度量器就是有关于信息消费的潜伏期,这个潜伏期指的是从信息的产生到开始处理的这段时间。我们使用 Dropwizard Metrics来实现这个潜伏期的度量:

class ClientProjection implements EventConsumer {

    private final ProjectionMetrics metrics;

    ClientProjection(ProjectionMetrics metrics) {
        this.metrics = metrics;
    }

    @Override
    public Event consume(Event event) {
        metrics.latency(Duration.between(event.getCreated(), Instant.now()));
        Sleeper.randSleep(10, 1);
        return event;
    }

}

ProjectionMetrics类的功能如下(主要就是将event的潜伏期用柱状图的形式表现出来):

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

@Slf4j
class ProjectionMetrics {

    private final Histogram latencyHist;

    ProjectionMetrics(MetricRegistry metricRegistry) {
        final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
                .outputTo(log)
                .convertRatesTo(TimeUnit.SECONDS)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .build();
        reporter.start(1, TimeUnit.SECONDS);
        latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));
    }

    void latency(Duration duration) {
        latencyHist.update(duration.toMillis());
    }
}

现在当你运行这个解决方案时,你很快就会发现潜伏期的中值和第99.9%的值(分别指的是第count/2个值和第99.9%*count个值)都在无限增长:

type=HISTOGRAM, [...] count=84,   min=0,  max=795,   mean=404.88540608274104, [...]
    median=414.0,   p75=602.0,   p95=753.0,   p98=783.0,   p99=795.0,   p999=795.0
type=HISTOGRAM, [...] count=182,  min=0,  max=1688,  mean=861.1706371990878,  [...]
    median=869.0,   p75=1285.0,  p95=1614.0,  p98=1659.0,  p99=1678.0,  p999=1688.0

[...30 seconds later...]

type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]
    median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0

在运行了30s之后我们的应用程序处理event会出现平均15s的延迟,因此它并不具备完整的实时性,显然缺少并发才是原因所在。我们的ClientProjection事件消费者会花费10ms去完成事件处理,所以它每秒最多可以处理100个event,然而我们需要更多的处理量。我们必须要增强ClientProjection同时不违反其他的设计要求!

Naive thread pool

最显而易见的解决方法是对EventConsumer使用多线程技术,最简单的实现途径就是利用ExecutorService:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class NaivePool implements EventConsumer, Closeable {

    private final EventConsumer downstream;
    private final ExecutorService executorService;

    NaivePool(int size, EventConsumer downstream) {
        this.executorService = Executors.newFixedThreadPool(size);
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        executorService.submit(() -> downstream.consume(event));
        return event;
    }

    @Override
    public void close() throws IOException {
        executorService.shutdown();
    }
}

这里我们使用了装饰者模式。最初的ClientProjection实现EventConsumer是可行的,但我们利用加入了并发的另一个EventConsumer实现对ClientProjection进行包装。这就允许我们能够将更复杂的行为组合起来而不用更改ClientProjection本身,这种设计可以:

  • 解耦:不同的EventConsumer互不影响,但它们却可以自由地组合在一起,在同一个线程池中工作
  • 单一职责:每个EventConsumer只做一项工作,并将自己委托给下一个组件即线程池
  • 开放/关闭原则:我们可以改变系统的行为却不用修改现有实现

开放/关闭原则通常可以通过注入策略模式和模板方法模式来实现,这很简单。整体的代码如下:

MetricRegistry metricRegistry =
        new MetricRegistry();
ProjectionMetrics metrics =
        new ProjectionMetrics(metricRegistry);
ClientProjection clientProjection =
        new ClientProjection(metrics);
NaivePool naivePool =
        new NaivePool(10, clientProjection);
EventStream es = new EventStream();
es.consume(naivePool);

我们写的度量器显示这种改良的方案确实表现的更好:

type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]
    median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]
    median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0

[...30 seconds later...]

type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]
    median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0

我们可以看到延迟虽然也在增长但规模却小得多,30s后潜伏期达到了364ms。这种潜伏期增长是系统问题,我们需要更多的度量器。注意到NaivePool(你会明白为什么这里是naive-初级的)会开启10条线程,这应该足以处理1000个event,每个要花费10ms。在实际情况下,我们需要一点额外的处理容量来避免因垃圾回收或小规模峰值负荷所带来的问题。为了证明线程池才是我们的瓶颈,我们要监控它内部的队列,这需要一点小小的工作量:

class NaivePool implements EventConsumer, Closeable {

    private final EventConsumer downstream;
    private final ExecutorService executorService;

    NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        String name = MetricRegistry.name(ProjectionMetrics.class, "queue");
        Gauge<Integer> gauge = queue::size;
        metricRegistry.register(name, gauge);
        this.executorService =
                new ThreadPoolExecutor(
                        size, size, 0L, TimeUnit.MILLISECONDS, queue);
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        executorService.submit(() -> downstream.consume(event));
        return event;
    }

    @Override
    public void close() throws IOException {
        executorService.shutdown();
    }
}

这里使用ThreadPoolExecutor的目的是为了能够提供自定义的LinkedBlockingQueue实例,接下来就可以监控队列的长度(see:ExecutorService – 10 tips and tricks)。Gauge会周期性地调用queue::size,你需要的时候就会提供队列的长度。度量器显示线程池的大小确实是一个问题:

type=GAUGE, name=[...].queue, value=35
type=GAUGE, name=[...].queue, value=52

[...30 seconds later...]

type=GAUGE, name=[...].queue, value=601

不断增长的队列长度进一步加剧了队列内正在等待着的task的潜伏期,将线程池的大小增加到10到20之间,最终队列的长度显示合理并且没有失控。然而我们仍然没有解决重复ID问题,并且也没有解决同一个clientId可能会对它的events进行并发处理的问题。

Obscure locking

让我们从避免对拥有相同clientId的events使用并行处理开始。如果两个有相同clientId的event一个接一个地来,相继进入线程池队列,那么NaivePool会几乎同时将它们取出队列实现并行处理。开始的时候我们可能会想到对每一个clientId加一个Lock:

@Slf4j
class FailOnConcurrentModification implements EventConsumer {

    private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
    private final EventConsumer downstream;

    FailOnConcurrentModification(EventConsumer downstream) {
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        Lock lock = findClientLock(event);
        if (lock.tryLock()) {
            try {
                downstream.consume(event);
            } finally {
                lock.unlock();
            }
        } else {
            log.error("Client {} already being modified by another thread", event.getClientId());
        }
        return event;
    }

    private Lock findClientLock(Event event) {
        return clientLocks.computeIfAbsent(
                event.getClientId(),
                clientId -> new ReentrantLock());
    }

}

以上的代码完全搞错方向了,这种设计太过于复杂,但运行代码至少会发现一个问题。events的处理过程就像下面这样,由一个装饰者包裹着另一个:

ClientProjection clientProjection =
        new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification failOnConcurrentModification =
        new FailOnConcurrentModification(clientProjection);
NaivePool naivePool =
        new NaivePool(10, failOnConcurrentModification, metricRegistry);
EventStream es = new EventStream();

es.consume(naivePool);

一旦运行过一会儿错误信息就会弹出来,告诉我们在其他线程中已经在处理拥有相同clientId的event。我们为每一个clientId都绑定了一个Lock,这样做的目的是为了弄清楚如果其他的线程没有处理的时候client的状态。这种丑陋的方法让我们的方案变得惨不忍睹,与其因获取不到Lock而抛出错误信息,还不如等待一下,等待Lock被释放:

@Slf4j
class WaitOnConcurrentModification implements EventConsumer {

    private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
    private final EventConsumer downstream;
    private final Timer lockWait;

    WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));
    }

    @Override
    public Event consume(Event event) {
        try {
            final Lock lock = findClientLock(event);
            final Timer.Context time = lockWait.time();
            try {
                final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);
                time.stop();
                if(locked) {
                    downstream.consume(event);
                }
            } finally {
                lock.unlock();
            }
        } catch (InterruptedException e) {
            log.warn("Interrupted", e);
        }
        return event;
    }

    private Lock findClientLock(Event event) {
        return clientLocks.computeIfAbsent(
                event.getClientId(),
                clientId -> new ReentrantLock());
    }

}

这次的设计和之前的很像,但不同的是tryLock()会持续1s的时间以等待指定client的Lock被释放。如果两个有相同clientId的event相继出现,其中一个会获取到Lock进行处理,而另一个会一直阻塞直到unlock()被调用。
这段代码不仅复杂,而且在某些微妙的情况下可能会发生不可预知的错误。例如,如果两个有相同clientId的event几乎在同一时刻出现,那么谁将会是第一个?两个event会在同一时刻请求Lock,这时我们并不能保证哪一个event会第一个得到非公平锁,处理event的顺序可能就会发生混乱。肯定会有更好的方法…

Dedicated threads

让我们退一步,深吸一口气。你会怎样确保事情不会并行发生?仅仅使用一个线程就行了!事实上这是我们最开始的做法,但它的处理流量并不理想。我们不用关心不同clientIds的并发情况,我们只需要确保有相同clientId的events由一个专有线程处理就行。
你可能会想到使用一个map将clientId映射到Thread,当然这太简单了。我们可能会创造上千个线程,而它们大多数的时候可能都处于空闲状态(对于给定的clientId每秒可能只处理少数几个event)。一个很好的折中是使用固定大小的线程池,每个线程负责指定的一些clientId。在这种方法中,两个不同的clientId可能会在同一个线程中完成处理,但相同的clientId总是在同一个线程中处理。如果两个有相同clientId的event出现了,它们都会被送去同一个线程,因此为了避免并发处理,以下实现相当简单:

class SmartPool implements EventConsumer, Closeable {

    private final List<ExecutorService> threadPools;
    private final EventConsumer downstream;

    SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        List<ExecutorService> list = IntStream
                .range(0, size)
                .mapToObj(i -> Executors.newSingleThreadExecutor())
                .collect(Collectors.toList());
        //译者:这里使用CopyOnWriteArrayList是为了保证访问threadPools里面元素时是线程安全的
        this.threadPools = new CopyOnWriteArrayList<>(list);
    }

    @Override
    public void close() throws IOException {
        threadPools.forEach(ExecutorService::shutdown);
    }

    @Override
    public Event consume(Event event) {
        final int threadIdx = event.getClientId() % threadPools.size();
        final ExecutorService executor = threadPools.get(threadIdx);
        executor.submit(() -> downstream.consume(event));
        return event;
    }
}

关键点是最后的那部分:

int threadIdx = event.getClientId() % threadPools.size();
ExecutorService executor = threadPools.get(threadIdx);

这个简单的算法总是为相同的clientId使用同一个ExecutorService单线程,不同的ID可能会在同一个线程内处理,例如当threadPools的大小为20时,Id为7, 27, 47的client都会在索引为7的线程内处理。虽然一个线程会对应多个clientId,但只要一个clientId在同一个线程内处理就行了。基于这点,锁就不需要了,顺序调用也就得到了保障。边注:一个clientId对应一个线程可能产生无法预估的后果,但一个actor对应一个clientId(例如在Akka里面就是如此)就简单许多。
顺便为了保证安全,我为每一个线程池都插入了度量器以监控它们的队列长度,实现如下:

class SmartPool implements EventConsumer, Closeable {

    private final List<LinkedBlockingQueue<Runnable>> queues;
    private final List<ExecutorService> threadPools;
    private final EventConsumer downstream;

    SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        this.queues = IntStream
                .range(0, size)
                .mapToObj(i -> new LinkedBlockingQueue<Runnable>())
                .collect(Collectors.toList());
        List<ThreadPoolExecutor> list = queues
                .stream()
                .map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q))
                .collect(Collectors.toList());
        this.threadPools = new CopyOnWriteArrayList<>(list);
        metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);
    }

    private double averageQueueLength() {
        double totalLength =
            queues
                .stream()
                .mapToDouble(LinkedBlockingQueue::size)
                .sum();
        return totalLength / queues.size();
    }

    //...

}

如果你是偏执狂的话,可以为每一个LinkedBlockingQueue都加入一个度量器(metric)。

Deduplication and idempotency

在分布式环境中,当你的生产者有至少一次保证时(指的是将event发送到系统的这个动作保证会发生一次),接收重复事件的情况就会经常发生。产生这种现象的原因已经超出了本文的范畴,但我们必须要学会如何处理这种问题。一种方法是为每一个信息添加一个全局唯一的ID(UUID),并且确保在消费者那端对同一个UUID的信息不会处理两次。每一个Event都有一个UUID,在满足我们要求的情况下最直接的办法就是简单地将处理过的UUID存储起来,并且对每一个新来的UUID都进行验证。随着时间的推移,ID会越积越多,如果使用像ConcurrentHashMap(在JDK中没有ConcurrentHashSet)这样的数据结构将会导致内存泄漏,这就是为什么我们专门只针对10s内的重复事件进行处理。当遭遇冲突时,从技术上你可以利用ConcurrentHashMap将UUID映射到时间戳,然后使用后台线程将超过10s的元素移除。当然如果你是一个Guava拥护者,Cache会使用声明收回政策做一些小把戏:

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

class IgnoreDuplicates implements EventConsumer {

    private final EventConsumer downstream;

    private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();

    IgnoreDuplicates(EventConsumer downstream) {
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        final UUID uuid = event.getUuid();
        if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
            return downstream.consume(event);
        } else {
            return event;
        }
    }
}

再一次为了保险起见,我认为应该加入两个有用的度量器:cache的大小以及发现的重复事件数量,让我们插入这些度量器:

class IgnoreDuplicates implements EventConsumer {

    private final EventConsumer downstream;
    private final Meter duplicates;

    private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();

    IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));
        metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);
    }

    @Override
    public Event consume(Event event) {
        final UUID uuid = event.getUuid();
        if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
            return downstream.consume(event);
        } else {
            duplicates.mark();
            return event;
        }
    }
}

终于,我们拥有了所有的碎片来构建我们的框架,其核心思想就是利用EventConsumer的实例相互包装以组成管道:

你可以选择(当然也可以不这么做)将FailOnConcurrentModification放到SmartPool与ClientProjection之间,这样做是为了保证额外安全(设计中并发修改不应该发生)(译者:加锁可能会影响性能,这纯粹是为了保险起见,在SmartPool中并发修改的情况几乎不可能发生):

ClientProjection clientProjection =
        new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification concurrentModification =
        new FailOnConcurrentModification(clientProjection);
SmartPool smartPool =
        new SmartPool(12, concurrentModification, metricRegistry);
IgnoreDuplicates withoutDuplicates =
        new IgnoreDuplicates(smartPool, metricRegistry);
EventStream es = new EventStream();
es.consume(withoutDuplicates);

我们花了很多的精力提出相对简单但却有良好结构(我希望你同意这点)的解决方案。最后处理并发问题的最好途径就是…避免并发,并且将可能会产生资源竞争的代码放到一个线程中运行。这也是Akka actors(一个信息由一个actor处理)和RxJava(一个信息由Subscriber处理)所使用的思想。在下一部分我们将会了解RxJava中的声明式解决方案。

See also:

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/113875.html

(0)
上一篇 2021年8月27日
下一篇 2021年8月27日

相关推荐

发表回复

登录后才能评论