PySpark Veiledning for nybegynnere: Lær med EKSEMPLER

Før du lærer PySpark, la oss forstå:

Hva er Apache Spark?

Spark er en big data-løsning som har vist seg å være enklere og raskere enn Hadoop MapReduce. Spark er en åpen kildekode-programvare utviklet av UC Berkeley RAD lab i 2009. Siden den ble utgitt for offentligheten i 2010, Spark har vokst i popularitet og brukes gjennom industrien med en enestående skala.

I den tiden av Store data, trenger utøvere mer enn noen gang raske og pålitelige verktøy for å behandle strømming av data. Tidligere verktøy som MapReduce var favoritter, men var trege. For å løse dette problemet, Spark tilbyr en løsning som er både rask og generell. Hovedforskjellen mellom Spark og MapReduce er det Spark kjører beregninger i minnet under senere på harddisken. Den tillater høyhastighetstilgang og databehandling, og reduserer tiden fra timer til minutter.

Hva er PySpark?

PySpark er et verktøy laget av Apache Spark Fellesskap for bruk Python med Spark. Det gjør det mulig å jobbe med RDD (Resilient Distributed Dataset) i Python. Det tilbyr også PySpark Shell for å linke Python APIer med Spark kjerne å sette i gang Spark Kontekst. Spark er navnemotoren for å realisere cluster computing, mens PySpark is Pythonsitt bibliotek å bruke Spark.

Hvordan gjør Spark arbeide?

Spark er basert på beregningsmotor, noe som betyr at den tar seg av planleggings-, distribusjons- og overvåkingsapplikasjonen. Hver oppgave utføres på tvers av ulike arbeidsmaskiner kalt dataklynge. En dataklynge refererer til oppgavedeling. En maskin utfører én oppgave, mens de andre bidrar til den endelige produksjonen gjennom en annen oppgave. Til slutt blir alle oppgavene samlet for å produsere et resultat. De Spark admin gir en 360 graders oversikt over ulike Spark Arbeidsplasser.

Hvordan gjør Spark vi
Hvordan gjør Spark vi

Spark er designet for å jobbe med

  • Python
  • Java
  • Skala
  • SQL

Et vesentlig trekk ved Spark er den enorme mengden innebygde biblioteker, inkludert MLlib for maskinlæring. Spark er også designet for å fungere med Hadoop-klynger og kan lese den brede typen filer, inkludert Hive-data, CSV, JSON, Casandra-data blant annet.

Hvorfor bruke Spark?

Som fremtidig datautøver bør du være kjent med pythons berømte biblioteker: Pandaer og scikit-learn. Disse to bibliotekene er fantastiske for å utforske datasett opp til mellomstore. Vanlige maskinlæringsprosjekter er bygget rundt følgende metodikk:

  • Last inn dataene til disken
  • Importer dataene til maskinens minne
  • Behandle/analysere dataene
  • Bygg maskinlæringsmodellen
  • Lagre prediksjonen tilbake til disken

Problemet oppstår hvis dataforskeren ønsker å behandle data som er for store for én datamaskin. I løpet av tidligere dager med datavitenskap, ville utøverne prøve det, da trening på enorme datasett ikke alltid var nødvendig. Dataforskeren ville finne et godt statistisk utvalg, utføre en ekstra robusthetssjekk og komme opp med en utmerket modell.

Det er imidlertid noen problemer med dette:

  • Gjenspeiler datasettet den virkelige verden?
  • Inneholder dataene et spesifikt eksempel?
  • Er modellen egnet for prøvetaking?

Ta brukeranbefaling for eksempel. Anbefalere er avhengige av å sammenligne brukere med andre brukere for å evaluere deres preferanser. Hvis datautøveren bare tar en delmengde av dataene, vil det ikke være en kohort av brukere som er veldig like hverandre. Anbefalere må kjøre på hele datasettet eller ikke i det hele tatt.

