硬核!Apache Hudi Schema演变深度分析与应用


1.场景需求

在医疗场景下,涉及到的业务库有几十个,可能有上万张表要做实时入湖,其中还有某些库的表结构修改操作是通过业务人员在网页手工实现,自由度较高,导致整体上存在非常多的新增列,删除列,改列名的情况。由于Apache Hudi 0.9.0 版本到 0.11.0 版本之间只支持有限的schema变更,即新增列到尾部的情况,且用户对数据质量要求较高,导致了非常高的维护成本。每次删除列和改列名都需要重新导入,这种情况极不利于长期发展,所以需要一种能够以较低成本支持完整schema演变的方案。

2.社区现状

schema演化允许用户轻松更改 Apache Hudi 表的当前 Schema,以适应随时间变化的数据。从 0.11.0 版本开始,已添加 Spark SQL(Spark 3.1.x、3.2.1 及更高版本)对 Schema 演化的 DDL 支持并处于试验阶段。

  1. 可以添加、删除、修改和移动列(包括嵌套列)
  2. 分区列不能进化
  3. 不能对 Array 类型的嵌套列进行添加、删除或操作

为此我们针对该功能进行了相关测试和调研工作。

2.1 Schema演变的版本迭代

回顾Apache Hudi 对schema演变的支持随着版本迭代的变化如下:

版本 Schema演变支持 多引擎查询
*<0.9
0.9<* 在最后的根级别添加一个新的可为空列 是(全)
  向内部结构添加一个新的可为空列(最后) 是(全)
  添加具有默认值的新复杂类型字段(地图和数组) 是(全)
  添加自定义可为空的 Hudi 元列,例如_hoodie_meta_col 是(全)
  为根级别的字段改变数据类型从 intlong 是(全)
  将嵌套字段数据类型从intlong 是(全)
  将复杂类型(映射或数组的值)数据类型从intlong 是(全)
0.11<* 相比之前版本新增:改列名 spark以外的引擎不支持
  相比之前版本新增:删除列 spark以外的引擎不支持
  相比之前版本新增:移动列 spark以外的引擎不支持

Apache Hudi 0.11.0版本完整Schema演变支持的类型修改如下:

Source/Target long float double string decimal date int
int Y Y Y Y Y N Y
long Y N Y Y Y N N
float N Y Y Y Y N N
double N N Y Y Y N N
decimal N N N Y Y N N
string N N N Y Y Y N
date N N N Y N Y N

2.2 官网提供的方式

实践中0.9.0版本的新增列未发现问题,已在正式环境使用。每次写入前捕获是否存在新增列删除列的情况,新增列的情况及时补空数据和struct,新增列的数据及时写入Hudi中;删除列则数据补空,struct不变,删除列仍写入Hudi中;每天需要重导数据处理删除列和修改列的情况,有变化的表在Hive中的元数据也以天为单位重新注册。

0.11开始的方式,按照官网的步骤:

进入spark-sql

# Spark SQL for spark 3.1.x
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1 /
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark SQL for spark 3.2.1 and above
spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1 /
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' /
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

设置参数,删列:

set hoodie.schema.on.read.enable=true;
---创建表---
create table test_schema_change (
  id string,
  f1 string,
  f2 string,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);
---1.新增列---
alter table test_schema_change add columns (f3 string);
---2.删除列---
alter table test_schema_change drop column f2;
---3.改列名---
alter table test_schema_change rename column f1 to f1_new;

2.3 其他方式

由于spark-sql的支持只在spark3.1之后支持,寻找并尝试了 BaseHoodieWriteClient.java 中存在名为 addColumn renameColumn deleteColumns 的几个方法,通过主动调用这些方法,也能达到schema完整变更的目的。使用这种方式需要将DDL的sql解析为对应的方法和参数,另外由于该方式测试和使用的例子还比较少,存在一些细节问题需要解决。

val hsec = new HoodieSparkEngineContext(spark.sparkContext);
val hoodieCfg = HoodieWriteConfig.newBuilder().forTable(tableName).withEmbeddedTimelineServerEnabled(true).withPath(basePath).build()
val client = new SparkRDDWriteClient(hsec, hoodieCfg)
//增加列
client.addColumn("f3",Schema.create(Schema.Type.STRING))
//删除列
client.deleteColumns("f1")
//改列名
client.renameColumn("f2","f2_c1")

本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;

2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;

3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;

4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;

5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/293327.html

(0)
上一篇 2022年11月20日
下一篇 2022年11月20日

相关推荐

发表回复

登录后才能评论