PySpark Учебник для начинающих: учитесь на ПРИМЕРАХ

Прежде чем изучать PySpark, давайте разбираться:

Что такое Апач Spark?

Spark — это решение для больших данных, которое оказалось проще и быстрее, чем Hadoop MapReduce. Spark — это программное обеспечение с открытым исходным кодом, разработанное лабораторией RAD Калифорнийского университета в Беркли в 2009 году. С тех пор, как оно было выпущено для широкой публики в 2010 году, Spark приобрела популярность и используется в отрасли в беспрецедентных масштабах.

В эпоху Big Data, практикам как никогда нужны быстрые и надежные инструменты для обработки потоковой передачи данных. Более ранние инструменты, такие как MapReduce, были любимыми, но работали медленно. Чтобы преодолеть эту проблему, Spark предлагает решение, которое является одновременно быстрым и универсальным. Основное различие между Spark и MapReduce - это Spark выполняет вычисления в памяти во время later на жестком диске. Это обеспечивает высокоскоростной доступ и обработку данных, сокращая время с часов до минут.

Что такое ПиSpark?

PySpark это инструмент, созданный Apache Spark Сообщество для использования Python с Spark. Он позволяет работать с RDD (Resilient Distributed Dataset) в Python. Он также предлагает PySpark Shell для связи API Python с Spark ядро для инициации Spark Контекст. Spark — это механизм имен для реализации кластерных вычислений, а PySpark это библиотека Python для использования Spark.

Каким Spark работать?

Spark основан на вычислительном механизме, что означает, что он заботится о приложении планирования, распределения и мониторинга. Каждая задача выполняется на различных рабочих машинах, называемых вычислительным кластером. Вычислительный кластер относится к разделению задач. Одна машина выполняет одну задачу, в то время как другие вносят вклад в конечный результат посредством другой задачи. В конечном итоге все задачи объединяются для получения результата. Spark администратор дает 360 обзор различных Spark Вакансии.

  Spark Работа
Spark Работа

Spark предназначен для работы с

  • Питон
  • Java
  • масштаб
  • SQL

Существенная особенность Spark — это огромное количество встроенных библиотек, включая MLlib для машинного обучения.. Spark также предназначен для работы с кластерами Hadoop и может читать файлы широкого типа, включая данные Hive, CSV, JSON, данные Casandra и другие.

Зачем использовать Spark?

Как будущий специалист по обработке данных, вы должны быть знакомы со знаменитыми библиотеками Python: Pandas и scikit-learn. Эти две библиотеки отлично подходят для изучения наборов данных среднего размера. Регулярные проекты машинного обучения строятся вокруг следующихwing методология:

  • Загрузите данные на диск
  • Импортируйте данные в память машины.
  • Обработка/анализ данных
  • Создайте модель машинного обучения
  • Сохраните прогноз обратно на диск

Проблема возникает, если специалист по данным хочет обработать данные, которые слишком велики для одного компьютера. На заре развития науки о данных практики использовали образцы, поскольку обучение работе с огромными наборами данных не всегда было необходимо. Специалист по данным найдет хорошую статистическую выборку, проведет дополнительную проверку на надежность и создаст отличную модель.

Однако здесь есть некоторые проблемы:

  • Отражает ли набор данных реальный мир?
  • Включают ли данные конкретный пример?
  • Подходит ли модель для выборки?

Возьмем, к примеру, рекомендации пользователей. Рекомендатели полагаются на сравнение пользователей с другими пользователями при оценке их предпочтений. Если специалист по обработке данных возьмет только часть данных, не будет когорты пользователей, очень похожих друг на друга. Рекомендации должны работать с полным набором данных или не работать вообще.

Каково решение?

Решение было очевидным уже давно: разбить проблему на несколько компьютеров. Параллельные вычисления также сопряжены с множеством проблем. У разработчиков часто возникают проблемы с написанием параллельного кода, и в конечном итоге им приходится решать кучу проблем.plex проблемы, связанные с самой мультиобработкой.

Pyspark предоставляет специалисту по данным API, который можно использовать для решения проблем параллельной обработки данных. Пиspark занимается связьюplexособенности многопроцессорной обработки, такие как распределение данных, распространение кода и сбор выходных данных от рабочих групп в кластере машин.

Spark может работать автономно, но чаще всего работает поверх инфраструктуры кластерных вычислений, такой как Hadoop. Однако при тестировании и разработке специалист по данным может эффективно запускать Spark по их развитию boxкомпьютеры или ноутбуки без кластера

