提升计算性能#

特征工程是一项计算密集型任务。虽然 Featuretools 为特征计算提供了合理的默认设置,但也有许多内置方法可以根据数据集和具体问题来提升计算性能。

减少唯一截止时间的数量#

Featuretools 创建的特征矩阵中的每一行都是在特定的截止时间计算的,该时间代表了实体集(entityset)中任何数据帧(dataframe)的数据可用于计算特征的最后一个时间点。因此,计算过程中需要查找每个不同时间点允许使用的数据子集,这会带来额外的开销。

注意

Featuretools 在处理时间方面非常精确。更多信息请参见处理时间

如果存在许多唯一的截止时间,找出如何减少它们通常是值得的。这可以通过手动确定预测问题需要哪些唯一的 সময়点来完成,或者使用近似方法自动完成。

并行特征计算#

通过并行化特征计算过程,通常可以提升计算性能。有几种不同的方法可以用于使用 Featuretools 进行并行特征计算。下面概述了最常用的方法。

简单的并行特征计算#

如果使用 pandas EntitySet,Featuretools 可以选择在多个核心上计算特征。控制并行程度最简单的方法是指定 n_jobs 参数

fm = ft.calculate_feature_matrix(features=features,
                                 entityset=entityset,
                                 cutoff_time=cutoff_time,
                                 n_jobs=2,
                                 verbose=True)

上面的命令将启动 2 个进程来并行计算特征矩阵的块。每个进程都会收到一份实体集的副本,因此内存使用量将与并行进程的数量成比例。由于实体集必须复制到每个进程,所以在计算开始之前执行此操作会产生开销。为了避免在后续调用 calculate_feature_matrix 时出现此开销,请阅读下面关于使用持久化集群的部分。

调整块大小#

默认情况下,Featuretools 会同时计算具有相同截止时间的行。chunk_size 参数限制了将被分组并一起计算的最大行数。如果使用并行处理进行计算,默认的块大小设置为 1 / n_jobs,以确保计算可以分散到可用的工作进程中。通常,这种行为效果良好,但如果只有少数唯一的截止时间,可能导致更高的峰值内存使用(由于存储在内存中的中间计算较多)或有限的并行性(如果块的数量少于 n_jobs)。

通过设置 chunk_size,我们可以在调用 ft.dfsft.calculate_feature_matrix 时,将每组中的最大行数限制为特定数值或总数据的百分比

# use maximum  100 rows per chunk
feature_matrix, features_list = ft.dfs(entityset=es,
                                       target_dataframe_name="customers",
                                       chunk_size=100)

我们还可以将块大小设置为总行数的百分比

# use maximum 5% of all rows per chunk
feature_matrix, features_list = ft.dfs(entityset=es,
                                       target_dataframe_name="customers",
                                       chunk_size=.05)

使用持久化集群#

在幕后,Featuretools 使用 Dask 的分布式调度器来实现多进程。当你只指定 n_jobs 参数时,Featuretools 将为该特定的特征矩阵计算创建一个集群,并在计算完成后销毁。这样做的一个缺点是,每次计算特征矩阵时,实体集都必须再次传输到工作进程。为了避免这种情况,我们希望在不同调用之间重用同一个集群。这样做的方法是先创建一个集群,然后通过 dask_kwargs 参数告诉 Featuretools 使用它

import featuretools as ft
from dask.distributed import LocalCluster

cluster = LocalCluster()
fm_1 = ft.calculate_feature_matrix(features=features_1,
                                   entityset=entityset,
                                   cutoff_time=cutoff_time,
                                   dask_kwargs={'cluster': cluster},
                                   verbose=True)

“cluster”值可以是实际的集群对象,也可以是集群调度器可访问的地址字符串。下面的调用同样有效。由于实体集数据已经保存在集群上,第二次特征矩阵计算将不需要重新发送数据到工作进程。

fm_2 = ft.calculate_feature_matrix(features=features_2,
                                   entityset=entityset,
                                   cutoff_time=cutoff_time,
                                   dask_kwargs={'cluster': cluster.scheduler.address},
                                   verbose=True)

注意

使用持久化集群时,Featuretools 在第一次计算特征矩阵时会将 EntitySet 的副本发布到集群。根据 EntitySet 的元数据,集群将在后续计算中重用它。这意味着如果两个 EntitySets 具有相同的元数据但行值不同(例如,向 EntitySet 添加了新数据),Featuretools 在后续调用中将不会重新复制第二个 EntitySet。避免这种情况的一个简单方法是使用唯一的 EntitySet ID。

使用分布式仪表盘#

Dask.distributed 提供了一个基于 Web 的诊断仪表盘,可用于分析工作进程和任务的状态。它对于跟踪内存使用或可视化任务运行时长也非常有用。关于 Web 界面的详细描述可以在此处找到。

Distributed dashboard image

该仪表盘需要额外的 Python 包 bokeh 才能工作。安装 bokeh 后,创建 LocalCluster 时将默认启动 Web 界面。Featuretools 在使用 n_jobs 时创建的集群不会自动启用 Web 界面。为此,必须在 dask_kwargs 中指定启动主 Web 界面的端口。

fm = ft.calculate_feature_matrix(features=features,
                                 entityset=entityset,
                                 cutoff_time=cutoff_time,
                                 n_jobs=2,
                                 dask_kwargs={'diagnostics_port': 8787}
                                 verbose=True)

通过数据分区进行并行计算#

作为 Featuretools 内部并行化的替代方案,可以将数据分区,并使用 Dask 或 Apache Spark (通过 PySpark) 在多个核心或集群上运行特征计算。对于大型 pandas EntitySet,这种方法可能是必要的,因为当前的并行实现将整个 EntitySet 发送到每个工作进程,这可能耗尽工作进程的内存。Dask 和 Spark 允许 Featuretools 扩展到单台机器上的多个核心或集群上的多台机器。

当计算给定实例集的特征不需要整个数据集时,我们可以将数据分割成独立的分区并在每个分区上进行计算。例如,假设我们正在为客户计算特征,特征是“该邮政编码中的其他客户数量”或“该邮政编码中的其他客户的平均年龄”。在这种情况下,我们可以按邮政编码分区加载数据。只要在计算时拥有某个邮政编码的所有数据,我们就可以计算出客户子集的所有特征。

这种方法的一个示例可以在 预测下一次购买演示笔记本 中看到。在此示例中,我们按客户分区数据,并在任何给定时间只将固定数量的客户加载到内存中。我们使用 Dask 轻松实现这一点,Dask 也可以用于将计算扩展到计算机集群。像 Spark 这样的框架也可以类似使用。

使用 Dask 将数据分区以分布到多个核心或集群上的另一个示例可以在 Dask 上的 Featuretools 笔记本 中看到。Feature Labs 工程博客上的 使用 Dask 并行化特征工程文章 中详细介绍了这种方法。Dask 允许简单地扩展到单台计算机上的多个核心或集群上的多台机器。

有关使用 Apache Spark (通过 PySpark) 进行类似的分区和分布式实现的示例,请参阅 Spark 上的特征工程笔记本。此实现展示了如何使用 Spark 作为分布式框架,在 EC2 实例集群上执行特征工程。