PySpark Урок за начинаещи: Учете с ПРИМЕРИ
Преди да научите PySpark, да разберем:
Какво е Apache Spark?
Spark е решение за големи данни, което е доказано по-лесно и по-бързо от Hadoop MapReduce. Spark е софтуер с отворен код, разработен от UC Berkeley RAD lab през 2009 г. Откакто беше пуснат за обществеността през 2010 г., Spark нарасна в популярност и се използва в индустрията с безпрецедентен мащаб.
В ерата на Голямо количество от данни, практиците се нуждаят повече от всякога от бързи и надеждни инструменти за обработка на поточно предаване на данни. По-ранните инструменти като MapReduce бяха любими, но бяха бавни. За да преодолеете този проблем, Spark предлага решение, което е едновременно бързо и с общо предназначение. Основната разлика между Spark и MapReduce е това Spark изпълнява изчисления в паметта по-късно на твърдия диск. Позволява високоскоростен достъп и обработка на данни, намалявайки времето от часове до минути.
Какво е PySpark?
PySpark е инструмент, създаден от Apache Spark Общността за използване Python с Spark. Позволява работа с RDD (Resilient Distributed Dataset) в Python. Той също така предлага PySpark Shell за връзка Python API с Spark ядро за започване Spark Контекст. Spark е името двигател за реализиране на клъстерни изчисления, докато PySpark is Pythonбиблиотека за използване Spark.
Как Spark работа?
Spark е базиран на изчислителна машина, което означава, че се грижи за приложението за планиране, разпространение и наблюдение. Всяка задача се изпълнява на различни работни машини, наречени изчислителен клъстер. Компютърният клъстер се отнася до разделянето на задачите. Една машина изпълнява една задача, докато другите допринасят за крайния резултат чрез различна задача. В крайна сметка всички задачи се агрегират, за да се получи резултат. The Spark admin дава 360 преглед на различни Spark Работни места.
Spark е предназначен за работа с
- Python
- Java
- Scala
- SQL
Съществена характеристика на Spark е огромното количество вградена библиотека, включително MLlib за машинно обучение. Spark също така е проектиран да работи с Hadoop клъстери и може да чете широк тип файлове, включително данни от Hive, CSV, JSON, данни от Casandra между другото.
Защо ползване Spark?
Като бъдещ специалист по данни, трябва да сте запознати с известните библиотеки на Python: Pandas и scikit-learn. Тези две библиотеки са фантастични за изследване на набор от данни до среден размер. Редовните проекти за машинно обучение са изградени около следната методология:
- Заредете данните на диска
- Импортирайте данните в паметта на машината
- Обработвайте/анализирайте данните
- Изградете модела за машинно обучение
- Запазете прогнозата обратно на диска
Проблемът възниква, ако специалистът по данни иска да обработва данни, които са твърде големи за един компютър. По време на по-ранните дни на науката за данни, практикуващите ще вземат проби, тъй като обучението върху огромни масиви от данни не винаги е било необходимо. Специалистът по данни ще намери добра статистическа извадка, ще извърши допълнителна проверка за устойчивост и ще излезе с отличен модел.
Има обаче някои проблеми с това:
- Наборът от данни отразява ли реалния свят?
- Данните включват ли конкретен пример?
- Моделът годен ли е за вземане на проби?
Вземете например препоръките на потребителите. Препоръчителите разчитат на сравняване на потребителите с други потребители при оценката на техните предпочитания. Ако специалистът по данни вземе само подмножество от данните, няма да има кохорта от потребители, които са много сходни един с друг. Препоръчителите трябва да работят върху пълния набор от данни или изобщо да не работят.
Какво е решението?
Решението е очевидно от дълго време, разделете проблема на няколко компютъра. Паралелното изчисление идва и с множество проблеми. Разработчиците често имат проблеми с писането на паралелен код и в крайна сметка трябва да решават куп сложни проблеми около самата мултипроцесорност.
Pyspark дава на специалиста по данни API, който може да се използва за решаване на проблеми с обработката на паралелни данни. Pyspark се справя със сложността на мултипроцесирането, като разпространение на данни, разпространение на код и събиране на изход от работниците на клъстер от машини.
Spark може да работи самостоятелно, но най-често работи върху клъстерна изчислителна рамка като Hadoop. При тестване и разработка обаче специалистът по данни може да работи ефективно Spark на техните развойни кутии или лаптопи без клъстер
• Едно от основните предимства на Spark е да се изгради архитектура, която обхваща управление на поточно предаване на данни, безпроблемни заявки за данни, прогнозиране на машинно обучение и достъп в реално време до различни анализи.
• Spark работи в тясно сътрудничество с езика SQL, т.е. структурирани данни. Позволява търсене на данни в реално време.
• Основната работа на специалиста по данни е да анализира и изгражда прогнозни модели. Накратко, специалистът по данни трябва да знае как да прави заявки за данни, използвайки SQL, изготвяне на статистически отчет и използване на машинно обучение за създаване на прогнози. Учените по данни прекарват значителна част от времето си за почистване, трансформиране и анализиране на данните. След като наборът от данни или работният поток от данни е готов, специалистът по данни използва различни техники, за да открие прозрения и скрити модели. Манипулирането на данни трябва да бъде стабилно и също толкова лесно за използване. Spark е правилният инструмент благодарение на своята скорост и богат API.
В този PySpark урок, ще научите как да създадете класификатор с PySpark примери.
Как да инсталирате PySpark с AWS
- Jupyter екип изгради Docker изображение за стартиране Spark ефективно. По-долу са стъпките, които можете да следвате, за да инсталирате PySpark екземпляр в AWS.
Вижте нашия урок AWS намлява TensorFlow
Стъпка 1: Създайте екземпляр
На първо място, трябва да създадете екземпляр. Отидете във вашия AWS акаунт и стартирайте екземпляра. Можете да увеличите паметта до 15 g и да използвате същата група за сигурност като в урока за TensorFlow.
Стъпка 2: Отворете връзката
Отворете връзката и инсталирайте докер контейнер. За повече подробности вижте урока с TensorFlow с докер. Имайте предвид, че трябва да сте в правилната работна директория.
Просто стартирайте тези кодове, за да инсталирате Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Стъпка 3: Отворете отново връзката и инсталирайте Spark
След като отворите отново връзката, можете да инсталирате изображението, съдържащо PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Стъпка 4: Отворете Jupyter
Проверете контейнера и името му
docker ps
Стартирайте докера с докер регистрационни файлове, последвани от името на докера. Например, докер регистрира zealous_goldwasser
Отидете до вашия браузър и стартирайте Jupyter. Адресът е http://localhost:8888/. Поставете паролата, дадена от терминала.
Забележка: ако искате да качите/изтеглите файл на вашата AWS машина, можете да използвате софтуера Cyberduck, https://cyberduck.io/.
Как да инсталирате PySpark on Windows/Mac с Conda
Следва подробен процес за инсталиране на PySpark on Windows/Mac използва Anaconda:
За да инсталирате Spark на вашата локална машина, препоръчителната практика е да създадете нова conda среда. Тази нова среда ще се инсталира Python 3.6, Spark и всички зависимости.
Потребител на Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Потребител
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Можете да редактирате .yml файла. Бъдете внимателни с отстъпа. Необходими са две интервали преди –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Запазете го и създайте среда. Отнема известно време
conda env create -f hello-spark.yml
За повече подробности относно местоположението, моля, проверете урока Инсталиране на TensorFlow
Можете да проверите цялата среда, инсталирана на вашето устройство
conda env list
Activate hello-spark
Потребител на Mac
source activate hello-spark
Windows Потребител
activate hello-spark
Забележка: Вече сте създали конкретна среда TensorFlow, за да изпълнявате уроците на TensorFlow. По-удобно е да създадете нова среда, различна от hello-tf. Няма смисъл да претоварвате hello-tf с Spark или всякакви други библиотеки за машинно обучение.
Представете си, че по-голямата част от вашия проект включва TensorFlow, но трябва да използвате Spark за един конкретен проект. Можете да зададете среда TensorFlow за целия си проект и да създадете отделна среда за Spark. Можете да добавите колкото се може повече библиотеки Spark среда, както искате, без да се намесвате в средата на TensorFlow. След като приключите с Sparkпроект на , можете да го изтриете, без да засягате средата на TensorFlow.
Jupyter
отворено Jupyter Бележник и опитайте дали PySpark работи. В нов бележник поставете следния PySpark примерен код:
import pyspark from pyspark import SparkContext sc =SparkContext()
Ако се покаже грешка, вероятно това е така Java не е инсталиран на вашата машина. В mac отворете терминала и напишете java -version, ако има версия на java, уверете се, че е 1.8. в Windows, отидете на Приложение и проверете дали има a Java папка. Ако има a Java папка, проверете това Java 1.8 е инсталирана. Към момента на писане, PySpark не е съвместим с Java9 и по-горе.
Ако трябва да инсталирате Java, ти да мислиш връзка и изтеглете jdk-8u181-windows-x64.exe
За потребители на Mac се препоръчва използването на `brew.`
brew tap caskroom/versions brew cask install java8
Вижте този урок стъпка по стъпка как да инсталирате Java
Забележка: Използвайте премахване, за да изтриете напълно среда.
conda env remove -n hello-spark -y
Spark Контекст
SparkКонтекстът е вътрешният двигател, който позволява връзките с клъстерите. Ако искате да извършите операция, имате нужда от SparkКонтекст.
Създаване на SparkКонтекст
На първо място, трябва да инициирате a SparkКонтекст.
import pyspark from pyspark import SparkContext sc =SparkContext()
Сега, че SparkКонтекстът е готов, можете да създадете колекция от данни, наречена RDD, Resilient Distributed Dataset. Изчисленията в RDD автоматично се паралелизират в клъстера.
nums= sc.parallelize([1,2,3,4])
Можете да получите достъп до първия ред с take
nums.take(1)
[1]
Можете да приложите трансформация към данните с ламбда функция. В PySpark в примера по-долу връщате квадрата на числата. Това е трансформация на карта
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContext
По-удобен начин е да използвате DataFrame. SparkКонтекстът вече е зададен, можете да го използвате, за да създадете dataFrame. Трябва също така да декларирате SQLContext
SQLContext позволява свързване на двигателя с различни източници на данни. Използва се за стартиране на функционалностите на Spark sql.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Сега в това Spark настойнически Python, нека създадем списък с кортежи. Всеки кортеж ще съдържа имената на хората и тяхната възраст. Необходими са четири стъпки:
Стъпка 1) Създайте списък от кортежи с информацията
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Стъпка 2) Изградете RDD
rdd = sc.parallelize(list_p)
Стъпка 3) Преобразувайте кортежите
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Стъпка 4) Създайте контекст на DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Ако искате да получите достъп до типа на всяка функция, можете да използвате printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Пример за машинно обучение с PySpark
Сега, когато имате кратка представа за Spark и SQLContext, вие сте готови да създадете първата си програма за машинно обучение.
Следват стъпките за изграждане на програма за машинно обучение с PySpark:
- Стъпка 1) Основна работа с PySpark
- Стъпка 2) Предварителна обработка на данни
- Стъпка 3) Изградете тръбопровод за обработка на данни
- Стъпка 4) Изградете класификатора: логистика
- Стъпка 5) Обучете и оценете модела
- Стъпка 6) Настройте хиперпараметъра
В този PySpark Урок за машинно обучение, ще използваме набора от данни за възрастни. Целта на този урок е да научите как да използвате Pyspark. За повече информация относно набора от данни вижте този урок.
Имайте предвид, че наборът от данни не е значителен и може да си помислите, че изчислението отнема много време. Spark е проектиран да обработва значително количество данни. SparkПроизводителността на се увеличава в сравнение с други библиотеки за машинно обучение, когато обработеният набор от данни нараства.
Стъпка 1) Основна операция с PySpark
На първо място, трябва да инициализирате SQLContext, който все още не е иницииран.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
след това можете да прочетете cvs файла с sqlContext.read.csv. Използвате inferSchema, зададена на True, за да кажете Spark за автоматично отгатване на типа на данните. По подразбиране е обърнато на False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Нека да разгледаме типа данни
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Можете да видите данните с show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Ако не сте задали inderShema на True, ето какво се случва с типа. Има всички в низ.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
За да конвертирате непрекъснатата променлива в правилния формат, можете да използвате преработване на колоните. Можете да използвате withColumn, за да кажете Spark коя колона да оперира трансформацията.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Изберете колони
Можете да изберете и покажете редовете с select и имената на функциите. По-долу са избрани възраст и fnlwgt.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Преброяване по групи
Ако искате да преброите броя на срещанията по група, можете да лансирате:
- Групирай по()
- броя()
заедно. В PySpark В примера по-долу броите броя на редовете според нивото на образование.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Опишете данните
За да получите обобщена статистика за данните, можете да използвате describe(). Той ще изчисли:
- броя
- означава
- стандартно отклонение
- мин
- макс
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Ако искате обобщената статистика само на една колона, добавете името на колоната вътре в describe()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Crosstab изчисление
В някои случаи може да бъде интересно да видите описателната статистика между две колони по двойки. Например, можете да преброите броя на хората с доходи под или над 50 хиляди по ниво на образование. Тази операция се нарича кръстосана таблица.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Можете да видите, че нито един човек няма приходи над 50 хиляди, когато е млад.
Капка колона
Има два интуитивни API за премахване на колони:
- drop(): Пускане на колона
- dropna(): Премахване на NA
По-долу пускате колоната education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Филтрирайте данните
Можете да използвате filter(), за да приложите описателна статистика в подмножество от данни. Например, можете да преброите броя на хората над 40 години
df.filter(df.age > 40).count()
13443
Descriptive статистика по групи
И накрая, можете да групирате данни по групи и да изчислявате статистически операции като средната стойност.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Стъпка 2) Предварителна обработка на данни
Обработката на данни е критична стъпка в машинното обучение. След като премахнете ненужните данни, получавате някои важни прозрения.
Например, знаете, че възрастта не е линейна функция с дохода. Когато хората са млади, техните доходи обикновено са по-ниски от тези на средна възраст. След пенсиониране домакинството използва своите спестявания, което означава намаляване на доходите. За да заснемете този модел, можете да добавите квадрат към функцията за възраст
Добавете квадрат за възраст
За да добавите нова функция, трябва:
- Изберете колоната
- Приложете трансформацията и я добавете към DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Можете да видите, че age_square е добавен успешно към рамката с данни. Можете да промените реда на променливите с select. По-долу въвеждате age_square веднага след age.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Изключете Холандия-Холандия
Когато група в рамките на характеристика има само едно наблюдение, тя не носи информация към модела. Напротив, това може да доведе до грешка по време на кръстосаното валидиране.
Да проверим произхода на домакинството
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Функцията native_country има само едно домакинство от Холандия. Вие го изключвате.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Стъпка 3) Изградете тръбопровод за обработка на данни
Подобно на scikit-learn, Pyspark има API за тръбопровод.
Конвейерът е много удобен за поддържане на структурата на данните. Вкарвате данните в тръбопровода. Вътре в конвейера се извършват различни операции, изходът се използва за захранване на алгоритъма.
Например, една универсална трансформация в машинното обучение се състои от преобразуване на низ в един горещ енкодер, т.е. една колона от група. Един горещ енкодер обикновено е матрица, пълна с нули.
Стъпките за трансформиране на данните са много подобни на scikit-learn. Трябва да:
- Индексирайте низа към числов
- Създайте един горещ енкодер
- Трансформирайте данните
Два API вършат работата: StringIndexer, OneHotEncoder
- Първо избирате колоната с низове за индексиране. InputCol е името на колоната в набора от данни. outputCol е новото име, дадено на трансформираната колона.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Поставете данните и ги трансформирайте
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Създайте колоните с новини въз основа на групата. Например, ако има 10 групи във функцията, новата матрица ще има 10 колони, по една за всяка група.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Изградете тръбопровода
Ще изградите тръбопровод, за да конвертирате всички точни характеристики и да ги добавите към крайния набор от данни. Конвейерът ще има четири операции, но не се колебайте да добавите толкова операции, колкото желаете.
- Кодирайте категориалните данни
- Индексирайте функцията на етикета
- Добавете непрекъсната променлива
- Сглобете стъпалата.
Всяка стъпка се съхранява в списък, наречен етапи. Този списък ще каже на VectorAssembler каква операция да извърши вътре в конвейера.
1. Кодирайте категоричните данни
Тази стъпка е абсолютно същата като горния пример, с изключение на това, че преминавате през всички категориални характеристики.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Индексирайте функцията на етикета
Spark, подобно на много други библиотеки, не приема низови стойности за етикета. Преобразувате функцията за етикет с StringIndexer и я добавяте към етапите на списъка
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Добавете непрекъсната променлива
InputCols на VectorAssembler е списък от колони. Можете да създадете нов списък, съдържащ всички нови колони. Кодът по-долу попълва списъка с кодирани категорични характеристики и непрекъснати характеристики.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Сглобете стъпалата.
Накрая преминавате всички стъпки във VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Сега, когато всички стъпки са готови, изпращате данните към тръбопровода.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Ако проверите новия набор от данни, можете да видите, че той съдържа всички характеристики, трансформирани и нетрансформирани. Вие се интересувате само от новия етикет и характеристики. Характеристиките включват всички трансформирани характеристики и непрекъснатите променливи.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Стъпка 4) Изградете класификатора: логистика
За да направите изчислението по-бързо, конвертирате модела в DataFrame.
Трябва да изберете нов етикет и функции от модела с помощта на карта.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Готови сте да създадете данните за влака като DataFrame. Използвате sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Проверете втория ред
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Създайте набор от влак/тест
Разделяте набора от данни 80/20 с randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Нека преброим колко хора с доходи под/над 50 XNUMX както в набора за обучение, така и в теста
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Изградете логистичния регресор
Не на последно място, можете да изградите класификатора. Pyspark има API, наречен LogisticRegression, за извършване на логистична регресия.
Вие инициализирате lr, като посочите колоната с етикети и колоните с характеристики. Задавате максимум 10 итерации и добавяте параметър за регулиране със стойност 0.3. Обърнете внимание, че в следващия раздел ще използвате кръстосано валидиране с мрежа от параметри, за да настроите модела
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Можете да видите коефициентите от регресията
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Стъпка 5) Обучете и оценете модела
За да генерирате прогноза за вашия набор от тестове,
Можете да използвате linearModel с transform() на test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Можете да отпечатате елементите в прогнозите
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Интересувате се от етикета, прогнозата и вероятността
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Оценете модела
Трябва да погледнете показателя за точност, за да видите колко добре (или лошо) се представя моделът. Понастоящем няма API за изчисляване на мярката за точност Spark. Стойността по подразбиране е ROC, крива на работната характеристика на приемника. Това е различна метрика, която отчита процента на фалшивите положителни резултати.
Преди да разгледате ROC, нека конструираме мярката за точност. Вие сте по-запознати с този показател. Мярката за точност е сумата от правилната прогноза спрямо общия брой наблюдения.
Създавате DataFrame с етикета и `предсказанието.
cm = predictions.select("label", "prediction")
Можете да проверите номера на класа в етикета и прогнозата
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Например в набора от тестове има 1578 домакинства с доход над 50k и 5021 под. Класификаторът обаче предвижда 617 домакинства с доход над 50 хиляди.
Можете да изчислите точността, като изчислите броя, когато етикетът е правилно класифициран спрямо общия брой редове.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Можете да обвиете всичко заедно и да напишете функция за изчисляване на точността.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC показатели
Модулът BinaryClassificationEvaluator включва ROC мерките. Приемникът OperaХарактеристичната крива е друг обичаен инструмент, използван с бинарна класификация. Тя е много подобна на кривата на прецизност/припомняне, но вместо да начертае прецизност спрямо припомняне, ROC кривата показва истинската положителна скорост (т.е. припомняне) срещу фалшиво положителна скорост. Процентът на фалшивите положителни резултати е съотношението на отрицателните случаи, които са неправилно класифицирани като положителни. Тя е равна на едно минус истинската отрицателна ставка. Истинският отрицателен процент се нарича още специфичност. Следователно ROC кривата изобразява чувствителността (припомнянето) спрямо 1 – специфичност
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192област Под ROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Стъпка 6) Настройте хиперпараметъра
Не на последно място, можете да настроите хиперпараметрите. Подобно на scikit научете създавате мрежа с параметри и добавяте параметрите, които искате да настроите.
За да намалите времето на изчислението, вие настройвате само параметъра за регулиране само с две стойности.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Накрая оценявате модела с помощта на метода на кръстосана проверка с 5 пъти. Тренировката отнема около 16 минути.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Време за обучение на модела: 978.807 секунди
Най-добрият хиперпараметър за регулиране е 0.01, с точност от 85.316 процента.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Можете да извлечете препоръчания параметър чрез верижно свързване на cvModel.bestModel с extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Oбобщение
Spark е основен инструмент за специалист по данни. Той позволява на практикуващия да свърже приложение с различни източници на данни, да извърши безпроблемно анализ на данни или да добави прогнозен модел.
Да започнем с Spark, трябва да инициирате a Spark Контекст с:
"SparkContext()'
и и SQL контекст за свързване към източник на данни:
„SQLContext()“
В урока научавате как да тренирате логистична регресия:
- Преобразувайте набора от данни в Dataframe с:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Имайте предвид, че името на колоната на етикета е newlabel и всички функции са събрани във функции. Променете тези стойности, ако са различни във вашия набор от данни.
- Създайте набора за обучение/тест
randomSplit([.8,.2],seed=1234)
- Обучете модела
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Направете прогноза
linearModel.transform()