• Одно из основных преимуществ Spark заключается в создании archiТехнология, которая включает в себя управление потоковой передачей данных, плавные запросы данных, прогнозирование машинного обучения и доступ в режиме реального времени к различным анализам.

• Spark тесно работает с языком SQL, т.е. со структурированными данными. Это позволяет запрашивать данные в режиме реального времени.

• Основная работа специалиста по данным — анализировать и строить прогнозные модели. Короче говоря, специалист по данным должен знать, как запрашивать данные, используя SQL, составить статистический отчет и использовать машинное обучение для прогнозирования. Специалист по данным тратит значительную часть своего времени на очистку, преобразование и анализ данных. Как только набор данных или рабочий процесс с данными готов, специалист по обработке данных использует различные методы для обнаружения идей и скрытых закономерностей. Манипулирование данными должно быть надежным и таким же простым в использовании. Spark это правильный инструмент благодаря своей скорости и богатому API.

В этом ПиSpark В этом уроке вы узнаете, как создать классификатор с помощью Py.Spark примеры.

Как установить РуSpark с AWS

Jupyter команда создаёт образ Docker для запуска Spark эффективно. Ниже приведены шаги, которые вы можете выполнить для установки Py.Spark экземпляр в AWS.

Обратитесь к нашему руководству по AWS и TensorFlow

Шаг 1. Создайте экземпляр

Прежде всего, вам нужно создать экземпляр. Перейдите в свою учетную запись AWS и запустите экземпляр. Вы можете увеличить объем хранилища до 15 ГБ и использовать ту же группу безопасности, что и в руководстве по TensorFlow.

Шаг 2. Откройте соединение.

Откройте соединение и установите докер-контейнер. Для получения дополнительной информацииtails, обратитесь к руководству по TensorFlow с Docker. Обратите внимание, что вы должны находиться в правильном рабочем каталоге.

Просто запустите эти коды, чтобы установить Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Шаг 3. Снова откройте соединение и установите. Spark

После повторного открытия соединения вы можете установить образ, содержащий Py.Spark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Шаг 4: Открыть Jupyter

Проверьте контейнер и его имя

docker ps

Запустите докер с журналами докера, за которыми следует имя докера. Например, докер регистрирует zealous_goldwasser

Зайдите в браузер и запустите Jupyter. Адрес http://localhost:8888/. Вставьте пароль, предоставленный терминалом.

Внимание: если вы хотите загрузить/загрузить файл на свой компьютер AWS, вы можете использовать программное обеспечение Cyberduck, https://cyberduck.io/.

Как установить РуSpark on Windows/Mac с Кондой

Фоллоwing это подробный процесс установки PySpark on Windows/Mac с использованием Анаконды:

Для установки Spark на вашем локальном компьютере рекомендуется создать новую среду conda. Эта новая среда установит Python 3.6, Spark и все зависимости.

Пользователь Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Информация о пользователе

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Вы можете редактировать файл .yml. Будьте осторожны с отступом. Прежде необходимо два пробела –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Сохраните его и создайте среду. Это занимает некоторое время

conda env create -f hello-spark.yml

Для более деtails о местоположении, пожалуйста, ознакомьтесь с руководством. Установите TensorFlow.

Вы можете проверить всю среду, установленную на вашем компьютере.

conda env list
Activate hello-spark

Пользователь Mac

source activate hello-spark

Windows Информация о пользователе

activate hello-spark

Примечание: Вы уже создали специальную среду TensorFlow для запуска учебных пособий по TensorFlow. Удобнее создать новую среду, отличную от hello-tf. Нет смысла перегружать hello-tf Spark или любые другие библиотеки машинного обучения.

Представьте, что большая часть вашего проекта включает в себя TensorFlow, но вам нужно использовать Spark для одного конкретного проекта. Вы можете установить среду TensorFlow для всего вашего проекта и создать отдельную среду для Spark. Вы можете добавить столько библиотек в Spark среду так, как вы хотите, не вмешиваясь в среду TensorFlow. Как только вы закончите с Sparkпроект, вы можете удалить его, не затрагивая среду TensorFlow.

Jupyter

Откройте Jupyter Блокнот и попробуйте, если PySpark работает. В новый блокнот вставьте следующееwing PySpark образец кода:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Если отображается ошибка, вероятно, на вашем компьютере не установлена ​​Java. В Mac откройте терминал и напишите java-версию, если есть версия Java, убедитесь, что она 1.8. В Windows, перейдите в Приложение и проверьте, есть ли папка Java. Если есть папка Java, проверьте, установлена ​​ли Java 1.8. На момент написания этой статьи ПиSpark несовместим с Java9 и выше.

