Clickhouse Optimize Table全面解析

最近笔者在使用Clickhouse的过程中,用到了Optimize Table命令,而在业务开发过程中,由于不了解Optimize Table命令的明确行为,中间出了很多岔子,在查问题的过程中,也发现网上关于Optimize Table命令的介绍资料很少,因此笔者决定结合源码,全面解析下Optimize Table命令。

Optimize Table命令的功能

Clickhouse作为一款OLAP数据库,对数据更新的支持比较弱,而且并不支持标准的SQL update/delete语法;它提供的alter table …… update/delete 语法也是异步的,即收到命令后先返回给客户端成功,至于什么时候数据真正更新成功是不确定的。

因此在业务需要数据更新的场景下(如Mysql同步到Clickhouse),通常会使用ReplacingMergeTree或CollapsingMergeTree的数据合并逻辑绕行实现异步更新,这样一方面可以保证数据的最终一致性,另一方面Clickhouse性能开销也会比alter table小。但这种方式有一个缺点是MergeTree引擎的数据合并过程(merge)是Clickhouse基于策略控制的,执行时间比较随机,因此数据一致性缺少时间保证,极端情况下数据过了一天也没有完全合并。

而Optimize Table这个命令可以强制触发MergeTree引擎的数据合并,可以用来解决数据合并时间不确定的问题。

Optimize Table执行过程源码解析

Clickhouse在收到一个SQL语句后,会通过如下的流程执行SQL:Parser(解析SQL语法,转化为AST)-> Interpreter(优化生成执行计划 RBO)-> Interpreter::executeImpl(通过Block Stream读取或写入数据)[1]。Optimize Table语句也不例外,只不过Optimize语句没有复杂的执行计划。

图一 Clickhouse SQL执行流程

Clickhouse收到Optimize Table命令后会调用到ParserOptimizeQuery::parseImpl()解析命令。

bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    ParserKeyword s_optimize_table("OPTIMIZE TABLE");
    ParserKeyword s_partition("PARTITION");
    ParserKeyword s_final("FINAL");
    ParserKeyword s_deduplicate("DEDUPLICATE");
    ParserKeyword s_by("BY");
    ......
}

可以看到Optimize Table语句中主要解析了以下几个关键词:“OPTIMIZE TABLE”、“PARTITION”、“FINAL”、“DEDUPLICATE”、“BY”。官方文档这样介绍这些关键词的作用[2]:

1. “OPTIMIZE TABLE”:指定需要Optimize的表,只支持MergeTree引擎。

2. “PARTITION”:若指定了分区,则只会对指定的分区触发合并任务。

3. “FINAL”:即使只有一个文件块也执行合并,即使有并行的正在执行的合并,也会强制执行这一次合并。

4. “DEDUPLICATE”:去重,若没有后续的“BY”子句,则按照行完全相同去重(所有字段值相同)。

5. “BY”:配合“DEDUPLICATE”关键词使用,指定依据哪些列去重。

接下来对照源码,看看这些关键词如何控制合并执行。

进入InterpreterOptimizeQuery::execute(),先校验了“DEDUPLICATE BY”的列中是否包含了表的分区键、主键,若未包含则直接抛出异常。Clickhouse的数据存储依据分区键划分文件块,每个文件块中的数据按照主键排序,因此在去重时若包含了分区键、主键,Clickhouse可以只对相邻的行进行去重,而不需要另外构造哈希表,可以极大的提升执行效率。

BlockIO InterpreterOptimizeQuery::execute()
{
    ......
    // Empty list of names means we deduplicate by all columns, but user can explicitly state which columns to use.
    Names column_names;
    if (ast.deduplicate_by_columns)
    {
        ......
        metadata_snapshot->check(column_names, NamesAndTypesList{}, table_id);
        Names required_columns;
        {
            required_columns = metadata_snapshot->getColumnsRequiredForSortingKey();
            const auto partitioning_cols = metadata_snapshot->getColumnsRequiredForPartitionKey();
            required_columns.reserve(required_columns.size() + partitioning_cols.size());
            required_columns.insert(required_columns.end(), partitioning_cols.begin(), partitioning_cols.end());
        }
        for (const auto & required_col : required_columns)
        {
            // Deduplication is performed only for adjacent rows in a block,
            // and all rows in block are in the sorting key order within a single partition,
            // hence deduplication always implicitly takes sorting keys and partition keys in account.
            // So we just explicitly state that limitation in order to avoid confusion.
            if (std::find(column_names.begin(), column_names.end(), required_col) == column_names.end())
                throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
                        "DEDUPLICATE BY expression must include all columns used in table's"
                        " ORDER BY, PRIMARY KEY, or PARTITION BY but '{}' is missing."
                        " Expanded DEDUPLICATE BY columns expression: ['{}']",
                        required_col, fmt::join(column_names, "', '"));
        }
    }

    table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());

    return {};
}

