PySpark 初学者教程:通过示例学习
在学习Py之前Spark,让我们来了解一下:
什么是阿帕奇 Spark?
Spark 是一种已被证明比 Hadoop MapReduce 更简单、更快速的大数据解决方案。 Spark 是加州大学伯克利分校 RAD 实验室于 2009 年开发的一款开源软件。自 2010 年向公众发布以来, Spark 越来越受欢迎,并以前所未有的规模在整个行业中得到应用。
在那个时代 大数据运用,从业者比以往任何时候都更需要快速可靠的工具来处理数据流。早期的工具如 MapReduce 很受欢迎,但速度很慢。为了克服这个问题, Spark 提供了一种既快速又通用的解决方案。两者之间的主要区别 Spark MapReduce 是 Spark 在内存中运行计算,然后在硬盘上进行计算。它允许高速访问和数据处理,将时间从几小时缩短到几分钟。
Py 是什么Spark?
PySpark 是 Apache 创建的工具 Spark 使用社区 Python - Spark. 它允许使用 RDD (弹性分布式数据集) Python. 它还提供 PySpark 外壳链接 Python API 具有 Spark 核心启动 Spark 上下文。 Spark 是实现集群计算的名称引擎,而PySpark is Python使用的库 Spark.
如何 Spark 工作?
Spark 基于计算引擎,这意味着它负责调度、分发和监控应用程序。每个任务都在称为计算集群的各种工作机器上完成。计算集群是指任务的划分。一台机器执行一项任务,而其他机器通过不同的任务为最终输出做出贡献。最后,所有任务被聚合以产生输出。 Spark 管理员对各种 Spark 就业机会。
Spark 设计用于
- Python
- Java
- 斯卡拉
- SQL
一个显著的特征是 Spark 有大量的内置库,包括用于机器学习的 MLlib. Spark 还设计用于与 Hadoop 集群协同工作,并可读取多种类型的文件,包括 Hive 数据、CSV、JSON、Casandra 数据等。
为什么使用 Spark?
作为未来的数据从业者,您应该熟悉 Python 的著名库:Pandas 和 scikit-learn。这两个库非常适合探索中等规模的数据集。常规机器学习项目围绕以下方法构建:
- 将数据加载到磁盘
- 将数据导入机器内存
- 处理/分析数据
- 构建机器学习模型
- 将预测结果存储回磁盘
如果数据科学家想要处理的数据对于一台计算机来说太大,就会出现问题。在数据科学的早期,从业者会对数据进行抽样,因为并不总是需要在庞大的数据集上进行训练。数据科学家会找到一个好的统计样本,进行额外的稳健性检查,然后得出一个优秀的模型。
然而,这样做存在一些问题:
- 数据集是否反映了现实世界?
- 数据中是否包含具体例子?
- 该模型是否适合抽样?
以用户推荐为例。推荐系统依靠将用户与其他用户进行比较来评估他们的偏好。如果数据从业者只采用数据的一个子集,那么就不会有一群彼此非常相似的用户。推荐系统要么在完整的数据集上运行,要么根本不运行。
解决办法是什么?
解决方案早已显而易见,那就是将问题拆分到多台计算机上。并行计算也带来了多个问题。开发人员经常难以编写并行代码,最终不得不解决围绕多处理本身的一系列复杂问题。
Pyspark 为数据科学家提供了一个 API,可用于解决并行数据处理问题。Pyspark 可处理多处理的复杂性,例如分发数据、分发代码以及从机器集群上的工作人员收集输出。
Spark 可以独立运行,但通常运行在 Hadoop 等集群计算框架之上。然而,在测试和开发中,数据科学家可以高效地运行 Spark 在没有集群的开发箱或笔记本电脑上
• 主要优势之一 Spark 是构建一个涵盖数据流管理、无缝数据查询、机器学习预测和实时访问各种分析的架构。
• Spark 与SQL语言紧密结合,即结构化数据,可以实时查询数据。
• 数据科学家的主要工作是分析和建立预测模型。简而言之,数据科学家需要知道如何使用 SQL,生成统计报告并利用机器学习进行预测。数据科学家花费大量时间来清理、转换和分析数据。一旦数据集或数据工作流程准备就绪,数据科学家就会使用各种技术来发现见解和隐藏的模式。数据操作应该是强大的,并且同样易于使用。 Spark 由于其速度快且 API 丰富,因此是正确的工具。
在这个 PySpark 教程中,你将学习如何使用 Py 构建分类器Spark 例子。
如何安装 PySpark 使用 AWS
这款 Jupyter 团队构建 Docker 镜像来运行 Spark 高效。下面是安装 Py 的步骤Spark AWS 中的实例。
请参阅我们的教程 AWS 和 TensorFlow
步骤 1:创建实例
首先,您需要创建一个实例。转到您的 AWS 帐户并启动实例。您可以将存储量增加到 15g,并使用与 TensorFlow 教程中相同的安全组。
第 2 步:打开连接
打开连接并安装docker容器。有关更多详细信息,请参阅TensorFlow的教程 Docker。请注意,您需要处于正确的工作目录中。
只需运行以下代码即可安装 Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
步骤 3:重新打开连接并安装 Spark
重新打开连接后,你可以安装包含 Py 的镜像Spark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
步骤4:打开 Jupyter
检查集装箱及其名称
docker ps
使用 docker logs 和 docker 名称启动 docker。例如,docker logs zealous_goldwasser
转到浏览器并启动 Jupyter。地址是http://localhost:8888/。粘贴终端给的密码。
备注:如果你想上传/下载文件到你的 AWS 机器,你可以使用 Cyberduck 软件, https://cyberduck.io/.
如何安装 PySpark on Windows/Mac 与 Conda
以下是安装Py的详细过程Spark on Windows/Mac 使用 Anaconda:
安装 Spark 在本地机器上,建议创建一个新的 conda 环境。此新环境将安装 Python Spark 以及所有依赖项。
苹果电脑用户
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows 用户
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
您可以编辑 .yml 文件。注意缩进。在 – 之前需要两个空格
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
保存并创建环境。这需要一些时间
conda env create -f hello-spark.yml
有关位置的更多详细信息,请查看教程安装 TensorFlow
您可以检查机器上安装的所有环境
conda env list
Activate hello-spark
苹果电脑用户
source activate hello-spark
Windows 用户
activate hello-spark
请注意: 您已经创建了一个特定的 TensorFlow 环境来运行 TensorFlow 教程。创建一个不同于 hello-tf 的新环境更为方便。用以下代码重载 hello-tf 是没有意义的 Spark 或任何其他机器学习库。
假设你的大部分项目都涉及 TensorFlow,但你需要使用 Spark 针对某个特定项目。您可以为所有项目设置一个 TensorFlow 环境,并为 Spark。您可以添加任意数量的库 Spark 根据需要在不干扰 TensorFlow 环境的情况下创建环境。完成 Spark的项目,您可以将其擦除而不会影响 TensorFlow 环境。
Jupyter
可选 Jupyter 笔记本并尝试 PySpark 有效。在新的笔记本中粘贴以下 PSpark 示例代码:
import pyspark from pyspark import SparkContext sc =SparkContext()
如果显示错误,则可能是 Java 您的机器上没有安装。在 Mac 中,打开终端并输入 java -version,如果有 java 版本,请确保它是 1.8。在 Windows,转到应用程序并检查是否有 Java 文件夹。如果有 Java 文件夹,检查 Java 1.8 已安装。 截至撰写本文时,PySpark 与不兼容 Java9及以上。
如果需要安装 Java,你想想 链接 并下载 jdk-8u181-windows-x64.exe
对于Mac用户,建议使用“brew”。
brew tap caskroom/versions brew cask install java8
请参阅此分步教程 如何安装 Java
备注:使用 remove 彻底删除环境。
conda env remove -n hello-spark -y
Spark 语境
Spark上下文是允许与集群连接的内部引擎。如果你想运行一个操作,你需要一个 Spark上下文。
创建一个 Spark语境
首先,你需要发起 Spark上下文。
import pyspark from pyspark import SparkContext sc =SparkContext()
现在,该 Spark上下文已准备就绪,您可以创建一个名为 RDD(弹性分布式数据集)的数据集合。RDD 中的计算会自动在集群中并行化。
nums= sc.parallelize([1,2,3,4])
您可以使用 take 访问第一行
nums.take(1)
[1]
您可以使用 lambda 函数对数据进行转换。在 PySpark 下面的例子,你返回数字的平方。这是一个映射转换
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQL上下文
更方便的方法是使用DataFrame。 Spark上下文已设置,您可以使用它来创建 dataFrame。您还需要声明 SQLContext
SQLContext 允许将引擎与不同的数据源连接起来。它用于启动 Spark SQL。
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
现在在这个 Spark 教程 Python,让我们创建一个元组列表。每个元组将包含人员的姓名和年龄。需要四个步骤:
步骤1) 使用以下信息创建元组列表
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
步骤2) 构建 RDD
rdd = sc.parallelize(list_p)
步骤3) 转换元组
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
步骤4) 创建 DataFrame 上下文
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
如果要访问每个功能的类型,可以使用 printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
机器学习示例Spark
现在你已经对 Spark 和 SQLContext,您已准备好构建您的第一个机器学习程序。
以下是使用 Py 构建机器学习程序的步骤Spark:
- 步骤1) Py 的基本操作Spark
- 步骤2) 数据预处理
- 步骤3) 构建数据处理管道
- 步骤4) 构建分类器:逻辑
- 步骤5) 训练和评估模型
- 步骤6) 调整超参数
在这个 PySpark 机器学习教程,我们将使用成人数据集。本教程的目的是学习如何使用 Pyspark。有关数据集的更多信息,请参阅本教程。
请注意,数据集并不重要,您可能会认为计算需要很长时间。 Spark 旨在处理大量数据。 Spark当处理的数据集变大时,其性能相对于其他机器学习库会有所提高。
步骤 1)Py 的基本操作Spark
首先,您需要初始化尚未启动的SQLContext。
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
然后,您可以使用 sqlContext.read.csv 读取 cvs 文件。您可以使用设置为 True 的 inferSchema 来告诉 Spark 自动猜测数据类型。默认情况下,它为 False。
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
让我们看看数据类型
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
您可以使用 show 查看数据。
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
如果您没有将 inderShema 设置为 True,则类型会发生以下情况。它们全部以字符串形式存在。
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
要将连续变量转换为正确的格式,可以使用重铸列。您可以使用 withColumn 来告诉 Spark 对哪一列进行操作转换。
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
选择列
您可以使用 select 来选择并显示行以及特征名称。下面选择了 age 和 fnlwgt。
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
按组计数
如果要按组计算出现次数,可以链接:
- 通过...分组()
- 数()
一起。在 PySpark 下面的例子,您可以根据教育水平计算行数。
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
描述数据
要获取数据的汇总统计信息,可以使用 describe()。它将计算:
- 数
- 意味着
- 标准差
- 分钟
- 最大
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
如果只想要一列的汇总统计信息,请在 describe() 中添加列的名称
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
交叉表计算
在某些情况下,查看两对列之间的描述性统计数据可能会很有趣。例如,您可以按教育水平统计收入低于或高于 50k 的人数。此操作称为交叉表。
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
你会发现没有人年轻时收入超过50万。
下降柱
有两个直观的 API 可以删除列:
- drop():删除一列
- dropna():删除 NA
下面你把列 education_num 放进去
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
过滤数据
您可以使用 filter() 在数据子集中应用描述性统计。例如,您可以统计 40 岁以上的人数
df.filter(df.age > 40).count()
13443
Descript各组统计数字
最后,您可以按组对数据进行分组并计算平均值等统计运算。
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
步骤2)数据预处理
数据处理是机器学习中的关键步骤。删除垃圾数据后,您会得到一些重要的见解。
例如,您知道年龄与收入不是线性函数。当人们年轻时,他们的收入通常低于中年。退休后,家庭会动用他们的储蓄,这意味着收入减少。为了捕捉这种模式,您可以在年龄特征中添加一个方块
添加年龄方块
要添加新功能,您需要:
- 选择列
- 应用转换并将其添加到 DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
您可以看到 age_square 已成功添加到数据框中。您可以使用 select 更改变量的顺序。下面,您将 age_square 放在 age 后面。
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
排除荷兰
当特征中的一组只有一个观测值时,它不会给模型带来任何信息。相反,它可能会在交叉验证过程中导致错误。
让我们检查一下这个家庭的起源
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
特征 native_country 只有一个家庭来自荷兰。您将其排除。
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
步骤3)构建数据处理管道
与 scikit-learn 类似,Pyspark 有一个管道 API。
管道非常方便维护数据的结构。你将数据推送到管道中。在管道内部,进行各种操作,输出用于输入算法。
例如,机器学习中的一个通用转换是将字符串转换为独热编码器,即每组一列。独热编码器通常是一个全零的矩阵。
转换数据的步骤与 scikit-learn 非常相似。您需要:
- 将字符串索引为数字
- 创建独热编码器
- 转换数据
两个 API 可以完成这个任务:StringIndexer、OneHotEncoder
- 首先,选择要索引的字符串列。inputCol 是数据集中列的名称。outputCol 是转换后的列的新名称。
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- 拟合数据并进行转换
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- 根据组创建新列。例如,如果特征中有 10 个组,则新矩阵将有 10 列,每个组一列。
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
构建管道
您将构建一个管道来转换所有精确特征并将其添加到最终数据集。该管道将有四个操作,但您可以随意添加任意数量的操作。
- 对分类数据进行编码
- 索引标签特征
- 添加连续变量
- 组装台阶。
每个步骤都存储在一个名为阶段的列表中。此列表将告诉 VectorAssembler 在管道内执行什么操作。
1. 对分类数据进行编码
此步骤与上面的例子完全相同,只是循环遍历所有分类特征。
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. 索引标签功能
Spark与许多其他库一样,不接受标签的字符串值。您可以使用 StringIndexer 转换标签功能并将其添加到列表阶段
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3.添加连续变量
VectorAssembler 的 inputCols 是列的列表。您可以创建一个包含所有新列的新列表。以下代码使用编码的分类特征和连续特征填充列表。
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. 组装台阶。
最后,你通过 VectorAssembler 中的所有步骤
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
现在所有步骤都已准备就绪,您可以将数据推送到管道。
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
如果您检查新数据集,您会发现它包含所有特征,包括转换后的特征和未转换的特征。您只对新标签和特征感兴趣。特征包括所有转换后的特征和连续变量。
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
步骤4)构建分类器:逻辑
为了加快计算速度,您可以将模型转换为 DataFrame。
您需要使用地图从模型中选择新的标签和特征。
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
您已准备好将训练数据创建为 DataFrame。您可以使用 sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
检查第二行
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
创建训练/测试集
您使用 randomSplit 将数据集按 80/20 进行拆分。
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
让我们计算一下训练集和测试集中收入低于/高于 50 万的人数
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
构建逻辑回归器
最后但同样重要的是,您可以构建分类器。Pyspark 有一个名为 LogisticRegression 的 API 来执行逻辑回归。
通过指示标签列和特征列来初始化 lr。设置最大迭代次数为 10 次,并添加值为 0.3 的正则化参数。请注意,在下一节中,您将使用带有参数网格的交叉验证来调整模型
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#你可以看到回归系数
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
步骤5)训练并评估模型
为了生成测试集的预测,
您可以在 test_data 上使用带有 transform() 的 linearModel
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
您可以打印预测中的元素
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
你对标签、预测和概率感兴趣
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
评估模型
您需要查看准确率指标,以了解模型的表现好坏。目前,没有 API 可以计算准确率度量 Spark。默认值是 ROC,即受试者工作特征曲线。这是一个考虑了假阳性率的不同指标。
在查看 ROC 之前,让我们先构建准确度度量。你对这个指标比较熟悉。准确度度量是正确预测与总观测数之和。
您创建一个带有标签和“预测”的 DataFrame。
cm = predictions.select("label", "prediction")
您可以检查标签中的类别数量和预测
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
例如,在测试集中,有 1578 个家庭的收入高于 50k,有 5021 个家庭的收入低于 617k。然而,分类器预测有 50 个家庭的收入高于 XNUMXk。
您可以通过计算标签在总行数中正确分类的计数来计算准确度。
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
您可以将所有内容放在一起并编写一个函数来计算准确性。
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC 指标
BinaryClassificationEvaluator 模块包括 ROC 测量。接收器 Opera特征曲线是二元分类的另一种常用工具。它与精度/召回率曲线非常相似,但 ROC 曲线不是绘制精度与召回率,而是显示真阳性率(即召回率)与假阳性率。假阳性率是被错误分类为阳性的阴性实例的比例。它等于 1 减去真阴性率。真阴性率也称为特异性。因此,ROC 曲线绘制了敏感度(召回率)与 XNUMX – 特异性
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192ROC 下面积
print(evaluator.evaluate(predictions))
0.8940481662695192
步骤6)调整超参数
最后但同样重要的是,你可以调整超参数。类似于 scikit学习 创建一个参数网格,然后添加您想要调整的参数。
为了减少计算时间,您只需使用两个值来调整正则化参数。
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
最后,使用 5 倍交叉验证法评估模型。训练大约需要 16 分钟。
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
训练模型的时间:978.807 秒
最佳正则化超参数为0.01,准确率为85.316%。
accuracy_m(model = cvModel) Model accuracy: 85.316%
您可以通过将 cvModel.bestModel 与 extractParamMap() 链接来提取推荐参数
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
总结
Spark 是数据科学家的基本工具。它允许从业者将应用程序连接到不同的数据源,无缝执行数据分析或添加预测模型。
开始 Spark,你需要发起一个 Spark 上下文:
“Spark语境()'
和和 SQL 连接到数据源的上下文:
'SQLContext()'
在本教程中,您将学习如何训练逻辑回归:
- 使用以下方法将数据集转换为 Dataframe:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
请注意,标签的列名为 newlabel,所有特征都集中在 features 中。如果数据集中的这些值不同,请更改这些值。
- 创建训练/测试集
randomSplit([.8,.2],seed=1234)
- 训练模型
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- 做出预测
linearModel.transform()