PySpark Tutorial per principianti: impara con gli ESEMPI

Prima di imparare PySpark, capiamo:

Cos'è Apache Spark?

Spark è una soluzione per big data che ha dimostrato di essere più semplice e veloce di Hadoop MapReduce. Spark è un software open source sviluppato dal laboratorio RAD della UC Berkeley nel 2009. Da quando è stato rilasciato al pubblico nel 2010, Spark è cresciuto in popolarità e viene utilizzato nel settore con una scala senza precedenti.

Nell'era di Big Data, i professionisti necessitano più che mai di strumenti veloci e affidabili per elaborare lo streaming di dati. Strumenti precedenti come MapReduce erano i preferiti ma erano lenti. Per superare questo problema, Spark offre una soluzione rapida e generica. La differenza principale tra Spark e MapReduce è quello Spark esegue calcoli in memoria durante la successiva sul disco rigido. Consente l'accesso ad alta velocità e l'elaborazione dei dati, riducendo i tempi da ore a minuti.

Cos'è PySpark?

PySpark è uno strumento creato da Apache Spark Comunità per l'utilizzo Python con Spark. Permette di lavorare con RDD (Resilient Distributed Dataset) in Python. Offre anche PySpark Shell da collegare Python API con Spark nucleo da avviare Spark Contesto. Spark è il nome del motore per realizzare il cluster computing, mentre PySpark is Pythonda utilizzare Spark.

Come si confronta la Spark funziona?

Spark si basa su un motore computazionale, il che significa che si occupa della pianificazione, distribuzione e monitoraggio dell'applicazione. Ogni attività viene eseguita su varie macchine di lavoro chiamate cluster di elaborazione. Un cluster informatico si riferisce alla divisione dei compiti. Una macchina esegue un compito, mentre le altre contribuiscono al risultato finale attraverso un compito diverso. Alla fine, tutte le attività vengono aggregate per produrre un output. IL Spark L'amministratore fornisce una panoramica a 360 gradi di vari Spark Lavori.

Che aspetto ha e come funziona il Spark Lavora
Che aspetto ha e come funziona il Spark Lavora

Spark è progettato per funzionare con

  • Python
  • Java
  • Scala
  • SQL

Una caratteristica significativa di Spark è la grande quantità di libreria integrata, incluso MLlib per l'apprendimento automatico. Spark è inoltre progettato per funzionare con i cluster Hadoop e può leggere un'ampia tipologia di file, inclusi dati Hive, CSV, JSON, dati Casandra tra gli altri.

Perché usare Spark?

Come futuro data practitioner, dovresti avere familiarità con le famose librerie di Python: Pandas e scikit-learn. Queste due librerie sono fantastiche per esplorare dataset di medie dimensioni. I progetti di machine learning regolari sono costruiti attorno alla seguente metodologia:

  • Caricare i dati sul disco
  • Importare i dati nella memoria della macchina
  • Elaborare/analizzare i dati
  • Costruisci il modello di machine learning
  • Memorizza la previsione su disco

Il problema sorge se il data scientist vuole elaborare dati troppo grandi per un computer. Agli albori della scienza dei dati, i professionisti campionavano i dati poiché la formazione su enormi set di dati non era sempre necessaria. Lo scienziato dei dati troverebbe un buon campione statistico, eseguirebbe un ulteriore controllo di robustezza e fornirebbe un modello eccellente.

Tuttavia, ci sono alcuni problemi con questo:

  • Il set di dati riflette il mondo reale?
  • I dati includono un esempio specifico?
  • Il modello è adatto per il campionamento?

Prendi ad esempio i consigli degli utenti. I suggerimenti si basano sul confronto degli utenti con altri utenti per valutare le loro preferenze. Se il professionista dei dati prende solo un sottoinsieme dei dati, non ci sarà una coorte di utenti molto simili tra loro. I suggerimenti devono essere eseguiti sull'intero set di dati o non essere eseguiti affatto.

Qual'è la soluzione?

