PySpark Урок за начинаещи: Учете с ПРИМЕРИ

Преди да научите PySpark, да разберем:

Какво е Apache Spark?

Spark е решение за големи данни, което е доказано по-лесно и по-бързо от Hadoop MapReduce. Spark е софтуер с отворен код, разработен от UC Berkeley RAD lab през 2009 г. Откакто беше пуснат за обществеността през 2010 г., Spark нарасна в популярност и се използва в индустрията с безпрецедентен мащаб.

В ерата на Голямо количество от данни, практиците се нуждаят повече от всякога от бързи и надеждни инструменти за обработка на поточно предаване на данни. По-ранните инструменти като MapReduce бяха любими, но бяха бавни. За да преодолеете този проблем, Spark предлага решение, което е едновременно бързо и с общо предназначение. Основната разлика между Spark и MapReduce е това Spark изпълнява изчисления в паметта по-късно на твърдия диск. Позволява високоскоростен достъп и обработка на данни, намалявайки времето от часове до минути.

Какво е PySpark?

PySpark е инструмент, създаден от Apache Spark Общността за използване Python с Spark. Позволява работа с RDD (Resilient Distributed Dataset) в Python. Той също така предлага PySpark Shell за връзка Python API с Spark ядро за започване Spark Контекст. Spark е името двигател за реализиране на клъстерни изчисления, докато PySpark is Pythonбиблиотека за използване Spark.

Как Spark работа?

Spark е базиран на изчислителна машина, което означава, че се грижи за приложението за планиране, разпространение и наблюдение. Всяка задача се изпълнява на различни работни машини, наречени изчислителен клъстер. Компютърният клъстер се отнася до разделянето на задачите. Една машина изпълнява една задача, докато другите допринасят за крайния резултат чрез различна задача. В крайна сметка всички задачи се агрегират, за да се получи резултат. The Spark admin дава 360 преглед на различни Spark Работни места.

Как действа Spark Работа
Как действа Spark Работа

Spark е предназначен за работа с

  • Python
  • Java
  • Scala
  • SQL

Съществена характеристика на Spark е огромното количество вградена библиотека, включително MLlib за машинно обучение. Spark също така е проектиран да работи с Hadoop клъстери и може да чете широк тип файлове, включително данни от Hive, CSV, JSON, данни от Casandra между другото.

Защо ползване Spark?

Като бъдещ специалист по данни, трябва да сте запознати с известните библиотеки на Python: Pandas и scikit-learn. Тези две библиотеки са фантастични за изследване на набор от данни до среден размер. Редовните проекти за машинно обучение са изградени около следната методология:

  • Заредете данните на диска
  • Импортирайте данните в паметта на машината
  • Обработвайте/анализирайте данните
  • Изградете модела за машинно обучение
  • Запазете прогнозата обратно на диска

Проблемът възниква, ако специалистът по данни иска да обработва данни, които са твърде големи за един компютър. По време на по-ранните дни на науката за данни, практикуващите ще вземат проби, тъй като обучението върху огромни масиви от данни не винаги е било необходимо. Специалистът по данни ще намери добра статистическа извадка, ще извърши допълнителна проверка за устойчивост и ще излезе с отличен модел.

Има обаче някои проблеми с това:

  • Наборът от данни отразява ли реалния свят?
  • Данните включват ли конкретен пример?
  • Моделът годен ли е за вземане на проби?

Вземете например препоръките на потребителите. Препоръчителите разчитат на сравняване на потребителите с други потребители при оценката на техните предпочитания. Ако специалистът по данни вземе само подмножество от данните, няма да има кохорта от потребители, които са много сходни един с друг. Препоръчителите трябва да работят върху пълния набор от данни или изобщо да не работят.

Какво е решението?

Решението е очевидно от дълго време, разделете проблема на няколко компютъра. Паралелното изчисление идва и с множество проблеми. Разработчиците често имат проблеми с писането на паралелен код и в крайна сметка трябва да решават куп сложни проблеми около самата мултипроцесорност.

Pyspark дава на специалиста по данни API, който може да се използва за решаване на проблеми с обработката на паралелни данни. Pyspark се справя със сложността на мултипроцесирането, като разпространение на данни, разпространение на код и събиране на изход от работниците на клъстер от машини.

Spark може да работи самостоятелно, но най-често работи върху клъстерна изчислителна рамка като Hadoop. При тестване и разработка обаче специалистът по данни може да работи ефективно Spark на техните развойни кутии или лаптопи без клъстер

