PySpark Tutorial für Anfänger: Lernen mit BEISPIELEN

Bevor Sie Py lernenSpark, lass uns verstehen:

Was ist Apache Spark?

Spark ist eine Big-Data-Lösung, die nachweislich einfacher und schneller ist als Hadoop MapReduce. Spark ist eine Open-Source-Software, die 2009 vom RAD-Labor der UC Berkeley entwickelt wurde. Seit ihrer Veröffentlichung im Jahr 2010 hat Spark erfreut sich wachsender Beliebtheit und wird in der Branche in beispiellosem Umfang eingesetzt.

In der Ära von Big Data, Praktiker benötigen mehr denn je schnelle und zuverlässige Tools zur Verarbeitung von Datenströmen. Frühere Tools wie MapReduce waren beliebt, aber langsam. Um dieses Problem zu lösen, Spark bietet eine Lösung, die sowohl schnell als auch universell ist. Der Hauptunterschied zwischen Spark und MapReduce ist das Spark führt Berechnungen im Speicher aus, während sie später auf der Festplatte ausgeführt werden. Dies ermöglicht Hochgeschwindigkeitszugriff und Datenverarbeitung und verkürzt die Zeit von Stunden auf Minuten.

Was ist PySpark?

PySpark ist ein von Apache erstelltes Tool Spark Community zur Nutzung Python mit Spark. Es ermöglicht das Arbeiten mit RDD (Resilient Distributed Dataset) in Python. Es bietet auch PySpark Shell zum Verknüpfen Python APIs mit Spark Kern zu initiieren Spark Kontext. Spark ist die Namensmaschine zur Realisierung von Cluster-Computing, während PySpark is Python's Bibliothek zu verwenden Spark.

Wie schneidet Spark ung?

Spark basiert auf einer Rechenmaschine, d. h. sie kümmert sich um die Planung, Verteilung und Überwachung der Anwendung. Jede Aufgabe wird auf verschiedenen Arbeitsmaschinen ausgeführt, die als Rechencluster bezeichnet werden. Ein Rechencluster bezieht sich auf die Aufgabenteilung. Eine Maschine führt eine Aufgabe aus, während die anderen durch eine andere Aufgabe zum Endergebnis beitragen. Am Ende werden alle Aufgaben zusammengefasst, um ein Ergebnis zu erzeugen. Das Spark Der Administrator bietet einen 360-Grad-Überblick über verschiedene Spark Jobs.

Wie funktioniert Spark Arbeiten
Wie funktioniert Spark Arbeiten

Spark ist entworfen, um mit zu arbeiten

  • Python
  • Java
  • Scala
  • SQL

Ein wesentliches Merkmal von Spark ist die große Menge an integrierten Bibliotheken, einschließlich MLlib für maschinelles Lernen. Spark ist auch für die Arbeit mit Hadoop-Clustern konzipiert und kann zahlreiche Dateitypen lesen, darunter unter anderem Hive-Daten, CSV-, JSON- und Casandra-Daten.

Warum Spark?

Als zukünftiger Datenpraktiker sollten Sie mit den berühmten Python-Bibliotheken vertraut sein: Pandas und scikit-learn. Diese beiden Bibliotheken eignen sich hervorragend zum Erkunden von Datensätzen bis zur mittleren Größe. Regelmäßige Machine-Learning-Projekte basieren auf der folgenden Methodik:

  • Laden Sie die Daten auf die Festplatte
  • Importieren Sie die Daten in den Speicher der Maschine
  • Verarbeiten/analysieren Sie die Daten
  • Erstellen Sie das Modell für maschinelles Lernen
  • Speichern Sie die Vorhersage wieder auf der Festplatte

Das Problem entsteht, wenn der Datenwissenschaftler Daten verarbeiten möchte, die für einen Computer zu groß sind. In früheren Zeiten der Datenwissenschaft probierten die Praktiker die Daten aus, da eine Schulung an riesigen Datenmengen nicht immer erforderlich war. Der Datenwissenschaftler würde eine gute statistische Stichprobe finden, eine zusätzliche Robustheitsprüfung durchführen und ein hervorragendes Modell erstellen.

Allerdings gibt es dabei einige Probleme:

  • Spiegelt der Datensatz die reale Welt wider?
  • Enthalten die Daten ein konkretes Beispiel?
  • Ist das Modell für die Bemusterung geeignet?

