PySpark Οδηγός για αρχάριους: Μάθετε με ΠΑΡΑΔΕΙΓΜΑΤΑ

Πριν μάθετε το PySpark, ας καταλάβουμε:

Τι είναι το Apache Spark?

Spark είναι μια λύση μεγάλων δεδομένων που έχει αποδειχθεί ευκολότερη και ταχύτερη από το Hadoop MapReduce. Spark είναι ένα λογισμικό ανοιχτού κώδικα που αναπτύχθηκε από το UC Berkeley RAD lab το 2009. Από τότε που κυκλοφόρησε στο κοινό το 2010, Spark έχει αυξηθεί σε δημοτικότητα και χρησιμοποιείται μέσω του κλάδου με πρωτοφανή κλίμακα.

Στην εποχή του Big Data, οι επαγγελματίες χρειάζονται περισσότερο από ποτέ γρήγορα και αξιόπιστα εργαλεία για την επεξεργασία ροής δεδομένων. Παλαιότερα εργαλεία όπως το MapReduce ήταν αγαπημένα αλλά ήταν αργά. Για να ξεπεραστεί αυτό το ζήτημα, Spark προσφέρει μια λύση που είναι τόσο γρήγορη όσο και γενικής χρήσης. Η κύρια διαφορά μεταξύ Spark και MapReduce είναι αυτό Spark εκτελεί υπολογισμούς στη μνήμη κατά τη διάρκεια του τελευταίου στο σκληρό δίσκο. Επιτρέπει την πρόσβαση υψηλής ταχύτητας και την επεξεργασία δεδομένων, μειώνοντας τους χρόνους από ώρες σε λεπτά.

Τι είναι το PySpark?

PySpark είναι ένα εργαλείο που δημιουργήθηκε από τον Apache Spark Κοινότητα για χρήση Python με Spark. Επιτρέπει την εργασία με RDD (Resilient Distributed Dataset) στο Python. Προσφέρει επίσης PySpark Κέλυφος για σύνδεση Python API με Spark πυρήνα για την έναρξη Spark Συμφραζόμενα. Spark είναι η μηχανή ονομασίας για την πραγματοποίηση cluster computing, ενώ η PySpark is Pythonβιβλιοθήκη του για χρήση Spark.

Πώς κάνει Spark λειτουργεί;

Spark βασίζεται σε υπολογιστική μηχανή, που σημαίνει ότι φροντίζει για τον προγραμματισμό, τη διανομή και την παρακολούθηση της εφαρμογής. Κάθε εργασία εκτελείται σε διάφορες μηχανές εργασίας που ονομάζονται συμπλέγματα υπολογιστών. Ένα υπολογιστικό σύμπλεγμα αναφέρεται στην κατανομή των εργασιών. Ένα μηχάνημα εκτελεί μια εργασία, ενώ τα άλλα συμβάλλουν στην τελική έξοδο μέσω μιας διαφορετικής εργασίας. Στο τέλος, όλες οι εργασίες συγκεντρώνονται για να παραχθεί ένα αποτέλεσμα. Ο Spark Ο διαχειριστής δίνει μια επισκόπηση 360 των διαφόρων Spark Θέσεις εργασίας.

Πώς λειτουργεί Spark Εργασία
Πώς λειτουργεί Spark Εργασία

Spark έχει σχεδιαστεί για να λειτουργεί με

  • Python
  • Java
  • Scala
  • SQL

Ένα σημαντικό χαρακτηριστικό του Spark είναι ο τεράστιος όγκος ενσωματωμένης βιβλιοθήκης, συμπεριλαμβανομένου του MLlib για μηχανική εκμάθηση. Spark έχει επίσης σχεδιαστεί για να λειτουργεί με συμπλέγματα Hadoop και μπορεί να διαβάσει τον ευρύ τύπο αρχείων, συμπεριλαμβανομένων των δεδομένων Hive, CSV, JSON, Casandra μεταξύ άλλων.

Γιατί να χρησιμοποιήσετε Spark?

Ως μελλοντικός επαγγελματίας δεδομένων, θα πρέπει να είστε εξοικειωμένοι με τις διάσημες βιβλιοθήκες του python: Pandas και scikit-learn. Αυτές οι δύο βιβλιοθήκες είναι φανταστικές για εξερεύνηση δεδομένων μέχρι μεσαίου μεγέθους. Τα τακτικά έργα μηχανικής εκμάθησης βασίζονται στην ακόλουθη μεθοδολογία:

  • Φορτώστε τα δεδομένα στο δίσκο
  • Εισαγάγετε τα δεδομένα στη μνήμη του μηχανήματος
  • Επεξεργασία/ανάλυση των δεδομένων
  • Δημιουργήστε το μοντέλο μηχανικής μάθησης
  • Αποθηκεύστε την πρόβλεψη πίσω στο δίσκο