La soluzione è evidente da molto tempo: suddividere il problema su più computer. Anche il calcolo parallelo presenta più problemi. Gli sviluppatori hanno spesso difficoltà a scrivere codice parallelo e finiscono per dover risolvere un sacco di problemi complessi relativi al multi-processing stesso.

Pyspark fornisce allo scienziato dei dati un'API che può essere utilizzata per risolvere i problemi di elaborazione dati parallela. Pyspark gestisce le complessità del multiprocessing, come la distribuzione dei dati, la distribuzione del codice e la raccolta dell'output dai worker su un cluster di macchine.

Spark può essere eseguito in modo autonomo ma molto spesso viene eseguito su un framework di cluster computing come Hadoop. Nelle fasi di test e sviluppo, tuttavia, un data scientist può operare in modo efficiente Spark sui loro box di sviluppo o laptop senza cluster

• Uno dei principali vantaggi di Spark è quello di costruire un'architettura che comprenda la gestione dello streaming di dati, query di dati fluide, previsioni di apprendimento automatico e accesso in tempo reale a varie analisi.

· XNUMX€ Spark lavora a stretto contatto con il linguaggio SQL, ovvero i dati strutturati. Permette di interrogare i dati in tempo reale.

• Il compito principale del data scientist è analizzare e costruire modelli predittivi. In breve, uno scienziato dei dati deve sapere come interrogare i dati utilizzando SQL, produrre un rapporto statistico e utilizzare l'apprendimento automatico per produrre previsioni. Il data scientist dedica una notevole quantità di tempo alla pulizia, trasformazione e analisi dei dati. Una volta che il set di dati o il flusso di lavoro dei dati è pronto, il data scientist utilizza varie tecniche per scoprire approfondimenti e modelli nascosti. La manipolazione dei dati dovrebbe essere robusta e allo stesso tempo facile da usare. Spark è lo strumento giusto grazie alla sua velocità e alle ricche API.

In questo PySpark tutorial, imparerai come costruire un classificatore con PySpark esempi.

Come installare PySpark conAWS

La sezione Currents, dedicata a opere audaci e innovative di artisti emergenti e affermati, include la prima statunitense di Mare’s Nest di Ben Rivers, descritto come “un enigmatico road movie ambientato in un mondo post-apocalittico governato da bambini”. Tra gli altri titoli spiccano Dracula di Radu Jude e With Hasan in Gaza di Kamal Aljafari. Jupyter il team crea un'immagine Docker da eseguire Spark in modo efficiente. Di seguito sono riportati i passaggi che è possibile seguire per installare PySpark istanza in AWS.

Consulta il nostro tutorial su AWS e TensorFlow

Passaggio 1: crea un'istanza

Prima di tutto, devi creare un'istanza. Vai al tuo account AWS e avvia l'istanza. Puoi aumentare lo spazio di archiviazione fino a 15 g e utilizzare lo stesso gruppo di sicurezza del tutorial di TensorFlow.

Passaggio 2: aprire la connessione

Apri la connessione e installa il contenitore docker. Per maggiori dettagli, fai riferimento al tutorial con TensorFlow con docker. Tieni presente che devi trovarti nella directory di lavoro corretta.

Basta eseguire questi codici per installare Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Passaggio 3: riaprire la connessione e installare Spark

Dopo aver riaperto la connessione, puoi installare l'immagine contenente PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Passo 4: aperto Jupyter

Controlla il contenitore e il suo nome

docker ps

Avvia la finestra mobile con i log della finestra mobile seguiti dal nome della finestra mobile. Ad esempio, la finestra mobile registra zealous_goldwasser

Vai al tuo browser e avvia Jupyter. L'indirizzo è http://localhost:8888/. Incolla la password fornita dal terminale.

Note:: se desideri caricare/scaricare un file sulla tua macchina AWS, puoi utilizzare il software Cyberduck, https://cyberduck.io/.

Come installare PySpark on Windows/Mac con Conda

Di seguito è riportato un processo dettagliato su come installare PySpark on Windows/Mac utilizzando Anaconda:

Per installare Spark sul tuo computer locale, una pratica consigliata è creare un nuovo ambiente conda. Questo nuovo ambiente verrà installato Python 3.6 Spark e tutte le dipendenze.

