分布式计算的pandas式框架
在过去的一年半中,我们与希望将Pandas代码移动到Dask或Spark以利用分布式计算资源的数据从业者进行了交谈。他们的工作量很快变得过于繁重,或者他们的数据集不再适合Pandas,因为Pandas只在一台机器上运行。
我们对话中反复出现的主题之一是Koalas(更名为PySpark Pandas)和Modin等工具,它们的目标是使用相同的Pandas接口,通过更改导入语句(大部分情况下)将工作带到Dask、Ray或Spark。
例如,PySpark Pandas替代品将是:
# import pandas as pd
import pyspark.pandas as pd
像pandas一样的框架很受欢迎,因为很多数据科学家都抗拒改变(我自己也经历过)。但是,仅仅更改import语句就可以让用户避免理解分布式系统中真正发生的事情,而缺乏理解会导致无效使用。
我们将看到,与Pandas API实现1:1对等的尝试将需要在性能和功能上做出妥协。
基准测试数据
我们创建了具有以下结构的DataFrame。列a和b是字符串列。列c和d是数值。此DataFrame将有100万行(但在某些情况下,我们也会更改它)。
我们将在Pandas、Modin(on Ray)、PySpark Pandas和Dask中创建此DataFrame。对于每个后端,我们将对不同案例的操作进行计时。
问题1:pandas假设数据在物理上是一致的
最常用的Pandas方法之一是iloc。这依赖于数据的隐式全局排序。这就是为什么Pandas可以快速检索给定索引值集中的行。它知道在哪里访问它需要检索的行的内存。
以下面代码段中的以下5个案例为例,我们将计算每个操作相对于案例1的速度。
我们希望看到每个框架的不同性能概要。下面的案例3-5是根据位置访问行和列。具体来说,案例5是DataFrame的中间部分。我们将在Pandas、Modin、PySpark Pandas和Dask上运行这五个案例。
# case 1
df.head(10)[["c","d"]]
# case 2
df.tail(10)[["c","d"]]
# case 3
df.iloc[:10, [2,3]]
# case 4
df.iloc[-10:, [2,3]]
# case 5
df.iloc[499995:500005, [2,3]]
在下面的基准测试中,Pandas在使用整数位置值访问数据时会加快速度。这是因为在一台机器上访问内存中的数据相对便宜。Modin在所有情况下都能提供一致的性能,但在访问DataFrame的中间部分时,速度会降低2倍(情况5)。
PySpark Pandas(在表中标记为Spark)和Dask给出了有趣的结果。Spark在所有情况下都有显著的减速。获得头部是相对优化的,但其他方面性能较差。事实上,获取数据帧的尾部或中部会导致获取头部的持续时间增加15倍(情况1)。
Dask实际上不允许在行上使用iloc。为了让iloc的行为与pandas一样,必须在性能上做出妥协,以维持全局有序。这是一个有意的设计决策,旨在偏离Pandas语义以保持性能。
PySpark Pandas以性能为代价,优先维护Pandas的对等性。与此同时,Dask对防止不良行为更加敏感。对比这些框架可以看出设计理念的不同。这也是第一次表明统一接口并不意味着一致的性能。
问题2:pandas认为数据随机成本低
在分布式环境中,数据存在于多台机器上。有时,需要在机器之间重新排列数据,以便每个worker都拥有属于一个逻辑组的所有数据。这种数据移动被称为随机(shuffle),这是使用分布式计算不可避免的,但代价高昂的一部分。
采取两个等效的操作。目标是为每个d值保留c值最高的行。请注意,groupby max不会保留整行。情况1执行全局排序,然后删除重复项以保留最后一行。
另一方面,情况2使用groupbyidxmax操作来保持最大行。然后将较小的DataFrame合并回原始DataFrame。这个基准使用了10万行,而不是100万行。
# case 1: more shuffle
df.sort_values(["c","d"]).drop_duplicates(subset=["d"], keep="last")
# case 2: less shuffle
idx = df.groupby("d")["c"].idxmax()
df.merge(idx, left_index=True, right_on="c")
对于pandas来说,情况2实际上比情况1慢,如下表所示。在案例2中,所有分布式计算框架的速度都要快得多,因为它们避免了全局排序。
相反,这groupby-idxmax是一个优化操作,首先发生在每台工作机器上,连接将发生在较小的 DataFrame 上。可以优化小型和大型 DataFrame 之间的连接(例如,广播连接)。
这是一个非常常见的Pandas代码片段的例子,它不能很好地转换为分布式设置。与第1期中的全局排序讨论类似,进行全局排序是一项非常昂贵的操作。
类似Pandas的框架的问题是,用户最终会以相同的本地计算思维方式处理大数据问题。如果用户在迁移到分布式设置时不更改代码,很容易会遇到次优操作,而这些操作所需的时间比他们应该的要长。
问题3:Pandas认为索引很有用
pandas思维中根深蒂固的核心概念之一是索引(index)。如果用户来自Pandas背景,他们认为索引很有用。让我们看看这如何转化为其他后端。
请看下面的代码片段。我们过滤给定的组,然后计算这些记录的总和。案例1没有索引,案例2使用索引。
# case 1: without index
df[df["a"]=="red"]["c"].sum()
# case 2: with "a" as index
idf = df.set_index("a")
idf.loc["red"]["c"].sum()
具体来说,基准测试中没有包含set_index。这是因为set_index有自己的开销。结果如下:
对于Pandas,当DataFrame被索引时,速度会加快。对于Modin或Spark,没有任何改进。Dask有了显著的改进。
同样,统一的接口并不意味着一致的性能配置文件。通常,某些操作无法满足用户的期望。对此也没有好的直觉。我们已经知道,为了支持Pandas API的分布式版本,必须做出妥协,但很难知道这些设计决策到底是什么。每个类似Pandas的框架都需要在不同的方向上进行特定的优化。
还请注意,对于上面提到的所有类似Pandas的框架,MultiIndex并不完全受支持。
问题4:惰性计算(第一部分)
惰性计算是分布式计算框架的一个关键特征。在DataFrame上调用操作时,将构造计算图。这些操作仅在执行需要数据的操作时发生。
在下面的代码片段中,案例1读取文件并计算所有列的最小值。案例2读取文件并计算两列的最小值。对于这个问题,我们将使用不同的数据集。这个新的有40列和200万行随机数。在这个单行表达式中有两到三个步骤:加载文件,过滤列,然后获取最小值。
# case 1: read file and min of all columns
backend.read_parquet(path).min()
# case 2: read file and min of two columns
backend.read_parquet(path)[["c0","c1"]].min()
结果如下所示。因为Pandas和Modin急切地计算事物,所以案例2与案例1相比只是略有减少。这是因为对最小值的计算更少(两列而不是全部)。但速度并没有那么快,因为在过滤所需列之前,首先读取整个数据。
另一方面,PySpark Pandas和Dask为这项操作提供了极大的加速。这是因为他们知道最后只需要两列,所以他们只从parquet中加载这两列(parquet优于csv文件的一个优点)。对于这三个操作(load、filter、min),PySpark Pandas和Dask能够通过最小化磁盘I/O来优化计算,因为它们进行惰性计算。
PySpark Pandas选择了与Spark相同的惰性计算。即使它们都是“分布式pandas”的一种形式,但它们的性能特征也非常不同。
问题4:惰性计算(第二部分)
当从业者不理解惰性计算时,很容易遇到重复的工作。
请看下面的例子,案例1只获得两列中的最小值,而案例2获得最小值、最大值和平均值。
# case 1: min of 2 columns
sub = backend.read_parquet(path)[["c0","c1"]]
sub.min()
# case 2: min, max, and mean of 2 columns
sub = backend.read_parquet(path)[["c0","c1"]]
sub.min()
sub.max()
sub.mean()
在下面的结果中,Pandas和Modin似乎没有发生任何重新计算。sub在被读取后已经保存在内存中。这是意料之中的。另一方面,PySpark Pandas和Dask显示sub被多次计算,因为我们没有显式地持久化sub。
在第4个案例中,我们看到了惰性计算的两面。我们看到了一个场景,它导致了急剧的加速,在最后一个场景中,我们看到它在使用不当时会导致减速。这并不意味着惰性计算一定好,更重要的是,我们在处理大数据时需要注意框架正在做什么,以获得最佳结果。
这是一个常见的陷阱,因为Pandas没有语法让用户注意到分布式计算的复杂性。
结论
Pandas非常适合本地计算(除了有太多的操作方法)。但我们需要认识到该接口的固有局限性,并理解它不是为在多台机器上扩展而构建的。Pandas不是设计用于分布式计算的接口。
如果你想尝试另一个不同于Pandas的语义层,Fugue会采取不同的方法。Fugue是一个用于分布式计算的开源抽象层。虽然它可以将Pandas代码引入Spark和Dask,但它有意与Pandas接口分离,以避免面临类似Pandas框架的妥协。
感谢阅读!