Το πρόβλημα προκύπτει εάν ο επιστήμονας δεδομένων θέλει να επεξεργαστεί δεδομένα που είναι πολύ μεγάλα για έναν υπολογιστή. Κατά τη διάρκεια των προηγούμενων ημερών της επιστήμης δεδομένων, οι επαγγελματίες δειγματοληπτούσαν το δεδομένο, καθώς δεν χρειαζόταν πάντα εκπαίδευση σε τεράστια σύνολα δεδομένων. Ο επιστήμονας δεδομένων θα έβρισκε ένα καλό στατιστικό δείγμα, θα έκανε έναν πρόσθετο έλεγχο ευρωστίας και θα καταλήξει σε ένα εξαιρετικό μοντέλο.

Ωστόσο, υπάρχουν ορισμένα προβλήματα με αυτό:

  • Το σύνολο δεδομένων αντικατοπτρίζει τον πραγματικό κόσμο;
  • Περιλαμβάνουν τα δεδομένα κάποιο συγκεκριμένο παράδειγμα;
  • Είναι το μοντέλο κατάλληλο για δειγματοληψία;

Πάρτε για παράδειγμα τις συστάσεις των χρηστών. Οι προτεινόμενοι βασίζονται στη σύγκριση χρηστών με άλλους χρήστες για την αξιολόγηση των προτιμήσεών τους. Εάν ο επαγγελματίας δεδομένων λάβει μόνο ένα υποσύνολο των δεδομένων, δεν θα υπάρχει μια ομάδα χρηστών που να μοιάζουν πολύ μεταξύ τους. Οι προτεινόμενοι πρέπει να εκτελούνται με το πλήρες σύνολο δεδομένων ή καθόλου.

Ποια είναι η λύση?

Η λύση ήταν εμφανής εδώ και πολύ καιρό, χωρίστε το πρόβλημα σε πολλούς υπολογιστές. Οι παράλληλοι υπολογιστές έχουν επίσης πολλά προβλήματα. Οι προγραμματιστές συχνά αντιμετωπίζουν προβλήματα με τη σύνταξη παράλληλου κώδικα και καταλήγουν να πρέπει να λύσουν ένα σωρό σύνθετα ζητήματα σχετικά με την ίδια την πολυεπεξεργασία.

Το Pyspark δίνει στον επιστήμονα δεδομένων ένα API που μπορεί να χρησιμοποιηθεί για την επίλυση προβλημάτων παράλληλων δεδομένων. Το Pyspark χειρίζεται την πολυπλοκότητα της πολυεπεξεργασίας, όπως η διανομή των δεδομένων, η διανομή κώδικα και η συλλογή εξόδου από τους εργαζόμενους σε ένα σύμπλεγμα μηχανών.

Spark μπορεί να τρέξει αυτόνομα, αλλά τις περισσότερες φορές εκτελείται πάνω από ένα πλαίσιο υπολογιστών συμπλέγματος όπως το Hadoop. Στη δοκιμή και την ανάπτυξη, ωστόσο, ένας επιστήμονας δεδομένων μπορεί να τρέξει αποτελεσματικά Spark σε κουτιά ανάπτυξης ή φορητούς υπολογιστές χωρίς σύμπλεγμα

• Ένα από τα κύρια πλεονεκτήματα του Spark είναι η κατασκευή μιας αρχιτεκτονικής που να περιλαμβάνει διαχείριση ροής δεδομένων, απρόσκοπτα ερωτήματα δεδομένων, πρόβλεψη μηχανικής μάθησης και πρόσβαση σε πραγματικό χρόνο σε διάφορες αναλύσεις.

• Spark συνεργάζεται στενά με τη γλώσσα SQL, δηλαδή με δομημένα δεδομένα. Επιτρέπει την αναζήτηση δεδομένων σε πραγματικό χρόνο.

• Η δουλειά του Data scientist main είναι να αναλύει και να δημιουργεί προγνωστικά μοντέλα. Εν ολίγοις, ένας επιστήμονας δεδομένων πρέπει να γνωρίζει πώς να αναζητά δεδομένα χρησιμοποιώντας SQL, δημιουργήστε μια στατιστική αναφορά και χρησιμοποιήστε τη μηχανική μάθηση για την παραγωγή προβλέψεων. Οι επιστήμονες δεδομένων ξοδεύουν σημαντικό μέρος του χρόνου τους στον καθαρισμό, τον μετασχηματισμό και την ανάλυση των δεδομένων. Μόλις το σύνολο δεδομένων ή η ροή εργασίας δεδομένων είναι έτοιμο, ο επιστήμονας δεδομένων χρησιμοποιεί διάφορες τεχνικές για να ανακαλύψει πληροφορίες και κρυφά μοτίβα. Ο χειρισμός των δεδομένων πρέπει να είναι ισχυρός και το ίδιο εύκολος στη χρήση. Spark είναι το σωστό εργαλείο χάρη στην ταχύτητα και τα πλούσια API του.

Σε αυτό το PySpark tutorial, θα μάθετε πώς να δημιουργείτε έναν ταξινομητή με το PySpark παραδείγματα.