Если вам нужно установить Java, подумайте ссылке и скачайте jdk-8u181-windows-x64.exe

Jupyter

Пользователям Mac рекомендуется использовать «brew».

brew tap caskroom/versions
brew cask install java8

Обратитесь к этому пошаговому руководству по как установить Яву

Внимание: используйте команду «Удалить», чтобы полностью стереть среду.

 conda env remove -n hello-spark -y

Spark Контекст

SparkКонтекст — это внутренний механизм, который обеспечивает соединения с кластерами. Если вы хотите выполнить операцию, вам понадобится SparkКонтекст.

Создайте SparkКонтекст

Прежде всего, вам необходимо инициировать SparkКонтекст.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Теперь, когда SparkКонтекст готов, вы можете создать коллекцию данных под названием RDD, Resilient Distributed Dataset. Вычисления в RDD автоматически распараллеливаются в кластере.

nums= sc.parallelize([1,2,3,4])

Вы можете получить доступ к первой строке с помощью take

nums.take(1)
[1]

Вы можете применить преобразование к данным с помощью лямбда-функции. В ПиSpark В примере ниже вы возвращаете квадрат чисел. Это трансформация карты

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLКонтекст

Более удобный способ — использовать DataFrame. SparkКонтекст уже установлен, вы можете использовать его для создания dataFrame. Вам также необходимо объявить SQLContext

SQLContext позволяет подключать движок к различным источникам данных. Он используется для запуска функций Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Теперь в этом Spark учебник Python, давайте создадим список кортежей. Каждый кортеж будет содержать имена людей и их возраст. Требуется четыре шага:

Шаг 1) Создайте список кортежей с информацией

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Шаг 2) Создайте СДРД

rdd = sc.parallelize(list_p)

Шаг 3) Преобразование кортежей

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Шаг 4) Создайте контекст DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Если вы хотите получить доступ к типу каждой функции, вы можете использовать printSchema().

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Пример машинного обучения с PySpark

Теперь, когда вы имеете краткое представление о Spark и SQLContext, вы готовы создать свою первую программу машинного обучения.

Фоллоwing шаги по созданию программы машинного обучения с помощью PySpark:

  • Шаг 1) Основные операции с PySpark
  • Шаг 2) Предварительная обработка данных
  • Шаг 3) Создайте конвейер обработки данных
  • Шаг 4) Постройте классификатор: логистический.
  • Шаг 5) Обучите и оцените модель
  • Шаг 6) Настройте гиперпараметр

В этом ПиSpark В учебнике по машинному обучению мы будем использовать набор данных для взрослых. Цель этого урока — научиться использовать Py.spark. Для получения дополнительной информации о наборе данных обратитесь к этому руководству.

Обратите внимание, что набор данных не имеет существенного значения, и вы можете подумать, что вычисления занимают много времени. Spark предназначен для обработки значительного объема данных. SparkПроизводительность увеличивается по сравнению с другими библиотеками машинного обучения, когда обрабатываемый набор данных становится больше.

Шаг 1) Основные операции с PySpark

Прежде всего, вам необходимо инициализировать SQLContext, который еще не запущен.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

затем вы можете прочитать файл cvs с помощью sqlContext.read.csv. Вы используете inferSchema со значением True, чтобы сказать Spark автоматически угадывать тип данных. По умолчанию установлено значение False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Давайте посмотрим на тип данных

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Вы можете просмотреть данные с помощью show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Если вы не установили для inderShema значение True, вот что происходит с типом. Там все в строке.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Чтобы преобразовать непрерывную переменную в нужный формат, вы можете использовать преобразование столбцов. Вы можете использовать withColumn, чтобы сообщить Spark в каком столбце выполнять преобразование.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Выберите столбцы

Вы можете выбрать и отобразить строки с помощью выбора и названий объектов. Ниже выбраны возраст и fnlwgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Считать по группам

Если вы хотите подсчитать количество вхождений по группам, вы можете создать цепочку:

  • группа по()
  • count ()

вместе. В ПиSpark В примере ниже вы подсчитываете количество строк по уровню образования.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Опишите данные

Чтобы получить сводную статистику данных, вы можете использовать описать(). Он вычислит:

  • считать
  • значить
  • стандартное отклонение
  • мин
  • Макс
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Если вам нужна сводная статистика только для одного столбца, добавьте имя столбца внутри метода описания().

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Вычисление перекрестной таблицы