• Едно от основните предимства на Spark е да се изгради архитектура, която обхваща управление на поточно предаване на данни, безпроблемни заявки за данни, прогнозиране на машинно обучение и достъп в реално време до различни анализи.

• Spark работи в тясно сътрудничество с езика SQL, т.е. структурирани данни. Позволява търсене на данни в реално време.

• Основната работа на специалиста по данни е да анализира и изгражда прогнозни модели. Накратко, специалистът по данни трябва да знае как да прави заявки за данни, използвайки SQL, изготвяне на статистически отчет и използване на машинно обучение за създаване на прогнози. Учените по данни прекарват значителна част от времето си за почистване, трансформиране и анализиране на данните. След като наборът от данни или работният поток от данни е готов, специалистът по данни използва различни техники, за да открие прозрения и скрити модели. Манипулирането на данни трябва да бъде стабилно и също толкова лесно за използване. Spark е правилният инструмент благодарение на своята скорост и богат API.

В този PySpark урок, ще научите как да създадете класификатор с PySpark примери.

Как да инсталирате PySpark с AWS

- Jupyter екип изгради Docker изображение за стартиране Spark ефективно. По-долу са стъпките, които можете да следвате, за да инсталирате PySpark екземпляр в AWS.

Вижте нашия урок AWS намлява TensorFlow

Стъпка 1: Създайте екземпляр

На първо място, трябва да създадете екземпляр. Отидете във вашия AWS акаунт и стартирайте екземпляра. Можете да увеличите паметта до 15 g и да използвате същата група за сигурност като в урока за TensorFlow.

Стъпка 2: Отворете връзката

Отворете връзката и инсталирайте докер контейнер. За повече подробности вижте урока с TensorFlow с докер. Имайте предвид, че трябва да сте в правилната работна директория.

Просто стартирайте тези кодове, за да инсталирате Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Стъпка 3: Отворете отново връзката и инсталирайте Spark

След като отворите отново връзката, можете да инсталирате изображението, съдържащо PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Стъпка 4: Отворете Jupyter

Проверете контейнера и името му

docker ps

Стартирайте докера с докер регистрационни файлове, последвани от името на докера. Например, докер регистрира zealous_goldwasser

Отидете до вашия браузър и стартирайте Jupyter. Адресът е http://localhost:8888/. Поставете паролата, дадена от терминала.

Забележка: ако искате да качите/изтеглите файл на вашата AWS машина, можете да използвате софтуера Cyberduck, https://cyberduck.io/.

Как да инсталирате PySpark on Windows/Mac с Conda

Следва подробен процес за инсталиране на PySpark on Windows/Mac използва Anaconda:

За да инсталирате Spark на вашата локална машина, препоръчителната практика е да създадете нова conda среда. Тази нова среда ще се инсталира Python 3.6, Spark и всички зависимости.

Потребител на Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Потребител

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Можете да редактирате .yml файла. Бъдете внимателни с отстъпа. Необходими са две интервали преди –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Запазете го и създайте среда. Отнема известно време

conda env create -f hello-spark.yml

За повече подробности относно местоположението, моля, проверете урока Инсталиране на TensorFlow

Можете да проверите цялата среда, инсталирана на вашето устройство

conda env list
Activate hello-spark

Потребител на Mac

source activate hello-spark

Windows Потребител

activate hello-spark

Забележка: Вече сте създали конкретна среда TensorFlow, за да изпълнявате уроците на TensorFlow. По-удобно е да създадете нова среда, различна от hello-tf. Няма смисъл да претоварвате hello-tf с Spark или всякакви други библиотеки за машинно обучение.

Представете си, че по-голямата част от вашия проект включва TensorFlow, но трябва да използвате Spark за един конкретен проект. Можете да зададете среда TensorFlow за целия си проект и да създадете отделна среда за Spark. Можете да добавите колкото се може повече библиотеки Spark среда, както искате, без да се намесвате в средата на TensorFlow. След като приключите с Sparkпроект на , можете да го изтриете, без да засягате средата на TensorFlow.

Jupyter

отворено Jupyter Бележник и опитайте дали PySpark работи. В нов бележник поставете следния PySpark примерен код:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Ако се покаже грешка, вероятно това е така Java не е инсталиран на вашата машина. В mac отворете терминала и напишете java -version, ако има версия на java, уверете се, че е 1.8. в Windows, отидете на Приложение и проверете дали има a Java папка. Ако има a Java папка, проверете това Java 1.8 е инсталирана. Към момента на писане, PySpark не е съвместим с Java9 и по-горе.