Utente Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Utente

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Puoi modificare il file .yml. Sii cauto con il trattino. Sono necessari due spazi prima –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Salvalo e crea l'ambiente. Ci vuole un po' di tempo

conda env create -f hello-spark.yml

Per maggiori dettagli sulla posizione, consultare il tutorial Installa TensorFlow

Puoi controllare tutto l'ambiente installato sulla tua macchina

conda env list
Activate hello-spark

Utente Mac

source activate hello-spark

Windows Utente

activate hello-spark

Nota: Hai già creato un ambiente TensorFlow specifico per eseguire i tutorial su TensorFlow. È più conveniente creare un nuovo ambiente diverso da hello-tf. Non ha senso sovraccaricare hello-tf con Spark o qualsiasi altra libreria di machine learning.

Immagina che la maggior parte del tuo progetto coinvolga TensorFlow, ma devi utilizzarlo Spark per un progetto particolare. Puoi impostare un ambiente TensorFlow per tutto il tuo progetto e creare un ambiente separato per Spark. Puoi aggiungere quante librerie vuoi Spark ambiente come desideri senza interferire con l'ambiente TensorFlow. Una volta che hai finito con Sparkdel progetto, puoi cancellarlo senza influire sull'ambiente TensorFlow.

Jupyter

Apri Jupyter Notebook e prova se PySpark funziona. In un nuovo notebook incolla il seguente PySpark Codice d'esempio:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Se viene visualizzato un errore, è probabile che sia così Java non è installato sulla tua macchina. In mac, apri il terminale e scrivi java -version, se c'è una versione java, assicurati che sia 1.8. In Windows, vai su Applicazione e controlla se è presente un file Java cartella. Se c'è un Java cartella, controllalo Java 1.8 è installato. Al momento della stesura di questo articolo, PySpark non è compatibile con Java9 e versioni successive.

Se è necessario installare Java, devi pensare link e scaricare jdk-8u181-windows-x64.exe

Jupyter

Per gli utenti Mac, si consiglia di utilizzare `brew.`

brew tap caskroom/versions
brew cask install java8

Fare riferimento a questo tutorial passo passo su come installare Java

Note:: utilizzare rimuovi per cancellare completamente un ambiente.

 conda env remove -n hello-spark -y

Spark Contesto

SparkIl contesto è il motore interno che permette le connessioni con i cluster. Se vuoi eseguire un'operazione, hai bisogno di un file SparkContesto.

Creare un SparkContesto

Prima di tutto, devi avviare a SparkContesto.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Ora che il SparkIl contesto è pronto, puoi creare una raccolta di dati chiamata RDD, Resilient Distributed Dataset. Il calcolo in un RDD viene automaticamente parallelizzato nel cluster.

nums= sc.parallelize([1,2,3,4])

Puoi accedere alla prima riga con take

nums.take(1)
[1]

È possibile applicare una trasformazione ai dati con una funzione lambda. Nel PySpark nell'esempio seguente, restituisci il quadrato dei num. È una trasformazione della mappa

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContesto

Un modo più conveniente è utilizzare DataFrame. SparkIl contesto è già impostato, puoi usarlo per creare dataFrame. È inoltre necessario dichiarare SQLContext

SQLContext consente di connettere il motore con diverse origini dati. Viene utilizzato per avviare le funzionalità di Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Ora in questo Spark lezione Python, creiamo un elenco di tuple. Ogni tupla conterrà il nome delle persone e la loro età. Sono necessari quattro passaggi:

Passo 1) Crea l'elenco di tuple con le informazioni

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Passo 2) Costruisci un RDD

rdd = sc.parallelize(list_p)

Passo 3) Converti le tuple

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Passo 4) Creare un contesto DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Se vuoi accedere al tipo di ciascuna funzionalità, puoi usare printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Esempio di machine learning con PySpark

Ora che hai una breve idea di Spark e SQLContext, sei pronto per creare il tuo primo programma di machine learning.

