PySpark Учебник для начинающих: учитесь на ПРИМЕРАХ
Прежде чем изучать PySpark, давайте разбираться:
Что такое Апач Spark?
Spark — это решение для больших данных, которое оказалось проще и быстрее, чем Hadoop MapReduce. Spark — это программное обеспечение с открытым исходным кодом, разработанное лабораторией RAD Калифорнийского университета в Беркли в 2009 году. С тех пор, как оно было выпущено для широкой публики в 2010 году, Spark приобрела популярность и используется в отрасли в беспрецедентных масштабах.
В эпоху Big Data, практикам как никогда нужны быстрые и надежные инструменты для обработки потоковой передачи данных. Более ранние инструменты, такие как MapReduce, были любимыми, но работали медленно. Чтобы преодолеть эту проблему, Spark предлагает решение, которое является одновременно быстрым и универсальным. Основное различие между Spark и MapReduce - это Spark запускает вычисления в памяти во время последующего на жестком диске. Это обеспечивает высокоскоростной доступ и обработку данных, сокращая время с часов до минут.
Что такое ПиSpark?
PySpark это инструмент, созданный Apache Spark Сообщество для использования Python Spark. Это позволяет работать с RDD (Resilient Distributed Dataset) в Python. Он также предлагает PySpark Оболочка для ссылки Python API с Spark ядро для инициации Spark Контекст. Spark — это механизм имен для реализации кластерных вычислений, а PySpark is Pythonбиблиотека для использования Spark.
Каким Spark работать?
Spark основан на вычислительном механизме, что означает, что он заботится о приложении планирования, распределения и мониторинга. Каждая задача выполняется на различных рабочих машинах, называемых вычислительным кластером. Вычислительный кластер относится к разделению задач. Одна машина выполняет одну задачу, в то время как другие вносят вклад в конечный результат посредством другой задачи. В конечном итоге все задачи объединяются для получения результата. Spark администратор дает 360 обзор различных Spark Вакансии.
Spark предназначен для работы с
- Python
- Java
- масштаб
- SQL
Существенная особенность Spark — это огромное количество встроенных библиотек, включая MLlib для машинного обучения.. Spark также предназначен для работы с кластерами Hadoop и может читать файлы широкого типа, включая данные Hive, CSV, JSON, данные Casandra и другие.
Зачем использовать Spark?
Как будущий специалист по обработке данных, вы должны быть знакомы со знаменитыми библиотеками Python: Pandas и scikit-learn. Эти две библиотеки отлично подходят для изучения наборов данных среднего размера. Регулярные проекты машинного обучения строятся на основе следующей методологии:
- Загрузите данные на диск
- Импортируйте данные в память машины.
- Обработка/анализ данных
- Создайте модель машинного обучения
- Сохраните прогноз обратно на диск
Проблема возникает, если специалист по данным хочет обработать данные, которые слишком велики для одного компьютера. На заре развития науки о данных практики использовали образцы, поскольку обучение работе с огромными наборами данных не всегда было необходимо. Специалист по данным найдет хорошую статистическую выборку, проведет дополнительную проверку на надежность и создаст отличную модель.
Однако здесь есть некоторые проблемы:
- Отражает ли набор данных реальный мир?
- Включают ли данные конкретный пример?
- Подходит ли модель для выборки?
Возьмем, к примеру, рекомендации пользователей. Рекомендатели полагаются на сравнение пользователей с другими пользователями при оценке их предпочтений. Если специалист по обработке данных возьмет только часть данных, не будет когорты пользователей, очень похожих друг на друга. Рекомендации должны работать с полным набором данных или не работать вообще.
Каково решение?
Решение было очевидным уже давно: разбить проблему на несколько компьютеров. Параллельные вычисления также сопряжены с множеством проблем. У разработчиков часто возникают проблемы с написанием параллельного кода, и в конечном итоге им приходится решать кучу сложных проблем, связанных с самой многопроцессорной обработкой.
Pyspark предоставляет специалисту по данным API, который можно использовать для решения проблем параллельной обработки данных. Pyspark справляется со сложностями многопроцессорной обработки, такими как распространение данных, распространение кода и сбор выходных данных от рабочих процессов в кластере машин.
Spark может работать автономно, но чаще всего работает поверх инфраструктуры кластерных вычислений, такой как Hadoop. Однако при тестировании и разработке специалист по данным может эффективно запускать Spark на своих корпусах для разработки или ноутбуках без кластера
• Одно из основных преимуществ Spark заключается в создании архитектуры, которая охватывает управление потоковой передачей данных, бесперебойные запросы данных, прогнозирование машинного обучения и доступ в режиме реального времени к различным анализам.
• Spark тесно работает с языком SQL, т.е. со структурированными данными. Это позволяет запрашивать данные в режиме реального времени.
• Основная работа специалиста по данным — анализировать и строить прогнозные модели. Короче говоря, специалист по данным должен знать, как запрашивать данные, используя SQL, составить статистический отчет и использовать машинное обучение для прогнозирования. Специалист по данным тратит значительную часть своего времени на очистку, преобразование и анализ данных. Как только набор данных или рабочий процесс с данными готов, специалист по обработке данных использует различные методы для обнаружения идей и скрытых закономерностей. Манипулирование данными должно быть надежным и таким же простым в использовании. Spark это правильный инструмент благодаря своей скорости и богатому API.
В этом ПиSpark В этом уроке вы узнаете, как создать классификатор с помощью Py.Spark примеры.
Как установить РуSpark с AWS
Команда Jupyter команда создаёт образ Docker для запуска Spark эффективно. Ниже приведены шаги, которые вы можете выполнить для установки Py.Spark экземпляр в AWS.
Обратитесь к нашему руководству по AWS и TensorFlow
Шаг 1. Создайте экземпляр
Прежде всего, вам нужно создать экземпляр. Перейдите в свою учетную запись AWS и запустите экземпляр. Вы можете увеличить объем хранилища до 15 ГБ и использовать ту же группу безопасности, что и в руководстве по TensorFlow.
Шаг 2. Откройте соединение.
Откройте соединение и установите докер-контейнер. Для получения более подробной информации обратитесь к руководству по TensorFlow с Docker. Обратите внимание, что вы должны находиться в правильном рабочем каталоге.
Просто запустите эти коды, чтобы установить Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Шаг 3. Снова откройте соединение и установите. Spark
После повторного открытия соединения вы можете установить образ, содержащий Py.Spark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Шаг 4: Открыть Jupyter
Проверьте контейнер и его имя
docker ps
Запустите докер с журналами докера, за которыми следует имя докера. Например, докер регистрирует zealous_goldwasser
Зайдите в браузер и запустите Jupyter. Адрес http://localhost:8888/. Вставьте пароль, предоставленный терминалом.
Внимание: если вы хотите загрузить/загрузить файл на свой компьютер AWS, вы можете использовать программное обеспечение Cyberduck, https://cyberduck.io/.
Как установить РуSpark on Windows/Mac с Кондой
Ниже приведен подробный процесс установки Py.Spark on Windows/Mac с использованием Анаконды:
Для установки Spark на вашем локальном компьютере рекомендуется создать новую среду conda. Эта новая среда установит Python 3.6 году Spark и все зависимости.
Пользователь Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Информация о пользователе
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Вы можете редактировать файл .yml. Будьте осторожны с отступом. Прежде необходимо два пробела –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Сохраните его и создайте среду. Это занимает некоторое время
conda env create -f hello-spark.yml
Для получения более подробной информации о местоположении ознакомьтесь с руководством по установке TensorFlow.
Вы можете проверить всю среду, установленную на вашем компьютере.
conda env list
Activate hello-spark
Пользователь Mac
source activate hello-spark
Windows Информация о пользователе
activate hello-spark
Примечание: Вы уже создали специальную среду TensorFlow для запуска учебных пособий по TensorFlow. Удобнее создать новую среду, отличную от hello-tf. Нет смысла перегружать hello-tf Spark или любые другие библиотеки машинного обучения.
Представьте, что большая часть вашего проекта включает в себя TensorFlow, но вам нужно использовать Spark для одного конкретного проекта. Вы можете установить среду TensorFlow для всего вашего проекта и создать отдельную среду для Spark. Вы можете добавить столько библиотек в Spark среду так, как вы хотите, не вмешиваясь в среду TensorFlow. Как только вы закончите с Sparkпроект, вы можете удалить его, не затрагивая среду TensorFlow.
Jupyter
Откройте Jupyter Блокнот и попробуйте, если PySpark работает. В новом блокноте вставьте следующий PySpark образец кода:
import pyspark from pyspark import SparkContext sc =SparkContext()
Если отображается ошибка, вполне вероятно, что Java не установлен на вашем компьютере. В Mac откройте терминал и напишите java -version, если есть версия java, убедитесь, что это 1.8. В Windows, перейдите в Приложение и проверьте, есть ли Java папка. Если есть Java папку, проверьте это Java 1.8 установлен. На момент написания этой статьи ПиSpark не совместим с Java9 и выше.
Если вам нужно установить Java, тебе подумать ссылке. и загрузите jdk-8u181-windows-x64.exe
Пользователям Mac рекомендуется использовать «brew».
brew tap caskroom/versions brew cask install java8
Обратитесь к этому пошаговому руководству по как установить Java
Внимание: используйте команду «Удалить», чтобы полностью стереть среду.
conda env remove -n hello-spark -y
Spark Контекст
SparkКонтекст — это внутренний механизм, который обеспечивает соединения с кластерами. Если вы хотите выполнить операцию, вам понадобится SparkКонтекст.
Создайте SparkКонтекст
Прежде всего, вам необходимо инициировать SparkКонтекст.
import pyspark from pyspark import SparkContext sc =SparkContext()
Теперь, когда SparkКонтекст готов, вы можете создать коллекцию данных под названием RDD, Resilient Distributed Dataset. Вычисления в RDD автоматически распараллеливаются в кластере.
nums= sc.parallelize([1,2,3,4])
Вы можете получить доступ к первой строке с помощью take
nums.take(1)
[1]
Вы можете применить преобразование к данным с помощью лямбда-функции. В ПиSpark В примере ниже вы возвращаете квадрат чисел. Это трансформация карты
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLКонтекст
Более удобный способ — использовать DataFrame. SparkКонтекст уже установлен, вы можете использовать его для создания dataFrame. Вам также необходимо объявить SQLContext
SQLContext позволяет подключать движок к различным источникам данных. Он используется для запуска функций Spark SQL.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Теперь в этом Spark учебник Python, давайте создадим список кортежей. Каждый кортеж будет содержать имена людей и их возраст. Требуется четыре шага:
Шаг 1) Создайте список кортежей с информацией
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Шаг 2) Создайте СДРД
rdd = sc.parallelize(list_p)
Шаг 3) Преобразование кортежей
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Шаг 4) Создайте контекст DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Если вы хотите получить доступ к типу каждой функции, вы можете использовать printSchema().
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Пример машинного обучения с PySpark
Теперь, когда вы имеете краткое представление о Spark и SQLContext, вы готовы создать свою первую программу машинного обучения.
Ниже приведены шаги по созданию программы машинного обучения с помощью Py.Spark:
- Шаг 1) Основные операции с PySpark
- Шаг 2) Предварительная обработка данных
- Шаг 3) Создайте конвейер обработки данных
- Шаг 4) Постройте классификатор: логистический.
- Шаг 5) Обучите и оцените модель
- Шаг 6) Настройте гиперпараметр
В этом ПиSpark Учебник по машинному обучению, мы будем использовать набор данных для взрослых. Цель этого руководства — научиться использовать Pyspark. Для получения дополнительной информации о наборе данных обратитесь к этому руководству.
Обратите внимание, что набор данных не имеет существенного значения, и вы можете подумать, что вычисления занимают много времени. Spark предназначен для обработки значительного объема данных. SparkПроизводительность увеличивается по сравнению с другими библиотеками машинного обучения, когда обрабатываемый набор данных становится больше.
Шаг 1) Основные операции с PySpark
Прежде всего, вам необходимо инициализировать SQLContext, который еще не запущен.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
затем вы можете прочитать файл cvs с помощью sqlContext.read.csv. Вы используете inferSchema со значением True, чтобы сказать Spark автоматически угадывать тип данных. По умолчанию установлено значение False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Давайте посмотрим на тип данных
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Вы можете просмотреть данные с помощью show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Если вы не установили для inderShema значение True, вот что происходит с типом. Там все в строке.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Чтобы преобразовать непрерывную переменную в нужный формат, вы можете использовать преобразование столбцов. Вы можете использовать withColumn, чтобы сообщить Spark в каком столбце выполнять преобразование.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Выберите столбцы
Вы можете выбрать и отобразить строки с помощью выбора и названий объектов. Ниже выбраны возраст и fnlwgt.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Считать по группам
Если вы хотите подсчитать количество вхождений по группам, вы можете создать цепочку:
- группа по()
- count ()
вместе. В ПиSpark В примере ниже вы подсчитываете количество строк по уровню образования.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Опишите данные
Чтобы получить сводную статистику данных, вы можете использовать описать(). Он вычислит:
- считать
- значить
- стандартное отклонение
- мин
- Макс
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Если вам нужна сводная статистика только для одного столбца, добавьте имя столбца внутри метода описания().
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Вычисление перекрестной таблицы
В некоторых случаях может быть интересно увидеть описательную статистику между двумя парными столбцами. Например, вы можете посчитать количество людей с доходом ниже или выше 50 тысяч по уровню образования. Эта операция называется перекрестной таблицей.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Вы можете видеть, что ни один человек не имеет дохода выше 50 тысяч, когда он молод.
Удалить столбец
Существует два интуитивно понятных API для удаления столбцов:
- drop(): удалить столбец
- dropna(): удалить NA
Ниже вы опускаете столбец education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Данные фильтра
Вы можете использовать filter() для применения описательной статистики к подмножеству данных. Например, вы можете посчитать количество людей старше 40 лет.
df.filter(df.age > 40).count()
13443
Descriptive статистика по группам
Наконец, вы можете группировать данные по группам и выполнять статистические операции, такие как среднее значение.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Шаг 2) Предварительная обработка данных
Обработка данных — важнейший шаг в машинном обучении. После удаления мусорных данных вы получите важную информацию.
Например, вы знаете, что возраст не является линейной функцией дохода. Когда люди молоды, их доход обычно ниже, чем у людей среднего возраста. После выхода на пенсию семья использует свои сбережения, что означает снижение дохода. Чтобы уловить эту закономерность, вы можете добавить квадрат к возрастному признаку.
Добавить возрастной квадрат
Чтобы добавить новую функцию, вам необходимо:
- Выберите столбец
- Примените преобразование и добавьте его в DataFrame.
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Вы можете видеть, что age_square был успешно добавлен во фрейм данных. Вы можете изменить порядок переменных с помощью select. Ниже вы указываете age_square сразу после возраста.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Исключить Голландию-Нидерланды
Когда группа внутри объекта имеет только одно наблюдение, это не приносит никакой информации в модель. Напротив, это может привести к ошибке во время перекрестной проверки.
Давайте проверим происхождение домохозяйства
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
В объекте Native_country указано только одно домохозяйство из Нидерландов. Вы это исключаете.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Шаг 3) Создайте конвейер обработки данных
Подобно scikit-learn, Pyspark имеет конвейерный API.
Конвейер очень удобен для поддержания структуры данных. Вы отправляете данные в конвейер. Внутри конвейера выполняются различные операции, выходные данные используются для питания алгоритма.
Например, одно универсальное преобразование в машинном обучении состоит из преобразования строки в один горячий кодировщик, т. е. одного столбца группой. Один горячий энкодер обычно представляет собой матрицу, полную нулей.
Шаги по преобразованию данных очень похожи на scikit-learn. Вам нужно:
- Индексировать строку в числовом формате
- Создайте один горячий кодировщик
- Преобразуйте данные
Эту работу выполняют два API: StringIndexer, OneHotEncoder.
- Прежде всего, вы выбираете строковый столбец для индексации. inputCol — это имя столбца в наборе данных. outputCol — это новое имя, присвоенное преобразованному столбцу.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Сопоставьте данные и преобразуйте их
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Создайте столбцы новостей на основе группы. Например, если в объекте 10 групп, новая матрица будет иметь 10 столбцов, по одному для каждой группы.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Постройте трубопровод
Вы построите конвейер для преобразования всех точных объектов и добавления их в окончательный набор данных. В конвейере будет четыре операции, но вы можете добавлять столько операций, сколько захотите.
- Кодируйте категориальные данные
- Индексируйте функцию метки
- Добавить непрерывную переменную
- Соберите ступеньки.
Каждый шаг сохраняется в списке с именем «этапы». Этот список сообщит VectorAssembler, какую операцию следует выполнить внутри конвейера.
1. Закодируйте категориальные данные
Этот шаг точно такой же, как и в приведенном выше примере, за исключением того, что вы перебираете все категориальные функции.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Индексируйте функцию метки
Spark, как и многие другие библиотеки, не принимает строковые значения для метки. Вы конвертируете функцию метки с помощью StringIndexer и добавляете ее на этапы списка.
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Добавьте непрерывную переменную
InputCols VectorAssembler представляет собой список столбцов. Вы можете создать новый список, содержащий все новые столбцы. Код ниже заполняет список закодированными категориальными признаками и непрерывными признаками.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Соберите ступени.
Наконец, вы проходите все шаги в VectorAssembler.
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Теперь, когда все шаги готовы, вы отправляете данные в конвейер.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Если вы проверите новый набор данных, вы увидите, что он содержит все объекты, преобразованные и не преобразованные. Вас интересуют только новая метка и функции. Объекты включают в себя все преобразованные объекты и непрерывные переменные.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Шаг 4) Постройте классификатор: логистический.
Чтобы ускорить вычисления, вы конвертируете модель в DataFrame.
Вам необходимо выбрать новую метку и объекты из модели с помощью карты.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Вы готовы создать данные поезда в виде DataFrame. Вы используете sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Проверьте вторую строку
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Создайте набор поездов/тестов
Вы разделяете набор данных 80/20 с помощью randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Давайте посчитаем, сколько людей с доходом ниже/выше 50 тыс. как в обучающей, так и в тестовой выборке.
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Постройте логистический регрессор
И последнее, но не менее важное: вы можете создать классификатор. У Pyspark есть API под названием LogisticRegrade для выполнения логистической регрессии.
Вы инициализируете lr, указывая столбец метки и столбцы функций. Вы устанавливаете максимум 10 итераций и добавляете параметр регуляризации со значением 0.3. Обратите внимание, что в следующем разделе вы будете использовать перекрестную проверку с сеткой параметров для настройки модели.
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Вы можете увидеть коэффициенты регрессии
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Шаг 5) Обучите и оцените модель
Чтобы создать прогноз для вашего набора тестов,
Вы можете использовать LinearModel с Transform() для test_data.
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Вы можете распечатать элементы в прогнозах
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Вас интересует метка, прогноз и вероятность
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Оцените модель
Вам нужно посмотреть на показатель точности, чтобы увидеть, насколько хорошо (или плохо) работает модель. В настоящее время не существует API для вычисления меры точности в Spark. Значением по умолчанию является ROC, кривая рабочей характеристики приемника. Это другие показатели, которые учитывают уровень ложноположительных результатов.
Прежде чем рассматривать ROC, давайте построим меру точности. Вы более знакомы с этой метрикой. Мера точности — это сумма правильных предсказаний по общему числу наблюдений.
Вы создаете DataFrame с меткой и прогнозом.
cm = predictions.select("label", "prediction")
Вы можете проверить номер класса в метке и прогнозе.
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Например, в тестовом наборе 1578 домохозяйств с доходом выше 50 тыс. и 5021 домохозяйство ниже. Однако классификатор предсказал 617 домохозяйств с доходом выше 50 тысяч.
Вы можете вычислить точность, вычислив количество случаев, когда метки правильно классифицированы по общему количеству строк.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Вы можете объединить все вместе и написать функцию для вычисления точности.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC-метрики
Модуль BinaryClassificationEvaluator включает меры ROC. Получатель OperaХарактеристическая кривая — еще один распространенный инструмент, используемый при бинарной классификации. Она очень похожа на кривую точности/отзыва, но вместо построения графика зависимости точности от полноты кривая ROC показывает соотношение истинного положительного результата (т.е. отзыва) и уровня ложноположительного результата. Уровень ложноположительных результатов — это соотношение отрицательных случаев, которые ошибочно классифицируются как положительные. Он равен единице минус истинно отрицательная ставка. Истинно отрицательный показатель также называется специфичностью. Следовательно, кривая ROC отображает чувствительность (напомнимость) в зависимости от 1 – специфичность.
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192областьПодROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Шаг 6) Настройте гиперпараметр
И последнее, но не менее важное: вы можете настроить гиперпараметры. Похожий на scikit учиться вы создаете сетку параметров и добавляете параметры, которые хотите настроить.
Чтобы сократить время вычислений, вы настраиваете параметр регуляризации только с двумя значениями.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Наконец, вы оцениваете модель, используя метод перекрестной проверки с 5 сгибами. Обучение занимает около 16 минут.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Время обучения модели: 978.807 секунды.
Лучший гиперпараметр регуляризации — 0.01 с точностью 85.316 процента.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Вы можете извлечь рекомендуемый параметр, связав cvModel.bestModel с ExtractParamMap().
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Итого
Spark является фундаментальным инструментом для специалиста по данным. Это позволяет практикующему специалисту подключать приложение к различным источникам данных, беспрепятственно выполнять анализ данных или добавлять прогнозную модель.
Начать с Spark, вам необходимо инициировать Spark Контекст с:
"SparkКонтекст()'
и и SQL контекст для подключения к источнику данных:
'SQLContext()'
В этом уроке вы узнаете, как тренировать логистическую регрессию:
- Преобразуйте набор данных в Dataframe с помощью:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Обратите внимание, что имя столбца метки — newlabel, и все объекты собраны в объектах. Измените эти значения, если они отличаются в вашем наборе данных.
- Создайте набор поездов/тестов
randomSplit([.8,.2],seed=1234)
- Тренируй модель
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Сделать прогноз
linearModel.transform()