Ако трябва да инсталирате Java, ти да мислиш връзка и изтеглете jdk-8u181-windows-x64.exe

Jupyter

За потребители на Mac се препоръчва използването на `brew.`

brew tap caskroom/versions
brew cask install java8

Вижте този урок стъпка по стъпка как да инсталирате Java

Забележка: Използвайте премахване, за да изтриете напълно среда.

 conda env remove -n hello-spark -y

Spark Контекст

SparkКонтекстът е вътрешният двигател, който позволява връзките с клъстерите. Ако искате да извършите операция, имате нужда от SparkКонтекст.

Създаване на SparkКонтекст

На първо място, трябва да инициирате a SparkКонтекст.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Сега, че SparkКонтекстът е готов, можете да създадете колекция от данни, наречена RDD, Resilient Distributed Dataset. Изчисленията в RDD автоматично се паралелизират в клъстера.

nums= sc.parallelize([1,2,3,4])

Можете да получите достъп до първия ред с take

nums.take(1)
[1]

Можете да приложите трансформация към данните с ламбда функция. В PySpark в примера по-долу връщате квадрата на числата. Това е трансформация на карта

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

По-удобен начин е да използвате DataFrame. SparkКонтекстът вече е зададен, можете да го използвате, за да създадете dataFrame. Трябва също така да декларирате SQLContext

SQLContext позволява свързване на двигателя с различни източници на данни. Използва се за стартиране на функционалностите на Spark sql.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Сега в това Spark настойнически Python, нека създадем списък с кортежи. Всеки кортеж ще съдържа имената на хората и тяхната възраст. Необходими са четири стъпки:

Стъпка 1) Създайте списък от кортежи с информацията

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Стъпка 2) Изградете RDD

rdd = sc.parallelize(list_p)

Стъпка 3) Преобразувайте кортежите

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Стъпка 4) Създайте контекст на DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Ако искате да получите достъп до типа на всяка функция, можете да използвате printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Пример за машинно обучение с PySpark

Сега, когато имате кратка представа за Spark и SQLContext, вие сте готови да създадете първата си програма за машинно обучение.

Следват стъпките за изграждане на програма за машинно обучение с PySpark:

  • Стъпка 1) Основна работа с PySpark
  • Стъпка 2) Предварителна обработка на данни
  • Стъпка 3) Изградете тръбопровод за обработка на данни
  • Стъпка 4) Изградете класификатора: логистика
  • Стъпка 5) Обучете и оценете модела
  • Стъпка 6) Настройте хиперпараметъра

В този PySpark Урок за машинно обучение, ще използваме набора от данни за възрастни. Целта на този урок е да научите как да използвате Pyspark. За повече информация относно набора от данни вижте този урок.

Имайте предвид, че наборът от данни не е значителен и може да си помислите, че изчислението отнема много време. Spark е проектиран да обработва значително количество данни. SparkПроизводителността на се увеличава в сравнение с други библиотеки за машинно обучение, когато обработеният набор от данни нараства.

Стъпка 1) Основна операция с PySpark

На първо място, трябва да инициализирате SQLContext, който все още не е иницииран.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

след това можете да прочетете cvs файла с sqlContext.read.csv. Използвате inferSchema, зададена на True, за да кажете Spark за автоматично отгатване на типа на данните. По подразбиране е обърнато на False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Нека да разгледаме типа данни

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Можете да видите данните с show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Ако не сте задали inderShema на True, ето какво се случва с типа. Има всички в низ.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

За да конвертирате непрекъснатата променлива в правилния формат, можете да използвате преработване на колоните. Можете да използвате withColumn, за да кажете Spark коя колона да оперира трансформацията.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Изберете колони

Можете да изберете и покажете редовете с select и имената на функциите. По-долу са избрани възраст и fnlwgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Преброяване по групи

Ако искате да преброите броя на срещанията по група, можете да лансирате:

  • Групирай по()
  • броя()

заедно. В PySpark В примера по-долу броите броя на редовете според нивото на образование.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Опишете данните

За да получите обобщена статистика за данните, можете да използвате describe(). Той ще изчисли:

  • броя
  • означава
  • стандартно отклонение
  • мин
  • макс
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Ако искате обобщената статистика само на една колона, добавете името на колоната вътре в describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Crosstab изчисление

