PySpark Vodič za početnike: Učite uz PRIMJERE
Prije učenja PySpark, da se razumijemo:
Što je Apache Spark?
Spark je rješenje za velike podatke koje je dokazano lakše i brže od Hadoop MapReduce. Spark je softver otvorenog koda koji je razvio RAD laboratorij UC Berkeley 2009. Otkako je 2010. pušten u javnost, Spark porasla je u popularnosti i koristi se u industriji u neviđenom opsegu.
U eri od Big Podacipraktičarima više nego ikad trebaju brzi i pouzdani alati za obradu toka podataka. Raniji alati poput MapReduce bili su omiljeni, ali su bili spori. Da biste prevladali ovaj problem, Spark nudi rješenje koje je istovremeno brzo i opće namjene. Glavna razlika između Spark a MapReduce je to Spark izvodi izračune u memoriji tijekom kasnijeg na tvrdom disku. Omogućuje brzi pristup i obradu podataka, skraćujući vrijeme sa sati na minute.
Što je PySpark?
PySpark je alat koji je stvorio Apache Spark Zajednica za korištenje Python s Spark. Omogućuje rad s RDD (Resilient Distributed Dataset) u Python. Također nudi PySpark Shell za povezivanje Python API-ji sa Spark jezgra pokrenuti Spark Kontekst. Spark naziv je motor za realizaciju klasterskog računalstva, dok je PySpark is Pythonknjižnica za korištenje Spark.
Kako Spark raditi?
Spark temelji se na računalnom stroju, što znači da brine o aplikaciji za planiranje, distribuciju i nadzor. Svaki se zadatak obavlja na različitim radnim strojevima koji se nazivaju računalni klaster. Računalni klaster odnosi se na podjelu zadataka. Jedan stroj obavlja jedan zadatak, dok ostali doprinose konačnom rezultatu kroz različite zadatke. Na kraju se svi zadaci zbrajaju kako bi proizveli izlaz. The Spark admin daje 360 pregled raznih Spark Poslovi.
Spark je dizajniran za rad s
- Python
- Java
- Skala
- SQL
Značajna karakteristika od Spark je velika količina ugrađene biblioteke, uključujući MLlib za strojno učenje. Spark također je dizajniran za rad s Hadoop klasterima i može čitati široku vrstu datoteka, uključujući Hive podatke, CSV, JSON, Casandra podatke između ostalog.
Zašto koristiti Spark?
Kao budući praktičar podataka, trebali biste biti upoznati s poznatim bibliotekama pythona: Pandas i scikit-learn. Ove dvije biblioteke su fantastične za istraživanje skupova podataka do srednje veličine. Redoviti projekti strojnog učenja izgrađeni su oko sljedeće metodologije:
- Učitajte podatke na disk
- Uvezite podatke u memoriju stroja
- Obraditi/analizirati podatke
- Izgradite model strojnog učenja
- Pohranite predviđanje natrag na disk
Problem nastaje ako podatkovni znanstvenik želi obraditi podatke koji su preveliki za jedno računalo. Tijekom ranijih dana znanosti o podacima, praktičari bi uzorkovali budući da obuka na velikim skupovima podataka nije uvijek bila potrebna. Znanstvenik bi pronašao dobar statistički uzorak, izvršio dodatnu provjeru robusnosti i došao do izvrsnog modela.
Međutim, postoje neki problemi s ovim:
- Odražava li skup podataka stvarni svijet?
- Sadrže li podaci određeni primjer?
- Je li model prikladan za uzorkovanje?
Uzmimo za primjer preporuke korisnika. Preporučitelji se oslanjaju na usporedbu korisnika s drugim korisnicima u procjeni njihovih preferencija. Ako stručnjak za podatke uzme samo podskup podataka, neće postojati kohorta korisnika koji su međusobno vrlo slični. Preporučitelji moraju raditi na cijelom skupu podataka ili uopće ne.
Što je to rješenje?
Rješenje je očito već duže vrijeme, podijelite problem na više računala. Paralelno računanje također dolazi s više problema. Programeri često imaju problema s pisanjem paralelnog koda i na kraju moraju riješiti hrpu složenih problema oko same višestruke obrade.
Pyspark daje podatkovnom znanstveniku API koji se može koristiti za rješavanje problema paralelnih podataka. Pyspark se nosi sa složenošću multiprocesiranja, kao što je distribucija podataka, distribucija koda i prikupljanje izlaznih podataka od radnika na klasteru strojeva.
Spark može raditi samostalno, ali najčešće radi na vrhu računalnog okvira klastera kao što je Hadoop. U testiranju i razvoju, međutim, podatkovni znanstvenik može učinkovito raditi Spark na svojim razvojnim kutijama ili laptopima bez klastera
• Jedna od glavnih prednosti Spark je izgraditi arhitekturu koja obuhvaća upravljanje protokom podataka, besprijekorne upite podataka, predviđanje strojnog učenja i pristup različitim analizama u stvarnom vremenu.
• Spark blisko radi sa SQL jezikom, tj. strukturiranim podacima. Omogućuje upite podataka u stvarnom vremenu.
• Glavni posao znanstvenika podataka je analiza i izgradnja prediktivnih modela. Ukratko, podatkovni znanstvenik mora znati kako postavljati upite podacima pomoću SQL, izraditi statističko izvješće i koristiti strojno učenje za izradu predviđanja. Podatkovni znanstvenik troši značajnu količinu svog vremena na čišćenje, transformaciju i analizu podataka. Nakon što je skup podataka ili tijek rada podataka spreman, podatkovni znanstvenik koristi se raznim tehnikama za otkrivanje uvida i skrivenih obrazaca. Manipulacija podacima treba biti robusna i jednako jednostavna za korištenje. Spark je pravi alat zahvaljujući brzini i bogatim API-jima.
U ovom PySpark tutorial, naučit ćete kako izgraditi klasifikator s PySpark primjeri.
Kako instalirati PySpark s AWS-om
The Jupyter tim izgraditi Docker sliku za pokretanje Spark efikasno. U nastavku su navedeni koraci koje možete slijediti da biste instalirali PySpark instanca u AWS-u.
Pogledajte naš vodič AWS i TensorFlow
Korak 1: Stvorite instancu
Prije svega, trebate stvoriti instancu. Idite na svoj AWS račun i pokrenite instancu. Možete povećati pohranu do 15 g i koristiti istu sigurnosnu grupu kao u TensorFlow vodiču.
Korak 2: Otvorite vezu
Otvorite vezu i instalirajte docker spremnik. Za više detalja pogledajte vodič za TensorFlow s Lučki radnik. Imajte na umu da morate biti u ispravnom radnom direktoriju.
Jednostavno pokrenite ove kodove da instalirate Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Korak 3: Ponovno otvorite vezu i instalirajte Spark
Nakon što ponovno otvorite vezu, možete instalirati sliku koja sadrži PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Korak 4: Otvoren Jupyter
Provjerite spremnik i njegov naziv
docker ps
Pokrenite docker s docker zapisima nakon kojih slijedi naziv dockera. Na primjer, docker bilježi zealous_goldwasser
Idite na svoj preglednik i pokrenite ga Jupyter. Adresa je http://localhost:8888/. Zalijepite lozinku koju je dao terminal.
bilješke: ako želite prenijeti/preuzeti datoteku na svoj AWS stroj, možete koristiti softver Cyberduck, https://cyberduck.io/.
Kako instalirati PySpark on Windows/Mac s Condom
Slijedi detaljan postupak kako instalirati PySpark on Windows/Mac koristi Anacondu:
Da biste instalirali Spark na vašem lokalnom računalu, preporučena praksa je stvaranje novog conda okruženja. Ovo novo okruženje će se instalirati Python 3.6, Spark i sve ovisnosti.
Mac korisnik
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows korisnik
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Možete urediti .yml datoteku. Budite oprezni s uvlakom. Prije su potrebna dva razmaka –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Sačuvajte i stvorite okruženje. Potrebno je neko vrijeme
conda env create -f hello-spark.yml
Za više pojedinosti o lokaciji pogledajte vodič za instalaciju TensorFlowa
Možete provjeriti sva okruženja instalirana na vašem stroju
conda env list
Activate hello-spark
Mac korisnik
source activate hello-spark
Windows korisnik
activate hello-spark
Bilješka: Već ste izradili specifično TensorFlow okruženje za pokretanje vodiča na TensorFlowu. Pogodnije je stvoriti novo okruženje različito od hello-tf. Nema smisla preopteretiti hello-tf s Spark ili bilo koje druge biblioteke za strojno učenje.
Zamislite da većina vašeg projekta uključuje TensorFlow, ali morate ga koristiti Spark za jedan određeni projekt. Možete postaviti TensorFlow okruženje za sve svoje projekte i stvoriti zasebno okruženje za Spark. Možete dodati koliko god biblioteka Spark okruženje kakvo želite bez uplitanja u okruženje TensorFlow. Nakon što završite s Sparkprojekta, možete ga izbrisati bez utjecaja na TensorFlow okruženje.
Jupyter
Otvoren Jupyter Bilježnica i pokušaj PySpark djela. U novu bilježnicu zalijepite sljedeći PySpark uzorak koda:
import pyspark from pyspark import SparkContext sc =SparkContext()
Ako se prikaže pogreška, vjerojatno je da Java nije instaliran na vašem računalu. U Macu otvorite terminal i napišite java -version, ako postoji java verzija, provjerite je li 1.8. U Windows, idite na Aplikacija i provjerite postoji li Java mapa. Ako postoji Java mapu, provjerite to Java Instaliran je 1.8. U trenutku pisanja ovog teksta, PySpark nije kompatibilno sa Java9 i više.
Ako trebate instalirati Java, ti misliti link i preuzmite jdk-8u181-windows-x64.exe
Za Mac korisnike preporučuje se korištenje `brew.`
brew tap caskroom/versions brew cask install java8
Pogledajte ovaj vodič korak po korak kako instalirati Java
bilješke: Koristite remove za potpuno brisanje okruženja.
conda env remove -n hello-spark -y
Spark Kontekst
SparkKontekst je unutarnji motor koji omogućuje veze s klasterima. Ako želite pokrenuti operaciju, trebate a SparkKontekst.
Napravite SparkKontekst
Prije svega, trebate pokrenuti a SparkKontekst.
import pyspark from pyspark import SparkContext sc =SparkContext()
Sada kada je SparkKontekst je spreman, možete stvoriti zbirku podataka pod nazivom RDD, Resilient Distributed Dataset. Računanje u RDD-u automatski se paralelizira u clusteru.
nums= sc.parallelize([1,2,3,4])
Prvom redu možete pristupiti uzmi
nums.take(1)
[1]
Na podatke možete primijeniti transformaciju pomoću lambda funkcije. U PySpark primjer ispod, vraćate kvadrat brojeva. To je transformacija karte
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContext
Prikladniji način je korištenje DataFramea. SparkKontekst je već postavljen, možete ga koristiti za izradu dataFramea. Također morate deklarirati SQLContext
SQLContext omogućuje povezivanje motora s različitim izvorima podataka. Koristi se za pokretanje funkcionalnosti Spark sql.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Sada u ovome Spark udžbenik Python, stvorimo popis torki. Svaka torka će sadržavati ime ljudi i njihovu dob. Potrebna su četiri koraka:
Korak 1) Napravite popis torki s informacijama
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Korak 2) Izgradite RDD
rdd = sc.parallelize(list_p)
Korak 3) Pretvorite torke
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Korak 4) Stvorite DataFrame kontekst
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Ako želite pristupiti vrsti svake značajke, možete koristiti printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Primjer strojnog učenja s PySpark
Sada kada imate kratku ideju o Spark i SQLContext, spremni ste izgraditi svoj prvi program strojnog učenja.
Slijede koraci za izradu programa strojnog učenja s PySpark:
- Korak 1) Osnovna operacija s PySpark
- Korak 2) Predobrada podataka
- Korak 3) Izgradite cjevovod za obradu podataka
- Korak 4) Izradite klasifikator: logistika
- Korak 5) Obučite i ocijenite model
- Korak 6) Podesite hiperparametar
U ovom PySpark Vodič za strojno učenje, koristit ćemo skup podataka za odrasle. Svrha ovog vodiča je naučiti kako koristiti Pyspark. Za više informacija o skupu podataka pogledajte ovaj vodič.
Imajte na umu da skup podataka nije značajan i možda mislite da izračun traje dugo. Spark dizajniran je za obradu značajne količine podataka. SparkPerformanse se povećavaju u odnosu na druge biblioteke strojnog učenja kada se obrađeni skup podataka poveća.
Korak 1) Osnovna operacija s PySpark
Prije svega, trebate inicijalizirati SQLContext koji još nije pokrenut.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
zatim, možete pročitati cvs datoteku s sqlContext.read.csv. Koristite inferSchema postavljenu na True da kažete Spark da automatski pogodite vrstu podataka. Prema zadanim postavkama postavljeno je na False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Pogledajmo vrstu podataka
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Podatke možete vidjeti pomoću prikaza.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Ako inderShema niste postavili na True, evo što se događa s tipom. Ima ih sve u nizu.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Za pretvaranje kontinuirane varijable u pravi format, možete upotrijebiti preinaku stupaca. Možete koristiti withColumn da kažete Spark koji stupac raditi transformaciju.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Odaberite stupce
Možete odabrati i prikazati retke s odabirom i nazivima značajki. Ispod su odabrane dob i fnlwgt.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Broji po grupama
Ako želite prebrojati broj pojavljivanja po grupi, možete lančano:
- grupiraj()
- računati()
zajedno. U PySpark u primjeru ispod, brojite broj redaka prema razini obrazovanja.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Opišite podatke
Da biste dobili sažetak statistike podataka, možete koristiti describe(). Izračunat će:
- računati
- značiti
- standardna devijacija
- minuta
- max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Ako želite sumarnu statistiku samo jednog stupca, dodajte naziv stupca unutar describe()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Unakrsno računanje
U nekim slučajevima može biti zanimljivo vidjeti deskriptivnu statistiku između dva stupca u paru. Na primjer, možete izračunati broj ljudi s prihodom ispod ili iznad 50 tisuća prema razini obrazovanja. Ova se operacija naziva unakrsnom tablicom.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Vidite da nitko nema prihod veći od 50 tisuća kad je mlad.
Ispusti stupac
Postoje dva intuitivna API-ja za ispuštanje stupaca:
- drop(): Ispusti stupac
- dropna(): Ispusti NA
Ispod ispuštate stupac broj obrazovanja
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Filtrirajte podatke
Možete koristiti filter() za primjenu deskriptivne statistike u podskupu podataka. Na primjer, možete izračunati broj ljudi starijih od 40 godina
df.filter(df.age > 40).count()
13443
Descriptive statistike po grupama
Konačno, možete grupirati podatke po grupama i izračunati statističke operacije poput srednje vrijednosti.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Korak 2) Predobrada podataka
Obrada podataka kritičan je korak u strojnom učenju. Nakon što uklonite otpadne podatke, dobit ćete neke važne uvide.
Na primjer, znate da dob nije linearna funkcija s prihodom. Kad su ljudi mladi, njihovi su prihodi obično niži nego u srednjim godinama. Nakon odlaska u mirovinu kućanstvo koristi svoju ušteđevinu, što znači smanjenje prihoda. Da biste uhvatili ovaj uzorak, možete dodati kvadrat značajki dobi
Dodajte dobni kvadrat
Da biste dodali novu značajku, trebate:
- Odaberite stupac
- Primijenite transformaciju i dodajte je u DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Možete vidjeti da je age_square uspješno dodan u podatkovni okvir. Možete promijeniti redoslijed varijabli pomoću select. Dolje donosite age_square odmah iza age.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Isključi Nizozemsku-Nizozemsku
Kada grupa unutar značajke ima samo jedno opažanje, ono ne donosi nikakve informacije modelu. Naprotiv, to može dovesti do pogreške tijekom unakrsne provjere.
Provjerimo porijeklo kućanstva
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Značajka native_country ima samo jedno kućanstvo koje dolazi iz Nizozemske. Vi ga isključite.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Korak 3) Izgradite cjevovod za obradu podataka
Slično scikit-learn-u, Pyspark ima API za cjevovod.
Cjevovod je vrlo prikladan za održavanje strukture podataka. Gurate podatke u cjevovod. Unutar cjevovoda obavljaju se razne operacije, izlaz se koristi za napajanje algoritma.
Na primjer, jedna univerzalna transformacija u strojnom učenju sastoji se od pretvaranja niza u jedan vrući koder, tj. jedan stupac po grupi. Jedan vrući enkoder obično je matrica puna nula.
Koraci za transformaciju podataka vrlo su slični scikit-learnu. Trebaš:
- Indeksirajte niz u numerički
- Stvorite jedan vrući koder
- Transformirajte podatke
Dva API-ja obavljaju posao: StringIndexer, OneHotEncoder
- Prije svega, odabirete stupac niza za indeksiranje. InputCol je naziv stupca u skupu podataka. outputCol je novo ime dano transformiranom stupcu.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Prilagodite podatke i transformirajte ih
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Kreirajte stupce vijesti na temelju grupe. Na primjer, ako postoji 10 grupa u značajci, nova matrica će imati 10 stupaca, po jedan za svaku grupu.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Izgradite cjevovod
Izgradit ćete cjevovod za pretvaranje svih preciznih značajki i njihovo dodavanje konačnom skupu podataka. Cjevovod će imati četiri operacije, ali slobodno dodajte onoliko operacija koliko želite.
- Kodirajte kategoričke podatke
- Indeksirajte značajku oznake
- Dodajte kontinuiranu varijablu
- Sastavite korake.
Svaki korak je pohranjen na popisu pod nazivom faze. Ovaj popis će reći VectorAssembleru koju operaciju treba izvesti unutar cjevovoda.
1. Kodirajte kategoričke podatke
Ovaj je korak u potpunosti isti kao gornji primjer, osim što prelazite kroz sve kategoričke značajke.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Indeksirajte značajku oznake
Spark, kao i mnoge druge biblioteke, ne prihvaća vrijednosti niza za oznaku. Značajku oznake pretvarate pomoću StringIndexera i dodajete je na popis faza
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Dodajte kontinuiranu varijablu
InputCols VectorAssemblera je popis stupaca. Možete stvoriti novi popis koji će sadržavati sve nove stupce. Kod u nastavku popunjava popis kodiranim kategoričkim značajkama i kontinuiranim značajkama.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Sastavite korake.
Konačno, prolazite sve korake u VectorAssembleru
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Sada kada su svi koraci spremni, gurnete podatke u cjevovod.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Ako provjerite novi skup podataka, možete vidjeti da sadrži sve značajke, transformirane i netransformirane. Zanimaju vas samo nova etiketa i karakteristike. Značajke uključuju sve transformirane značajke i kontinuirane varijable.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Korak 4) Izradite klasifikator: logistika
Kako bi izračun bio brži, pretvarate model u DataFrame.
Morate odabrati novu oznaku i značajke iz modela pomoću karte.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Spremni ste za izradu podataka o vlaku kao DataFrame. Koristite sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Provjerite drugi red
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Stvorite set vlakova/testiranja
Podijelite skup podataka 80/20 pomoću randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Izbrojimo koliko je ljudi s prihodom ispod/iznad 50 tisuća u obuci i testu
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Izgradite logistički regresor
Na kraju, ali ne manje važno, možete izgraditi klasifikator. Pyspark ima API pod nazivom LogisticRegression za izvođenje logističke regresije.
Inicijalizirate lr označavanjem stupca oznake i stupaca značajki. Postavljate najviše 10 ponavljanja i dodajete parametar regularizacije s vrijednošću 0.3. Imajte na umu da ćete u sljedećem odjeljku koristiti unakrsnu provjeru valjanosti s rešetkom parametara za podešavanje modela
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Možete vidjeti koeficijente iz regresije
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Korak 5) Obučite i ocijenite model
Za generiranje predviđanja za vaš skup testova,
Možete koristiti linearModel s transform() na test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Možete ispisati elemente u predviđanjima
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Zanimaju vas oznaka, predviđanje i vjerojatnost
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Ocijenite model
Morate pogledati metriku točnosti da vidite koliko je model dobar (ili loš). Trenutačno ne postoji API za izračunavanje mjere točnosti Spark. Zadana vrijednost je ROC, radna karakteristična krivulja prijemnika. Drugačija je metrika koja uzima u obzir lažno pozitivnu stopu.
Prije nego pogledate ROC, konstruirajmo mjeru točnosti. Više ste upoznati s ovom metrikom. Mjera točnosti je zbroj točnog predviđanja ukupnog broja opažanja.
Izradite DataFrame s oznakom i `predviđanjem.
cm = predictions.select("label", "prediction")
Možete provjeriti broj klase u naljepnici i predviđanje
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Na primjer, u testnom skupu postoji 1578 kućanstava s prihodom iznad 50 tisuća i 5021 ispod. Klasifikator je, međutim, predvidio 617 kućanstava s prihodom iznad 50 tisuća kuna.
Točnost možete izračunati izračunavanjem broja kada su oznake ispravno klasificirane prema ukupnom broju redaka.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Možete zamotati sve zajedno i napisati funkciju za izračunavanje točnosti.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC metrika
Modul BinaryClassificationEvaluator uključuje ROC mjere. Primatelj OperaKarakteristična krivulja još je jedan uobičajeni alat koji se koristi s binarnom klasifikacijom. Vrlo je slična krivulji preciznost/prisjećanje, ali umjesto iscrtavanja preciznosti u odnosu na prisjećanje, ROC krivulja pokazuje pravu pozitivnu stopu (tj. prisjećanje) naspram lažno pozitivne stope. Lažno pozitivna stopa je omjer negativnih slučajeva koji su netočno klasificirani kao pozitivni. Jednaka je jedan minus prava negativna stopa. Prava negativna stopa također se naziva specifičnost. Stoga ROC krivulja prikazuje osjetljivost (prisjećanje) u odnosu na 1 – specifičnost
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192područjePod ROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Korak 6) Podesite hiperparametar
Posljednje, ali ne manje važno, možete podesiti hiperparametre. Slično scikit naučiti stvorite rešetku parametara i dodate parametre koje želite podesiti.
Kako biste smanjili vrijeme izračuna, podešavate samo parametar regularizacije sa samo dvije vrijednosti.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Na kraju, procjenjujete model pomoću metode unakrsne provjere s 5 puta. Za trening je potrebno oko 16 minuta.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Vrijeme za treniranje modela: 978.807 sekundi
Najbolji regularizacijski hiperparametar je 0.01, s točnošću od 85.316 posto.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Preporučeni parametar možete izdvojiti ulančavanjem cvModel.bestModel s extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
rezime
Spark temeljni je alat za podatkovnog znanstvenika. Stručnjaku omogućuje povezivanje aplikacije s različitim izvorima podataka, besprijekornu analizu podataka ili dodavanje prediktivnog modela.
Početi sa Spark, trebate pokrenuti a Spark Kontekst sa:
'SparkKontekst()'
I i SQL kontekst za povezivanje s izvorom podataka:
'SQLContext()'
U tutorialu ćete naučiti kako trenirati logističku regresiju:
- Pretvorite skup podataka u Dataframe pomoću:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Imajte na umu da je naziv stupca oznake nova oznaka i da su sve značajke okupljene u značajkama. Promijenite ove vrijednosti ako se razlikuju u vašem skupu podataka.
- Stvorite set vlakova/testiranja
randomSplit([.8,.2],seed=1234)
- Uvježbajte model
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Napravite predviđanje
linearModel.transform()