PySpark 初学者教程:通过示例学习

在学习Py之前Spark,让我们来了解一下:

什么是阿帕奇 Spark?

Spark 是一种已被证明比 Hadoop MapReduce 更简单、更快速的大数据解决方案。 Spark 是加州大学伯克利分校 RAD 实验室于 2009 年开发的一款开源软件。自 2010 年向公众发布以来, Spark 越来越受欢迎,并以前所未有的规模在整个行业中得到应用。

在那个时代 大数据运用,从业者比以往任何时候都更需要快速可靠的工具来处理数据流。早期的工具如 MapReduce 很受欢迎,但速度很慢。为了克服这个问题, Spark 提供了一种既快速又通用的解决方案。两者之间的主要区别 Spark MapReduce 是 Spark 在内存中运行计算,然后在硬盘上进行计算。它允许高速访问和数据处理,将时间从几小时缩短到几分钟。

Py 是什么Spark?

PySpark 是 Apache 创建的工具 Spark 使用社区 Python - Spark. 它允许使用 RDD (弹性分布式数据集) Python. 它还提供 PySpark 外壳链接 Python API 具有 Spark 核心启动 Spark 上下文。 Spark 是实现集群计算的名称引擎,而PySpark is Python使用的库 Spark.

如何 Spark 工作?

Spark 基于计算引擎,这意味着它负责调度、分发和监控应用程序。每个任务都在称为计算集群的各种工作机器上完成。计算集群是指任务的划分。一台机器执行一项任务,而其他机器通过不同的任务为最终输出做出贡献。最后,所有任务被聚合以产生输出。 Spark 管理员对各种 Spark 就业机会。

如何 Spark 工作
如何 Spark 工作

Spark 设计用于

  • Python
  • Java
  • 斯卡拉
  • SQL

一个显著的特征是 Spark 有大量的内置库,包括用于机器学习的 MLlib. Spark 还设计用于与 Hadoop 集群协同工作,并可读取多种类型的文件,包括 Hive 数据、CSV、JSON、Casandra 数据等。

为什么使用 Spark?

作为未来的数据从业者,您应该熟悉 Python 的著名库:Pandas 和 scikit-learn。这两个库非常适合探索中等规模的数据集。常规机器学习项目围绕以下方法构建:

  • 将数据加载到磁盘
  • 将数据导入机器内存
  • 处理/分析数据
  • 构建机器学习模型
  • 将预测结果存储回磁盘

如果数据科学家想要处理的数据对于一台计算机来说太大,就会出现问题。在数据科学的早期,从业者会对数据进行抽样,因为并不总是需要在庞大的数据集上进行训练。数据科学家会找到一个好的统计样本,进行额外的稳健性检查,然后得出一个优秀的模型。

然而,这样做存在一些问题:

  • 数据集是否反映了现实世界?
  • 数据中是否包含具体例子?
  • 该模型是否适合抽样?

以用户推荐为例。推荐系统依靠将用户与其他用户进行比较来评估他们的偏好。如果数据从业者只采用数据的一个子集,那么就不会有一群彼此非常相似的用户。推荐系统要么在完整的数据集上运行,要么根本不运行。

解决办法是什么?

解决方案早已显而易见,那就是将问题拆分到多台计算机上。并行计算也带来了多个问题。开发人员经常难以编写并行代码,最终不得不解决围绕多处理本身的一系列复杂问题。

Pyspark 为数据科学家提供了一个 API,可用于解决并行数据处理问题。Pyspark 可处理多处理的复杂性,例如分发数据、分发代码以及从机器集群上的工作人员收集输出。

Spark 可以独立运行,但通常运行在 Hadoop 等集群计算框架之上。然而,在测试和开发中,数据科学家可以高效地运行 Spark 在没有集群的开发箱或笔记本电脑上

• 主要优势之一 Spark 是构建一个涵盖数据流管理、无缝数据查询、机器学习预测和实时访问各种分析的架构。

• Spark 与SQL语言紧密结合,即结构化数据,可以实时查询数据。

• 数据科学家的主要工作是分析和建立预测模型。简而言之,数据科学家需要知道如何使用 SQL,生成统计报告并利用机器学习进行预测。数据科学家花费大量时间来清理、转换和分析数据。一旦数据集或数据工作流程准备就绪,数据科学家就会使用各种技术来发现见解和隐藏的模式。数据操作应该是强大的,并且同样易于使用。 Spark 由于其速度快且 API 丰富,因此是正确的工具。

在这个 PySpark 教程中,你将学习如何使用 Py 构建分类器Spark 例子。

如何安装 PySpark 使用 AWS

