PySpark Veiledning for nybegynnere: Lær med EKSEMPLER
Før du lærer PySpark, la oss forstå:
Hva er Apache Spark?
Spark er en big data-løsning som har vist seg å være enklere og raskere enn Hadoop MapReduce. Spark er en åpen kildekode-programvare utviklet av UC Berkeley RAD lab i 2009. Siden den ble utgitt for offentligheten i 2010, Spark har vokst i popularitet og brukes gjennom industrien med en enestående skala.
I den tiden av Store data, trenger utøvere mer enn noen gang raske og pålitelige verktøy for å behandle strømming av data. Tidligere verktøy som MapReduce var favoritter, men var trege. For å løse dette problemet, Spark tilbyr en løsning som er både rask og generell. Hovedforskjellen mellom Spark og MapReduce er det Spark kjører beregninger i minnet under senere på harddisken. Den tillater høyhastighetstilgang og databehandling, og reduserer tiden fra timer til minutter.
Hva er PySpark?
PySpark er et verktøy laget av Apache Spark Fellesskap for bruk Python med Spark. Det gjør det mulig å jobbe med RDD (Resilient Distributed Dataset) i Python. Det tilbyr også PySpark Shell for å linke Python APIer med Spark kjerne å sette i gang Spark Kontekst. Spark er navnemotoren for å realisere cluster computing, mens PySpark is Pythonsitt bibliotek å bruke Spark.
Hvordan gjør Spark arbeide?
Spark er basert på beregningsmotor, noe som betyr at den tar seg av planleggings-, distribusjons- og overvåkingsapplikasjonen. Hver oppgave utføres på tvers av ulike arbeidsmaskiner kalt dataklynge. En dataklynge refererer til oppgavedeling. En maskin utfører én oppgave, mens de andre bidrar til den endelige produksjonen gjennom en annen oppgave. Til slutt blir alle oppgavene samlet for å produsere et resultat. De Spark admin gir en 360 graders oversikt over ulike Spark Arbeidsplasser.
Spark er designet for å jobbe med
- Python
- Java
- Skala
- SQL
Et vesentlig trekk ved Spark er den enorme mengden innebygde biblioteker, inkludert MLlib for maskinlæring. Spark er også designet for å fungere med Hadoop-klynger og kan lese den brede typen filer, inkludert Hive-data, CSV, JSON, Casandra-data blant annet.
Hvorfor bruke Spark?
Som fremtidig datautøver bør du være kjent med pythons berømte biblioteker: Pandaer og scikit-learn. Disse to bibliotekene er fantastiske for å utforske datasett opp til mellomstore. Vanlige maskinlæringsprosjekter er bygget rundt følgende metodikk:
- Last inn dataene til disken
- Importer dataene til maskinens minne
- Behandle/analysere dataene
- Bygg maskinlæringsmodellen
- Lagre prediksjonen tilbake til disken
Problemet oppstår hvis dataforskeren ønsker å behandle data som er for store for én datamaskin. I løpet av tidligere dager med datavitenskap, ville utøverne prøve det, da trening på enorme datasett ikke alltid var nødvendig. Dataforskeren ville finne et godt statistisk utvalg, utføre en ekstra robusthetssjekk og komme opp med en utmerket modell.
Det er imidlertid noen problemer med dette:
- Gjenspeiler datasettet den virkelige verden?
- Inneholder dataene et spesifikt eksempel?
- Er modellen egnet for prøvetaking?
Ta brukeranbefaling for eksempel. Anbefalere er avhengige av å sammenligne brukere med andre brukere for å evaluere deres preferanser. Hvis datautøveren bare tar en delmengde av dataene, vil det ikke være en kohort av brukere som er veldig like hverandre. Anbefalere må kjøre på hele datasettet eller ikke i det hele tatt.
Hva er løsningen?
Løsningen har vært tydelig i lang tid, delt problemet opp på flere datamaskiner. Parallell databehandling kommer også med flere problemer. Utviklere har ofte problemer med å skrive parallell kode og ender opp med å måtte løse en haug med de komplekse problemene rundt multiprosessering i seg selv.
Pyspark gir dataforskeren et API som kan brukes til å løse problemene med parallelle dataforløp. Pyspark håndterer kompleksiteten til multiprosessering, for eksempel distribusjon av data, distribusjon av kode og innsamling av utdata fra arbeiderne på en klynge av maskiner.
Spark kan kjøre frittstående, men kjører oftest på toppen av et cluster computing-rammeverk som Hadoop. I test og utvikling kan imidlertid en dataforsker kjøre effektivt Spark på deres utviklingsbokser eller bærbare datamaskiner uten en klynge
• En av hovedfordelene med Spark er å bygge en arkitektur som omfatter administrasjon av datastrømming, sømløse dataspørringer, maskinlæringsprediksjon og sanntidstilgang til ulike analyser.
• Spark jobber tett med SQL-språk, dvs. strukturerte data. Det gjør det mulig å spørre etter data i sanntid.
• Dataforskers hovedoppgave er å analysere og bygge prediktive modeller. Kort sagt, en dataforsker trenger å vite hvordan man spør etter data ved hjelp av SQL, lage en statistisk rapport og bruke maskinlæring for å produsere spådommer. Dataforsker bruker en betydelig del av tiden sin på å rense, transformere og analysere dataene. Når datasettet eller dataarbeidsflyten er klar, bruker dataforskeren ulike teknikker for å oppdage innsikt og skjulte mønstre. Datamanipulasjonen skal være robust og like enkel å bruke. Spark er det riktige verktøyet takket være dets hastighet og rike APIer.
I denne PySpark veiledning, vil du lære hvordan du bygger en klassifisering med PySpark eksempler.
Hvordan installere PySpark med AWS
Ocuco Jupyter team bygge et Docker-bilde for å kjøre Spark effektivt. Nedenfor er trinnene du kan følge for å installere PySpark forekomst i AWS.
Se vår veiledning på AWS og tensorflow
Trinn 1: Opprett en forekomst
Først av alt må du opprette en forekomst. Gå til AWS-kontoen din og start forekomsten. Du kan øke lagringsplassen opp til 15g og bruke samme sikkerhetsgruppe som i TensorFlow-opplæringen.
Trinn 2: Åpne tilkoblingen
Åpne tilkoblingen og installer docker-beholder. For flere detaljer, se veiledningen med TensorFlow med Docker. Merk at du må være i riktig arbeidskatalog.
Bare kjør disse kodene for å installere Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Trinn 3: Åpne tilkoblingen på nytt og installer Spark
Etter at du har åpnet tilkoblingen på nytt, kan du installere bildet som inneholder PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Trinn 4: Åpen Jupyter
Sjekk beholderen og navnet
docker ps
Start docker med docker-logger etterfulgt av navnet på docker. For eksempel logger docker zealous_goldwasser
Gå til nettleseren din og start Jupyter. Adressen er http://localhost:8888/. Lim inn passordet gitt av terminalen.
Merknader: hvis du vil laste opp/laste ned en fil til AWS-maskinen din, kan du bruke programvaren Cyberduck, https://cyberduck.io/.
Hvordan installere PySpark on Windows/Mac med Conda
Følgende er en detaljert prosess for hvordan du installerer PySpark on Windows/Mac bruker Anaconda:
Slik installerer Spark på din lokale maskin er en anbefalt praksis å lage et nytt conda-miljø. Dette nye miljøet vil installeres Python 3.6, Spark og alle avhengighetene.
Mac-bruker
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Bruker
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Du kan redigere .yml-filen. Vær forsiktig med innrykk. Det kreves to plasser før –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Lagre det og skap miljøet. Det tar litt tid
conda env create -f hello-spark.yml
For mer informasjon om plasseringen, sjekk veiledningen Installer TensorFlow
Du kan sjekke alt miljøet som er installert på maskinen din
conda env list
Activate hello-spark
Mac-bruker
source activate hello-spark
Windows Bruker
activate hello-spark
OBS: Du har allerede opprettet et spesifikt TensorFlow-miljø for å kjøre opplæringen på TensorFlow. Det er mer praktisk å skape et nytt miljø som er annerledes enn hello-tf. Det gir ingen mening å overbelaste hello-tf med Spark eller andre maskinlæringsbiblioteker.
Tenk deg at det meste av prosjektet ditt involverer TensorFlow, men du må bruke Spark for ett bestemt prosjekt. Du kan sette et TensorFlow-miljø for hele prosjektet ditt og lage et eget miljø for Spark. Du kan legge til så mange biblioteker Spark miljøet som du vil uten å forstyrre TensorFlow-miljøet. Når du er ferdig med Sparksitt prosjekt, kan du slette det uten å påvirke TensorFlow-miljøet.
Jupyter
Open Jupyter Notebook og prøv om PySpark fungerer. I en ny notatbok lim inn følgende PySpark eksempelkode:
import pyspark from pyspark import SparkContext sc =SparkContext()
Hvis det vises en feil, er det sannsynlig at Java er ikke installert på maskinen din. I mac, åpne terminalen og skriv java -versjon, hvis det er en java-versjon, sørg for at den er 1.8. I Windows, gå til Application og sjekk om det er en Java mappe. Hvis det er en Java mappe, sjekk det Java 1.8 er installert. Når dette skrives, PySpark er ikke kompatibel med Java9 og over.
Hvis du trenger å installere Java, du å tenke link og last ned jdk-8u181-windows-x64.exe
For Mac-brukere anbefales det å bruke `brew.`
brew tap caskroom/versions brew cask install java8
Se denne trinnvise opplæringen hvordan du installerer Java
Merknader: Bruk fjern for å slette et miljø fullstendig.
conda env remove -n hello-spark -y
Spark Kontekst
SparkKontekst er den interne motoren som tillater forbindelsene med klyngene. Hvis du vil kjøre en operasjon, trenger du en SparkKontekst.
Lag en SparkKontekst
Først av alt må du sette i gang en SparkKontekst.
import pyspark from pyspark import SparkContext sc =SparkContext()
Nå som SparkKonteksten er klar, du kan lage en samling av data kalt RDD, Resilient Distributed Dataset. Beregning i en RDD blir automatisk parallellisert over klyngen.
nums= sc.parallelize([1,2,3,4])
Du får tilgang til den første raden med take
nums.take(1)
[1]
Du kan bruke en transformasjon til dataene med en lambda-funksjon. I PySpark eksempel nedenfor returnerer du kvadratet av tall. Det er en karttransformasjon
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContext
En mer praktisk måte er å bruke DataFrame. SparkKonteksten er allerede satt, du kan bruke den til å lage dataFrame. Du må også deklarere SQLContext
SQLContext gjør det mulig å koble motoren med forskjellige datakilder. Den brukes til å starte funksjonene til Spark sql.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Nå i dette Spark tutorial Python, la oss lage en liste over tuple. Hver tuppel vil inneholde navnet på personene og deres alder. Fire trinn kreves:
Trinn 1) Lag listen over tuple med informasjonen
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Trinn 2) Bygg en RDD
rdd = sc.parallelize(list_p)
Trinn 3) Konverter tuplene
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Trinn 4) Lag en DataFrame-kontekst
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Hvis du vil ha tilgang til typen for hver funksjon, kan du bruke printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Eksempel på maskinlæring med PySpark
Nå som du har en kort ide om Spark og SQLContext, er du klar til å bygge ditt første maskinlæringsprogram.
Følgende er trinnene for å bygge et maskinlæringsprogram med PySpark:
- Trinn 1) Grunnleggende drift med PySpark
- Trinn 2) Forbehandling av data
- Trinn 3) Bygg en databehandlingspipeline
- Trinn 4) Bygg klassifisereren: logistikk
- Trinn 5) Trene og vurdere modellen
- Trinn 6) Still inn hyperparameteren
I denne PySpark Machine Learning tutorial, vi vil bruke voksendatasettet. Hensikten med denne opplæringen er å lære hvordan du bruker Pyspark. For mer informasjon om datasettet, se denne opplæringen.
Merk at datasettet ikke er signifikant, og du tror kanskje at beregningen tar lang tid. Spark er designet for å behandle en betydelig mengde data. Sparkytelsen øker i forhold til andre maskinlæringsbiblioteker når datasettet som behandles vokser seg større.
Trinn 1) Grunnleggende drift med PySpark
Først av alt, må du initialisere SQLContext er ikke allerede i initiert ennå.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
deretter kan du lese cvs-filen med sqlContext.read.csv. Du bruker inferSchema satt til True for å fortelle Spark for å gjette automatisk type data. Som standard er den slått på False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
La oss ta en titt på datatypen
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Du kan se dataene med show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Hvis du ikke satte inderShema til True, her er hva som skjer med typen. Det er alle i streng.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
For å konvertere den kontinuerlige variabelen til riktig format, kan du bruke recast kolonnene. Du kan bruke withColumn for å fortelle Spark hvilken kolonne som skal utføre transformasjonen.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Velg kolonner
Du kan velge og vise radene med select og navnene på funksjonene. Nedenfor er alder og fnlwgt valgt.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Tell etter gruppe
Hvis du vil telle antall forekomster etter gruppe, kan du kjede:
- gruppe av()
- telle()
sammen. I PySpark eksempel nedenfor, teller du antall rader etter utdanningsnivå.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Beskriv dataene
For å få en oppsummerende statistikk over dataene kan du bruke describe(). Den vil beregne:
- telle
- bety
- standardavvik
- minutter
- max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Hvis du vil ha oppsummeringsstatistikken for bare én kolonne, legg til navnet på kolonnen i describe()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Krysstabellberegning
I noen tilfeller kan det være interessant å se den beskrivende statistikken mellom to parvise kolonner. For eksempel kan du telle antall personer med inntekt under eller over 50k etter utdanningsnivå. Denne operasjonen kalles en krysstabell.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Du kan se at ingen mennesker har inntekter over 50 XNUMX når de er unge.
Slipp kolonne
Det er to intuitive API for å slippe kolonner:
- drop(): Slipp en kolonne
- dropna(): Slipp NA-er
Nedenfor slipper du kolonnen education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Filtrer data
Du kan bruke filter() for å bruke beskrivende statistikk i et undersett av data. For eksempel kan du telle antall personer over 40 år
df.filter(df.age > 40).count()
13443
Descriptive statistikk etter gruppe
Til slutt kan du gruppere data etter gruppe og beregne statistiske operasjoner som gjennomsnittet.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Trinn 2) Dataforbehandling
Databehandling er et kritisk trinn i maskinlæring. Etter at du har fjernet søppeldata, får du noen viktige innsikter.
For eksempel vet du at alder ikke er en lineær funksjon med inntekten. Når folk er unge, er inntekten vanligvis lavere enn middelaldrende. Etter pensjonering bruker en husholdning sparepengene sine, noe som betyr en nedgang i inntekt. For å fange dette mønsteret kan du legge til en firkant til aldersfunksjonen
Legg til aldersrute
For å legge til en ny funksjon, må du:
- Velg kolonnen
- Bruk transformasjonen og legg den til DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Du kan se at age_square har blitt lagt til datarammen. Du kan endre rekkefølgen på variablene med select. Nedenfor tar du med age_square rett etter alder.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Ekskluder Holand-Nederland
Når en gruppe i en funksjon bare har én observasjon, gir den ingen informasjon til modellen. Tvert imot kan det føre til en feil under kryssvalideringen.
La oss sjekke opprinnelsen til husholdningen
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Funksjonen native_country har bare én husstand som kommer fra Nederland. Du utelukker det.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Trinn 3) Bygg en databehandlingspipeline
I likhet med scikit-learn har Pyspark en pipeline API.
En rørledning er veldig praktisk for å opprettholde strukturen til dataene. Du skyver dataene inn i rørledningen. Inne i rørledningen gjøres forskjellige operasjoner, utgangen brukes til å mate algoritmen.
For eksempel består én universell transformasjon i maskinlæring av å konvertere en streng til én varm enkoder, dvs. én kolonne av en gruppe. En varm koder er vanligvis en matrise full av nuller.
Trinnene for å transformere dataene ligner veldig på scikit-learn. Du må:
- Indekser strengen til numerisk
- Lag den ene varmekoderen
- Transformere dataene
To APIer gjør jobben: StringIndexer, OneHotEncoder
- Først av alt velger du strengkolonnen som skal indekseres. inputCol er navnet på kolonnen i datasettet. outputCol er det nye navnet gitt til den transformerte kolonnen.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Tilpass dataene og transformer dem
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Lag nyhetsspaltene basert på gruppen. For eksempel, hvis det er 10 grupper i funksjonen, vil den nye matrisen ha 10 kolonner, en for hver gruppe.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Bygg rørledningen
Du vil bygge en pipeline for å konvertere alle de nøyaktige funksjonene og legge dem til det endelige datasettet. Rørledningen vil ha fire operasjoner, men legg gjerne til så mange operasjoner du ønsker.
- Kod inn de kategoriske dataene
- Indekser etikettfunksjonen
- Legg til kontinuerlig variabel
- Sett sammen trinnene.
Hvert trinn er lagret i en liste kalt stadier. Denne listen vil fortelle VectorAssembler hvilken operasjon som skal utføres inne i rørledningen.
1. Kod de kategoriske dataene
Dette trinnet er nøyaktig det samme som eksemplet ovenfor, bortsett fra at du går over alle de kategoriske funksjonene.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Indekser etikettfunksjonen
Spark, som mange andre biblioteker, godtar ikke strengverdier for etiketten. Du konverterer etikettfunksjonen med StringIndexer og legger den til listestadiene
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Legg til kontinuerlig variabel
InputCols til VectorAssembler er en liste over kolonner. Du kan opprette en ny liste som inneholder alle de nye kolonnene. Koden nedenfor fyller listen med kodede kategoriske funksjoner og de kontinuerlige funksjonene.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Sett sammen trinnene.
Til slutt passerer du alle trinnene i VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Nå som alle trinnene er klare, skyver du dataene til rørledningen.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Hvis du sjekker det nye datasettet, kan du se at det inneholder alle funksjonene, transformert og ikke transformert. Du er kun interessert i den nye etiketten og funksjonene. Funksjonene inkluderer alle de transformerte funksjonene og de kontinuerlige variablene.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Trinn 4) Bygg klassifisereren: logistikk
For å gjøre beregningen raskere, konverterer du modellen til en DataFrame.
Du må velge ny etikett og funksjoner fra modellen ved å bruke kartet.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Du er klar til å lage togdataene som en DataFrame. Du bruker sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Sjekk den andre raden
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Lag et tog/testsett
Du deler datasettet 80/20 med randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
La oss telle hvor mange personer med inntekt under/over 50k i både trening og testsett
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Bygg den logistiske regressoren
Sist, men ikke minst, kan du bygge klassifisereren. Pyspark har et API kalt LogisticRegression for å utføre logistisk regresjon.
Du initialiserer lr ved å angi etikettkolonnen og funksjonskolonner. Du angir maksimalt 10 iterasjoner og legger til en regulariseringsparameter med en verdi på 0.3. Merk at i neste avsnitt vil du bruke kryssvalidering med et parameternett for å justere modellen
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Du kan se koeffisientene fra regresjonen
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Trinn 5) Tren og evaluer modellen
For å generere prediksjon for testsettet ditt,
Du kan bruke linearModel med transform() på test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Du kan skrive ut elementene i spådommer
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Du er interessert i merkelappen, spådommen og sannsynligheten
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Evaluer modellen
Du må se på nøyaktighetsberegningen for å se hvor godt (eller dårlig) modellen presterer. Foreløpig er det ingen API for å beregne nøyaktighetsmålet i Spark. Standardverdien er ROC, mottakerens driftskarakteristikk. Det er en annen beregning som tar hensyn til falsk positiv rate.
Før du ser på ROC, la oss konstruere nøyaktighetsmålet. Du er mer kjent med denne beregningen. Nøyaktighetsmålet er summen av riktig prediksjon over det totale antallet observasjoner.
Du oppretter en DataFrame med etiketten og `prediksjon.
cm = predictions.select("label", "prediction")
Du kan sjekke antall klasser i etiketten og prediksjonen
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
For eksempel, i testsettet er det 1578 husstander med en inntekt over 50k og 5021 under. Klassifisereren spådde imidlertid 617 husstander med inntekt over 50k.
Du kan beregne nøyaktigheten ved å beregne antallet når etiketten er riktig klassifisert over det totale antallet rader.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Du kan pakke alt sammen og skrive en funksjon for å beregne nøyaktigheten.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC-beregninger
Modulen BinaryClassificationEvaluator inkluderer ROC-målene. Mottakeren Operating Karakteristisk kurve er et annet vanlig verktøy som brukes med binær klassifisering. Den er veldig lik presisjon/gjenkallingskurven, men i stedet for å plotte presisjon versus gjenkalling, viser ROC-kurven den sanne positive raten (dvs. tilbakekalling) mot den falske positive raten. Den falske positive frekvensen er forholdet mellom negative forekomster som feilaktig er klassifisert som positive. Det er lik én minus den sanne negative satsen. Den sanne negative raten kalles også spesifisitet. Derfor plotter ROC-kurven sensitivitet (gjenkalling) versus 1 – spesifisitet
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192areaUnderROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Trinn 6) Still inn hyperparameteren
Sist, men ikke minst, kan du justere hyperparametrene. Ligner på scikit lære du oppretter et parameternett, og du legger til parameterne du vil justere.
For å redusere tiden for beregningen, justerer du bare regulariseringsparameteren med bare to verdier.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Til slutt vurderer du modellen med å bruke kryssvalieringsmetoden med 5 folder. Det tar rundt 16 minutter å trene.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Tid til å trene modell: 978.807 sekunder
Den beste regulariseringshyperparameteren er 0.01, med en nøyaktighet på 85.316 prosent.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Du kan trekke ut den anbefalte parameteren ved å kjede cvModel.bestModel med extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Sammendrag
Spark er et grunnleggende verktøy for en dataforsker. Den lar utøveren koble en app til forskjellige datakilder, utføre dataanalyse sømløst eller legge til en prediktiv modell.
Til å begynne med Spark, må du sette i gang en Spark Kontekst med:
'SparkKontekst()'
Og og SQL kontekst for å koble til en datakilde:
'SQLContext()'
I opplæringen lærer du hvordan du trener en logistisk regresjon:
- Konverter datasettet til en dataramme med:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Merk at etikettens kolonnenavn er newlabel og alle funksjonene er samlet i funksjoner. Endre disse verdiene hvis de er forskjellige i datasettet.
- Lag toget/testsettet
randomSplit([.8,.2],seed=1234)
- Tren modellen
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Gjør spådommer
linearModel.transform()