校验了去重列之后,就调用了表的optimize()方法。事实上只有MergeTree和ReplicatedMergeTree实现了optimize()方法,其他的存储引擎调用optimize()方法都会直接抛出异常。

进入InterpreterOptimizeQuery::optimize(),在未指定“PARTITION”并且使用了“FINAL”时,会遍历表的所有分区,并对每一个分区执行合并逻辑;如果指定了分区,就不再关注“FINAL”关键词了,都是对该分区执行合并;如果即没有指定分区,也没有使用“FINAL”的情况下,代码中的partition_id就为空,在merge()方法中对这种情况做了特殊的处理。

bool StorageMergeTree::optimize(
    const ASTPtr & /*query*/,
    const StorageMetadataPtr & /*metadata_snapshot*/,
    const ASTPtr & partition,
    bool final,
    bool deduplicate,
    const Names & deduplicate_by_columns,
    ContextPtr local_context)
{
    ......
    String disable_reason;
    if (!partition && final)
    {
        DataPartsVector data_parts = getDataPartsVector();
        std::unordered_set<String> partition_ids;

        for (const DataPartPtr & part : data_parts)
            partition_ids.emplace(part->info.partition_id);

        for (const String & partition_id : partition_ids)
        {
            if (!merge(
                    true,
                    partition_id,
                    true,
                    deduplicate,
                    deduplicate_by_columns,
                    &disable_reason,
                    local_context->getSettingsRef().optimize_skip_merged_partitions))
            {......}
        }
    }
    else
    {
        String partition_id;
        if (partition)
            partition_id = getPartitionIDFromQuery(partition, local_context);

        if (!merge(
                true,
                partition_id,
                final,
                deduplicate,
                deduplicate_by_columns,
                &disable_reason,
                local_context->getSettingsRef().optimize_skip_merged_partitions))
        {......}
    }

    return true;
}

InterpreterOptimizeQuery::merge()方法逻辑很简单,选取要合并的文件块 -> 合并选择的文件块。

bool StorageMergeTree::merge(
    bool aggressive,
    const String & partition_id,
    bool final,
    bool deduplicate,
    const Names & deduplicate_by_columns,
    String * out_disable_reason,
    bool optimize_skip_merged_partitions)
{
    ......
    {
        merge_mutate_entry = selectPartsToMerge(
            metadata_snapshot,
            aggressive,
            partition_id,
            final,
            out_disable_reason,
            table_lock_holder,
            lock,
            optimize_skip_merged_partitions,
            &select_decision);
    }
    ......
    return mergeSelectedParts(metadata_snapshot, deduplicate, deduplicate_by_columns, *merge_mutate_entry, table_lock_holder);
}

进入StorageMergeTree::selectPartsToMerge(),在partition_id为空时(只有既不指定分区,又不使用“FINAL”时,才会为空),会执行selectPartsToMerge()依据策略选择一些文件块来执行合并,而在partition_id非空时,则是执行selectAllPartsToMergeWithinPartition()将分区下所有的文件块全部合并。所以,在既不指定分区,也不使用“FINAL”关键词的情况下,Optimize Table命令并不能保证数据最终会变为完全合并的状态

std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
    const StorageMetadataPtr & metadata_snapshot,
    bool aggressive,
    const String & partition_id,
    bool final,
    String * out_disable_reason,
    TableLockHolder & /* table_lock_holder */,
    std::unique_lock<std::mutex> & lock,
    bool optimize_skip_merged_partitions,
    SelectPartsDecision * select_decision_out)
{
    ......
    if (partition_id.empty())
    {
        ......
        if (max_source_parts_size > 0)
        {
            select_decision = merger_mutator.selectPartsToMerge(
                future_part,
                aggressive,
                max_source_parts_size,
                can_merge,
                merge_with_ttl_allowed,
                out_disable_reason);
        }
        else if (out_disable_reason)
            *out_disable_reason = "Current value of max_source_parts_size is zero";
    }
    else
    {
        while (true)
        {
            UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
            select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
                future_part, disk_space, can_merge, partition_id, final, metadata_snapshot, out_disable_reason, optimize_skip_merged_partitions);
            auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds();
            auto timeout = std::chrono::milliseconds(timeout_ms);

            /// If final - we will wait for currently processing merges to finish and continue.
            if (final
                && select_decision != SelectPartsDecision::SELECTED
                && !currently_merging_mutating_parts.empty()
                && out_disable_reason
                && out_disable_reason->empty())
            {
                LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
                    currently_merging_mutating_parts.size());

                if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(lock, timeout))
                {
                    *out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
                    break;
                }
            }
            else
                break;
        }
    }
    ......
}