Hva er løsningen?

Løsningen har vært tydelig i lang tid, delt problemet opp på flere datamaskiner. Parallell databehandling kommer også med flere problemer. Utviklere har ofte problemer med å skrive parallell kode og ender opp med å måtte løse en haug med de komplekse problemene rundt multiprosessering i seg selv.

Pyspark gir dataforskeren et API som kan brukes til å løse problemene med parallelle dataforløp. Pyspark håndterer kompleksiteten til multiprosessering, for eksempel distribusjon av data, distribusjon av kode og innsamling av utdata fra arbeiderne på en klynge av maskiner.

Spark kan kjøre frittstående, men kjører oftest på toppen av et cluster computing-rammeverk som Hadoop. I test og utvikling kan imidlertid en dataforsker kjøre effektivt Spark på deres utviklingsbokser eller bærbare datamaskiner uten en klynge

• En av hovedfordelene med Spark er å bygge en arkitektur som omfatter administrasjon av datastrømming, sømløse dataspørringer, maskinlæringsprediksjon og sanntidstilgang til ulike analyser.

• Spark jobber tett med SQL-språk, dvs. strukturerte data. Det gjør det mulig å spørre etter data i sanntid.

• Dataforskers hovedoppgave er å analysere og bygge prediktive modeller. Kort sagt, en dataforsker trenger å vite hvordan man spør etter data ved hjelp av SQL, lage en statistisk rapport og bruke maskinlæring for å produsere spådommer. Dataforsker bruker en betydelig del av tiden sin på å rense, transformere og analysere dataene. Når datasettet eller dataarbeidsflyten er klar, bruker dataforskeren ulike teknikker for å oppdage innsikt og skjulte mønstre. Datamanipulasjonen skal være robust og like enkel å bruke. Spark er det riktige verktøyet takket være dets hastighet og rike APIer.

I denne PySpark veiledning, vil du lære hvordan du bygger en klassifisering med PySpark eksempler.

Hvordan installere PySpark med AWS

Ocuco Jupyter team bygge et Docker-bilde for å kjøre Spark effektivt. Nedenfor er trinnene du kan følge for å installere PySpark forekomst i AWS.

Se vår veiledning på AWS og tensorflow

Trinn 1: Opprett en forekomst

Først av alt må du opprette en forekomst. Gå til AWS-kontoen din og start forekomsten. Du kan øke lagringsplassen opp til 15g og bruke samme sikkerhetsgruppe som i TensorFlow-opplæringen.

Trinn 2: Åpne tilkoblingen

Åpne tilkoblingen og installer docker-beholder. For flere detaljer, se veiledningen med TensorFlow med Docker. Merk at du må være i riktig arbeidskatalog.

Bare kjør disse kodene for å installere Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Trinn 3: Åpne tilkoblingen på nytt og installer Spark

Etter at du har åpnet tilkoblingen på nytt, kan du installere bildet som inneholder PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Trinn 4: Åpen Jupyter

Sjekk beholderen og navnet

docker ps

Start docker med docker-logger etterfulgt av navnet på docker. For eksempel logger docker zealous_goldwasser

Gå til nettleseren din og start Jupyter. Adressen er http://localhost:8888/. Lim inn passordet gitt av terminalen.

Merknader: hvis du vil laste opp/laste ned en fil til AWS-maskinen din, kan du bruke programvaren Cyberduck, https://cyberduck.io/.

Hvordan installere PySpark on Windows/Mac med Conda

Følgende er en detaljert prosess for hvordan du installerer PySpark on Windows/Mac bruker Anaconda:

Slik installerer Spark på din lokale maskin er en anbefalt praksis å lage et nytt conda-miljø. Dette nye miljøet vil installeres Python 3.6, Spark og alle avhengighetene.

Mac-bruker

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Bruker

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Du kan redigere .yml-filen. Vær forsiktig med innrykk. Det kreves to plasser før –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Lagre det og skap miljøet. Det tar litt tid

