本文将概述如何在scikit-learn中并行训练和调整某些模型,以优化时间。我还将概述如何并行这些模型的预测过程。我将介绍将数据划分为多个部分,并为每个部分生成预测,以及将整个数据集传递到多个模型中。
我以前写过一篇关于multiprocessing的文章,以及如何将其用于具有单个和多个参数的函数。你可以在这里阅读:
https://towardsdatascience.com/parallelization-w-multiprocessing-in-python-bd2fc234f516
我们将在本文的内容中使用multiprocessing模块。以下是本文的概要:
目录
- SciKit Learn的并行化
- 数据
- 要求
- 生成数据
- 训练模型
- 正常训练
- 并行训练
- 并行模型评估(交叉验证)
- 正常实现
- 并行实现
- 并行模型预测-单个模型
- 结构
- 正常实现
- 并行实现
- 并行模型预测-多个模型
- 结构
- 正常实现
- 并行实现
- 结束
- 资源
SciKit Learn的并行化
大多数机器学习项目都有4个主要组件,这需要大量的计算能力和时间。
1.模型训练
- 在各种训练测试分割上训练多个ML模型
2.模型的超参数调整
- 调整与模型相关的各种超参数,以最大化模型性能,而不会使其与原始数据过拟合
3.模型评估
- 通过交叉验证、准确性、分类报告等多种评估方法评估模型。
4.模型预测
- 为模型生成预测,以便推断时间较短。
- 与机器学习模型相关联的推断时间是指模型处理输入数据和生成预测所需的时间量。
- 你希望最大化模型性能,同时最小化推理时间。低推理时间有助于生产环境中的可扩展机器学习。
这4个组件对数据科学管道至关重要,每个组件都起着重要作用,每个都需要大量时间。
值得庆幸的是,当通过sci-kit-learn(一个用于模型开发的开放源码库)进行机器学习建模时,许多并行化已经内置到了你训练的许多常用函数和模型中。sklearn中函数的n_jobs参数指定了运行该函数时要使用的核心数。
njobs是一个整数,指定同时运行的工作线程的最大数量。如果给定1,则根本不使用joblib并行性,这对调试很有用。如果设置为-1,则使用所有CPU。对于低于-1的n_jobs,使用(n_cpus+1+n_job)。例如,当n_jobs=-2时,将使用除一个以外的所有CPU。
n_jobs默认为“无”,表示未设置;它通常被解释为n_jobs=1
- [1] https://scikit-learn.org/stable/glossary.html#term-n个作业
请注意,并非所有函数和模型都有此参数可供使用,这意味着sklearn模块的所有函数都不是并行的。例如,随机森林和K-Nearest Neighbours等模型具有n_jobs参数,而Elastic Net和Gradient Boosting等模型则没有。
要确定此参数是否适用于你正在使用的sklearn中的函数,你可以查看与该功能相关的文档,并检查n_jobs是否在该功能的参数部分中。
数据
我们将为下面的教程合成数据。我将重点介绍如何生成一些假数据,训练、调整、评估和预测机器学习模型,以常规方式并通过并行化。
本教程中我们将参考的主要模块将是sklearn、multiprocessing、panda和numpy。以下是本教程中需要遵循的版本和要求。如果你只是想参考与本教程相关的Jupyter笔记本,你可以在我的GitHub上找到它。
要求
Python>=3.8.8
pandas>=1.2.4
numpy>=1.20.1
sklearn=0.24.1
我们还将依赖于multiprocessing、random和itertools库。不用担心,因为这些库是与Python一起预装的。
生成数据
import pandas as pd
import numpy as np
import random
from random import randint
from multiprocessing import Pool
from multiprocessing import cpu_count
from functools import partial
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import classification_report
def generate_data(n_books = 3000, n_genres = 10, n_authors = 450, n_publishers = 50, n_readers = 30000, dataset_size = 100000):
'''
This function will generate a dataset with features associated to
book data set. The dataset will have the following columns :
- book_id (String) : Unique identified for the book
- book_rating (Integer) : A value between 0 and 10
- reader_id (String) : Unique identifier for the user
- book_genre (Integer) : An integer representing a genre for the book,
value is between 1 and 15, indicating that
there are 15 unique genres. Each book can only
have 1 genre
- author_id (String) : Unique identifier for the author of the book
- num_pages (Integer) : Random value between 70 and 500
- publisher_id (String) : A unique identifier for the publisher of the book
- publish_year (Integer) : The year of book publishing
- book_price (Integer) : The sale price of the book
- text_lang (Integer) : The language of the book - returns an integer which
is mapped to some language
params:
n_books (Integer) : The number of books you want the dataset to have
n_genres (Integer) : Number of genres to be chosen from
n_authors (Integer) : Number of authors to be generated
n_publishers (Integer) : Number of publishers for the dataset
n_readers (Integer) : Number of readers for the dataset
dataset_size (Integer) : The number of rows to be generated
example:
data = generate_data()
'''
d = pd.DataFrame(
{
'book_id' : [randint(1, n_books) for _ in range(dataset_size)],
'author_id' : [randint(1, n_authors) for _ in range(dataset_size)],
'book_genre' : [randint(1, n_genres) for _ in range(dataset_size)],
'reader_id' : [randint(1, n_readers) for _ in range(dataset_size)],
'num_pages' : [randint(75, 700) for _ in range(dataset_size)],
'book_rating' : [randint(1, 10) for _ in range(dataset_size)],
'publisher_id' : [randint(1, n_publishers) for _ in range(dataset_size)],
'publish_year' : [randint(2000, 2021) for _ in range(dataset_size)],
'book_price' : [randint(1, 200) for _ in range(dataset_size)],
'text_lang' : [randint(1,7) for _ in range(dataset_size)]
}
).drop_duplicates()
return d
d = generate_data(dataset_size = 100000)
# d.to_csv('data.csv', index = False)
d.head()
上述函数将随机生成与书籍相关数据相关的CSV。这些数据将用于生成两个模型,一个通常没有并行化,另一个通过并行化。
我们不会对数据进行任何特征工程或预处理,因为本文的范围是强调如何训练和预测sklearn模型。上面的函数应该生成一个示例DataFrame,如下所示:
训练模型
正常训练
genre_features = ['num_pages', 'book_rating', 'book_price', 'text_lang']
genre_target = 'book_genre'
x = d[genre_features].values
y = d[genre_target].values
X_train, X_test, y_train, y_test = train_test_split(
x,
y,
test_size = 0.3
)
print("Training Model")
# instantiate the model (using the default parameters)
gen_mdl = GradientBoostingClassifier()
# fit the model with data
gen_mdl.fit(X_train, y_train)
##perform classification and prediction on samples in tf_test
predicted_mdl = gen_mdl.predict(X_test)
print(classification_report(y_test, predicted_mdl))
report = classification_report(y_test, predicted_mdl, output_dict=True)
GB分类器没有n_jobs参数,因此你无法并行化此模型的模型训练过程。
并行训练
lang_features = ['num_pages', 'book_rating', 'book_price', 'book_genre']
lang_target = 'text_lang'
x = d[lang_features].values
y = d[lang_target].values
X_train, X_test, y_train, y_test = train_test_split(
x,
y,
test_size = 0.3
)
print("Training Model")
# instantiate the model (using the default parameters)
lang_mdl = RandomForestClassifier(n_jobs = 2)
# fit the model with data
lang_mdl.fit(X_train, y_train)
##perform classification and prediction on samples in tf_test
predicted_mdl = lang_mdl.predict(X_test)
print(classification_report(y_test, predicted_mdl))
report = classification_report(y_test, predicted_mdl, output_dict=True)
随机林分类器有n_jobs参数,因此你可以将其指定为要使用的核心数。
并行模型评估(交叉验证)
正常交叉验证
# evaluate the model
n_scores = cross_val_score(
lang_mdl,
X_train,
y_train,
scoring='accuracy',
cv=4,
n_jobs=1
)
并行交叉验证
# evaluate the model
n_scores = cross_val_score(
lang_mdl,
X_train,
y_train,
scoring='accuracy',
cv=4,
n_jobs=2
)
并行化模型预测-奇异模型
我们将把传递到模型中的输入数据拆分为多个批次。这些批处理的大小将与你要用于并行化预测的核数相等。
结构
正常实现
def predict(data, feature_cols, clf, pred_col):
'''
This function will generate predictions given a dataset, the associated features and a model.
params:
data (DataFrame) : The dataset which holds the features
feature_cols (List -> String) : List of column names in data corresponding to the model features
clf (Model) : The classification model which generates the predictions
pred_col (String) : The name of the column you want to store the predictions under in data
returns:
This function will add a column to the input dataset associated to the predictions generated
example:
>> predict(
data = df,
feature_col = lang_features,
pred_col = 'lang_prediction'
)
'''
ft = data[feature_cols].values
res = clf.predict(ft)
data[pred_col] = res
return data
# normal predictions
res = predict(
data = d,
feature_cols = lang_features,
clf = lang_mdl,
pred_col = 'lang_prediction'
)
并行实现
def parallel_pred(fn, data, feature_cols, clf, pred_col, n_cores):
'''
This function will parallelize the prediction process such that the data is split into
n components (n is defined based on n_cores) and passed onto the model.
params:
fn (Function) : The function you want to parallelize
data (DataFrame) : The dataset holding the features for the model
feature_cols (List -> String) : List of column names in data corresponding to the model features
clf (Model) : The model which generates the predictions
pred_col (String) : The name of the column you want to store the predictions under in data
n_cores (Integer) : The number of cores you want to use
returns:
This function will return the result of the input function
example:
parallel_pred(
fn = predict,
data = d,
feature_cols = lang_features,
clf = lang_mdl,
pred_col = 'parallel_lang_pred',
n_cores = 4
)
'''
if cpu_count() < n_cores:
raise ValueError("The number of CPU's specified exceed the amount available")
df_list = np.array_split(data, n_cores)
pool = Pool(n_cores)
res = pool.map(partial(
fn,
feature_cols = feature_cols,
clf = clf,
pred_col = pred_col
), df_list)
pool.close()
pool.join()
return pd.concat(res)
# parallel predictions
res = parallel_pred(
fn = predict,
data = d,
feature_cols = lang_features,
clf = lang_mdl,
pred_col = 'parallel_lang_pred',
n_cores = 2
)
对于并行实现,我们将使用在正常实现期间创建的函数,并进行优化,以便使用numpy提供的array_split函数将1个参数拆分为n个分区。
并行化模型预测-多个模型
现在我们的目标是将两个单独的模型传递到两个不同的核心,第一个模型是我们上面生成的类型模型,第二个是语言模型。我们将把相同的数据传递到这些模型中的每一个,并并行生成两个模型的预测。
结构
正常实现
model_data = [[gen_mdl, genre_features, 'genre'], [lang_mdl, lang_features, 'lang']]
def multi_predcit(data, models_data, pred_col = 'prediction_{}'):
'''
This function will outline how to generate predictions associated to multiple
models when passing in the same dataset.
params:
data (DataFrame) : The dataframe holding the feature data for the model
models_data (List -> List) : Nested list associated with the model,
feature columns and the name of the target
pred_col (String): The name of the column you want storing the results
returns:
This function will return the input dataframe with additional columns
corresponding to the predictions generated from all the models passed.
example:
multi_predcit(
data = d,
models_data = model_data,
pred_col = 'prediction_{}'
)
'''
for i in models_data:
mdl = i[0]
ft_cols = i[1]
target = i[2]
ft = data[ft_cols].values
res = mdl.predict(ft)
data[pred_col.format(target)] = res
return data
# predict normally
res = multi_predcit(
data = d,
models_data = model_data,
pred_col = 'prediction_{}'
)
我们将首先创建model_data,这是一个包含模型、特征列和目标名称的嵌套列表。我们可以迭代传递输入数据和相关特征以生成预测。
并行实现
def parallel_multi(fn, data, model_data, n_cores):
'''
This function will parallelize the input function so that a model is allocated to each core and
predictions are generated in parallel.
params:
fn (Function) : The function you want to parallelize
data (DataFrame) : The dataset holding the features
models_data (List -> List) : Nested list associated with the model,
feature columns and the name of the target
n_cores (Integer) : The number of cores you want to parallelize with
returns:
This function will return the input dataframe with additional columns, 1
corresponding to each of the models
example:
parallel_multi(
fn = multi_predcit,
data = d,
model_data = model_data,
n_cores = 2
)
'''
mdl_split = np.array_split(model_data, n_cores)
pool = Pool(n_cores)
res = pool.map(partial(fn, data), mdl_split)
pool.close()
pool.join()
return res
# predict parallel
res = parallel_multi(
fn = multi_predcit,
data = d,
model_data = model_data,
n_cores = 2
)
我们可以再次利用numpy中的array_split函数来分割上面创建的model_data,并将每个模型数据传递到一个核心中。每个核心基本上都将运行上面正常实现中概述的multi_prection函数。
结束
本文概述了如何通过并行处理将scikit-learn中用于训练、调优和预测模型的时间降到最低。并行训练模型的大部分繁重工作已经由 sci-kit learn 在内部完成。
我提供了一个深入的教程,展示了并行生成预测的不同方法。概述的一种方法是,当你有多个模型,并且你希望将数据传递给这些模型中的每一个以产生预测时。你试图预测的每个模型都将作为一个单独的进程并行运行。另一种方法是,当你有一个大数据集时,你希望更快地生成预测。在这种方法中,你可以将数据集划分为多个较小的段(你最多应该拥有的段数等于你想要使用的最大核数)。你可以在每个分段中传递一个模型,以并行生成预测,并将结果合并在一起。
请注意,当你有大量的核心和大型数据集时,这很有用。当在少量数据上运行此过程时,正常(非并行)实现将在与并行实现相似的时间内运行。与此存储库关联的笔记本运行在少量数据上以提高效率,因为我没有太多可用内存或内核。但是,你可以轻松地增加生成的数据量,从而增加为训练、评估和预测组件传递的数据量。
另一种优化模型预测所需时间的方法是通过稀疏矩阵。这个概念超出了本文的范围,但如果你感兴趣,可以在这里阅读。
https://towardsdatascience.com/optimize-training-predicting-sk-learn-models-in-python-with-sparse-matrices-1612072a025d
如果你想了解与本文相关的代码,可以查看我在这里发布笔记本的GitHub。
https://github.com/vatsal220/medium_articles/blob/main/parallel/sklearn_parallel.ipynb
资源
- [1] https://scikit-learn.org/stable/glossary.html#term-n-jobs
感谢阅读!