Nehmen Sie zum Beispiel die Empfehlung eines Benutzers. Empfehlungsgeber verlassen sich bei der Bewertung ihrer Präferenzen darauf, Benutzer mit anderen Benutzern zu vergleichen. Wenn der Datenpraktiker nur eine Teilmenge der Daten verwendet, wird es keine Kohorte von Benutzern geben, die einander sehr ähnlich sind. Empfehlungsgeber müssen entweder den gesamten Datensatz nutzen oder gar nicht.

Was ist die Lösung?

Die Lösung ist schon lange klar: das Problem auf mehrere Computer verteilen. Paralleles Rechnen bringt auch mehrere Probleme mit sich. Entwickler haben oft Probleme, parallelen Code zu schreiben und müssen am Ende eine Reihe komplexer Probleme rund um die Mehrfachverarbeitung selbst lösen.

Pyspark bietet dem Datenwissenschaftler eine API, mit der er Probleme der parallelen Datenverarbeitung lösen kann. Pyspark bewältigt die Komplexität der Mehrfachverarbeitung, wie z. B. die Verteilung der Daten, die Verteilung des Codes und das Sammeln der Ausgabe der Worker auf einem Maschinencluster.

Spark kann eigenständig ausgeführt werden, läuft aber meistens auf einem Cluster-Computing-Framework wie Hadoop. In Test und Entwicklung kann ein Datenwissenschaftler jedoch effizient ausführen Spark auf ihren Entwicklungsboxen oder Laptops ohne Cluster

• Einer der Hauptvorteile von Spark ist der Aufbau einer Architektur, die Datenstreaming-Management, nahtlose Datenabfragen, Vorhersagen durch maschinelles Lernen und Echtzeitzugriff auf verschiedene Analysen umfasst.

• Spark arbeitet eng mit der SQL-Sprache, also strukturierten Daten, zusammen. Es ermöglicht die Abfrage der Daten in Echtzeit.

• Die Hauptaufgabe eines Datenwissenschaftlers besteht darin, Vorhersagemodelle zu analysieren und zu erstellen. Kurz gesagt, ein Datenwissenschaftler muss wissen, wie er Daten abfragt SQL, erstellen Sie einen statistischen Bericht und nutzen Sie maschinelles Lernen, um Vorhersagen zu treffen. Datenwissenschaftler verbringen einen erheblichen Teil ihrer Zeit mit der Bereinigung, Transformation und Analyse der Daten. Sobald der Datensatz oder Datenworkflow fertig ist, verwendet der Datenwissenschaftler verschiedene Techniken, um Erkenntnisse und verborgene Muster zu entdecken. Die Datenmanipulation sollte robust und gleichzeitig einfach zu bedienen sein. Spark ist dank seiner Geschwindigkeit und umfangreichen APIs das richtige Tool.

In diesem PySpark Im Tutorial erfahren Sie, wie Sie mit Py einen Klassifikator erstellenSpark Beispiele.

So installieren Sie PySpark mit AWS

Der Jupyter Das Team erstellt ein Docker-Image zur Ausführung Spark effizient. Im Folgenden finden Sie die Schritte, die Sie zur Installation von Py ausführen könnenSpark Instanz in AWS.

Weitere Informationen finden Sie in unserem Tutorial AWS und TensorFlow

Schritt 1: Erstellen Sie eine Instanz

Zunächst müssen Sie eine Instanz erstellen. Gehen Sie zu Ihrem AWS-Konto und starten Sie die Instanz. Sie können den Speicher auf bis zu 15 g erhöhen und dieselbe Sicherheitsgruppe wie im TensorFlow-Tutorial verwenden.

Schritt 2: Öffnen Sie die Verbindung

Öffnen Sie die Verbindung und installieren Sie den Docker-Container. Weitere Einzelheiten finden Sie im Tutorial mit TensorFlow mit Docker. Beachten Sie, dass Sie sich im richtigen Arbeitsverzeichnis befinden müssen.

Führen Sie einfach diese Codes aus, um Docker zu installieren:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Schritt 3: Öffnen Sie die Verbindung erneut und installieren Sie Spark

Nachdem Sie die Verbindung erneut geöffnet haben, können Sie das Image mit Py installierenSpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Schritt 4: Öffnen Jupyter

