距离上个月的用户体验升级帖已经过去一个月了,在这一个月的时间里,DataPipeline的小伙伴都在忙些什么呢?
为能更好地服务用户,DataPipeline最新版本支持:
1. 一个数据源数据同时分发(实时或定时)到多个目的地;
2. 提升Hive的使用场景:
-
写入Hive目的地时,支持选择任意目标表字段作为分区字段;
-
可将Hive作为数据源定时分发到多个目的地。
3. 定时同步关系型数据库数据时,可自定义读取策略来满足各个表的同步增量需求。
本篇将首先介绍一下一对多数据分发及批量读取模式2.0的功能,后续功能会陆续发布。
推出背景
推出「一对多数据分发」的背景
在历史版本中,DataPipeline每个任务只允许有一个数据源和目的地,从数据源读取的数据只允许写入到一张目标表。这会导致无法完美地支持客户的两个需求场景:
需求场景一:
客户从一个API数据源或者从KafkaTopic获取JSON数据后,通过高级清洗解析写入到目的地多个表或者多个数据库中,但历史版本无法同时写入到多个目的地,只能创建多个任务。这会导致数据源端会重复获取同一批数据(而且无法完全保证数据一致性),浪费资源,并且无法统一管理。
需求场景二:
客户希望创建一个数据任务,并从一个关系型数据库表实时(或定时)分发到多个数据目的地。在历史版本中,用户需要创建多个任务来解决,但创建多个任务执行该需求时会重复读取数据源同一张表的数据,比较浪费资源。客户更希望只读取一次便可直接解析为多个表,完成该需求场景。
新功能解决的问题:
1. 用户在一个数据任务中选择一个数据源后,允许选择多个目的地或者多个表作为写入对象,而不需要创建多个任务来实现该需求。
2. 用户在单个任务中针对每个目的地的类型和特性,可以单独设置各个目的地表结构和写入策略,大大减少了数据源读取次数和管理成本。
扩展Hive相关使用场景
历史版本中,DataPipeline支持各类型数据源数据同步到Hive目的地的需求场景。但由于每个客户的Hive使用方式、数据存储方式不同,两个需求场景在历史版本中并没有得到支持。
需求场景一:
动态分区字段。历史版本中,用户只允许选择时间类型字段作为分区字段。在真实的客户场景中除了按照时间做分区策略外,客户希望指定Hive表任意字段作为分区字段。
需求场景二:
客户希望除了以Hive作为目的地,定时写入数据到Hive外,客户还希望使用DataPipeline可以定时分发Hive表数据到各个应用系统,解决业务需求。
新功能解决的问题:
1. 允许用户指定目的地Hive表中任何字段作为分区字段,并支持选择多个分区字段。
2. 新增Hive数据源,可作为数据任务读取对象。
推出「批量读取模式2.0功能」的背景
需求场景:
关系型数据库(以MySQL为例)的表没有权限读取BINLOG,但在业务上客户需要定期同步增量数据,在权限只有SELECT情况下,需要做到增量数据的同步任务。
新版本出现之前,DataPipeline在用户选择批量读取模式时提供了增量识别字段的功能,可以选择自增字段或者更新时间字段作为条件完成增量数据的同步,但部分表可能没有这种类型的字段,或者增量同步的逻辑不通(比如:只同步过去1小时的数据,或只同步到5分钟前的数据等)。
新功能解决的问题:
1. 在关系型数据库作为数据源的情况下,允许用户针对每一个表设置WHERE读取条件,并提供lastmax方法。
2. 使用该函数DataPipeline会取该任务下已同步数据中某一个字段的最大值,用户可以使用该值作为WHERE语句读取条件。
3. 用户使用last_max()函数,在首次执行该语句或对应字段暂无数值时,则会忽略该函数相关的读取条件。
4. 允许用户结合其他数据库提供的方法编辑读取条件:
例:以时间字段作为读取条件,每次只同步一小时前的数据,且只同步未曾读取的数据。
SELECT * FROMtable1 WHEREupdate_time > ‘last_max(update_time)’ ANDupdate_time<= DATE_SUB(NOW(), INTERVAL 24 HOUR)
相较于之前的一对一设置,新版本上线后用户可以通过批量设置增量识别字段、批量移除、批量修改表名称等,批量地操作一些表,批量地做一些动态修改,减少用户配置成本。例如,现在需要在200张表的名称后面都加一个data_warehouse,不同于以往的逐一添加,现在可以批量添加这些前缀后缀。
DataPipeline每一次版本的迭代都凝聚了团队对企业数据使用需求的深入思索,其它新功能还在路上,很快就会跟大家见面了,希望能够切实帮助大家更敏捷高效地获取数据。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/190870.html