Πώς να εγκαταστήσετε το PySpark με AWS

The Jupyter η ομάδα δημιουργεί μια εικόνα Docker για εκτέλεση Spark αποτελεσματικά. Παρακάτω είναι τα βήματα που μπορείτε να ακολουθήσετε για να εγκαταστήσετε το PySpark παράδειγμα στο AWS.

Ανατρέξτε στο σεμινάριο μας στο AWS και TensorFlow

Βήμα 1: Δημιουργήστε ένα παράδειγμα

Πρώτα απ 'όλα, πρέπει να δημιουργήσετε ένα παράδειγμα. Μεταβείτε στον λογαριασμό σας AWS και ξεκινήστε την παρουσία. Μπορείτε να αυξήσετε τον αποθηκευτικό χώρο έως και 15 γραμμάρια και να χρησιμοποιήσετε την ίδια ομάδα ασφαλείας όπως στο φροντιστήριο TensorFlow.

Βήμα 2: Ανοίξτε τη σύνδεση

Ανοίξτε τη σύνδεση και εγκαταστήστε το κοντέινερ docker. Για περισσότερες λεπτομέρειες, ανατρέξτε στο σεμινάριο με το TensorFlow with Λιμενεργάτης. Σημειώστε ότι, πρέπει να βρίσκεστε στον σωστό κατάλογο εργασίας.

Απλώς εκτελέστε αυτούς τους κωδικούς για να εγκαταστήσετε το Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Βήμα 3: Ανοίξτε ξανά τη σύνδεση και εγκαταστήστε Spark

Αφού ανοίξετε ξανά τη σύνδεση, μπορείτε να εγκαταστήσετε την εικόνα που περιέχει το PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Βήμα 4: Άνοιγμα Jupyter

Ελέγξτε το δοχείο και το όνομά του

docker ps

Εκκινήστε το docker με αρχεία καταγραφής docker ακολουθούμενο από το όνομα του docker. Για παράδειγμα, το docker logs zealous_goldwasser

Μεταβείτε στο πρόγραμμα περιήγησής σας και ξεκινήστε Jupyter. Η διεύθυνση είναι http://localhost:8888/. Επικολλήστε τον κωδικό πρόσβασης που δίνεται από το τερματικό.

Σημείωση: εάν θέλετε να ανεβάσετε/κατεβάσετε ένα αρχείο στον υπολογιστή σας AWS, μπορείτε να χρησιμοποιήσετε το λογισμικό Cyberduck, https://cyberduck.io/.

Πώς να εγκαταστήσετε το PySpark on Windows/Μακ με Κόντα

Ακολουθεί μια λεπτομερής διαδικασία για τον τρόπο εγκατάστασης του PySpark on Windows/Mac χρησιμοποιώντας Anaconda:

Για να εγκαταστήσετε Spark στο τοπικό σας μηχάνημα, μια συνιστώμενη πρακτική είναι να δημιουργήσετε ένα νέο περιβάλλον conda. Αυτό το νέο περιβάλλον θα εγκατασταθεί Python 3.6, Spark και όλες τις εξαρτήσεις.

Χρήστης Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Χρήστες

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Μπορείτε να επεξεργαστείτε το αρχείο .yml. Να είστε προσεκτικοί με την εσοχή. Απαιτούνται δύο κενά πριν -

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Αποθηκεύστε το και δημιουργήστε το περιβάλλον. Χρειάζεται λίγος χρόνος

conda env create -f hello-spark.yml

Για περισσότερες λεπτομέρειες σχετικά με την τοποθεσία, ανατρέξτε στον οδηγό Εγκατάσταση του TensorFlow

Μπορείτε να ελέγξετε όλο το περιβάλλον που είναι εγκατεστημένο στο μηχάνημά σας

conda env list
Activate hello-spark

Χρήστης Mac

source activate hello-spark

Windows Χρήστες

activate hello-spark

Σημείωση: Έχετε ήδη δημιουργήσει ένα συγκεκριμένο περιβάλλον TensorFlow για την εκτέλεση των σεμιναρίων στο TensorFlow. Είναι πιο βολικό να δημιουργήσετε ένα νέο περιβάλλον διαφορετικό από το hello-tf. Δεν έχει νόημα να υπερφορτώνουμε το hello-tf Spark ή οποιεσδήποτε άλλες βιβλιοθήκες μηχανικής εκμάθησης.

Φανταστείτε ότι το μεγαλύτερο μέρος του έργου σας περιλαμβάνει το TensorFlow, αλλά πρέπει να το χρησιμοποιήσετε Spark για ένα συγκεκριμένο έργο. Μπορείτε να ορίσετε ένα περιβάλλον TensorFlow για όλο το έργο σας και να δημιουργήσετε ένα ξεχωριστό περιβάλλον για Spark. Μπορείτε να προσθέσετε τόσες βιβλιοθήκες Spark περιβάλλον όπως θέλετε χωρίς να παρεμβαίνετε στο περιβάλλον TensorFlow. Μόλις τελειώσετε με το Sparkτου έργου, μπορείτε να το διαγράψετε χωρίς να επηρεάσετε το περιβάλλον TensorFlow.