Überprüfen Sie den Container und seinen Namen

docker ps

Starten Sie den Docker mit Docker-Protokollen gefolgt vom Namen des Dockers. Beispielsweise protokolliert Docker zealous_goldwasser

Gehen Sie zu Ihrem Browser und starten Sie ihn Jupyter. Die Adresse lautet http://localhost:8888/. Fügen Sie das vom Terminal angegebene Passwort ein.

Hinweis: Wenn Sie eine Datei auf Ihre AWS-Maschine hoch-/herunterladen möchten, können Sie die Software Cyberduck verwenden. https://cyberduck.io/.

So installieren Sie PySpark on Windows/Mac mit Conda

Nachfolgend finden Sie eine detaillierte Anleitung zur Installation von PySpark on Windows/Mac mit Anaconda:

So installieren Sie Spark Auf Ihrem lokalen Computer empfiehlt es sich, eine neue Conda-Umgebung zu erstellen. Diese neue Umgebung wird installiert Python 3.6, Spark und alle Abhängigkeiten.

Mac-Benutzer

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Mitglied

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Sie können die .yml-Datei bearbeiten. Seien Sie vorsichtig mit dem Einzug. Vorher sind zwei Leerzeichen erforderlich –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Speichern Sie es und erstellen Sie die Umgebung. Es braucht etwas Zeit

conda env create -f hello-spark.yml

Weitere Einzelheiten zum Standort finden Sie im Tutorial „TensorFlow installieren“

Sie können die gesamte auf Ihrem Computer installierte Umgebung überprüfen

conda env list
Activate hello-spark

Mac-Benutzer

source activate hello-spark

Windows Mitglied

activate hello-spark

Hinweis: Sie haben bereits eine bestimmte TensorFlow-Umgebung erstellt, um die Tutorials auf TensorFlow auszuführen. Es ist bequemer, eine andere neue Umgebung als hello-tf zu erstellen. Es macht keinen Sinn, hello-tf damit zu überladen Spark oder andere Bibliotheken für maschinelles Lernen.

Stellen Sie sich vor, der größte Teil Ihres Projekts beinhaltet TensorFlow, Sie müssen es jedoch verwenden Spark für ein bestimmtes Projekt. Sie können eine TensorFlow-Umgebung für Ihr gesamtes Projekt festlegen und eine separate Umgebung dafür erstellen Spark. Sie können beliebig viele Bibliotheken hinzufügen Spark Umgebung nach Ihren Wünschen, ohne die TensorFlow-Umgebung zu beeinträchtigen. Sobald Sie damit fertig sind Sparkkönnen Sie das Projekt löschen, ohne dass dies Auswirkungen auf die TensorFlow-Umgebung hat.

Jupyter

Öffne Jupyter Notizbuch und versuchen Sie, ob PySpark funktioniert. Fügen Sie in ein neues Notebook den folgenden Py einSpark Beispielcode:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Wenn ein Fehler angezeigt wird, ist es wahrscheinlich, dass Java ist nicht auf Ihrem Computer installiert. Öffnen Sie auf dem Mac das Terminal und schreiben Sie java -version. Wenn eine Java-Version vorhanden ist, stellen Sie sicher, dass sie 1.8 ist. In Windows, gehen Sie zu Anwendung und prüfen Sie, ob es eine Java Ordner. Wenn es einen Java Ordner, überprüfen Sie, ob Java 1.8 ist installiert. Zum Zeitpunkt dieses Schreibens ist PySpark ist nicht kompatibel mit Java9 und höher.

Wenn Sie installieren müssen Java, du denkst Link und laden Sie jdk-8u181-windows-x64.exe herunter

Jupyter

Für Mac-Benutzer wird die Verwendung von „brew“ empfohlen

brew tap caskroom/versions
brew cask install java8

Weitere Informationen finden Sie in dieser Schritt-für-Schritt-Anleitung zur Installation Java

Hinweis: Verwenden Sie Remove, um eine Umgebung vollständig zu löschen.

 conda env remove -n hello-spark -y

Spark Kontext

SparkKontext ist die interne Engine, die die Verbindungen mit den Clustern ermöglicht. Wenn Sie eine Operation ausführen möchten, benötigen Sie einen SparkKontext.

Erstellen Sie SparkKontext