这款 Jupyter 团队构建 Docker 镜像来运行 Spark 高效。下面是安装 Py 的步骤Spark AWS 中的实例。

请参阅我们的教程 AWSTensorFlow

步骤 1:创建实例

首先,您需要创建一个实例。转到您的 AWS 帐户并启动实例。您可以将存储量增加到 15g,并使用与 TensorFlow 教程中相同的安全组。

第 2 步:打开连接

打开连接并安装docker容器。有关更多详细信息,请参阅TensorFlow的教程 Docker。请注意,您需要处于正确的工作目录中。

只需运行以下代码即可安装 Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

步骤 3:重新打开连接并安装 Spark

重新打开连接后,你可以安装包含 Py 的镜像Spark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

步骤4:打开 Jupyter

检查集装箱及其名称

docker ps

使用 docker logs 和 docker 名称启动 docker。例如,docker logs zealous_goldwasser

转到浏览器并启动 Jupyter。地址是http://localhost:8888/。粘贴终端给的密码。

备注:如果你想上传/下载文件到你的 AWS 机器,你可以使用 Cyber​​duck 软件, https://cyberduck.io/.

如何安装 PySpark on Windows/Mac 与 Conda

以下是安装Py的详细过程Spark on Windows/Mac 使用 Anaconda:

安装 Spark 在本地机器上,建议创建一个新的 conda 环境。此新环境将安装 Python Spark 以及所有依赖项。

苹果电脑用户

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows 用户

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

您可以编辑 .yml 文件。注意缩进。在 – 之前需要两个空格

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

保存并创建环境。这需要一些时间

conda env create -f hello-spark.yml

有关位置的更多详细信息,请查看教程安装 TensorFlow

您可以检查机器上安装的所有环境

conda env list
Activate hello-spark

苹果电脑用户

source activate hello-spark

Windows 用户

activate hello-spark

请注意: 您已经创建了一个特定的 TensorFlow 环境来运行 TensorFlow 教程。创建一个不同于 hello-tf 的新环境更为方便。用以下代码重载 hello-tf 是没有意义的 Spark 或任何其他机器学习库。

假设你的大部分项目都涉及 TensorFlow,但你需要使用 Spark 针对某个特定项目。您可以为所有项目设置一个 TensorFlow 环境,并为 Spark。您可以添加任意数量的库 Spark 根据需要在不干扰 TensorFlow 环境的情况下创建环境。完成 Spark的项目,您可以将其擦除而不会影响 TensorFlow 环境。

Jupyter

可选 Jupyter 笔记本并尝试 PySpark 有效。在新的笔记本中粘贴以下 PSpark 示例代码:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

如果显示错误,则可能是 Java 您的机器上没有安装。在 Mac 中,打开终端并输入 java -version,如果有 java 版本,请确保它是 1.8。在 Windows,转到应用程序并检查是否有 Java 文件夹。如果有 Java 文件夹,检查 Java 1.8 已安装。 截至撰写本文时,PySpark 与不兼容 Java9及以上。

如果需要安装 Java,你想想 链接 并下载 jdk-8u181-windows-x64.exe

Jupyter

对于Mac用户,建议使用“brew”。

brew tap caskroom/versions
brew cask install java8

请参阅此分步教程 如何安装 Java

备注:使用 remove 彻底删除环境。

 conda env remove -n hello-spark -y

Spark 语境

Spark上下文是允许与集群连接的内部引擎。如果你想运行一个操作,你需要一个 Spark上下文。

创建一个 Spark语境

首先,你需要发起 Spark上下文。

import pyspark
from pyspark import SparkContext
sc =SparkContext()

现在,该 Spark上下文已准备就绪,您可以创建一个名为 RDD(弹性分布式数据集)的数据集合。RDD 中的计算会自动在集群中并行化。

nums= sc.parallelize([1,2,3,4])

您可以使用 take 访问第一行

nums.take(1)
[1]

您可以使用 lambda 函数对数据进行转换。在 PySpark 下面的例子,你返回数字的平方。这是一个映射转换

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQL上下文

更方便的方法是使用DataFrame。 Spark上下文已设置,您可以使用它来创建 dataFrame。您还需要声明 SQLContext

SQLContext 允许将引擎与不同的数据源连接起来。它用于启动 Spark SQL。

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

现在在这个 Spark 教程 Python,让我们创建一个元组列表。每个元组将包含人员的姓名和年龄。需要四个步骤:

步骤1) 使用以下信息创建元组列表

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

步骤2) 构建 RDD

rdd = sc.parallelize(list_p)

步骤3) 转换元组

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

步骤4) 创建 DataFrame 上下文

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

如果要访问每个功能的类型,可以使用 printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

机器学习示例Spark