conda env create -f hello-spark.yml

For mer informasjon om plasseringen, sjekk veiledningen Installer TensorFlow

Du kan sjekke alt miljøet som er installert på maskinen din

conda env list
Activate hello-spark

Mac-bruker

source activate hello-spark

Windows Bruker

activate hello-spark

OBS: Du har allerede opprettet et spesifikt TensorFlow-miljø for å kjøre opplæringen på TensorFlow. Det er mer praktisk å skape et nytt miljø som er annerledes enn hello-tf. Det gir ingen mening å overbelaste hello-tf med Spark eller andre maskinlæringsbiblioteker.

Tenk deg at det meste av prosjektet ditt involverer TensorFlow, men du må bruke Spark for ett bestemt prosjekt. Du kan sette et TensorFlow-miljø for hele prosjektet ditt og lage et eget miljø for Spark. Du kan legge til så mange biblioteker Spark miljøet som du vil uten å forstyrre TensorFlow-miljøet. Når du er ferdig med Sparksitt prosjekt, kan du slette det uten å påvirke TensorFlow-miljøet.

Jupyter

Open Jupyter Notebook og prøv om PySpark fungerer. I en ny notatbok lim inn følgende PySpark eksempelkode:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Hvis det vises en feil, er det sannsynlig at Java er ikke installert på maskinen din. I mac, åpne terminalen og skriv java -versjon, hvis det er en java-versjon, sørg for at den er 1.8. I Windows, gå til Application og sjekk om det er en Java mappe. Hvis det er en Java mappe, sjekk det Java 1.8 er installert. Når dette skrives, PySpark er ikke kompatibel med Java9 og over.

Hvis du trenger å installere Java, du å tenke link og last ned jdk-8u181-windows-x64.exe

Jupyter

For Mac-brukere anbefales det å bruke `brew.`

brew tap caskroom/versions
brew cask install java8

Se denne trinnvise opplæringen hvordan du installerer Java

Merknader: Bruk fjern for å slette et miljø fullstendig.

 conda env remove -n hello-spark -y

Spark Kontekst

SparkKontekst er den interne motoren som tillater forbindelsene med klyngene. Hvis du vil kjøre en operasjon, trenger du en SparkKontekst.

Lag en SparkKontekst

Først av alt må du sette i gang en SparkKontekst.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Nå som SparkKonteksten er klar, du kan lage en samling av data kalt RDD, Resilient Distributed Dataset. Beregning i en RDD blir automatisk parallellisert over klyngen.

nums= sc.parallelize([1,2,3,4])

Du får tilgang til den første raden med take

nums.take(1)
[1]

Du kan bruke en transformasjon til dataene med en lambda-funksjon. I PySpark eksempel nedenfor returnerer du kvadratet av tall. Det er en karttransformasjon

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

En mer praktisk måte er å bruke DataFrame. SparkKonteksten er allerede satt, du kan bruke den til å lage dataFrame. Du må også deklarere SQLContext

SQLContext gjør det mulig å koble motoren med forskjellige datakilder. Den brukes til å starte funksjonene til Spark sql.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Nå i dette Spark tutorial Python, la oss lage en liste over tuple. Hver tuppel vil inneholde navnet på personene og deres alder. Fire trinn kreves:

Trinn 1) Lag listen over tuple med informasjonen

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Trinn 2) Bygg en RDD

rdd = sc.parallelize(list_p)

Trinn 3) Konverter tuplene

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Trinn 4) Lag en DataFrame-kontekst

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Hvis du vil ha tilgang til typen for hver funksjon, kan du bruke printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Eksempel på maskinlæring med PySpark

Nå som du har en kort ide om Spark og SQLContext, er du klar til å bygge ditt første maskinlæringsprogram.