Di seguito sono riportati i passaggi per creare un programma di apprendimento automatico con PySpark:

  • Passo 1) Operazioni di base con PySpark
  • Passo 2) Preelaborazione dei dati
  • Passo 3) Costruisci una pipeline di elaborazione dati
  • Passo 4) Costruisci il classificatore: logistico
  • Passo 5) Formare e valutare il modello
  • Passo 6) Ottimizza l'iperparametro

In questo PySpark Tutorial di Machine Learning, useremo il dataset per adulti. Lo scopo di questo tutorial è imparare a usare Pyspark. Per maggiori informazioni sul dataset, fare riferimento a questo tutorial.

Tieni presente che il set di dati non è significativo e potresti pensare che il calcolo richieda molto tempo. Spark è progettato per elaborare una notevole quantità di dati. SparkLe prestazioni di aumentano rispetto ad altre librerie di machine learning quando il set di dati elaborato diventa più grande.

Passaggio 1) Operazioni di base con PySpark

Prima di tutto, è necessario inizializzare SQLContext che non sia ancora iniziato.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

quindi, puoi leggere il file cvs con sqlContext.read.csv. Per dirlo si utilizza inferSchema impostato su True Spark per indovinare automaticamente il tipo di dati. Per impostazione predefinita, è impostato su False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Diamo un'occhiata al tipo di dati

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Puoi vedere i dati con show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Se non hai impostato inderShema su True, ecco cosa sta succedendo al tipo. Ci sono tutti in corda.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Per convertire la variabile continua nel formato corretto, puoi utilizzare la rifusione delle colonne. Puoi usare withColumn per dirlo Spark su quale colonna operare la trasformazione.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Seleziona colonne

Puoi selezionare e mostrare le righe con select e i nomi delle funzionalità. Di seguito vengono selezionati età e fnlwgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Contare per gruppo

Se vuoi contare il numero di occorrenze per gruppo, puoi concatenare:

  • raggruppa per()
  • contare()

insieme. Nel PySpark nell'esempio seguente, conti il ​​numero di righe in base al livello di istruzione.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Descrivi i dati

Per ottenere una statistica riepilogativa dei dati, è possibile utilizzare description(). Calcolerà:

  • contare
  • significare
  • deviazione standard
  • verbale
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Se desideri la statistica riassuntiva di una sola colonna, aggiungi il nome della colonna all'interno di description()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Calcolo della tabella incrociata

In alcune occasioni, può essere interessante vedere le statistiche descrittive tra due colonne a coppie. Ad esempio, puoi contare il numero di persone con un reddito inferiore o superiore a 50k in base al livello di istruzione. Questa operazione è chiamata tabella incrociata.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Puoi vedere che nessuna persona ha entrate superiori a 50 quando è giovane.

Colonna di rilascio

Esistono due API intuitive per eliminare le colonne:

  • drop(): rilascia una colonna
  • dropna(): elimina gli NA

Sotto trascini la colonna education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Filtra i dati

È possibile utilizzare filter() per applicare statistiche descrittive in un sottoinsieme di dati. Ad esempio, puoi contare il numero di persone di età superiore a 40 anni

df.filter(df.age > 40).count()

13443

Descriptcinque statistiche per gruppo

Infine, puoi raggruppare i dati per gruppo e calcolare operazioni statistiche come la media.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Passaggio 2) Preelaborazione dei dati

L’elaborazione dei dati è un passaggio fondamentale nell’apprendimento automatico. Dopo aver rimosso i dati inutili, ottieni alcune informazioni importanti.

Ad esempio, sai che l'età non è una funzione lineare del reddito. Quando le persone sono giovani, il loro reddito è solitamente inferiore a quello della mezza età. Dopo il pensionamento, una famiglia utilizza i propri risparmi, il che significa una diminuzione del reddito. Per catturare questo modello, puoi aggiungere un quadrato alla funzione età

Aggiungi il quadrato dell'età

Per aggiungere una nuova funzionalità, è necessario:

  1. Seleziona la colonna
  2. Applica la trasformazione e aggiungila a DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Puoi vedere che age_square è stato aggiunto con successo al data frame. Puoi cambiare l'ordine delle variabili con select. Di seguito, riporti age_square subito dopo age.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Escludere Olanda-Paesi Bassi

