在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段
PySpark UDF on complex Data types
在处理系统日志或任何其他半结构化数据时,我们遇到了具有许多嵌套字段和嵌入式结构数组的数据。
我们要选择的第一个也是最简单的解决方案是展开字段,然后执行数据转换。如果您需要平面模式,这种方法并没有错,但为了保持模式完整,我们需要对嵌套字段应用转换。
一种方法是将 Dataframe 转换为 RDD 并使用低级 API 来转换 Dataframe。假设我们想使用 Spark SQL API 以方便使用
为了克服这个问题,我们可以使用 PySpark UDF,它可以将复杂字段作为参数并返回新字段。
让我们创建一个足够复杂的示例数据,以便为我们的用例处理。
从 pyspark.sql 导入 SparkSession
从 pyspark.sql.types 导入 *
从 pyspark.sql 导入行
从 pyspark.sql.functions 导入 udf, col data = [(["James","","Smith","36636","M",3000, [{'dept':'HR','allocation':0.4},{'dept':'FIN ','分配':0.6}]],
["Michael","Rose","","40288","M",4000,[{'dept':'HR','allocation':0.4},{'dept':'FIN','allocation ':0.6}]],
["罗伯特","","威廉姆斯","42114","M",4000,[{'dept':'HR','allocation':0.9},{'dept':'FIN','allocation ':0.1}]],
["Maria","Anne","Jones","39192","F",4000,[{'dept':'HR','allocation':0.75},{'dept':'FIN','分配':0.25}]],
["Jen","Mary","Brown","","F",-1,[{'dept':'HR','allocation':0.30},{'dept':'FIN','分配':0.70}]])
] 架构 =( ArrayType(StructType([
StructField("名字",StringType(),True),
StructField("中间名",StringType(),True),
StructField("姓氏",StringType(),True),
StructField("id", StringType(), True),
StructField("性别", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),
StructField('allocation', FloatType())]
)
), 真的)
])
)) spark = SparkSession.builder.appName('test_udf').getOrCreate()
df = spark.createDataFrame(data= data , schema=schema)
这只是一行数据,其中包含存储在 Array of Struct 中的许多员工的详细信息。
问题陈述:我们要添加 经理 内部结构中的字段 隶属关系 并保持 Dataframe 的结构完整,因此不希望通过爆炸来改变粒度。
Dataframe 的当前架构:
根
|-- 值:数组(可为空=真)
| |-- 元素:结构(containsNull = true)
| | |-- 名字:字符串(可为空 = true)
| | |-- 中间名:字符串(可为空=真)
| | |-- 姓氏:字符串(可为空 = true)
| | |-- id: 字符串(可为空=真)
| | |-- 性别:字符串(可为空=真)
| | |-- 工资:整数(可为空 = true)
| | |-- 隶属关系:数组(可为空 = true)
| | | |-- 元素:结构(containsNull = true)
| | | | |-- 部门:字符串(可为空=真)
| | | | |-- 分配:浮动(可为空=真)
目标模式:在结构数组中添加新字段
根
|-- 值:数组(可为空=真)
| |-- 元素:结构(containsNull = true)
| | |-- 名字:字符串(可为空 = true)
| | |-- 中间名:字符串(可为空=真)
| | |-- 姓氏:字符串(可为空 = true)
| | |-- id: 字符串(可为空=真)
| | |-- 性别:字符串(可为空=真)
| | |-- 工资:整数(可为空 = true)
| | |-- 隶属关系:数组(可为空 = true)
| | | |-- 元素:结构(containsNull = true)
| | | | |-- 部门:字符串(可为空=真)
| | | | |-- 分配:浮动(可为空=真)
| | | | |-- 经理:字符串(可为空=真)
为了嵌入新字段,我们将编写一个 UDF,它将未分解的字段作为参数并返回一个新字段 经理 嵌入到我们想要的级别,即隶属关系数组。
定义 return_schema 在数组中有新字段 隶属关系:
return_schema = (ArrayType(StructType([
StructField("名字", StringType(), True),
StructField("中间名", StringType(), True),
StructField("姓氏", StringType(), True),
StructField("id", StringType(), True),
StructField("性别", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),
StructField('分配', FloatType()),
StructField('manager', StringType())]
)
), 真的)
])
))
下一步是编写带有对象的 UDF 图式 并返回一个新对象 return_schema。
@udf(returnType=return_schema)
def add_manager(p):
行 = []
对于 p 中的 ele:
inner_rows = []
对于 ele.affiliations 中的 aff_ele:
inner_rows.append(Row(dept=aff_ele.dept, clubs=aff_ele.allocation,manager='Mr. X')) # 例如:可以通过 API 调用或映射 DS 来拉取新字段
行.追加(
行(名字=ele.firstname,中间名=ele.middlename,姓氏=ele.lastname,id=ele.id,性别=ele.gender,
薪水=ele.salary,隶属关系=inner_rows))
返回行
最后一件事是在 select 语句中调用 udf:
df.select(add_manager(col("value"))).show(truncate=False)
Dataframe 中的每条记录都作为 Row 对象传递到 UDF。我们要修改数组的模式 隶属关系 但 不可变 Row[] 的行为不允许修改。因此,我们为每条记录创建一个新的 Row 对象,并返回一个嵌套的 Row 对象,该对象与 return_schema .
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明
本文链接:https://www.qanswer.top/1346/52362817
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/282723.html