Pandas UDF and Function Api in Spark
Apache Arrow in PySpark
Spark可以使用Apache Arrow
对python和jvm之间的数据进行传输, 这样会比默认传输方式更加高效。
为了能高效地利用特性和保障兼容性,使用的时候可能需要一点点修改或者配置。
为什么使用Arrow作为数据交换中介能够提升性能?
普通的python udf需要经过如下步骤来和jvm交互:
- jvm中一条数据序列化
- 序列化的数据发送到python进程
- 记录被python反序列化
- 记录被python处理
- 结果被python序列化
- 结果被发送到jvm
- jvm反序列化并存储结果到dataframe
所以python udf会比java和scala原生的udf慢。
但是使用pandas udf可以克服数据传输中需要的序列化问题,关键是使用了Arrow. spark使用arrow把JVM中的Dataframe转为可共享的buffer, 然后python也可以把这块共享buffer作为pandas的dataframe, 所以python可以直接在共享内存上操作。
以上,我们总结一下,使用arrow主要有两个好处:
- 因为直接使用了共享内存,不在需要python和jvm序列化和反序列化数据。
- pandas有很多使用c实现的方法, 可以直接使用。
Spark DataFrame和Pandas DataFrame的转化
首先需要配置spark, 设置spark.sql.execution.arrow.pyspark.enabled
, 默认这个选项是不打开的。
还可以开启spark.sql.execution.arrow.pyspark.fallback.enabled
来避免如果没有安装Arrow
或者其它相关错误。
Spark可以使用toPandas()
方法转化为Pandas DataFrame; 而使用createDataFrame(pandas_df)
把Pandas DataFrame转为Spark DataFrame.
1 | import numpy as np |
Pandas UDF(矢量UDF)
Pandas UDF
是用户定义的函数, Spark是用arrow传输数据并用pandas来运行pandas UDF
, pandas UDF
使用向量计算,相比于旧版本的row-at-a-time
python udf, 最多增加100倍的性能. 使用pandas_udf
修饰器装饰函数,就可以定义一个pandas UDF
.对spark来说,UDF就是一个普通的pyspark函数。
从spark3.0开始, 推荐使用python类型(type hint
)来定义pandas udf.
定义类型的时候,StructType
需要使用pandas.DataFrame
类型, 其他一律使用pandas.Series
类型。
1 | import pandas as pd |
Series to Series 类型的UDF
当类型提示可以被表达为pandas.Series -> pandas.Series
时,称为Series to Series
UDF
这种类型的pandas UDF
的输入和输出必须要有相同的长度, PySpark会把数据按列分成多个batch, 然后对每个batch运行pandas UDF
, 然后组合各自的结果。
1 | >>> import pandas as pd |
Series迭代器 -> Series迭代器 类型的UDF
当类型提示可以被表达为Iterator[pandas.Series] -> Iterator[pandas.Series]
时,称为Iterator[Series] to Iterator[Series]
UDF.
1 | from typing import Iterator |
多个Series迭代器 -> Series迭代器 类型的UDF
当类型提示可以被表达为Iterator[Tuple[pandas.Series,...]] -> Iterator[pandas.Series]
时,称为Iterator[Tuple[pandas.Series,...]] to Iterator[Series]
UDF.
1 | >>> from typing import Iterator, Tuple |
Series -> Scalar 类型的UDF
当类型提示可以被表达为pandas.Series -> Scalar
时,称为Series to Scalar
UDF.
Scalar具体的类型必须是原生python类型如int, float等等, 或者是numpy的数据类型如numpy.int64, numpy.float64
这种UDF可以被用于groupBy(), agg(), pyspark.sql.Window
.
1 | >>> from pyspark.sql import Window |
Spark的Pandas函数API
Spark有一些函数可以让python的函数通过pandas实例直接用在spark dataframe上。内部机制上类似pandas udf
, jvm把数据转成arrow的buffer, 然后pandas可以直接在buffer上操作。但是区别是,这些函数api使用起来就像普通pyspark api一样是作用在dataframe上的, 而不像udf那样作用于一个column
. 实际使用的时候,一般是DataFrame.groupby().applyInPandas()
或者DataFrame.groupby().mapInPandas()
Grouped Map api
Spark的dataframe在groupby
后使用普通的pandas函数, 如df.groupby().applyInPandas(func, schema))
, 普通的pandas函数需要输入是pandas dataframe, 返回普通的pandas dataframe. 上面这写法会把每个分组group映射到pandas dataframe.df.groupby().applyInPandas(func, schema))
过程其实分为三步, 典型的split-apply-combine
模式:
DataFrame.groupBy
分组数据- 分组的数据映射到pandas dataframe后,apply到传入的函数
- 组合结果成一个新的pyspark Dataframe
使用groupBy().applyInPandas(), 用户需要做两件事: - 写好pandas函数
- 定义好pyspark dataframe结果的schema
1
2
3
4
5
6
7
8
9
10
11
12
13
14>>> def subtract_mean(pdf):
... v = pdf.v
... return pdf.assign(v=v-v.mean())
...
>>> df.groupby('id').applyInPandas(subtract_mean,schema='id long, v double').show()
+---+------------------+
| id| v|
+---+------------------+
| 1| -0.5|
| 1| 0.5|
| 2|-2.666666666666667|
| 2|-1.666666666666667|
| 2| 4.333333333333333|
+---+------------------+
Map api
也可以对pyspark dataframe和pandas dataframe做map操作,DataFrame.mapInPandas()
是对当前的DataFrame的取一个迭代器映射普通到pandas函数。这个普通pandas函数必须是输入输出都是pdf.
1 | >>> def filter_func(iterator): |
Co-grouped Map api
这个api可以使两个pyspark dataframe组合后使用pandas函数
1 | import pandas as pd |
- Post title:Pandas UDF and Function Api in Spark
- Post author:Kopei
- Create time:2020-07-23 00:00:00
- Post link:https://kopei.github.io/2020/07/22/bigdata-2020-07-23-pyspark-for-pandas/
- Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.