Følgende er trinnene for å bygge et maskinlæringsprogram med PySpark:

  • Trinn 1) Grunnleggende drift med PySpark
  • Trinn 2) Forbehandling av data
  • Trinn 3) Bygg en databehandlingspipeline
  • Trinn 4) Bygg klassifisereren: logistikk
  • Trinn 5) Trene og vurdere modellen
  • Trinn 6) Still inn hyperparameteren

I denne PySpark Machine Learning tutorial, vi vil bruke voksendatasettet. Hensikten med denne opplæringen er å lære hvordan du bruker Pyspark. For mer informasjon om datasettet, se denne opplæringen.

Merk at datasettet ikke er signifikant, og du tror kanskje at beregningen tar lang tid. Spark er designet for å behandle en betydelig mengde data. Sparkytelsen øker i forhold til andre maskinlæringsbiblioteker når datasettet som behandles vokser seg større.

Trinn 1) Grunnleggende drift med PySpark

Først av alt, må du initialisere SQLContext er ikke allerede i initiert ennå.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

deretter kan du lese cvs-filen med sqlContext.read.csv. Du bruker inferSchema satt til True for å fortelle Spark for å gjette automatisk type data. Som standard er den slått på False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

La oss ta en titt på datatypen

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Du kan se dataene med show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Hvis du ikke satte inderShema til True, her er hva som skjer med typen. Det er alle i streng.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

For å konvertere den kontinuerlige variabelen til riktig format, kan du bruke recast kolonnene. Du kan bruke withColumn for å fortelle Spark hvilken kolonne som skal utføre transformasjonen.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Velg kolonner

Du kan velge og vise radene med select og navnene på funksjonene. Nedenfor er alder og fnlwgt valgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Tell etter gruppe

Hvis du vil telle antall forekomster etter gruppe, kan du kjede:

  • gruppe av()
  • telle()

sammen. I PySpark eksempel nedenfor, teller du antall rader etter utdanningsnivå.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Beskriv dataene

For å få en oppsummerende statistikk over dataene kan du bruke describe(). Den vil beregne:

  • telle
  • bety
  • standardavvik
  • minutter
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Hvis du vil ha oppsummeringsstatistikken for bare én kolonne, legg til navnet på kolonnen i describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Krysstabellberegning

I noen tilfeller kan det være interessant å se den beskrivende statistikken mellom to parvise kolonner. For eksempel kan du telle antall personer med inntekt under eller over 50k etter utdanningsnivå. Denne operasjonen kalles en krysstabell.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Du kan se at ingen mennesker har inntekter over 50 XNUMX når de er unge.

Slipp kolonne

Det er to intuitive API for å slippe kolonner:

  • drop(): Slipp en kolonne
  • dropna(): Slipp NA-er

Nedenfor slipper du kolonnen education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Filtrer data

Du kan bruke filter() for å bruke beskrivende statistikk i et undersett av data. For eksempel kan du telle antall personer over 40 år

df.filter(df.age > 40).count()

13443

Descriptive statistikk etter gruppe

Til slutt kan du gruppere data etter gruppe og beregne statistiske operasjoner som gjennomsnittet.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Trinn 2) Dataforbehandling

Databehandling er et kritisk trinn i maskinlæring. Etter at du har fjernet søppeldata, får du noen viktige innsikter.

For eksempel vet du at alder ikke er en lineær funksjon med inntekten. Når folk er unge, er inntekten vanligvis lavere enn middelaldrende. Etter pensjonering bruker en husholdning sparepengene sine, noe som betyr en nedgang i inntekt. For å fange dette mønsteret kan du legge til en firkant til aldersfunksjonen

Legg til aldersrute

For å legge til en ny funksjon, må du:

  1. Velg kolonnen
  2. Bruk transformasjonen og legg den til DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Du kan se at age_square har blitt lagt til datarammen. Du kan endre rekkefølgen på variablene med select. Nedenfor tar du med age_square rett etter alder.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Ekskluder Holand-Nederland