В некоторых случаях может быть интересно увидеть описательную статистику между двумя парами.wise столбцы. Например, вы можете посчитать количество людей с доходом ниже или выше 50 тысяч по уровню образования. Эта операция называется перекрестной таблицей.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Вы можете видеть, что ни один человек не имеет дохода выше 50 тысяч, когда он молод.

Удалить столбец

Существует два интуитивно понятных API для удаления столбцов:

  • drop(): удалить столбец
  • dropna(): удалить NA

Ниже вы опускаете столбец education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Данные фильтра

Вы можете использовать filter() для применения описательной статистики к подмножеству данных. Например, вы можете посчитать количество людей старше 40 лет.

df.filter(df.age > 40).count()

13443

Описательная статистика по группам

Наконец, вы можете группировать данные по группам и выполнять статистические операции, такие как среднее значение.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Шаг 2) Предварительная обработка данных

Обработка данных — важнейший шаг в машинном обучении. После удаления мусорных данных вы получите важную информацию.

Например, вы знаете, что возраст не является линейной функцией дохода. Когда люди молоды, их доход обычно ниже, чем у людей среднего возраста. После выхода на пенсию семья использует свои сбережения, что означает снижение дохода. Чтобы уловить эту закономерность, вы можете добавить квадрат к возрастному признаку.

Добавить возрастной квадрат

Чтобы добавить новую функцию, вам необходимо:

  1. Выберите столбец
  2. Примените преобразование и добавьте его в DataFrame.
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Вы можете видеть, что age_square был успешно добавлен во фрейм данных. Вы можете изменить порядок переменных с помощью select. Ниже вы указываете age_square сразу после возраста.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Исключить Голландию-Нидерланды

Когда группа внутри объекта имеет только одно наблюдение, это не приносит никакой информации в модель. Напротив, это может привести к ошибке во время перекрестной проверки.

Давайте проверим происхождение домохозяйства

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

В объекте Native_country указано только одно домохозяйство из Нидерландов. Вы это исключаете.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Шаг 3) Создайте конвейер обработки данных

Подобно scikit-learn, Pyspark имеет конвейерный API.

Конвейер очень удобен для поддержания структуры данных. Вы отправляете данные в конвейер. Внутри конвейера выполняются различные операции, выходные данные используются для питания алгоритма.

Например, одно универсальное преобразование в машинном обучении состоит из преобразования строки в один горячий кодировщик, т. е. одного столбца группой. Один горячий энкодер обычно представляет собой матрицу, полную нулей.

Шаги по преобразованию данных очень похожи на scikit-learn. Вам нужно:

  • Индексировать строку в числовом формате
  • Создайте один горячий кодировщик
  • Преобразуйте данные

Эту работу выполняют два API: StringIndexer, OneHotEncoder.

  1. Прежде всего, вы выбираете строковый столбец для индексации. inputCol — это имя столбца в наборе данных. outputCol — это новое имя, присвоенное преобразованному столбцу.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Сопоставьте данные и преобразуйте их
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Создайте столбцы новостей на основе группы. Например, если в объекте 10 групп, новая матрица будет иметь 10 столбцов, по одному для каждой группы.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Постройте трубопровод

Вы построите конвейер для преобразования всех точных объектов и добавления их в окончательный набор данных. В конвейере будет четыре операции, но вы можете добавлять столько операций, сколько захотите.

  1. Кодируйте категориальные данные
  2. Индексируйте функцию метки
  3. Добавить непрерывную переменную
  4. Соберите ступеньки.

Каждый шаг сохраняется в списке с именем «этапы». Этот список сообщит VectorAssembler, какую операцию следует выполнить внутри конвейера.

1. Закодируйте категориальные данные

Этот шаг точно такой же, как и в приведенном выше примере, за исключением того, что вы перебираете все категориальные функции.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Индексируйте функцию метки

Spark, как и многие другие библиотеки, не принимает строковые значения для метки. Вы конвертируете функцию метки с помощью StringIndexer и добавляете ее на этапы списка.

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Добавьте непрерывную переменную

InputCols VectorAssembler представляет собой список столбцов. Вы можете создать новый список, содержащий все новые столбцы. Код ниже заполняет список закодированными категориальными признаками и непрерывными признаками.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Соберите ступени.

Наконец, вы проходите все шаги в VectorAssembler.

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Теперь, когда все шаги готовы, вы отправляете данные в конвейер.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Если вы проверите новый набор данных, вы увидите, что он содержит все объекты, преобразованные и не преобразованные. Вас интересуют только новая метка и функции. Объекты включают в себя все преобразованные объекты и непрерывные переменные.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Шаг 4) Постройте классификатор: логистический.