Zunächst müssen Sie eine initiieren SparkKontext.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Nun, dass die SparkDer Kontext ist bereit. Sie können eine Datensammlung namens RDD (Resilient Distributed Dataset) erstellen. Die Berechnung in einem RDD wird automatisch über den Cluster hinweg parallelisiert.

nums= sc.parallelize([1,2,3,4])

Mit take gelangt man in die erste Zeile

nums.take(1)
[1]

Sie können mit einer Lambda-Funktion eine Transformation auf die Daten anwenden. Im PySpark Im folgenden Beispiel geben Sie das Quadrat von Zahlen zurück. Es handelt sich um eine Kartentransformation

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

Eine bequemere Möglichkeit ist die Verwendung des DataFrame. SparkDer Kontext ist bereits festgelegt. Sie können ihn zum Erstellen des DataFrame verwenden. Sie müssen auch den SQLContext deklarieren

SQLContext ermöglicht die Verbindung der Engine mit verschiedenen Datenquellen. Es wird verwendet, um die Funktionen von zu starten Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Jetzt hier Spark Lernprogramm Python, erstellen wir eine Liste von Tupeln. Jedes Tupel enthält den Namen der Personen und ihr Alter. Vier Schritte sind erforderlich:

Schritt 1) Erstellen Sie die Liste der Tupel mit den Informationen

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Schritt 2) Erstellen Sie ein RDD

rdd = sc.parallelize(list_p)

Schritt 3) Konvertieren Sie die Tupel

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Schritt 4) Erstellen Sie einen DataFrame-Kontext

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Wenn Sie auf den Typ jedes Features zugreifen möchten, können Sie printSchema() verwenden

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Beispiel für maschinelles Lernen mit PySpark

Jetzt haben Sie eine kurze Vorstellung davon Spark und SQLContext sind Sie bereit, Ihr erstes Programm für maschinelles Lernen zu erstellen.

Im Folgenden finden Sie die Schritte zum Erstellen eines Machine-Learning-Programms mit PySpark:

  • Schritt 1) Grundlegende Bedienung mit PySpark
  • Schritt 2) Datenvorverarbeitung
  • Schritt 3) Erstellen Sie eine Datenverarbeitungspipeline
  • Schritt 4) Erstellen Sie den Klassifikator: logistic
  • Schritt 5) Trainieren und evaluieren Sie das Modell
  • Schritt 6) Optimieren Sie den Hyperparameter

In diesem PySpark Im Machine-Learning-Tutorial verwenden wir den Erwachsenendatensatz. Ziel dieses Tutorials ist es, die Verwendung von Pyspark zu erlernen. Weitere Informationen zum Datensatz finden Sie in diesem Tutorial.

Beachten Sie, dass der Datensatz nicht aussagekräftig ist und Sie möglicherweise denken, dass die Berechnung lange dauert. Spark ist darauf ausgelegt, große Datenmengen zu verarbeiten. SparkDie Leistung steigt im Vergleich zu anderen Bibliotheken für maschinelles Lernen, wenn der verarbeitete Datensatz größer wird.

Schritt 1) ​​Grundlegende Bedienung mit PySpark

Zunächst müssen Sie den SQLContext initialisieren, der noch nicht initiiert ist.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

Anschließend können Sie die CVS-Datei mit sqlContext.read.csv lesen. Um dies festzustellen, verwenden Sie inferSchema auf „True“. Spark um die Art der Daten automatisch zu erraten. Standardmäßig ist die Einstellung auf „Falsch“ gesetzt.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Werfen wir einen Blick auf den Datentyp

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Sie können die Daten mit anzeigen sehen.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Wenn Sie inderShema nicht auf „True“ gesetzt haben, passiert Folgendes mit dem Typ. Es sind alle in einer Reihe.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Um die kontinuierliche Variable in das richtige Format umzuwandeln, können Sie die Spalten neu umwandeln. Sie können withColumn verwenden, um dies zu erkennen Spark in welcher Spalte die Transformation durchgeführt werden soll.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Spalten auswählen

Sie können die Zeilen mit „select“ und den Namen der Features auswählen und anzeigen. Unten werden Alter und Fnlwgt ausgewählt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Zählen Sie nach Gruppe

Wenn Sie die Anzahl der Vorkommen nach Gruppen zählen möchten, können Sie Folgendes verketten:

  • gruppiere nach()
  • Anzahl ()