Jupyter

Ανοικτό Jupyter Σημειωματάριο και δοκιμάστε εάν PySpark εργοστάσιο. Σε ένα νέο σημειωματάριο επικολλήστε το ακόλουθο PySpark δείγμα κώδικα:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Εάν εμφανιστεί κάποιο σφάλμα, είναι πιθανό ότι Java δεν είναι εγκατεστημένο στο μηχάνημά σας. Στο mac, ανοίξτε το τερματικό και γράψτε java -version, αν υπάρχει έκδοση java, φροντίστε να είναι 1.8. Σε Windows, μεταβείτε στην Εφαρμογή και ελέγξτε αν υπάρχει α Java ντοσιέ. Εάν υπάρχει α Java φάκελο, ελέγξτε το Java 1.8 είναι εγκατεστημένο. Μέχρι τη στιγμή που γράφει αυτό το άρθρο, ο PySpark δεν είναι συμβατό με Java9 και παραπάνω.

Εάν πρέπει να εγκαταστήσετε Java, να σκεφτείς σύνδεσμος και κατεβάστε το jdk-8u181-windows-x64.exe

Jupyter

Για χρήστες Mac, συνιστάται η χρήση του «brew.».

brew tap caskroom/versions
brew cask install java8

Ανατρέξτε σε αυτό το βήμα προς βήμα εκμάθηση πώς να εγκαταστήσετε Java

Σημείωση: Χρησιμοποιήστε το remove για να διαγράψετε εντελώς ένα περιβάλλον.

 conda env remove -n hello-spark -y

Spark Πλαίσιο

SparkΤο πλαίσιο είναι ο εσωτερικός κινητήρας που επιτρέπει τις συνδέσεις με τα συμπλέγματα. Εάν θέλετε να εκτελέσετε μια λειτουργία, χρειάζεστε ένα SparkΣυμφραζόμενα.

Δημιουργία SparkΠλαίσιο

Πρώτα απ 'όλα, πρέπει να ξεκινήσετε ένα SparkΣυμφραζόμενα.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Τώρα που η SparkΤο περιβάλλον είναι έτοιμο, μπορείτε να δημιουργήσετε μια συλλογή δεδομένων που ονομάζεται RDD, Resilient Distributed Dataset. Ο υπολογισμός σε ένα RDD παραλληλίζεται αυτόματα σε όλο το σύμπλεγμα.

nums= sc.parallelize([1,2,3,4])

Μπορείτε να αποκτήσετε πρόσβαση στην πρώτη σειρά με λήψη

nums.take(1)
[1]

Μπορείτε να εφαρμόσετε έναν μετασχηματισμό στα δεδομένα με μια συνάρτηση λάμδα. Στο PySpark Στο παρακάτω παράδειγμα, επιστρέφετε το τετράγωνο των αριθμών. Είναι ένας μετασχηματισμός χάρτη

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLCcontext

Ένας πιο βολικός τρόπος είναι να χρησιμοποιήσετε το DataFrame. SparkΤο πλαίσιο έχει ήδη οριστεί, μπορείτε να το χρησιμοποιήσετε για να δημιουργήσετε το dataFrame. Πρέπει επίσης να δηλώσετε το SQLContext

Το SQLContext επιτρέπει τη σύνδεση του κινητήρα με διαφορετικές πηγές δεδομένων. Χρησιμοποιείται για την εκκίνηση των λειτουργιών του Spark sql.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Τώρα σε αυτό Spark φροντιστήριο Python, ας δημιουργήσουμε μια λίστα με πλειάδες. Κάθε πλειάδα θα περιέχει το όνομα των ανθρώπων και την ηλικία τους. Απαιτούνται τέσσερα βήματα:

Βήμα 1) Δημιουργήστε τη λίστα με τις πλειάδες με τις πληροφορίες

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Βήμα 2) Κατασκευάστε ένα RDD

rdd = sc.parallelize(list_p)

Βήμα 3) Μετατρέψτε τις πλειάδες

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Βήμα 4) Δημιουργήστε ένα πλαίσιο DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Εάν θέλετε να αποκτήσετε πρόσβαση στον τύπο κάθε δυνατότητας, μπορείτε να χρησιμοποιήσετε το printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Παράδειγμα μηχανικής εκμάθησης με PySpark

Τώρα που έχετε μια σύντομη ιδέα Spark και SQLContext, είστε έτοιμοι να δημιουργήσετε το πρώτο σας πρόγραμμα Μηχανικής εκμάθησης.