另外,在使用了“FINAL”关键词的情况下,Optimize Table命令会等待正在执行的合并任务结束,再执行该次合并,所以在指定分区的情况下,使用“FINAL”关键词的响应会慢一些。

InterpreterOptimizeQuery::mergeSelectedParts()的逻辑就比较复杂了,这里就不具体介绍了,但总体逻辑就是把选择的文件块都读出来,然后执行数据合并,形成一个新的文件块写入磁盘。因此在数据量比较大的情况下,这其实是一个很重的操作,因为不论是否真的有数据需要合并,都需要将全量数据读出来,重新写一份到磁盘上。在执行Optimize之后,生成了新的文件块,但是老的文件块并不会立刻消失,而是会异步删除,因此在执行大表的Optimize之后会看到数据存储容量有短暂的上升。

针对有些分区不需要合并的情况,Clickhouse 21.1版本做了一个优化,在系统变量(system.settings表)里增加了optimize_skip_merged_partitions参数,这个参数开启,在selectAllPartsToMergeWithinPartition()中会排除掉只有一个文件块且level>0的分区(这样的分区意味着该分区之前已经合并过)。

实验验证

为了验证上面代码逻辑,笔者在Clickhouse 20.3 版本(没有optimize_skip_merged_partitions参数)上进行了一些实验。

1. Optimize + Partition

图二是执行Optimize Table …… Partition 20210209的执行效果,可以看到执行后20210209这个分区中的2个文件块(Parts)被合并成了一个文件块,其level为3,而其他的分区并没有发生合并。当然图中展示的是Optimize最终的效果,在刚执行完该命令时,原有的20210209_84_94_2、20210209_95_95_0文件夹并不会立刻消失,而是过了几分钟才被删除掉。

图二 Optimize Partition执行效果

2. Optimize + Final

图三是Optimize Table …… Final的执行效果,可以看到经过执行Optimize Final命令之后,20211013这个分区的多个文件块合并成了一个文件块;同时,其他已经合并过的分区(如20210729)会被重新写一份,其level由5变为了7(因为中间执行了2次Optimize Final语句)。

图三 Optimize + Final执行效果

3. Optimize

最后再看下简单Optimize的效果,如图四所示。可以看到Clickhouse只是依据策略选取了某一个分区的某些文件块进行合并(20211013_0_231_28、20211013_232_410_30、20211013_411_432_10三个文件块合并为了20211013_0_432_31文件块),这样并不能保证最终数据被完全合并。

图四 Optimize执行效果

使用总结

在基于Clickhouse的数据仓库建设中,由于Clickhouse本身不支持完备的数据更新,数据的实时性和一致性存在trade-off,如果应用场景对数据一致性要求很高,在有数据更新的情况下,基本无法实时导入数据,只能周期性的离线导入以保证Clickhouse中的数据是某一时刻的完整切片。离线任务由于存在调度延时,一般来讲周期最小只能做到小时级,很难做到分钟级。如果应用场景更在意数据的实时性,就可以采用实时导入的方式,由于Clickhouse的Merge过程是基于策略调度的,因此在数据一致性上就会差一些(会查到本该被删除的数据)。

基于实时写入+定期Optimize的方式,可以通过改变Optimize周期,在性能、数据一致性之间做平衡。当数据一致性要求较高时,可以缩短Optimize周期,极端情况甚至可以每次写入都执行Optimize,这样可以将数据不一致的时间缩短到分钟级(当然这样对Clickhouse的性能要求比较严格);当数据量比较大时,可以半个小时左右执行一次Optimize,这样在保证Clickhouse集群性能的同时,也对数据不一致的时间有一个保障。在笔者的实际使用中,Clickhouse集群使用32核64G机器,单表原始数据量在1TB以内的情况下,Optimize执行周期在5min-10min都没什么压力。

参考文献

[1] ClickHouse 源码阅读 —— 详解查询SQL语句执行过程. https://nowjava.com/article/43828

[2] Clickhouse docs. https://clickhouse.com/docs/en/sql-reference/statements/optimize/

原创文章,作者:kirin,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/212021.html

(0)
上一篇 2021年12月14日 19:10
下一篇 2021年12月14日 19:10

相关推荐

发表回复

登录后才能评论