zusammen. Im PySpark Im folgenden Beispiel zählen Sie die Anzahl der Zeilen nach dem Bildungsniveau.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Beschreiben Sie die Daten

Um eine zusammenfassende Statistik der Daten zu erhalten, können Sie beschreiben() verwenden. Es wird Folgendes berechnet:

  • zählen
  • bedeuten
  • Standardabweichung
  • Min.
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Wenn Sie die zusammenfassende Statistik nur einer Spalte wünschen, fügen Sie den Namen der Spalte in beschreiben() ein.

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Kreuztabellenberechnung

In manchen Fällen kann es interessant sein, die beschreibenden Statistiken zwischen zwei paarweisen Spalten zu sehen. Sie können beispielsweise die Anzahl der Personen mit einem Einkommen unter oder über 50 nach Bildungsniveau zählen. Diese Operation wird als Kreuztabelle bezeichnet.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Sie können sehen, dass kein junger Mensch einen Umsatz von mehr als 50 hat.

Spalte löschen

Es gibt zwei intuitive APIs zum Löschen von Spalten:

  • drop(): Eine Spalte löschen
  • dropna(): NAs löschen

Darunter fügen Sie die Spalte education_num ein

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Daten filtern

Sie können filter() verwenden, um deskriptive Statistiken auf eine Teilmenge von Daten anzuwenden. Sie können beispielsweise die Anzahl der Personen über 40 Jahre zählen

df.filter(df.age > 40).count()

13443

Descriptive Statistiken nach Gruppe

Schließlich können Sie Daten nach Gruppen gruppieren und statistische Operationen wie den Mittelwert berechnen.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Schritt 2) Datenvorverarbeitung

Die Datenverarbeitung ist ein entscheidender Schritt beim maschinellen Lernen. Nachdem Sie Mülldaten entfernt haben, erhalten Sie einige wichtige Erkenntnisse.

Sie wissen beispielsweise, dass das Alter keine lineare Funktion des Einkommens ist. Wenn die Menschen jung sind, ist ihr Einkommen normalerweise niedriger als im mittleren Alter. Nach der Pensionierung verbraucht ein Haushalt seine Ersparnisse, was zu einem Einkommensrückgang führt. Um dieses Muster zu erfassen, können Sie der Altersfunktion ein Quadrat hinzufügen

Altersquadrat hinzufügen

Um eine neue Funktion hinzuzufügen, müssen Sie Folgendes tun:

  1. Wählen Sie die Spalte aus
  2. Wenden Sie die Transformation an und fügen Sie sie dem DataFrame hinzu
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Sie können sehen, dass age_square erfolgreich zum Datenrahmen hinzugefügt wurde. Mit „select“ können Sie die Reihenfolge der Variablen ändern. Unten geben Sie age_square direkt nach age ein.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Ohne Holland-Niederlande

Wenn eine Gruppe innerhalb eines Features nur eine Beobachtung hat, bringt sie keine Informationen in das Modell ein. Im Gegenteil kann es zu einem Fehler bei der Kreuzvalidierung kommen.

Lassen Sie uns die Herkunft des Haushalts überprüfen

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Das Feature native_country hat nur einen Haushalt, der aus den Niederlanden stammt. Sie schließen es aus.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Schritt 3) Erstellen Sie eine Datenverarbeitungspipeline

Ähnlich wie scikit-learn verfügt Pyspark über eine Pipeline-API.

Eine Pipeline ist sehr praktisch, um die Struktur der Daten beizubehalten. Sie schieben die Daten in die Pipeline. Innerhalb der Pipeline werden verschiedene Operationen ausgeführt, die Ausgabe wird verwendet, um den Algorithmus zu füttern.

Eine universelle Transformation beim maschinellen Lernen besteht beispielsweise darin, einen String in einen Hot-Encoder, also eine Spalte pro Gruppe, umzuwandeln. Ein Hot-Encoder ist normalerweise eine Matrix voller Nullen.

Die Schritte zum Transformieren der Daten sind denen von scikit-learn sehr ähnlich. Du brauchst:

  • Indizieren Sie die Zeichenfolge auf numerisch
  • Erstellen Sie den einen Hot-Encoder
  • Transformieren Sie die Daten