Når en gruppe i en funksjon bare har én observasjon, gir den ingen informasjon til modellen. Tvert imot kan det føre til en feil under kryssvalideringen.

La oss sjekke opprinnelsen til husholdningen

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Funksjonen native_country har bare én husstand som kommer fra Nederland. Du utelukker det.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Trinn 3) Bygg en databehandlingspipeline

I likhet med scikit-learn har Pyspark en pipeline API.

En rørledning er veldig praktisk for å opprettholde strukturen til dataene. Du skyver dataene inn i rørledningen. Inne i rørledningen gjøres forskjellige operasjoner, utgangen brukes til å mate algoritmen.

For eksempel består én universell transformasjon i maskinlæring av å konvertere en streng til én varm enkoder, dvs. én kolonne av en gruppe. En varm koder er vanligvis en matrise full av nuller.

Trinnene for å transformere dataene ligner veldig på scikit-learn. Du må:

  • Indekser strengen til numerisk
  • Lag den ene varmekoderen
  • Transformere dataene

To APIer gjør jobben: StringIndexer, OneHotEncoder

  1. Først av alt velger du strengkolonnen som skal indekseres. inputCol er navnet på kolonnen i datasettet. outputCol er det nye navnet gitt til den transformerte kolonnen.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Tilpass dataene og transformer dem
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Lag nyhetsspaltene basert på gruppen. For eksempel, hvis det er 10 grupper i funksjonen, vil den nye matrisen ha 10 kolonner, en for hver gruppe.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Bygg rørledningen

Du vil bygge en pipeline for å konvertere alle de nøyaktige funksjonene og legge dem til det endelige datasettet. Rørledningen vil ha fire operasjoner, men legg gjerne til så mange operasjoner du ønsker.

  1. Kod inn de kategoriske dataene
  2. Indekser etikettfunksjonen
  3. Legg til kontinuerlig variabel
  4. Sett sammen trinnene.

Hvert trinn er lagret i en liste kalt stadier. Denne listen vil fortelle VectorAssembler hvilken operasjon som skal utføres inne i rørledningen.

1. Kod de kategoriske dataene

Dette trinnet er nøyaktig det samme som eksemplet ovenfor, bortsett fra at du går over alle de kategoriske funksjonene.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Indekser etikettfunksjonen

Spark, som mange andre biblioteker, godtar ikke strengverdier for etiketten. Du konverterer etikettfunksjonen med StringIndexer og legger den til listestadiene

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Legg til kontinuerlig variabel

InputCols til VectorAssembler er en liste over kolonner. Du kan opprette en ny liste som inneholder alle de nye kolonnene. Koden nedenfor fyller listen med kodede kategoriske funksjoner og de kontinuerlige funksjonene.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Sett sammen trinnene.

Til slutt passerer du alle trinnene i VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Nå som alle trinnene er klare, skyver du dataene til rørledningen.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Hvis du sjekker det nye datasettet, kan du se at det inneholder alle funksjonene, transformert og ikke transformert. Du er kun interessert i den nye etiketten og funksjonene. Funksjonene inkluderer alle de transformerte funksjonene og de kontinuerlige variablene.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Trinn 4) Bygg klassifisereren: logistikk

For å gjøre beregningen raskere, konverterer du modellen til en DataFrame.

Du må velge ny etikett og funksjoner fra modellen ved å bruke kartet.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Du er klar til å lage togdataene som en DataFrame. Du bruker sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Sjekk den andre raden

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Lag et tog/testsett

Du deler datasettet 80/20 med randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

La oss telle hvor mange personer med inntekt under/over 50k i både trening og testsett

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Bygg den logistiske regressoren

Sist, men ikke minst, kan du bygge klassifisereren. Pyspark har et API kalt LogisticRegression for å utføre logistisk regresjon.