В някои случаи може да бъде интересно да видите описателната статистика между две колони по двойки. Например, можете да преброите броя на хората с доходи под или над 50 хиляди по ниво на образование. Тази операция се нарича кръстосана таблица.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Можете да видите, че нито един човек няма приходи над 50 хиляди, когато е млад.

Капка колона

Има два интуитивни API за премахване на колони:

  • drop(): Пускане на колона
  • dropna(): Премахване на NA

По-долу пускате колоната education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Филтрирайте данните

Можете да използвате filter(), за да приложите описателна статистика в подмножество от данни. Например, можете да преброите броя на хората над 40 години

df.filter(df.age > 40).count()

13443

Descriptive статистика по групи

И накрая, можете да групирате данни по групи и да изчислявате статистически операции като средната стойност.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Стъпка 2) Предварителна обработка на данни

Обработката на данни е критична стъпка в машинното обучение. След като премахнете ненужните данни, получавате някои важни прозрения.

Например, знаете, че възрастта не е линейна функция с дохода. Когато хората са млади, техните доходи обикновено са по-ниски от тези на средна възраст. След пенсиониране домакинството използва своите спестявания, което означава намаляване на доходите. За да заснемете този модел, можете да добавите квадрат към функцията за възраст

Добавете квадрат за възраст

За да добавите нова функция, трябва:

  1. Изберете колоната
  2. Приложете трансформацията и я добавете към DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Можете да видите, че age_square е добавен успешно към рамката с данни. Можете да промените реда на променливите с select. По-долу въвеждате age_square веднага след age.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Изключете Холандия-Холандия

Когато група в рамките на характеристика има само едно наблюдение, тя не носи информация към модела. Напротив, това може да доведе до грешка по време на кръстосаното валидиране.

Да проверим произхода на домакинството

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Функцията native_country има само едно домакинство от Холандия. Вие го изключвате.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Стъпка 3) Изградете тръбопровод за обработка на данни

Подобно на scikit-learn, Pyspark има API за тръбопровод.

Конвейерът е много удобен за поддържане на структурата на данните. Вкарвате данните в тръбопровода. Вътре в конвейера се извършват различни операции, изходът се използва за захранване на алгоритъма.

Например, една универсална трансформация в машинното обучение се състои от преобразуване на низ в един горещ енкодер, т.е. една колона от група. Един горещ енкодер обикновено е матрица, пълна с нули.

Стъпките за трансформиране на данните са много подобни на scikit-learn. Трябва да:

  • Индексирайте низа към числов
  • Създайте един горещ енкодер
  • Трансформирайте данните

Два API вършат работата: StringIndexer, OneHotEncoder

  1. Първо избирате колоната с низове за индексиране. InputCol е името на колоната в набора от данни. outputCol е новото име, дадено на трансформираната колона.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Поставете данните и ги трансформирайте
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Създайте колоните с новини въз основа на групата. Например, ако има 10 групи във функцията, новата матрица ще има 10 колони, по една за всяка група.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Изградете тръбопровода

Ще изградите тръбопровод, за да конвертирате всички точни характеристики и да ги добавите към крайния набор от данни. Конвейерът ще има четири операции, но не се колебайте да добавите толкова операции, колкото желаете.

  1. Кодирайте категориалните данни
  2. Индексирайте функцията на етикета
  3. Добавете непрекъсната променлива
  4. Сглобете стъпалата.

Всяка стъпка се съхранява в списък, наречен етапи. Този списък ще каже на VectorAssembler каква операция да извърши вътре в конвейера.

1. Кодирайте категоричните данни

Тази стъпка е абсолютно същата като горния пример, с изключение на това, че преминавате през всички категориални характеристики.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Индексирайте функцията на етикета

Spark, подобно на много други библиотеки, не приема низови стойности за етикета. Преобразувате функцията за етикет с StringIndexer и я добавяте към етапите на списъка

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Добавете непрекъсната променлива

InputCols на VectorAssembler е списък от колони. Можете да създадете нов списък, съдържащ всички нови колони. Кодът по-долу попълва списъка с кодирани категорични характеристики и непрекъснати характеристики.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Сглобете стъпалата.

Накрая преминавате всички стъпки във VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Сега, когато всички стъпки са готови, изпращате данните към тръбопровода.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Ако проверите новия набор от данни, можете да видите, че той съдържа всички характеристики, трансформирани и нетрансформирани. Вие се интересувате само от новия етикет и характеристики. Характеристиките включват всички трансформирани характеристики и непрекъснатите променливи.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Стъпка 4) Изградете класификатора: логистика