Zwei APIs erledigen die Aufgabe: StringIndexer, OneHotEncoder

  1. Zunächst wählen Sie die zu indizierende Zeichenfolgenspalte aus. Die Eingabespalte ist der Name der Spalte im Datensatz. „outputCol“ ist der neue Name, der der transformierten Spalte gegeben wird.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Passen Sie die Daten an und transformieren Sie sie
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Erstellen Sie die Nachrichtenspalten basierend auf der Gruppe. Wenn das Feature beispielsweise 10 Gruppen enthält, verfügt die neue Matrix über 10 Spalten, eine für jede Gruppe.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Bauen Sie die Pipeline

Sie erstellen eine Pipeline, um alle präzisen Features zu konvertieren und sie dem endgültigen Datensatz hinzuzufügen. Die Pipeline besteht aus vier Operationen, Sie können jedoch beliebig viele Operationen hinzufügen.

  1. Kodieren Sie die kategorialen Daten
  2. Indizieren Sie die Beschriftungsfunktion
  3. Kontinuierliche Variable hinzufügen
  4. Montieren Sie die Stufen.

Jeder Schritt wird in einer Liste mit dem Namen „Stages“ gespeichert. Diese Liste teilt dem VectorAssembler mit, welche Operation innerhalb der Pipeline ausgeführt werden soll.

1. Kodieren Sie die kategorialen Daten

Dieser Schritt ist genau derselbe wie im obigen Beispiel, außer dass Sie alle kategorialen Features durchlaufen.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Indizieren Sie die Beschriftungsfunktion

SparkWie viele andere Bibliotheken akzeptiert es keine Zeichenfolgenwerte für die Bezeichnung. Sie konvertieren die Beschriftungsfunktion mit StringIndexer und fügen sie den Listenstufen hinzu

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Fügen Sie eine kontinuierliche Variable hinzu

Die inputCols des VectorAssembler sind eine Liste von Spalten. Sie können eine neue Liste erstellen, die alle neuen Spalten enthält. Der folgende Code füllt die Liste mit codierten kategorialen Merkmalen und den kontinuierlichen Merkmalen.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Montieren Sie die Stufen.

Abschließend übergeben Sie alle Schritte im VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Nachdem alle Schritte abgeschlossen sind, übertragen Sie die Daten in die Pipeline.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Wenn Sie den neuen Datensatz überprüfen, können Sie sehen, dass er alle transformierten und nicht transformierten Features enthält. Sie interessieren sich nur für das neue Label und die neuen Funktionen. Die Features umfassen alle transformierten Features und die kontinuierlichen Variablen.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Schritt 4) Erstellen Sie den Klassifikator: logistic

Um die Berechnung zu beschleunigen, konvertieren Sie das Modell in einen DataFrame.

Sie müssen mithilfe der Karte ein neues Label und Features aus dem Modell auswählen.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Sie sind bereit, die Zugdaten als DataFrame zu erstellen. Sie verwenden den sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Überprüfen Sie die zweite Reihe

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Erstellen Sie einen Zug-/Testsatz

Mit randomSplit teilen Sie den Datensatz im Verhältnis 80/20 auf.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Zählen wir, wie viele Personen im Trainings- und Testsatz ein Einkommen unter/über 50 haben

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Erstellen Sie den logistischen Regressor

Zu guter Letzt können Sie den Klassifikator erstellen. Pyspark verfügt über eine API namens LogisticRegression zur Durchführung einer logistischen Regression.

Sie initialisieren lr, indem Sie die Beschriftungsspalte und die Feature-Spalten angeben. Sie legen maximal 10 Iterationen fest und fügen einen Regularisierungsparameter mit einem Wert von 0.3 hinzu. Beachten Sie, dass Sie im nächsten Abschnitt die Kreuzvalidierung mit einem Parameterraster verwenden, um das Modell zu optimieren

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Sie können die Koeffizienten aus der Regression sehen

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Schritt 5) Trainieren und bewerten Sie das Modell

Um eine Vorhersage für Ihren Testsatz zu generieren,

Sie können linearModel mit transform() für test_data verwenden

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Sie können die Elemente in Vorhersagen drucken

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Sie interessieren sich für die Bezeichnung, die Vorhersage und die Wahrscheinlichkeit

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Bewerten Sie das Modell