现在你已经对 Spark 和 SQLContext,您已准备好构建您的第一个机器学习程序。

以下是使用 Py 构建机器学习程序的步骤Spark:

  • 步骤1) Py 的基本操作Spark
  • 步骤2) 数据预处理
  • 步骤3) 构建数据处理管道
  • 步骤4) 构建分类器:逻辑
  • 步骤5) 训练和评估模型
  • 步骤6) 调整超参数

在这个 PySpark 机器学习教程,我们将使用成人数据集。本教程的目的是学习如何使用 Pyspark。有关数据集的更多信息,请参阅本教程。

请注意,数据集并不重要,您可能会认为计算需要很长时间。 Spark 旨在处理大量数据。 Spark当处理的数据集变大时,其性能相对于其他机器学习库会有所提高。

步骤 1)Py 的基本操作Spark

首先,您需要初始化尚未启动的SQLContext。

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

然后,您可以使用 sqlContext.read.csv 读取 cvs 文件。您可以使用设置为 True 的 inferSchema 来告诉 Spark 自动猜测数据类型。默认情况下,它为 False。

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

让我们看看数据类型

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

您可以使用 show 查看数据。

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

如果您没有将 inderShema 设置为 True,则类型会发生以下情况。它们全部以字符串形式存在。

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

要将连续变量转换为正确的格式,可以使用重铸列。您可以使用 withColumn 来告诉 Spark 对哪一列进行操作转换。

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

选择列

您可以使用 select 来选择并显示行以及特征名称。下面选择了 age 和 fnlwgt。

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

按组计数

如果要按组计算出现次数,可以链接:

  • 通过...分组()
  • 数()

一起。在 PySpark 下面的例子,您可以根据教育水平计算行数。

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

描述数据

要获取数据的汇总统计信息,可以使用 describe()。它将计算:

  • 意味着
  • 标准差
  • 分钟
  • 最大
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

如果只想要一列的汇总统计信息,请在 describe() 中添加列的名称

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

交叉表计算

在某些情况下,查看两对列之间的描述性统计数据可能会很有趣。例如,您可以按教育水平统计收入低于或高于 50k 的人数。此操作称为交叉表。

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

你会发现没有人年轻时收入超过50万。

下降柱

有两个直观的 API 可以删除列:

  • drop():删除一列
  • dropna():删除 NA

下面你把列 education_num 放进去

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

过滤数据

您可以使用 filter() 在数据子集中应用描述性统计。例如,您可以统计 40 岁以上的人数

df.filter(df.age > 40).count()

13443

Descript各组统计数字

最后,您可以按组对数据进行分组并计算平均值等统计运算。

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

步骤2)数据预处理

数据处理是机器学习中的关键步骤。删除垃圾数据后,您会得到一些重要的见解。

例如,您知道年龄与收入不是线性函数。当人们年轻时,他们的收入通常低于中年。退休后,家庭会动用他们的储蓄,这意味着收入减少。为了捕捉这种模式,您可以在年龄特征中添加一个方块

添加年龄方块

要添加新功能,您需要:

  1. 选择列
  2. 应用转换并将其添加到 DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

您可以看到 age_square 已成功添加到数据框中。您可以使用 select 更改变量的顺序。下面,您将 age_square 放在 age 后面。

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

排除荷兰

当特征中的一组只有一个观测值时,它不会给模型带来任何信息。相反,它可能会在交叉验证过程中导致错误。

让我们检查一下这个家庭的起源

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

特征 native_country 只有一个家庭来自荷兰。您将其排除。

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

步骤3)构建数据处理管道

与 scikit-learn 类似,Pyspark 有一个管道 API。

管道非常方便维护数据的结构。你将数据推送到管道中。在管道内部,进行各种操作,输出用于输入算法。

例如,机器学习中的一个通用转换是将字符串转换为独热编码器,即每组一列。独热编码器通常是一个全零的矩阵。

转换数据的步骤与 scikit-learn 非常相似。您需要:

  • 将字符串索引为数字
  • 创建独热编码器
  • 转换数据

两个 API 可以完成这个任务:StringIndexer、OneHotEncoder

  1. 首先,选择要索引的字符串列。inputCol 是数据集中列的名称。outputCol 是转换后的列的新名称。
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. 拟合数据并进行转换
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. 根据组创建新列。例如,如果特征中有 10 个组,则新矩阵将有 10 列,每个组一列。
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

构建管道

您将构建一个管道来转换所有精确特征并将其添加到最终数据集。该管道将有四个操作,但您可以随意添加任意数量的操作。

  1. 对分类数据进行编码
  2. 索引标签特征
  3. 添加连续变量
  4. 组装台阶。

