在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段


在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

PySpark UDF on complex Data types

在处理系统日志或任何其他半结构化数据时,我们遇到了具有许多嵌套字段和嵌入式结构数组的数据。

我们要选择的第一个也是最简单的解决方案是展开字段,然后执行数据转换。如果您需要平面模式,这种方法并没有错,但为了保持模式完整,我们需要对嵌套字段应用转换。

一种方法是将 Dataframe 转换为 RDD 并使用低级 API 来转换 Dataframe。假设我们想使用 Spark SQL API 以方便使用

为了克服这个问题,我们可以使用 PySpark UDF,它可以将复杂字段作为参数并返回新字段。

让我们创建一个足够复杂的示例数据,以便为我们的用例处理。

 从 pyspark.sql 导入 SparkSession  
 从 pyspark.sql.types 导入 *  
 从 pyspark.sql 导入行  
 从 pyspark.sql.functions 导入 udf, col data = [(["James","","Smith","36636","M",3000, [{'dept':'HR','allocation':0.4},{'dept':'FIN ','分配':0.6}]],  
 ["Michael","Rose","","40288","M",4000,[{'dept':'HR','allocation':0.4},{'dept':'FIN','allocation ':0.6}]],  
 ["罗伯特","","威廉姆斯","42114","M",4000,[{'dept':'HR','allocation':0.9},{'dept':'FIN','allocation ':0.1}]],  
 ["Maria","Anne","Jones","39192","F",4000,[{'dept':'HR','allocation':0.75},{'dept':'FIN','分配':0.25}]],  
 ["Jen","Mary","Brown","","F",-1,[{'dept':'HR','allocation':0.30},{'dept':'FIN','分配':0.70}]])  
 ] 架构 =( ArrayType(StructType([  
 StructField("名字",StringType(),True),  
 StructField("中间名",StringType(),True),  
 StructField("姓氏",StringType(),True),  
 StructField("id", StringType(), True),  
 StructField("性别", StringType(), True),  
 StructField("salary", IntegerType(), True),  
 StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),  
 StructField('allocation', FloatType())]  
 )  
 ), 真的)  
 ])  
 )) spark = SparkSession.builder.appName('test_udf').getOrCreate()  
 df = spark.createDataFrame(data= data , schema=schema)

这只是一行数据,其中包含存储在 Array of Struct 中的许多员工的详细信息。

问题陈述:我们要添加 经理 内部结构中的字段 隶属关系 并保持 Dataframe 的结构完整,因此不希望通过爆炸来改变粒度。

Dataframe 的当前架构:

 根  
 |-- 值:数组(可为空=真)  
 | |-- 元素:结构(containsNull = true)  
 | | |-- 名字:字符串(可为空 = true)  
 | | |-- 中间名:字符串(可为空=真)  
 | | |-- 姓氏:字符串(可为空 = true)  
 | | |-- id: 字符串(可为空=真)  
 | | |-- 性别:字符串(可为空=真)  
 | | |-- 工资:整数(可为空 = true)  
 | | |-- 隶属关系:数组(可为空 = true)  
 | | | |-- 元素:结构(containsNull = true)  
 | | | | |-- 部门:字符串(可为空=真)  
 | | | | |-- 分配:浮动(可为空=真)

目标模式:在结构数组中添加新字段

 根  
 |-- 值:数组(可为空=真)  
 | |-- 元素:结构(containsNull = true)  
 | | |-- 名字:字符串(可为空 = true)  
 | | |-- 中间名:字符串(可为空=真)  
 | | |-- 姓氏:字符串(可为空 = true)  
 | | |-- id: 字符串(可为空=真)  
 | | |-- 性别:字符串(可为空=真)  
 | | |-- 工资:整数(可为空 = true)  
 | | |-- 隶属关系:数组(可为空 = true)  
 | | | |-- 元素:结构(containsNull = true)  
 | | | | |-- 部门:字符串(可为空=真)  
 | | | | |-- 分配:浮动(可为空=真)  
 | | | | |-- 经理:字符串(可为空=真)

为了嵌入新字段,我们将编写一个 UDF,它将未分解的字段作为参数并返回一个新字段 经理 嵌入到我们想要的级别,即隶属关系数组。

定义 return_schema 在数组中有新字段 隶属关系:

 return_schema = (ArrayType(StructType([  
 StructField("名字", StringType(), True),  
 StructField("中间名", StringType(), True),  
 StructField("姓氏", StringType(), True),  
 StructField("id", StringType(), True),  
 StructField("性别", StringType(), True),  
 StructField("salary", IntegerType(), True),  
 StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),  
 StructField('分配', FloatType()),  
 StructField('manager', StringType())]  
 )  
 ), 真的)  
 ])  
 ))

下一步是编写带有对象的 UDF 图式 并返回一个新对象 return_schema。

 @udf(returnType=return_schema)  
 def add_manager(p):  
 行 = []  
 对于 p 中的 ele:  
 inner_rows = []  
 对于 ele.affiliations 中的 aff_ele:  
 inner_rows.append(Row(dept=aff_ele.dept, clubs=aff_ele.allocation,manager='Mr. X')) # 例如:可以通过 API 调用或映射 DS 来拉取新字段  
 行.追加(  
 行(名字=ele.firstname,中间名=ele.middlename,姓氏=ele.lastname,id=ele.id,性别=ele.gender,  
 薪水=ele.salary,隶属关系=inner_rows))  
 返回行

最后一件事是在 select 语句中调用 udf:

 df.select(add_manager(col("value"))).show(truncate=False)

Dataframe 中的每条记录都作为 Row 对象传递到 UDF。我们要修改数组的模式 隶属关系不可变 Row[] 的行为不允许修改。因此,我们为每条记录创建一个新的 Row 对象,并返回一个嵌套的 Row 对象,该对象与 return_schema .

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明

本文链接:https://www.qanswer.top/1346/52362817

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/282723.html

(0)
上一篇 2022年8月28日
下一篇 2022年8月28日

相关推荐

发表回复

登录后才能评论