Quando un gruppo all'interno di una caratteristica ha una sola osservazione, non apporta alcuna informazione al modello. Al contrario, può portare ad un errore durante la convalida incrociata.

Controlliamo l'origine della famiglia

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

La funzione native_country ha solo una famiglia proveniente dai Paesi Bassi. Lo escludi.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Passaggio 3) Costruire una pipeline di elaborazione dati

Simile a scikit-learn, Pyspark ha una pipeline API.

Una pipeline è molto comoda per mantenere la struttura dei dati. Inserisci i dati nella pipeline. All'interno della pipeline vengono eseguite varie operazioni, l'output viene utilizzato per alimentare l'algoritmo.

Ad esempio, una trasformazione universale nell'apprendimento automatico consiste nel convertire una stringa in un codificatore attivo, ovvero una colonna per gruppo. Un codificatore attivo è solitamente una matrice piena di zeri.

I passaggi per trasformare i dati sono molto simili a scikit-learn. Devi:

  • Indicizzare la stringa in numerico
  • Crea l'unico codificatore interessante
  • Trasforma i dati

Due API svolgono il lavoro: StringIndexer, OneHotEncoder

  1. Prima di tutto, seleziona la colonna stringa da indicizzare. inputCol è il nome della colonna nel set di dati. outputCol è il nuovo nome assegnato alla colonna trasformata.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Adatta i dati e trasformali
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Crea le colonne di notizie in base al gruppo. Ad esempio, se nella funzione sono presenti 10 gruppi, la nuova matrice avrà 10 colonne, una per ciascun gruppo.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Costruisci la pipeline

Costruirai una pipeline per convertire tutte le funzionalità precise e aggiungerle al set di dati finale. La pipeline avrà quattro operazioni, ma sentiti libero di aggiungere quante operazioni desideri.

  1. Codificare i dati categorici
  2. Indicizzare la caratteristica dell'etichetta
  3. Aggiungi variabile continua
  4. Assemblare i passaggi.

Ogni passaggio viene memorizzato in un elenco denominato fasi. Questo elenco indicherà a VectorAssembler quale operazione eseguire all'interno della pipeline.

1. Codificare i dati categorici

Questo passaggio è esattamente lo stesso dell'esempio precedente, tranne per il fatto che si ripetono tutte le funzionalità categoriche.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Indicizzare la caratteristica dell'etichetta

Spark, come molte altre librerie, non accetta valori stringa per l'etichetta. Converti la funzionalità etichetta con StringIndexer e la aggiungi alle fasi dell'elenco

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Aggiungi variabile continua

L'inputCols di VectorAssembler è un elenco di colonne. È possibile creare un nuovo elenco contenente tutte le nuove colonne. Il codice seguente popola l'elenco con le caratteristiche categoriche codificate e le caratteristiche continue.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Assemblare i gradini.

Infine, esegui tutti i passaggi in VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Ora che tutti i passaggi sono pronti, esegui il push dei dati nella pipeline.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Se controlli il nuovo set di dati, puoi vedere che contiene tutte le funzionalità, trasformate e non trasformate. Ti interessano solo la nuova etichetta e le funzionalità. Le feature includono tutte le feature trasformate e le variabili continue.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Passaggio 4) Costruisci il classificatore: logistico

Per rendere il calcolo più veloce, converti il ​​modello in un DataFrame.

È necessario selezionare la nuova etichetta e le funzionalità dal modello utilizzando la mappa.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Sei pronto per creare i dati del treno come DataFrame. Si utilizza sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Controlla la seconda riga

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Creare un set di treni/test

Dividi il set di dati 80/20 con randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Contiamo quante persone con un reddito inferiore/superiore a 50 sia nel set di formazione che in quello di test

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Costruire il regressore logistico

Ultimo ma non meno importante, puoi costruire il classificatore. Pyspark ha un'API chiamata LogisticRegression per eseguire la regressione logistica.