每个步骤都存储在一个名为阶段的列表中。此列表将告诉 VectorAssembler 在管道内执行什么操作。

1. 对分类数据进行编码

此步骤与上面的例子完全相同,只是循环遍历所有分类特征。

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. 索引标签功能

Spark与许多其他库一样,不接受标签的字符串值。您可以使用 StringIndexer 转换标签功能并将其添加到列表阶段

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3.添加连续变量

VectorAssembler 的 inputCols 是列的列表。您可以创建一个包含所有新列的新列表。以下代码使用编码的分类特征和连续特征填充列表。

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. 组装台阶。

最后,你通过 VectorAssembler 中的所有步骤

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

现在所有步骤都已准备就绪,您可以将数据推送到管道。

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

如果您检查新数据集,您会发现它包含所有特征,包括转换后的特征和未转换的特征。您只对新标签和特征感兴趣。特征包括所有转换后的特征和连续变量。

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

步骤4)构建分类器:逻辑

为了加快计算速度,您可以将模型转换为 DataFrame。

您需要使用地图从模型中选择新的标签和特征。

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

您已准备好将训练数据创建为 DataFrame。您可以使用 sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

检查第二行

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

创建训练/测试集

您使用 randomSplit 将数据集按 80/20 进行拆分。

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

让我们计算一下训练集和测试集中收入低于/高于 50 万的人数

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

构建逻辑回归器

最后但同样重要的是,您可以构建分类器。Pyspark 有一个名为 LogisticRegression 的 API 来执行逻辑回归。

通过指示标签列和特征列来初始化 lr。设置最大迭代次数为 10 次,并添加值为 0.3 的正则化参数。请注意,在下一节中,您将使用带有参数网格的交叉验证来调整模型

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#你可以看到回归系数

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

步骤5)训练并评估模型

为了生成测试集的预测,

您可以在 test_data 上使用带有 transform() 的 linearModel

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

您可以打印预测中的元素

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

你对标签、预测和概率感兴趣

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

评估模型

您需要查看准确率指标,以了解模型的表现好坏。目前,没有 API 可以计算准确率度量 Spark。默认值是 ROC,即受试者工作特征曲线。这是一个考虑了假阳性率的不同指标。

在查看 ROC 之前,让我们先构建准确度度量。你对这个指标比较熟悉。准确度度量是正确预测与总观测数之和。

您创建一个带有标签和“预测”的 DataFrame。

cm = predictions.select("label", "prediction")

您可以检查标签中的类别数量和预测

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

例如,在测试集中,有 1578 个家庭的收入高于 50k,有 5021 个家庭的收入低于 617k。然而,分类器预测有 50 个家庭的收入高于 XNUMXk。

您可以通过计算标签在总行数中正确分类的计数来计算准确度。

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

您可以将所有内容放在一起并编写一个函数来计算准确性。

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC 指标

BinaryClassificationEvaluator 模块包括 ROC 测量。接收器 Opera特征曲线是二元分类的另一种常用工具。它与精度/召回率曲线非常相似,但 ROC 曲线不是绘制精度与召回率,而是显示真阳性率(即召回率)与假阳性率。假阳性率是被错误分类为阳性的阴性实例的比例。它等于 1 减去真阴性率。真阴性率也称为特异性。因此,ROC 曲线绘制了敏感度(召回率)与 XNUMX – 特异性

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192ROC 下面积

print(evaluator.evaluate(predictions))

0.8940481662695192

步骤6)调整超参数

最后但同样重要的是,你可以调整超参数。类似于 scikit学习 创建一个参数网格,然后添加您想要调整的参数。

为了减少计算时间,您只需使用两个值来调整正则化参数。

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

最后,使用 5 倍交叉验证法评估模型。训练大约需要 16 分钟。

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

训练模型的时间:978.807 秒

最佳正则化超参数为0.01,准确率为85.316%。

accuracy_m(model = cvModel)
Model accuracy: 85.316%

您可以通过将 cvModel.bestModel 与 extractParamMap() 链接来提取推荐参数

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

总结

Spark 是数据科学家的基本工具。它允许从业者将应用程序连接到不同的数据源,无缝执行数据分析或添加预测模型。

开始 Spark,你需要发起一个 Spark 上下文:

“Spark语境()'

和和 SQL 连接到数据源的上下文:

'SQLContext()'

在本教程中,您将学习如何训练逻辑回归:

  1. 使用以下方法将数据集转换为 Dataframe:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

请注意,标签的列名为 newlabel,所有特征都集中在 features 中。如果数据集中的这些值不同,请更改这些值。

  1. 创建训练/测试集
randomSplit([.8,.2],seed=1234)
  1. 训练模型
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. 做出预测
linearModel.transform()