За да направите изчислението по-бързо, конвертирате модела в DataFrame.

Трябва да изберете нов етикет и функции от модела с помощта на карта.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Готови сте да създадете данните за влака като DataFrame. Използвате sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Проверете втория ред

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Създайте набор от влак/тест

Разделяте набора от данни 80/20 с randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Нека преброим колко хора с доходи под/над 50 XNUMX както в набора за обучение, така и в теста

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Изградете логистичния регресор

Не на последно място, можете да изградите класификатора. Pyspark има API, наречен LogisticRegression, за извършване на логистична регресия.

Вие инициализирате lr, като посочите колоната с етикети и колоните с характеристики. Задавате максимум 10 итерации и добавяте параметър за регулиране със стойност 0.3. Обърнете внимание, че в следващия раздел ще използвате кръстосано валидиране с мрежа от параметри, за да настроите модела

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Можете да видите коефициентите от регресията

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Стъпка 5) Обучете и оценете модела

За да генерирате прогноза за вашия набор от тестове,

Можете да използвате linearModel с transform() на test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Можете да отпечатате елементите в прогнозите

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Интересувате се от етикета, прогнозата и вероятността

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Оценете модела

Трябва да погледнете показателя за точност, за да видите колко добре (или лошо) се представя моделът. Понастоящем няма API за изчисляване на мярката за точност Spark. Стойността по подразбиране е ROC, крива на работната характеристика на приемника. Това е различна метрика, която отчита процента на фалшивите положителни резултати.

Преди да разгледате ROC, нека конструираме мярката за точност. Вие сте по-запознати с този показател. Мярката за точност е сумата от правилната прогноза спрямо общия брой наблюдения.

Създавате DataFrame с етикета и `предсказанието.

cm = predictions.select("label", "prediction")

Можете да проверите номера на класа в етикета и прогнозата

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Например в набора от тестове има 1578 домакинства с доход над 50k и 5021 под. Класификаторът обаче предвижда 617 домакинства с доход над 50 хиляди.

Можете да изчислите точността, като изчислите броя, когато етикетът е правилно класифициран спрямо общия брой редове.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Можете да обвиете всичко заедно и да напишете функция за изчисляване на точността.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC показатели

Модулът BinaryClassificationEvaluator включва ROC мерките. Приемникът OperaХарактеристичната крива е друг обичаен инструмент, използван с бинарна класификация. Тя е много подобна на кривата на прецизност/припомняне, но вместо да начертае прецизност спрямо припомняне, ROC кривата показва истинската положителна скорост (т.е. припомняне) срещу фалшиво положителна скорост. Процентът на фалшивите положителни резултати е съотношението на отрицателните случаи, които са неправилно класифицирани като положителни. Тя е равна на едно минус истинската отрицателна ставка. Истинският отрицателен процент се нарича още специфичност. Следователно ROC кривата изобразява чувствителността (припомнянето) спрямо 1 – специфичност

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192област Под ROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Стъпка 6) Настройте хиперпараметъра

Не на последно място, можете да настроите хиперпараметрите. Подобно на scikit научете създавате мрежа с параметри и добавяте параметрите, които искате да настроите.

За да намалите времето на изчислението, вие настройвате само параметъра за регулиране само с две стойности.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Накрая оценявате модела с помощта на метода на кръстосана проверка с 5 пъти. Тренировката отнема около 16 минути.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Време за обучение на модела: 978.807 секунди

Най-добрият хиперпараметър за регулиране е 0.01, с точност от 85.316 процента.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Можете да извлечете препоръчания параметър чрез верижно свързване на cvModel.bestModel с extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Oбобщение

Spark е основен инструмент за специалист по данни. Той позволява на практикуващия да свърже приложение с различни източници на данни, да извърши безпроблемно анализ на данни или да добави прогнозен модел.

Да започнем с Spark, трябва да инициирате a Spark Контекст с:

"SparkContext()'

и и SQL контекст за свързване към източник на данни:

„SQLContext()“

В урока научавате как да тренирате логистична регресия:

  1. Преобразувайте набора от данни в Dataframe с:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Имайте предвид, че името на колоната на етикета е newlabel и всички функции са събрани във функции. Променете тези стойности, ако са различни във вашия набор от данни.

  1. Създайте набора за обучение/тест
randomSplit([.8,.2],seed=1234)
  1. Обучете модела
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Направете прогноза
linearModel.transform()