Du initialiserer lr ved å angi etikettkolonnen og funksjonskolonner. Du angir maksimalt 10 iterasjoner og legger til en regulariseringsparameter med en verdi på 0.3. Merk at i neste avsnitt vil du bruke kryssvalidering med et parameternett for å justere modellen

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Du kan se koeffisientene fra regresjonen

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Trinn 5) Tren og evaluer modellen

For å generere prediksjon for testsettet ditt,

Du kan bruke linearModel med transform() på test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Du kan skrive ut elementene i spådommer

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Du er interessert i merkelappen, spådommen og sannsynligheten

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Evaluer modellen

Du må se på nøyaktighetsberegningen for å se hvor godt (eller dårlig) modellen presterer. Foreløpig er det ingen API for å beregne nøyaktighetsmålet i Spark. Standardverdien er ROC, mottakerens driftskarakteristikk. Det er en annen beregning som tar hensyn til falsk positiv rate.

Før du ser på ROC, la oss konstruere nøyaktighetsmålet. Du er mer kjent med denne beregningen. Nøyaktighetsmålet er summen av riktig prediksjon over det totale antallet observasjoner.

Du oppretter en DataFrame med etiketten og `prediksjon.

cm = predictions.select("label", "prediction")

Du kan sjekke antall klasser i etiketten og prediksjonen

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

For eksempel, i testsettet er det 1578 husstander med en inntekt over 50k og 5021 under. Klassifisereren spådde imidlertid 617 husstander med inntekt over 50k.

Du kan beregne nøyaktigheten ved å beregne antallet når etiketten er riktig klassifisert over det totale antallet rader.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Du kan pakke alt sammen og skrive en funksjon for å beregne nøyaktigheten.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC-beregninger

Modulen BinaryClassificationEvaluator inkluderer ROC-målene. Mottakeren Operating Karakteristisk kurve er et annet vanlig verktøy som brukes med binær klassifisering. Den er veldig lik presisjon/gjenkallingskurven, men i stedet for å plotte presisjon versus gjenkalling, viser ROC-kurven den sanne positive raten (dvs. tilbakekalling) mot den falske positive raten. Den falske positive frekvensen er forholdet mellom negative forekomster som feilaktig er klassifisert som positive. Det er lik én minus den sanne negative satsen. Den sanne negative raten kalles også spesifisitet. Derfor plotter ROC-kurven sensitivitet (gjenkalling) versus 1 – spesifisitet

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192areaUnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Trinn 6) Still inn hyperparameteren

Sist, men ikke minst, kan du justere hyperparametrene. Ligner på scikit lære du oppretter et parameternett, og du legger til parameterne du vil justere.

For å redusere tiden for beregningen, justerer du bare regulariseringsparameteren med bare to verdier.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Til slutt vurderer du modellen med å bruke kryssvalieringsmetoden med 5 folder. Det tar rundt 16 minutter å trene.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Tid til å trene modell: 978.807 sekunder

Den beste regulariseringshyperparameteren er 0.01, med en nøyaktighet på 85.316 prosent.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Du kan trekke ut den anbefalte parameteren ved å kjede cvModel.bestModel med extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Sammendrag

Spark er et grunnleggende verktøy for en dataforsker. Den lar utøveren koble en app til forskjellige datakilder, utføre dataanalyse sømløst eller legge til en prediktiv modell.

Til å begynne med Spark, må du sette i gang en Spark Kontekst med:

'SparkKontekst()'

Og og SQL kontekst for å koble til en datakilde:

'SQLContext()'

I opplæringen lærer du hvordan du trener en logistisk regresjon:

  1. Konverter datasettet til en dataramme med:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Merk at etikettens kolonnenavn er newlabel og alle funksjonene er samlet i funksjoner. Endre disse verdiene hvis de er forskjellige i datasettet.

  1. Lag toget/testsettet
randomSplit([.8,.2],seed=1234)
  1. Tren modellen
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Gjør spådommer
linearModel.transform()