Ακολουθούν τα βήματα για τη δημιουργία ενός προγράμματος Machine Learning με το PySpark:

  • Βήμα 1) Βασική λειτουργία με PySpark
  • Βήμα 2) Προεπεξεργασία δεδομένων
  • Βήμα 3) Δημιουργήστε έναν αγωγό επεξεργασίας δεδομένων
  • Βήμα 4) Κατασκευάστε τον ταξινομητή: logistic
  • Βήμα 5) Εκπαιδεύστε και αξιολογήστε το μοντέλο
  • Βήμα 6) Συντονίστε την υπερπαράμετρο

Σε αυτό το PySpark Οδηγός Μηχανικής Μάθησης, θα χρησιμοποιήσουμε το σύνολο δεδομένων για ενήλικες. Ο σκοπός αυτού του σεμιναρίου είναι να μάθετε πώς να χρησιμοποιείτε το Pyspark. Για περισσότερες πληροφορίες σχετικά με το σύνολο δεδομένων, ανατρέξτε σε αυτό το σεμινάριο.

Σημειώστε ότι το σύνολο δεδομένων δεν είναι σημαντικό και μπορεί να πιστεύετε ότι ο υπολογισμός διαρκεί πολύ. Spark έχει σχεδιαστεί για να επεξεργάζεται σημαντικό όγκο δεδομένων. SparkΟι επιδόσεις του αυξάνονται σε σχέση με άλλες βιβλιοθήκες μηχανικής εκμάθησης όταν το σύνολο δεδομένων που υποβάλλεται σε επεξεργασία μεγαλώνει.

Βήμα 1) Βασική λειτουργία με το PySpark

Πρώτα απ 'όλα, πρέπει να αρχικοποιήσετε το SQLCcontext δεν έχει ήδη ξεκινήσει.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

Στη συνέχεια, μπορείτε να διαβάσετε το αρχείο cvs με το sqlContext.read.csv. Χρησιμοποιείτε το inferSchema που έχει οριστεί σε True για να το πω Spark για να μαντέψει αυτόματα τον τύπο των δεδομένων. Από προεπιλογή, είναι στροφή σε False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Ας ρίξουμε μια ματιά στον τύπο δεδομένων

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Μπορείτε να δείτε τα δεδομένα με εμφάνιση.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Εάν δεν ρυθμίσατε το inderShema σε True, ορίστε τι συμβαίνει με τον τύπο. Υπάρχουν όλα σε σειρά.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Για να μετατρέψετε τη συνεχή μεταβλητή στη σωστή μορφή, μπορείτε να χρησιμοποιήσετε την αναδιατύπωση των στηλών. Μπορείτε να χρησιμοποιήσετε το withColumn για να το πείτε Spark ποια στήλη για να λειτουργήσει ο μετασχηματισμός.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Επιλέξτε στήλες

Μπορείτε να επιλέξετε και να εμφανίσετε τις σειρές με επιλογή και τα ονόματα των χαρακτηριστικών. Παρακάτω, επιλέγονται η ηλικία και το fnlwgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Μετρήστε ανά ομάδα

Εάν θέλετε να μετρήσετε τον αριθμό των εμφανίσεων ανά ομάδα, μπορείτε να αλυσιδώσετε:

  • groupBy()
  • μετρώ()

μαζί. Στο PySpark Στο παρακάτω παράδειγμα, μετράτε τον αριθμό των σειρών με βάση το επίπεδο εκπαίδευσης.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Περιγράψτε τα δεδομένα

Για να λάβετε μια συνοπτική στατιστική, των δεδομένων, μπορείτε να χρησιμοποιήσετε το describe(). Θα υπολογίσει το:

  • μετράνε
  • εννοώ
  • τυπική απόκλιση
  • πρακτικά
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Εάν θέλετε το συνοπτικό στατιστικό στοιχείο μιας μόνο στήλης, προσθέστε το όνομα της στήλης μέσα στο describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Υπολογισμός crosstab

Σε ορισμένες περιπτώσεις, μπορεί να είναι ενδιαφέρον να δούμε τα περιγραφικά στατιστικά στοιχεία μεταξύ δύο στηλών κατά ζεύγη. Για παράδειγμα, μπορείτε να μετρήσετε τον αριθμό των ατόμων με εισόδημα κάτω ή πάνω από 50 χιλιάδες ανά επίπεδο εκπαίδευσης. Αυτή η λειτουργία ονομάζεται crosstab.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Μπορείτε να δείτε ότι κανένας δεν έχει έσοδα πάνω από 50 χιλιάδες όταν είναι νέος.

Πτώση στήλης

Υπάρχουν δύο διαισθητικά API για απόθεση στηλών:

  • drop(): Απόθεση στήλης
  • dropna(): Ρίξτε NA's

Παρακάτω ρίχνετε τη στήλη Education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Φιλτράρισμα δεδομένων

Μπορείτε να χρησιμοποιήσετε το filter() για να εφαρμόσετε περιγραφικά στατιστικά στοιχεία σε ένα υποσύνολο δεδομένων. Για παράδειγμα, μπορείτε να μετρήσετε τον αριθμό των ατόμων άνω των 40 ετών

df.filter(df.age > 40).count()

13443

Descriptive στατιστικά ανά ομάδα