Чтобы ускорить вычисления, вы конвертируете модель в DataFrame.

Вам необходимо выбрать новую метку и объекты из модели с помощью карты.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Вы готовы создать данные поезда в виде DataFrame. Вы используете sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Проверьте вторую строку

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Создайте набор поездов/тестов

Вы разделяете набор данных 80/20 с помощью randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Давайте посчитаем, сколько людей с доходом ниже/выше 50 тыс. как в обучающей, так и в тестовой выборке.

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Постройте логистический регрессор

И последнее, но не менее важное: вы можете создать классификатор. Пиspark имеет API под названием LogisticRegrade для выполнения логистической регрессии.

Вы инициализируете lr, указывая столбец метки и столбцы функций. Вы устанавливаете максимум 10 итераций и добавляете параметр регуляризации со значением 0.3. Обратите внимание, что в следующем разделе вы будете использовать перекрестную проверку с сеткой параметров для настройки модели.

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Вы можете увидеть коэффициенты регрессии

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Шаг 5) Обучите и оцените модель

Чтобы создать прогноз для вашего набора тестов,

Вы можете использовать LinearModel с Transform() для test_data.

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Вы можете распечатать элементы в прогнозах

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Вас интересует метка, прогноз и вероятность

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Оцените модель

Вам нужно посмотреть на показатель точности, чтобы увидеть, насколько хорошо (или плохо) работает модель. В настоящее время не существует API для вычисления меры точности в Spark. Значением по умолчанию является ROC, кривая рабочей характеристики приемника. Это другие показатели, которые учитывают уровень ложноположительных результатов.

Прежде чем рассматривать ROC, давайте построим меру точности. Вы более знакомы с этой метрикой. Мера точности — это сумма правильных предсказаний по общему числу наблюдений.

Вы создаете DataFrame с меткой и прогнозом.

cm = predictions.select("label", "prediction")

Вы можете проверить номер класса в метке и прогнозе.

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Например, в тестовом наборе 1578 домохозяйств с доходом выше 50 тыс. и 5021 домохозяйство ниже. Однако классификатор предсказал 617 домохозяйств с доходом выше 50 тысяч.

Вы можете вычислить точность, вычислив количество случаев, когда метки правильно классифицированы по общему количеству строк.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Вы можете объединить все вместе и написать функцию для вычисления точности.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC-метрики

Модуль BinaryClassificationEvaluator включает меры ROC. Кривая рабочих характеристик приемника — еще один распространенный инструмент, используемый при двоичной классификации. Она очень похожа на кривую точности/отзыва, но вместо построения графика зависимости точности от полноты кривая ROC показывает соотношение истинного положительного результата (т.е. отзыва) и уровня ложноположительного результата. Уровень ложноположительных результатов — это соотношение отрицательных случаев, которые ошибочно классифицируются как положительные. Он равен единице минус истинно отрицательная ставка. Истинно отрицательный показатель также называется специфичностью. Следовательно, кривая ROC отображает чувствительность (напомнимость) в зависимости от 1 – специфичность.

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192областьПодROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Шаг 6) Настройте гиперпараметр

И последнее, но не менее важное: вы можете настроить гиперпараметры. Похожий на scikit учиться вы создаете сетку параметров и добавляете параметры, которые хотите настроить.

Чтобы сократить время вычислений, вы настраиваете параметр регуляризации только с двумя значениями.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Наконец, вы оцениваете модель, используя метод перекрестной проверки с 5 сгибами. Обучение занимает около 16 минут.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Время обучения модели: 978.807 секунды.

Лучший гиперпараметр регуляризации — 0.01 с точностью 85.316 процента.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Вы можете извлечь рекомендуемый параметр, связав cvModel.bestModel с ExtractParamMap().

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Итоги

Spark является фундаментальным инструментом для специалиста по данным. Это позволяет практикующему специалисту подключать приложение к различным источникам данных, беспрепятственно выполнять анализ данных или добавлять прогнозную модель.

Начать с Spark, вам необходимо инициировать Spark Контекст с:

"SparkКонтекст()'

и и SQL контекст для подключения к источнику данных:

'SQLContext()'

В этом уроке вы узнаете, как тренировать логистическую регрессию:

  1. Преобразуйте набор данных в Dataframe с помощью:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Обратите внимание, что имя столбца метки — newlabel, и все объекты собраны в объектах. Измените эти значения, если они отличаются в вашем наборе данных.

  1. Создайте набор поездов/тестов
randomSplit([.8,.2],seed=1234)
  1. Тренируй модель
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Сделать прогноз
linearModel.transform()