Sie müssen sich die Genauigkeitsmetrik ansehen, um zu sehen, wie gut (oder schlecht) das Modell funktioniert. Derzeit gibt es keine API zur Berechnung des Genauigkeitsmaßes Spark. Der Standardwert ist die ROC-Kurve (Receiver Operating Characteristic Curve). Es handelt sich um eine andere Metrik, die die Rate falsch-positiver Ergebnisse berücksichtigt.

Bevor wir uns den ROC ansehen, konstruieren wir das Genauigkeitsmaß. Sie sind mit dieser Metrik besser vertraut. Das Genauigkeitsmaß ist die Summe der korrekten Vorhersage über die Gesamtzahl der Beobachtungen.

Sie erstellen einen DataFrame mit der Bezeichnung und der „Vorhersage“.

cm = predictions.select("label", "prediction")

Sie können die Anzahl der Klassen im Etikett und in der Vorhersage überprüfen

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Im Testsatz gibt es beispielsweise 1578 Haushalte mit einem Einkommen über 50 und 5021 darunter. Der Klassifikator prognostizierte jedoch 617 Haushalte mit einem Einkommen über 50.

Sie können die Genauigkeit berechnen, indem Sie die Anzahl berechnen, wenn die Beschriftung über die Gesamtzahl der Zeilen korrekt klassifiziert wurde.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Sie können alles zusammenfassen und eine Funktion schreiben, um die Genauigkeit zu berechnen.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC-Metriken

Das Modul BinaryClassificationEvaluator beinhaltet die ROC-Kennzahlen. Der Empfänger OperaDie Ting-Charakteristikkurve ist ein weiteres gängiges Werkzeug, das bei der binären Klassifizierung verwendet wird. Sie ist der Präzisions-/Erinnerungskurve sehr ähnlich, aber anstatt Präzision gegen Erinnerung darzustellen, zeigt die ROC-Kurve die wahre positive Rate (d. h. Erinnerung) gegenüber der falsch positiven Rate. Die Falsch-Positiv-Rate ist das Verhältnis der negativen Fälle, die fälschlicherweise als positiv eingestuft werden. Er entspricht eins minus der echten Negativrate. Die wahre Negativrate wird auch Spezifität genannt. Daher stellt die ROC-Kurve die Sensitivität (Erinnerung) im Vergleich zu 1 – Spezifität dar

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192areaUnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Schritt 6) Optimieren Sie den Hyperparameter

Zu guter Letzt können Sie die Hyperparameter optimieren. Ähnlich zu Scikit lernen Sie erstellen ein Parameterraster und fügen die Parameter hinzu, die Sie optimieren möchten.

Um die Berechnungszeit zu verkürzen, optimieren Sie den Regularisierungsparameter nur mit nur zwei Werten.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Abschließend bewerten Sie das Modell mithilfe der Kreuzvalidierungsmethode mit 5 Falten. Das Training dauert etwa 16 Minuten.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Zeit zum Trainieren des Modells: 978.807 Sekunden

Der beste Regularisierungshyperparameter ist 0.01 mit einer Genauigkeit von 85.316 Prozent.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Sie können den empfohlenen Parameter extrahieren, indem Sie cvModel.bestModel mit extractParamMap() verketten.

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Zusammenfassung

Spark ist ein grundlegendes Werkzeug für einen Datenwissenschaftler. Es ermöglicht dem Praktiker, eine App mit verschiedenen Datenquellen zu verbinden, Datenanalysen nahtlos durchzuführen oder ein Vorhersagemodell hinzuzufügen.

Beginnen mit Spark, müssen Sie eine initiieren Spark Kontext mit:

'SparkKontext()'

und und SQL Kontext zum Herstellen einer Verbindung mit einer Datenquelle:

'SQLContext()'

Im Tutorial erfahren Sie, wie Sie eine logistische Regression trainieren:

  1. Konvertieren Sie den Datensatz in einen Datenrahmen mit:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Beachten Sie, dass der Spaltenname des Labels „newlabel“ lautet und alle Features in „features“ zusammengefasst sind. Ändern Sie diese Werte, wenn sie in Ihrem Datensatz abweichen.

  1. Erstellen Sie den Zug-/Testsatz
randomSplit([.8,.2],seed=1234)
  1. Trainiere das Modell
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Machen Sie eine Vorhersage
linearModel.transform()