Τέλος, μπορείτε να ομαδοποιήσετε δεδομένα ανά ομάδα και να υπολογίσετε στατιστικές πράξεις όπως ο μέσος όρος.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Βήμα 2) Προεπεξεργασία δεδομένων

Η επεξεργασία δεδομένων είναι ένα κρίσιμο βήμα στη μηχανική μάθηση. Αφού αφαιρέσετε τα δεδομένα σκουπιδιών, λαμβάνετε ορισμένες σημαντικές πληροφορίες.

Για παράδειγμα, γνωρίζετε ότι η ηλικία δεν είναι γραμμική συνάρτηση με το εισόδημα. Όταν οι άνθρωποι είναι νέοι, το εισόδημά τους είναι συνήθως χαμηλότερο από τη μέση ηλικία. Μετά τη συνταξιοδότηση, ένα νοικοκυριό χρησιμοποιεί την αποταμίευσή του, που σημαίνει μείωση του εισοδήματος. Για να καταγράψετε αυτό το μοτίβο, μπορείτε να προσθέσετε ένα τετράγωνο στο χαρακτηριστικό ηλικίας

Προσθέστε τετράγωνο ηλικίας

Για να προσθέσετε μια νέα δυνατότητα, πρέπει:

  1. Επιλέξτε τη στήλη
  2. Εφαρμόστε τον μετασχηματισμό και προσθέστε τον στο DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Μπορείτε να δείτε ότι το age_square προστέθηκε με επιτυχία στο πλαίσιο δεδομένων. Μπορείτε να αλλάξετε τη σειρά των μεταβλητών με επιλογή. Παρακάτω, φέρετε το age_square αμέσως μετά την ηλικία.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Εξαιρούνται Ολλανδία-Ολλανδία

Όταν μια ομάδα μέσα σε ένα χαρακτηριστικό έχει μόνο μία παρατήρηση, δεν φέρνει πληροφορίες στο μοντέλο. Αντίθετα, μπορεί να οδηγήσει σε σφάλμα κατά τη διασταυρούμενη επικύρωση.

Ας ελέγξουμε την προέλευση του νοικοκυριού

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Το χαρακτηριστικό native_country έχει μόνο ένα νοικοκυριό που προέρχεται από την Ολλανδία. Το αποκλείεις.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Βήμα 3) Δημιουργήστε έναν αγωγό επεξεργασίας δεδομένων

Παρόμοια με το scikit-learn, το Pyspark έχει ένα Pipeline API.

Ένας αγωγός είναι πολύ βολικός για τη διατήρηση της δομής των δεδομένων. Σπρώχνετε τα δεδομένα στη διοχέτευση. Μέσα στον αγωγό, γίνονται διάφορες λειτουργίες, η έξοδος χρησιμοποιείται για την τροφοδοσία του αλγόριθμου.

Για παράδειγμα, ένας καθολικός μετασχηματισμός στη μηχανική μάθηση συνίσταται στη μετατροπή μιας συμβολοσειράς σε έναν καυτό κωδικοποιητή, δηλαδή, μια στήλη από μια ομάδα. Ένας καυτός κωδικοποιητής είναι συνήθως ένας πίνακας γεμάτος μηδενικά.

Τα βήματα για τον μετασχηματισμό των δεδομένων είναι πολύ παρόμοια με το scikit-learn. Χρειάζεται να:

  • Ευρετηριάστε τη συμβολοσειρά σε αριθμητικό
  • Δημιουργήστε τον ένα καυτό κωδικοποιητή
  • Μεταμορφώστε τα δεδομένα

Δύο API κάνουν τη δουλειά: StringIndexer, OneHotEncoder

  1. Πρώτα απ 'όλα, επιλέγετε τη στήλη συμβολοσειράς για ευρετηρίαση. Το inputCol είναι το όνομα της στήλης στο σύνολο δεδομένων. outputCol είναι το νέο όνομα που δίνεται στη μετασχηματισμένη στήλη.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Προσαρμόστε τα δεδομένα και μεταμορφώστε τα
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Δημιουργήστε τις στήλες ειδήσεων με βάση την ομάδα. Για παράδειγμα, εάν υπάρχουν 10 ομάδες στο χαρακτηριστικό, ο νέος πίνακας θα έχει 10 στήλες, μία για κάθε ομάδα.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Φτιάξτε τον αγωγό

Θα δημιουργήσετε μια διοχέτευση για να μετατρέψετε όλα τα ακριβή χαρακτηριστικά και να τα προσθέσετε στο τελικό σύνολο δεδομένων. Ο αγωγός θα έχει τέσσερις λειτουργίες, αλλά μη διστάσετε να προσθέσετε όσες λειτουργίες θέλετε.

  1. Κωδικοποιήστε τα κατηγοριοποιημένα δεδομένα
  2. Ευρετηριάστε το χαρακτηριστικό της ετικέτας
  3. Προσθήκη συνεχούς μεταβλητής
  4. Συναρμολογήστε τα βήματα.