Inizializzi lr indicando la colonna label e le colonne feature. Imposti un massimo di 10 iterazioni e aggiungi un parametro di regolarizzazione con un valore di 0.3. Tieni presente che nella sezione successiva utilizzerai la convalida incrociata con una griglia di parametri per ottimizzare il modello

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Puoi vedere i coefficienti della regressione

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Passaggio 5) Addestrare e valutare il modello

Per generare una previsione per il set di test,

Puoi utilizzare linearModel con trasforma() su test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

È possibile stampare gli elementi nelle previsioni

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Ti interessano l'etichetta, la previsione e la probabilità

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Valuta il modello

È necessario esaminare la metrica di precisione per vedere quanto bene (o male) funziona il modello. Attualmente non esiste alcuna API per calcolare la misura di precisione Spark. Il valore predefinito è il ROC, curva caratteristica operativa del ricevitore. Si tratta di una metrica diversa che tiene conto del tasso di falsi positivi.

Prima di esaminare il ROC, costruiamo la misura di precisione. Hai più familiarità con questa metrica. La misura di precisione è la somma della previsione corretta sul numero totale di osservazioni.

Crei un DataFrame con l'etichetta e il file `prediction.

cm = predictions.select("label", "prediction")

Puoi controllare il numero di classe nell'etichetta e nel pronostico

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Ad esempio, nel set di test, ci sono 1578 famiglie con un reddito superiore a 50 e 5021 inferiori. Il classificatore, tuttavia, prevedeva 617 famiglie con reddito superiore a 50.

È possibile calcolare la precisione calcolando il conteggio quando le etichette vengono classificate correttamente sul numero totale di righe.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Puoi racchiudere tutto insieme e scrivere una funzione per calcolare la precisione.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

Metriche ROC

Il modulo BinaryClassificationEvaluator include le misure ROC. Il ricevente OperaLa curva caratteristica è un altro strumento comune utilizzato con la classificazione binaria. È molto simile alla curva precisione/richiamo, ma invece di tracciare la precisione rispetto al richiamo, la curva ROC mostra il tasso di veri positivi (ovvero il richiamo) rispetto al tasso di falsi positivi. Il tasso di falsi positivi è il rapporto tra i casi negativi erroneamente classificati come positivi. È uguale a uno meno il vero tasso negativo. Il vero tasso negativo è anche chiamato specificità. Pertanto la curva ROC traccia la sensibilità (richiamo) rispetto a 1 – specificità

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192areaSottoROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Passaggio 6) Ottimizzare l'iperparametro

Ultimo ma non meno importante, puoi ottimizzare gli iperparametri. Simile a scikit impara crei una griglia di parametri e aggiungi i parametri che desideri ottimizzare.

Per ridurre il tempo del calcolo, ottimizzi solo il parametro di regolarizzazione con solo due valori.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Infine, valuti il ​​modello utilizzando il metodo di valutazione incrociata con 5 pieghe. Per allenarsi ci vogliono circa 16 minuti.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Tempo per addestrare il modello: 978.807 secondi

Il miglior iperparametro di regolarizzazione è 0.01, con una precisione dell'85.316%.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Puoi estrarre il parametro consigliato concatenando cvModel.bestModel con extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Sintesi

Spark è uno strumento fondamentale per un data scientist. Consente al professionista di connettere un'app a diverse origini dati, eseguire analisi dei dati senza problemi o aggiungere un modello predittivo.

Iniziare con Spark, è necessario avviare un Spark Contesto con:

'SparkContesto()'

e e SQL contesto per connettersi a un'origine dati:

'SQLContesto()'

Nel tutorial imparerai come addestrare una regressione logistica:

  1. Converti il ​​set di dati in un Dataframe con:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Tieni presente che il nome della colonna dell'etichetta è newlabel e tutte le funzionalità sono raccolte in funzionalità. Modifica questi valori se diversi nel tuo set di dati.

  1. Creare il set di treno/test
randomSplit([.8,.2],seed=1234)
  1. Allena il modello
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Fai una previsione
linearModel.transform()