PySpark Tutorial per principianti: impara con gli ESEMPI
Prima di imparare PySpark, capiamo:
Cos'è Apache Spark?
Spark è una soluzione per big data che ha dimostrato di essere più semplice e veloce di Hadoop MapReduce. Spark è un software open source sviluppato dal laboratorio RAD della UC Berkeley nel 2009. Da quando è stato rilasciato al pubblico nel 2010, Spark è cresciuto in popolarità e viene utilizzato nel settore con una scala senza precedenti.
Nell'era di Big Data, i professionisti necessitano più che mai di strumenti veloci e affidabili per elaborare lo streaming di dati. Strumenti precedenti come MapReduce erano i preferiti ma erano lenti. Per superare questo problema, Spark offre una soluzione rapida e generica. La differenza principale tra Spark e MapReduce è quello Spark esegue calcoli in memoria durante la successiva sul disco rigido. Consente l'accesso ad alta velocità e l'elaborazione dei dati, riducendo i tempi da ore a minuti.
Cos'è PySpark?
PySpark è uno strumento creato da Apache Spark Comunità per l'utilizzo Python con Spark. Permette di lavorare con RDD (Resilient Distributed Dataset) in Python. Offre anche PySpark Shell da collegare Python API con Spark nucleo da avviare Spark Contesto. Spark è il nome del motore per realizzare il cluster computing, mentre PySpark is Pythonda utilizzare Spark.
Come si confronta la Spark funziona?
Spark si basa su un motore computazionale, il che significa che si occupa della pianificazione, distribuzione e monitoraggio dell'applicazione. Ogni attività viene eseguita su varie macchine di lavoro chiamate cluster di elaborazione. Un cluster informatico si riferisce alla divisione dei compiti. Una macchina esegue un compito, mentre le altre contribuiscono al risultato finale attraverso un compito diverso. Alla fine, tutte le attività vengono aggregate per produrre un output. IL Spark L'amministratore fornisce una panoramica a 360 gradi di vari Spark Lavori.
Spark è progettato per funzionare con
- Python
- Java
- Scala
- SQL
Una caratteristica significativa di Spark è la grande quantità di libreria integrata, incluso MLlib per l'apprendimento automatico. Spark è inoltre progettato per funzionare con i cluster Hadoop e può leggere un'ampia tipologia di file, inclusi dati Hive, CSV, JSON, dati Casandra tra gli altri.
Perché usare Spark?
Come futuro data practitioner, dovresti avere familiarità con le famose librerie di Python: Pandas e scikit-learn. Queste due librerie sono fantastiche per esplorare dataset di medie dimensioni. I progetti di machine learning regolari sono costruiti attorno alla seguente metodologia:
- Caricare i dati sul disco
- Importare i dati nella memoria della macchina
- Elaborare/analizzare i dati
- Costruisci il modello di machine learning
- Memorizza la previsione su disco
Il problema sorge se il data scientist vuole elaborare dati troppo grandi per un computer. Agli albori della scienza dei dati, i professionisti campionavano i dati poiché la formazione su enormi set di dati non era sempre necessaria. Lo scienziato dei dati troverebbe un buon campione statistico, eseguirebbe un ulteriore controllo di robustezza e fornirebbe un modello eccellente.
Tuttavia, ci sono alcuni problemi con questo:
- Il set di dati riflette il mondo reale?
- I dati includono un esempio specifico?
- Il modello è adatto per il campionamento?
Prendi ad esempio i consigli degli utenti. I suggerimenti si basano sul confronto degli utenti con altri utenti per valutare le loro preferenze. Se il professionista dei dati prende solo un sottoinsieme dei dati, non ci sarà una coorte di utenti molto simili tra loro. I suggerimenti devono essere eseguiti sull'intero set di dati o non essere eseguiti affatto.
Qual'è la soluzione?
La soluzione è evidente da molto tempo: suddividere il problema su più computer. Anche il calcolo parallelo presenta più problemi. Gli sviluppatori hanno spesso difficoltà a scrivere codice parallelo e finiscono per dover risolvere un sacco di problemi complessi relativi al multi-processing stesso.
Pyspark fornisce allo scienziato dei dati un'API che può essere utilizzata per risolvere i problemi di elaborazione dati parallela. Pyspark gestisce le complessità del multiprocessing, come la distribuzione dei dati, la distribuzione del codice e la raccolta dell'output dai worker su un cluster di macchine.
Spark può essere eseguito in modo autonomo ma molto spesso viene eseguito su un framework di cluster computing come Hadoop. Nelle fasi di test e sviluppo, tuttavia, un data scientist può operare in modo efficiente Spark sui loro box di sviluppo o laptop senza cluster
• Uno dei principali vantaggi di Spark è quello di costruire un'architettura che comprenda la gestione dello streaming di dati, query di dati fluide, previsioni di apprendimento automatico e accesso in tempo reale a varie analisi.
· XNUMX€ Spark lavora a stretto contatto con il linguaggio SQL, ovvero i dati strutturati. Permette di interrogare i dati in tempo reale.
• Il compito principale del data scientist è analizzare e costruire modelli predittivi. In breve, uno scienziato dei dati deve sapere come interrogare i dati utilizzando SQL, produrre un rapporto statistico e utilizzare l'apprendimento automatico per produrre previsioni. Il data scientist dedica una notevole quantità di tempo alla pulizia, trasformazione e analisi dei dati. Una volta che il set di dati o il flusso di lavoro dei dati è pronto, il data scientist utilizza varie tecniche per scoprire approfondimenti e modelli nascosti. La manipolazione dei dati dovrebbe essere robusta e allo stesso tempo facile da usare. Spark è lo strumento giusto grazie alla sua velocità e alle ricche API.
In questo PySpark tutorial, imparerai come costruire un classificatore con PySpark esempi.
Come installare PySpark conAWS
La sezione Currents, dedicata a opere audaci e innovative di artisti emergenti e affermati, include la prima statunitense di Mare’s Nest di Ben Rivers, descritto come “un enigmatico road movie ambientato in un mondo post-apocalittico governato da bambini”. Tra gli altri titoli spiccano Dracula di Radu Jude e With Hasan in Gaza di Kamal Aljafari. Jupyter il team crea un'immagine Docker da eseguire Spark in modo efficiente. Di seguito sono riportati i passaggi che è possibile seguire per installare PySpark istanza in AWS.
Consulta il nostro tutorial su AWS e TensorFlow
Passaggio 1: crea un'istanza
Prima di tutto, devi creare un'istanza. Vai al tuo account AWS e avvia l'istanza. Puoi aumentare lo spazio di archiviazione fino a 15 g e utilizzare lo stesso gruppo di sicurezza del tutorial di TensorFlow.
Passaggio 2: aprire la connessione
Apri la connessione e installa il contenitore docker. Per maggiori dettagli, fai riferimento al tutorial con TensorFlow con docker. Tieni presente che devi trovarti nella directory di lavoro corretta.
Basta eseguire questi codici per installare Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Passaggio 3: riaprire la connessione e installare Spark
Dopo aver riaperto la connessione, puoi installare l'immagine contenente PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Passo 4: aperto Jupyter
Controlla il contenitore e il suo nome
docker ps
Avvia la finestra mobile con i log della finestra mobile seguiti dal nome della finestra mobile. Ad esempio, la finestra mobile registra zealous_goldwasser
Vai al tuo browser e avvia Jupyter. L'indirizzo è http://localhost:8888/. Incolla la password fornita dal terminale.
Note:: se desideri caricare/scaricare un file sulla tua macchina AWS, puoi utilizzare il software Cyberduck, https://cyberduck.io/.
Come installare PySpark on Windows/Mac con Conda
Di seguito è riportato un processo dettagliato su come installare PySpark on Windows/Mac utilizzando Anaconda:
Per installare Spark sul tuo computer locale, una pratica consigliata è creare un nuovo ambiente conda. Questo nuovo ambiente verrà installato Python 3.6 Spark e tutte le dipendenze.
Utente Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Utente
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Puoi modificare il file .yml. Sii cauto con il trattino. Sono necessari due spazi prima –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Salvalo e crea l'ambiente. Ci vuole un po' di tempo
conda env create -f hello-spark.yml
Per maggiori dettagli sulla posizione, consultare il tutorial Installa TensorFlow
Puoi controllare tutto l'ambiente installato sulla tua macchina
conda env list
Activate hello-spark
Utente Mac
source activate hello-spark
Windows Utente
activate hello-spark
Nota: Hai già creato un ambiente TensorFlow specifico per eseguire i tutorial su TensorFlow. È più conveniente creare un nuovo ambiente diverso da hello-tf. Non ha senso sovraccaricare hello-tf con Spark o qualsiasi altra libreria di machine learning.
Immagina che la maggior parte del tuo progetto coinvolga TensorFlow, ma devi utilizzarlo Spark per un progetto particolare. Puoi impostare un ambiente TensorFlow per tutto il tuo progetto e creare un ambiente separato per Spark. Puoi aggiungere quante librerie vuoi Spark ambiente come desideri senza interferire con l'ambiente TensorFlow. Una volta che hai finito con Sparkdel progetto, puoi cancellarlo senza influire sull'ambiente TensorFlow.
Jupyter
Apri Jupyter Notebook e prova se PySpark funziona. In un nuovo notebook incolla il seguente PySpark Codice d'esempio:
import pyspark from pyspark import SparkContext sc =SparkContext()
Se viene visualizzato un errore, è probabile che sia così Java non è installato sulla tua macchina. In mac, apri il terminale e scrivi java -version, se c'è una versione java, assicurati che sia 1.8. In Windows, vai su Applicazione e controlla se è presente un file Java cartella. Se c'è un Java cartella, controllalo Java 1.8 è installato. Al momento della stesura di questo articolo, PySpark non è compatibile con Java9 e versioni successive.
Se è necessario installare Java, devi pensare link e scaricare jdk-8u181-windows-x64.exe
Per gli utenti Mac, si consiglia di utilizzare `brew.`
brew tap caskroom/versions brew cask install java8
Fare riferimento a questo tutorial passo passo su come installare Java
Note:: utilizzare rimuovi per cancellare completamente un ambiente.
conda env remove -n hello-spark -y
Spark Contesto
SparkIl contesto è il motore interno che permette le connessioni con i cluster. Se vuoi eseguire un'operazione, hai bisogno di un file SparkContesto.
Creare un SparkContesto
Prima di tutto, devi avviare a SparkContesto.
import pyspark from pyspark import SparkContext sc =SparkContext()
Ora che il SparkIl contesto è pronto, puoi creare una raccolta di dati chiamata RDD, Resilient Distributed Dataset. Il calcolo in un RDD viene automaticamente parallelizzato nel cluster.
nums= sc.parallelize([1,2,3,4])
Puoi accedere alla prima riga con take
nums.take(1)
[1]
È possibile applicare una trasformazione ai dati con una funzione lambda. Nel PySpark nell'esempio seguente, restituisci il quadrato dei num. È una trasformazione della mappa
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContesto
Un modo più conveniente è utilizzare DataFrame. SparkIl contesto è già impostato, puoi usarlo per creare dataFrame. È inoltre necessario dichiarare SQLContext
SQLContext consente di connettere il motore con diverse origini dati. Viene utilizzato per avviare le funzionalità di Spark SQL.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Ora in questo Spark lezione Python, creiamo un elenco di tuple. Ogni tupla conterrà il nome delle persone e la loro età. Sono necessari quattro passaggi:
Passo 1) Crea l'elenco di tuple con le informazioni
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Passo 2) Costruisci un RDD
rdd = sc.parallelize(list_p)
Passo 3) Converti le tuple
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Passo 4) Creare un contesto DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Se vuoi accedere al tipo di ciascuna funzionalità, puoi usare printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Esempio di machine learning con PySpark
Ora che hai una breve idea di Spark e SQLContext, sei pronto per creare il tuo primo programma di machine learning.
Di seguito sono riportati i passaggi per creare un programma di apprendimento automatico con PySpark:
- Passo 1) Operazioni di base con PySpark
- Passo 2) Preelaborazione dei dati
- Passo 3) Costruisci una pipeline di elaborazione dati
- Passo 4) Costruisci il classificatore: logistico
- Passo 5) Formare e valutare il modello
- Passo 6) Ottimizza l'iperparametro
In questo PySpark Tutorial di Machine Learning, useremo il dataset per adulti. Lo scopo di questo tutorial è imparare a usare Pyspark. Per maggiori informazioni sul dataset, fare riferimento a questo tutorial.
Tieni presente che il set di dati non è significativo e potresti pensare che il calcolo richieda molto tempo. Spark è progettato per elaborare una notevole quantità di dati. SparkLe prestazioni di aumentano rispetto ad altre librerie di machine learning quando il set di dati elaborato diventa più grande.
Passaggio 1) Operazioni di base con PySpark
Prima di tutto, è necessario inizializzare SQLContext che non sia ancora iniziato.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
quindi, puoi leggere il file cvs con sqlContext.read.csv. Per dirlo si utilizza inferSchema impostato su True Spark per indovinare automaticamente il tipo di dati. Per impostazione predefinita, è impostato su False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Diamo un'occhiata al tipo di dati
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Puoi vedere i dati con show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Se non hai impostato inderShema su True, ecco cosa sta succedendo al tipo. Ci sono tutti in corda.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Per convertire la variabile continua nel formato corretto, puoi utilizzare la rifusione delle colonne. Puoi usare withColumn per dirlo Spark su quale colonna operare la trasformazione.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Seleziona colonne
Puoi selezionare e mostrare le righe con select e i nomi delle funzionalità. Di seguito vengono selezionati età e fnlwgt.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Contare per gruppo
Se vuoi contare il numero di occorrenze per gruppo, puoi concatenare:
- raggruppa per()
- contare()
insieme. Nel PySpark nell'esempio seguente, conti il numero di righe in base al livello di istruzione.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Descrivi i dati
Per ottenere una statistica riepilogativa dei dati, è possibile utilizzare description(). Calcolerà:
- contare
- significare
- deviazione standard
- verbale
- max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Se desideri la statistica riassuntiva di una sola colonna, aggiungi il nome della colonna all'interno di description()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Calcolo della tabella incrociata
In alcune occasioni, può essere interessante vedere le statistiche descrittive tra due colonne a coppie. Ad esempio, puoi contare il numero di persone con un reddito inferiore o superiore a 50k in base al livello di istruzione. Questa operazione è chiamata tabella incrociata.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Puoi vedere che nessuna persona ha entrate superiori a 50 quando è giovane.
Colonna di rilascio
Esistono due API intuitive per eliminare le colonne:
- drop(): rilascia una colonna
- dropna(): elimina gli NA
Sotto trascini la colonna education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Filtra i dati
È possibile utilizzare filter() per applicare statistiche descrittive in un sottoinsieme di dati. Ad esempio, puoi contare il numero di persone di età superiore a 40 anni
df.filter(df.age > 40).count()
13443
Descriptcinque statistiche per gruppo
Infine, puoi raggruppare i dati per gruppo e calcolare operazioni statistiche come la media.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Passaggio 2) Preelaborazione dei dati
L’elaborazione dei dati è un passaggio fondamentale nell’apprendimento automatico. Dopo aver rimosso i dati inutili, ottieni alcune informazioni importanti.
Ad esempio, sai che l'età non è una funzione lineare del reddito. Quando le persone sono giovani, il loro reddito è solitamente inferiore a quello della mezza età. Dopo il pensionamento, una famiglia utilizza i propri risparmi, il che significa una diminuzione del reddito. Per catturare questo modello, puoi aggiungere un quadrato alla funzione età
Aggiungi il quadrato dell'età
Per aggiungere una nuova funzionalità, è necessario:
- Seleziona la colonna
- Applica la trasformazione e aggiungila a DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Puoi vedere che age_square è stato aggiunto con successo al data frame. Puoi cambiare l'ordine delle variabili con select. Di seguito, riporti age_square subito dopo age.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Escludere Olanda-Paesi Bassi
Quando un gruppo all'interno di una caratteristica ha una sola osservazione, non apporta alcuna informazione al modello. Al contrario, può portare ad un errore durante la convalida incrociata.
Controlliamo l'origine della famiglia
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
La funzione native_country ha solo una famiglia proveniente dai Paesi Bassi. Lo escludi.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Passaggio 3) Costruire una pipeline di elaborazione dati
Simile a scikit-learn, Pyspark ha una pipeline API.
Una pipeline è molto comoda per mantenere la struttura dei dati. Inserisci i dati nella pipeline. All'interno della pipeline vengono eseguite varie operazioni, l'output viene utilizzato per alimentare l'algoritmo.
Ad esempio, una trasformazione universale nell'apprendimento automatico consiste nel convertire una stringa in un codificatore attivo, ovvero una colonna per gruppo. Un codificatore attivo è solitamente una matrice piena di zeri.
I passaggi per trasformare i dati sono molto simili a scikit-learn. Devi:
- Indicizzare la stringa in numerico
- Crea l'unico codificatore interessante
- Trasforma i dati
Due API svolgono il lavoro: StringIndexer, OneHotEncoder
- Prima di tutto, seleziona la colonna stringa da indicizzare. inputCol è il nome della colonna nel set di dati. outputCol è il nuovo nome assegnato alla colonna trasformata.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Adatta i dati e trasformali
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Crea le colonne di notizie in base al gruppo. Ad esempio, se nella funzione sono presenti 10 gruppi, la nuova matrice avrà 10 colonne, una per ciascun gruppo.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Costruisci la pipeline
Costruirai una pipeline per convertire tutte le funzionalità precise e aggiungerle al set di dati finale. La pipeline avrà quattro operazioni, ma sentiti libero di aggiungere quante operazioni desideri.
- Codificare i dati categorici
- Indicizzare la caratteristica dell'etichetta
- Aggiungi variabile continua
- Assemblare i passaggi.
Ogni passaggio viene memorizzato in un elenco denominato fasi. Questo elenco indicherà a VectorAssembler quale operazione eseguire all'interno della pipeline.
1. Codificare i dati categorici
Questo passaggio è esattamente lo stesso dell'esempio precedente, tranne per il fatto che si ripetono tutte le funzionalità categoriche.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Indicizzare la caratteristica dell'etichetta
Spark, come molte altre librerie, non accetta valori stringa per l'etichetta. Converti la funzionalità etichetta con StringIndexer e la aggiungi alle fasi dell'elenco
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Aggiungi variabile continua
L'inputCols di VectorAssembler è un elenco di colonne. È possibile creare un nuovo elenco contenente tutte le nuove colonne. Il codice seguente popola l'elenco con le caratteristiche categoriche codificate e le caratteristiche continue.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Assemblare i gradini.
Infine, esegui tutti i passaggi in VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Ora che tutti i passaggi sono pronti, esegui il push dei dati nella pipeline.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Se controlli il nuovo set di dati, puoi vedere che contiene tutte le funzionalità, trasformate e non trasformate. Ti interessano solo la nuova etichetta e le funzionalità. Le feature includono tutte le feature trasformate e le variabili continue.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Passaggio 4) Costruisci il classificatore: logistico
Per rendere il calcolo più veloce, converti il modello in un DataFrame.
È necessario selezionare la nuova etichetta e le funzionalità dal modello utilizzando la mappa.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Sei pronto per creare i dati del treno come DataFrame. Si utilizza sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Controlla la seconda riga
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Creare un set di treni/test
Dividi il set di dati 80/20 con randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Contiamo quante persone con un reddito inferiore/superiore a 50 sia nel set di formazione che in quello di test
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Costruire il regressore logistico
Ultimo ma non meno importante, puoi costruire il classificatore. Pyspark ha un'API chiamata LogisticRegression per eseguire la regressione logistica.
Inizializzi lr indicando la colonna label e le colonne feature. Imposti un massimo di 10 iterazioni e aggiungi un parametro di regolarizzazione con un valore di 0.3. Tieni presente che nella sezione successiva utilizzerai la convalida incrociata con una griglia di parametri per ottimizzare il modello
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Puoi vedere i coefficienti della regressione
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Passaggio 5) Addestrare e valutare il modello
Per generare una previsione per il set di test,
Puoi utilizzare linearModel con trasforma() su test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
È possibile stampare gli elementi nelle previsioni
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Ti interessano l'etichetta, la previsione e la probabilità
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Valuta il modello
È necessario esaminare la metrica di precisione per vedere quanto bene (o male) funziona il modello. Attualmente non esiste alcuna API per calcolare la misura di precisione Spark. Il valore predefinito è il ROC, curva caratteristica operativa del ricevitore. Si tratta di una metrica diversa che tiene conto del tasso di falsi positivi.
Prima di esaminare il ROC, costruiamo la misura di precisione. Hai più familiarità con questa metrica. La misura di precisione è la somma della previsione corretta sul numero totale di osservazioni.
Crei un DataFrame con l'etichetta e il file `prediction.
cm = predictions.select("label", "prediction")
Puoi controllare il numero di classe nell'etichetta e nel pronostico
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Ad esempio, nel set di test, ci sono 1578 famiglie con un reddito superiore a 50 e 5021 inferiori. Il classificatore, tuttavia, prevedeva 617 famiglie con reddito superiore a 50.
È possibile calcolare la precisione calcolando il conteggio quando le etichette vengono classificate correttamente sul numero totale di righe.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Puoi racchiudere tutto insieme e scrivere una funzione per calcolare la precisione.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
Metriche ROC
Il modulo BinaryClassificationEvaluator include le misure ROC. Il ricevente OperaLa curva caratteristica è un altro strumento comune utilizzato con la classificazione binaria. È molto simile alla curva precisione/richiamo, ma invece di tracciare la precisione rispetto al richiamo, la curva ROC mostra il tasso di veri positivi (ovvero il richiamo) rispetto al tasso di falsi positivi. Il tasso di falsi positivi è il rapporto tra i casi negativi erroneamente classificati come positivi. È uguale a uno meno il vero tasso negativo. Il vero tasso negativo è anche chiamato specificità. Pertanto la curva ROC traccia la sensibilità (richiamo) rispetto a 1 – specificità
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192areaSottoROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Passaggio 6) Ottimizzare l'iperparametro
Ultimo ma non meno importante, puoi ottimizzare gli iperparametri. Simile a scikit impara crei una griglia di parametri e aggiungi i parametri che desideri ottimizzare.
Per ridurre il tempo del calcolo, ottimizzi solo il parametro di regolarizzazione con solo due valori.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Infine, valuti il modello utilizzando il metodo di valutazione incrociata con 5 pieghe. Per allenarsi ci vogliono circa 16 minuti.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Tempo per addestrare il modello: 978.807 secondi
Il miglior iperparametro di regolarizzazione è 0.01, con una precisione dell'85.316%.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Puoi estrarre il parametro consigliato concatenando cvModel.bestModel con extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Sintesi
Spark è uno strumento fondamentale per un data scientist. Consente al professionista di connettere un'app a diverse origini dati, eseguire analisi dei dati senza problemi o aggiungere un modello predittivo.
Iniziare con Spark, è necessario avviare un Spark Contesto con:
'SparkContesto()'
e e SQL contesto per connettersi a un'origine dati:
'SQLContesto()'
Nel tutorial imparerai come addestrare una regressione logistica:
- Converti il set di dati in un Dataframe con:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Tieni presente che il nome della colonna dell'etichetta è newlabel e tutte le funzionalità sono raccolte in funzionalità. Modifica questi valori se diversi nel tuo set di dati.
- Creare il set di treno/test
randomSplit([.8,.2],seed=1234)
- Allena il modello
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Fai una previsione
linearModel.transform()