Κάθε βήμα αποθηκεύεται σε μια λίστα με το όνομα στάδια. Αυτή η λίστα θα πει στο VectorAssembler ποια λειτουργία πρέπει να εκτελέσει μέσα στο pipeline.

1. Κωδικοποιήστε τα κατηγοριοποιημένα δεδομένα

Αυτό το βήμα είναι ακριβώς το ίδιο με το παραπάνω παράδειγμα, με τη διαφορά ότι κάνετε βρόχο πάνω από όλα τα κατηγορικά χαρακτηριστικά.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Καταχωρίστε το χαρακτηριστικό της ετικέτας

Spark, όπως πολλές άλλες βιβλιοθήκες, δεν δέχεται τιμές συμβολοσειράς για την ετικέτα. Μετατρέπετε τη δυνατότητα ετικέτας με το StringIndexer και την προσθέτετε στα στάδια της λίστας

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Προσθέστε συνεχή μεταβλητή

Τα inputCols του VectorAssembler είναι μια λίστα στηλών. Μπορείτε να δημιουργήσετε μια νέα λίστα που περιέχει όλες τις νέες στήλες. Ο παρακάτω κώδικας συμπληρώνει τη λίστα με κωδικοποιημένα χαρακτηριστικά κατηγοριών και τα συνεχή χαρακτηριστικά.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Συναρμολογήστε τα βήματα.

Τέλος, περνάτε όλα τα βήματα στο VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Τώρα που όλα τα βήματα είναι έτοιμα, ωθήστε τα δεδομένα στη διοχέτευση.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Εάν ελέγξετε το νέο σύνολο δεδομένων, μπορείτε να δείτε ότι περιέχει όλα τα χαρακτηριστικά, μετασχηματισμένα και όχι μετασχηματισμένα. Σας ενδιαφέρει μόνο η νέα ετικέτα και τα χαρακτηριστικά. Τα χαρακτηριστικά περιλαμβάνουν όλα τα μετασχηματισμένα χαρακτηριστικά και τις συνεχείς μεταβλητές.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Βήμα 4) Κατασκευάστε τον ταξινομητή: logistic

Για να κάνετε τον υπολογισμό πιο γρήγορο, μετατρέπετε το μοντέλο σε DataFrame.

Πρέπει να επιλέξετε νέα ετικέτα και χαρακτηριστικά από το μοντέλο χρησιμοποιώντας χάρτη.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Είστε έτοιμοι να δημιουργήσετε τα δεδομένα τρένου ως DataFrame. Χρησιμοποιείτε το sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Ελέγξτε τη δεύτερη σειρά

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Δημιουργήστε ένα σετ τρένου/δοκιμών

Διαχωρίζετε το σύνολο δεδομένων 80/20 με το randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Ας μετρήσουμε πόσα άτομα με εισόδημα κάτω/πάνω από 50 χιλ. τόσο στο σετ εκπαίδευσης όσο και στο σετ δοκιμών

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Κατασκευάστε το logistic regressor

Τελευταίο αλλά εξίσου σημαντικό, μπορείτε να δημιουργήσετε τον ταξινομητή. Το Pyspark έχει ένα API που ονομάζεται LogisticRegression για την εκτέλεση λογιστικής παλινδρόμησης.

Αρχικοποιείτε το lr υποδεικνύοντας τη στήλη ετικέτας και τις στήλες χαρακτηριστικών. Ορίζετε έως και 10 επαναλήψεις και προσθέτετε μια παράμετρο τακτοποίησης με τιμή 0.3. Σημειώστε ότι στην επόμενη ενότητα, θα χρησιμοποιήσετε διασταυρούμενη επικύρωση με ένα πλέγμα παραμέτρων για να συντονίσετε το μοντέλο

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Μπορείτε να δείτε τους συντελεστές από την παλινδρόμηση

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Βήμα 5) Εκπαιδεύστε και αξιολογήστε το μοντέλο

Για να δημιουργήσετε πρόβλεψη για το δοκιμαστικό σας σύνολο,

Μπορείτε να χρησιμοποιήσετε το linearModel με τη transform() στο test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Μπορείτε να εκτυπώσετε τα στοιχεία σε προβλέψεις

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Σας ενδιαφέρει η ετικέτα, η πρόβλεψη και η πιθανότητα

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Αξιολογήστε το μοντέλο

Πρέπει να εξετάσετε τη μέτρηση ακρίβειας για να δείτε πόσο καλά (ή κακά) αποδίδει το μοντέλο. Επί του παρόντος, δεν υπάρχει API για τον υπολογισμό του μέτρου ακρίβειας Spark. Η προεπιλεγμένη τιμή είναι η χαρακτηριστική καμπύλη λειτουργίας του δέκτη ROC. Είναι μια διαφορετική μέτρηση που λαμβάνει υπόψη το ψευδώς θετικό ποσοστό.

Πριν κοιτάξετε το ROC, ας κατασκευάσουμε το μέτρο ακρίβειας. Είστε πιο εξοικειωμένοι με αυτήν τη μέτρηση. Το μέτρο ακρίβειας είναι το άθροισμα της σωστής πρόβλεψης επί του συνολικού αριθμού των παρατηρήσεων.

Δημιουργείτε ένα DataFrame με την ετικέτα και την πρόβλεψη.

cm = predictions.select("label", "prediction")

Μπορείτε να ελέγξετε τον αριθμό της τάξης στην ετικέτα και την πρόβλεψη

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Για παράδειγμα, στο δοκιμαστικό σύνολο, υπάρχουν 1578 νοικοκυριά με εισόδημα πάνω από 50 χιλιάδες και 5021 κάτω. Ο ταξινομητής, ωστόσο, προέβλεψε 617 νοικοκυριά με εισόδημα άνω των 50 χιλ.

Μπορείτε να υπολογίσετε την ακρίβεια υπολογίζοντας την καταμέτρηση όταν η ετικέτα ταξινομείται σωστά στον συνολικό αριθμό σειρών.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Μπορείτε να τυλίξετε τα πάντα μαζί και να γράψετε μια συνάρτηση για να υπολογίσετε την ακρίβεια.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

Μετρήσεις ROC

Η ενότητα BinaryClassificationEvaluator περιλαμβάνει τα μέτρα ROC. Ο λαβων OperaΗ χαρακτηριστική καμπύλη είναι ένα άλλο κοινό εργαλείο που χρησιμοποιείται με τη δυαδική ταξινόμηση. Μοιάζει πολύ με την καμπύλη ακριβείας/ανάκλησης, αλλά αντί να σχεδιάζεται η ακρίβεια έναντι της ανάκλησης, η καμπύλη ROC εμφανίζει τον πραγματικό θετικό ρυθμό (δηλαδή ανάκληση) έναντι του ψευδώς θετικού ποσοστού. Το ποσοστό ψευδώς θετικών είναι ο λόγος των αρνητικών περιπτώσεων που ταξινομούνται εσφαλμένα ως θετικές. Είναι ίσο με ένα μείον το πραγματικό αρνητικό ποσοστό. Το πραγματικό αρνητικό ποσοστό ονομάζεται επίσης ειδικότητα. Ως εκ τούτου, η καμπύλη ROC απεικονίζει την ευαισθησία (ανάκληση) έναντι 1 – ειδικότητα

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192περιοχή UnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Βήμα 6) Συντονίστε την υπερπαράμετρο

Τελευταίο αλλά εξίσου σημαντικό, μπορείτε να συντονίσετε τις υπερπαραμέτρους. Παρόμοιο με scikit μαθαίνω δημιουργείτε ένα πλέγμα παραμέτρων και προσθέτετε τις παραμέτρους που θέλετε να συντονίσετε.

Για να μειώσετε το χρόνο του υπολογισμού, συντονίζετε την παράμετρο κανονικοποίησης μόνο με δύο μόνο τιμές.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Τέλος, αξιολογείτε το μοντέλο χρησιμοποιώντας τη μέθοδο cross Valiation με 5 πτυχές. Χρειάζονται περίπου 16 λεπτά για να προπονηθείτε.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Χρόνος για εκπαίδευση μοντέλο: 978.807 δευτερόλεπτα

Η καλύτερη υπερπαράμετρος τακτοποίησης είναι 0.01, με ακρίβεια 85.316 τοις εκατό.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Μπορείτε να εξαγάγετε τη συνιστώμενη παράμετρο συνδέοντας το cvModel.bestModel με το extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Σύνοψη

Spark είναι ένα θεμελιώδες εργαλείο για έναν επιστήμονα δεδομένων. Επιτρέπει στον επαγγελματία να συνδέσει μια εφαρμογή με διαφορετικές πηγές δεδομένων, να εκτελέσει ανάλυση δεδομένων απρόσκοπτα ή να προσθέσει ένα μοντέλο πρόβλεψης.

Για αρχή Spark, πρέπει να ξεκινήσετε ένα Spark Πλαίσιο με:

»SparkΣυμφραζόμενα()'

Και και SQL πλαίσιο σύνδεσης σε πηγή δεδομένων:

'SQLContext()'

Στο σεμινάριο, μαθαίνετε πώς να εκπαιδεύετε μια λογιστική παλινδρόμηση:

  1. Μετατρέψτε το σύνολο δεδομένων σε πλαίσιο δεδομένων με:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Σημειώστε ότι το όνομα της στήλης της ετικέτας είναι newlabel και όλα τα χαρακτηριστικά συγκεντρώνονται σε χαρακτηριστικά. Αλλάξτε αυτές τις τιμές εάν διαφέρουν στο σύνολο δεδομένων σας.

  1. Δημιουργήστε το σετ τρένου/δοκιμών
randomSplit([.8,.2],seed=1234)
  1. Εκπαιδεύστε το μοντέλο
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Κάντε πρόβλεψη
linearModel.transform()