PySpark Tutoriel pour débutants : Apprenez avec des EXEMPLES
Avant d'apprendre PySpark, comprenons :
Qu'est-ce qu'Apache Spark?
Spark est une solution Big Data qui s'est avérée plus simple et plus rapide que Hadoop MapReduce. Spark est un logiciel open source développé par le laboratoire RAD de l'UC Berkeley en 2009. Depuis sa sortie publique en 2010, Spark a gagné en popularité et est utilisé dans l’industrie à une échelle sans précédent.
À l'ère de Big Data, les praticiens ont plus que jamais besoin d’outils rapides et fiables pour traiter le streaming des données. Les outils antérieurs comme MapReduce étaient préférés mais étaient lents. Pour surmonter ce problème, Spark propose une solution à la fois rapide et polyvalente. La principale différence entre Spark et MapReduce c'est ça Spark exécute des calculs en mémoire pendant ceux-ci sur le disque dur. Il permet un accès et un traitement des données à grande vitesse, réduisant les temps de plusieurs heures à quelques minutes.
Qu'est-ce que PySpark?
PySpark est un outil créé par Apache Spark Communauté d'utilisation Python avec Spark. Il permet de travailler avec RDD (Resilient Distributed Dataset) dans Python. Il propose également PySpark Shell à lier Python API avec Spark noyau pour initier Spark Le contexte. Spark est le moteur de nom pour réaliser le cluster computing, tandis que PySpark is Pythonla bibliothèque à utiliser Spark.
Comment La Spark marche ?
Spark est basé sur un moteur de calcul, ce qui signifie qu'il prend en charge l'application de planification, de distribution et de surveillance. Chaque tâche est effectuée sur diverses machines de travail appelées cluster informatique. Un cluster informatique fait référence à la division des tâches. Une machine effectue une tâche, tandis que les autres contribuent au résultat final via une tâche différente. En fin de compte, toutes les tâches sont regroupées pour produire un résultat. Le Spark L'administrateur donne un aperçu à 360 ° de divers Spark Offres d'emploi.
Spark est conçu pour fonctionner avec
- Python
- Java
- Scala
- SQL
Une caractéristique importante de Spark est la grande quantité de bibliothèques intégrées, y compris MLlib pour l'apprentissage automatique. Spark est également conçu pour fonctionner avec les clusters Hadoop et peut lire un large type de fichiers, notamment les données Hive, CSV, JSON, Casandra, entre autres.
Pourquoi utiliser Spark?
En tant que futur praticien des données, vous devez être familier avec les célèbres bibliothèques de python : Pandas et scikit-learn. Ces deux bibliothèques sont fantastiques pour explorer des ensembles de données de taille moyenne. Les projets réguliers de machine learning sont construits autour de la méthodologie suivante :
- Charger les données sur le disque
- Importer les données dans la mémoire de la machine
- Traiter/analyser les données
- Construire le modèle d'apprentissage automatique
- Stocker la prédiction sur le disque
Le problème se pose si le data scientist souhaite traiter des données trop volumineuses pour un seul ordinateur. Au début de la science des données, les praticiens échantillonnaient les données, car une formation sur d'énormes ensembles de données n'était pas toujours nécessaire. Le data scientist trouverait un bon échantillon statistique, effectuerait un contrôle de robustesse supplémentaire et proposerait un excellent modèle.
Cependant, cela pose quelques problèmes :
- L'ensemble de données reflète-t-il le monde réel ?
- Les données incluent-elles un exemple spécifique ?
- Le modèle est-il adapté à l’échantillonnage ?
Prenez par exemple les recommandations des utilisateurs. Les recommandateurs s'appuient sur la comparaison des utilisateurs avec d'autres utilisateurs pour évaluer leurs préférences. Si le praticien des données ne prend qu’un sous-ensemble des données, il n’y aura pas de cohorte d’utilisateurs très similaires les uns aux autres. Les recommandataires doivent fonctionner sur l'ensemble de données complet, voire pas du tout.
Quelle est la solution?
La solution était évidente depuis longtemps : répartir le problème sur plusieurs ordinateurs. L’informatique parallèle pose également de nombreux problèmes. Les développeurs ont souvent du mal à écrire du code parallèle et finissent par devoir résoudre de nombreux problèmes complexes liés au multitraitement lui-même.
Pyspark donne au data scientist une API qui peut être utilisée pour résoudre les problèmes de traitement des données parallèles. Pyspark gère les complexités du multitraitement, telles que la distribution des données, la distribution du code et la collecte des résultats des travailleurs sur un cluster de machines.
Spark peut fonctionner de manière autonome, mais s'exécute le plus souvent sur un framework informatique en cluster tel que Hadoop. En test et en développement, cependant, un data scientist peut exécuter efficacement Spark sur leurs boîtiers de développement ou leurs ordinateurs portables sans cluster
• L'un des principaux avantages de Spark Il s'agit de construire une architecture qui englobe la gestion du streaming de données, les requêtes de données transparentes, la prédiction par apprentissage automatique et l'accès en temps réel à diverses analyses.
• Spark travaille en étroite collaboration avec le langage SQL, c'est-à-dire les données structurées. Il permet d'interroger les données en temps réel.
• Le travail principal du Data Scientist est d'analyser et de construire des modèles prédictifs. En bref, un data scientist doit savoir comment interroger les données à l'aide de SQL, produire un rapport statistique et utiliser l'apprentissage automatique pour produire des prédictions. Les data scientists consacrent une grande partie de leur temps au nettoyage, à la transformation et à l'analyse des données. Une fois l’ensemble de données ou le flux de données prêt, le data scientist utilise diverses techniques pour découvrir des informations et des modèles cachés. La manipulation des données doit être robuste et tout aussi facile à utiliser. Spark est le bon outil grâce à sa rapidité et ses API riches.
Dans ce PySpark tutoriel, vous apprendrez à créer un classificateur avec PySpark exemples.
Comment installer PySpark avec AWS
La Jupyter l'équipe crée une image Docker à exécuter Spark efficacement. Voici les étapes que vous pouvez suivre pour installer PySpark instance dans AWS.
Référez-vous à notre tutoriel sur AWS et TensorFlow
Étape 1 : Créer une instance
Tout d'abord, vous devez créer une instance. Accédez à votre compte AWS et lancez l'instance. Vous pouvez augmenter le stockage jusqu'à 15 Go et utiliser le même groupe de sécurité que dans le didacticiel TensorFlow.
Étape 2 : ouvrez la connexion
Ouvrez la connexion et installez le conteneur Docker. Pour plus de détails, reportez-vous au tutoriel avec TensorFlow avec Docker. Notez que vous devez être dans le bon répertoire de travail.
Exécutez simplement ces codes pour installer Docker :
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Étape 3 : Rouvrez la connexion et installez Spark
Après avoir rouvert la connexion, vous pouvez installer l'image contenant PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Étape 4: Ouvert Jupyter
Vérifiez le conteneur et son nom
docker ps
Lancez le docker avec les journaux du docker suivis du nom du docker. Par exemple, Docker enregistre zealous_goldwasser
Allez sur votre navigateur et lancez Jupyter. L'adresse est http://localhost:8888/. Collez le mot de passe donné par le terminal.
Note: si vous souhaitez charger/télécharger un fichier sur votre machine AWS, vous pouvez utiliser le logiciel Cyberduck, https://cyberduck.io/.
Comment installer PySpark on Windows/Mac avec Conda
Vous trouverez ci-dessous un processus détaillé sur la façon d'installer PySpark on Windows/Mac utilisant Anaconda :
Pour installer Spark sur votre ordinateur local, une pratique recommandée consiste à créer un nouvel environnement conda. Ce nouvel environnement installera Python 3.6 Spark et toutes les dépendances.
Utilisateur Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Utilisateur
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Vous pouvez modifier le fichier .yml. Soyez prudent avec le retrait. Deux espaces sont requis avant –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Enregistrez-le et créez l’environnement. Cela prend du temps
conda env create -f hello-spark.yml
Pour plus de détails sur l'emplacement, veuillez consulter le tutoriel Installer TensorFlow
Vous pouvez vérifier tout l’environnement installé sur votre machine
conda env list
Activate hello-spark
Utilisateur Mac
source activate hello-spark
Windows Utilisateur
activate hello-spark
Remarque: Vous avez déjà créé un environnement TensorFlow spécifique pour exécuter les tutoriels sur TensorFlow. Il est plus pratique de créer un nouvel environnement différent de hello-tf. Cela n'a aucun sens de surcharger hello-tf avec Spark ou toute autre bibliothèque d’apprentissage automatique.
Imaginez que la majeure partie de votre projet implique TensorFlow, mais que vous devez utiliser Spark pour un projet particulier. Vous pouvez définir un environnement TensorFlow pour l'ensemble de votre projet et créer un environnement distinct pour Spark. Vous pouvez ajouter autant de bibliothèques dans Spark l'environnement comme vous le souhaitez sans interférer avec l'environnement TensorFlow. Une fois que vous avez terminé avec le Sparkdu projet, vous pouvez l'effacer sans affecter l'environnement TensorFlow.
Jupyter
Open Jupyter Notebook et essayez si PySpark fonctionne. Dans un nouveau bloc-notes, collez le Py suivantSpark exemple de code :
import pyspark from pyspark import SparkContext sc =SparkContext()
Si une erreur s'affiche, il est probable que Java n'est pas installé sur votre machine. Sous Mac, ouvrez le terminal et écrivez java -version, s'il existe une version Java, assurez-vous qu'il s'agit de 1.8. Windows, allez dans Application et vérifiez s'il existe un Java dossier. S'il y a un Java dossier, vérifiez que Java 1.8 est installé. Au moment d'écrire ces lignes, PySpark n'est pas compatible avec Java9 et ci-dessus.
Si vous devez installer Java, tu dois penser lien et téléchargez jdk-8u181-windows-x64.exe
Pour les utilisateurs Mac, il est recommandé d'utiliser « brew ».
brew tap caskroom/versions brew cask install java8
Reportez-vous à ce tutoriel étape par étape sur l'installation Java
Note: Utilisez Remove pour effacer complètement un environnement.
conda env remove -n hello-spark -y
Spark Contexte
SparkLe contexte est le moteur interne qui permet les connexions avec les clusters. Si vous souhaitez exécuter une opération, vous avez besoin d'un SparkLe contexte.
Créer un SparkContexte
Tout d'abord, vous devez lancer une SparkLe contexte.
import pyspark from pyspark import SparkContext sc =SparkContext()
Maintenant que la SparkLe contexte est prêt, vous pouvez créer une collection de données appelée RDD, Resilient Distributed Dataset. Le calcul dans un RDD est automatiquement parallélisé sur l’ensemble du cluster.
nums= sc.parallelize([1,2,3,4])
Vous pouvez accéder à la première ligne avec take
nums.take(1)
[1]
Vous pouvez appliquer une transformation aux données avec une fonction lambda. Au PySpark exemple ci-dessous, vous renvoyez le carré des nombres. C'est une transformation de carte
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
Contexte SQL
Un moyen plus pratique consiste à utiliser le DataFrame. SparkLe contexte est déjà défini, vous pouvez l'utiliser pour créer le dataFrame. Vous devez également déclarer le SQLContext
SQLContext permet de connecter le moteur à différentes sources de données. Il est utilisé pour initier les fonctionnalités de Spark SQL.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Maintenant dans ce Spark tutoriel Python, créons une liste de tuples. Chaque tuple contiendra le nom des personnes et leur âge. Quatre étapes sont nécessaires :
Étape 1) Créer la liste des tuples avec les informations
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Étape 2) Construire un RDD
rdd = sc.parallelize(list_p)
Étape 3) Convertir les tuples
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Étape 4) Créer un contexte DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Si vous souhaitez accéder au type de chaque fonctionnalité, vous pouvez utiliser printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Exemple d'apprentissage automatique avec PySpark
Maintenant que vous avez une brève idée de Spark et SQLContext, vous êtes prêt à créer votre premier programme d'apprentissage automatique.
Voici les étapes pour créer un programme de Machine Learning avec PySpark:
- Étape 1) Opération de base avec PySpark
- Étape 2) Pré-traitement des données
- Étape 3) Construire un pipeline de traitement de données
- Étape 4) Construire le classificateur : logistique
- Étape 5) Former et évaluer le modèle
- Étape 6) Ajuster l'hyperparamètre
Dans ce PySpark Tutoriel Machine Learning, nous utiliserons le jeu de données pour adultes. Le but de ce tutoriel est d'apprendre à utiliser Pyspark. Pour plus d'informations sur le jeu de données, reportez-vous à ce tutoriel.
Notez que l'ensemble de données n'est pas significatif et vous pouvez penser que le calcul prend beaucoup de temps. Spark est conçu pour traiter une quantité considérable de données. SparkLes performances de augmentent par rapport aux autres bibliothèques d'apprentissage automatique lorsque l'ensemble de données traité augmente.
Étape 1) Opération de base avec PySpark
Tout d'abord, vous devez initialiser le SQLContext qui n'est pas encore lancé.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
ensuite, vous pouvez lire le fichier cvs avec sqlContext.read.csv. Vous utilisez inferSchema défini sur True pour le dire Spark deviner automatiquement le type de données. Par défaut, il s'agit de False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Jetons un coup d'oeil au type de données
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Vous pouvez voir les données avec show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Si vous n'avez pas défini inderShema sur True, voici ce qui arrive au type. Il y en a tous en ficelle.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Pour convertir la variable continue au bon format, vous pouvez utiliser la refonte des colonnes. Vous pouvez utiliser withColumn pour le dire Spark quelle colonne opérer la transformation.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Sélectionnez les colonnes
Vous pouvez sélectionner et afficher les lignes avec select et les noms des fonctionnalités. Ci-dessous, l'âge et le poids fnlwgt sont sélectionnés.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Compter par groupe
Si vous souhaitez compter le nombre d'occurrences par groupe, vous pouvez enchaîner :
- par groupe()
- count ()
ensemble. Au PySpark Dans l'exemple ci-dessous, vous comptez le nombre de lignes par niveau d'éducation.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Décrire les données
Pour obtenir un résumé des statistiques des données, vous pouvez utiliser décrire(). Il calculera :
- compter
- signifier
- écart-type
- m.
- max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Si vous souhaitez les statistiques récapitulatives d'une seule colonne, ajoutez le nom de la colonne dans décrire()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Calcul de tableau croisé
Dans certaines occasions, il peut être intéressant de voir les statistiques descriptives entre deux colonnes par paires. Par exemple, vous pouvez compter le nombre de personnes dont le revenu est inférieur ou supérieur à 50 par niveau d'éducation. Cette opération est appelée tableau croisé.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Vous pouvez voir que personne n’a de revenus supérieurs à 50 lorsqu’il est jeune.
Déposer la colonne
Il existe deux API intuitives pour supprimer des colonnes :
- drop() : Supprime une colonne
- dropna() : supprimer les NA
Ci-dessous, déposez la colonne education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Filtrer les données
Vous pouvez utiliser filter() pour appliquer des statistiques descriptives à un sous-ensemble de données. Par exemple, vous pouvez compter le nombre de personnes de plus de 40 ans.
df.filter(df.age > 40).count()
13443
Descriptive statistiques par groupe
Enfin, vous pouvez regrouper les données par groupe et calculer des opérations statistiques comme la moyenne.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Étape 2) Prétraitement des données
Le traitement des données est une étape critique de l’apprentissage automatique. Après avoir supprimé les données inutiles, vous obtenez des informations importantes.
Par exemple, vous savez que l’âge n’est pas une fonction linéaire avec le revenu. Lorsque les gens sont jeunes, leurs revenus sont généralement inférieurs à ceux d’âge moyen. Après la retraite, un ménage utilise son épargne, ce qui entraîne une diminution de ses revenus. Pour capturer ce modèle, vous pouvez ajouter un carré à la fonction d'âge
Ajouter un carré d'âge
Pour ajouter une nouvelle fonctionnalité, vous devez :
- Sélectionnez la colonne
- Appliquez la transformation et ajoutez-la au DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Vous pouvez voir que age_square a été ajouté avec succès au bloc de données. Vous pouvez modifier l'ordre des variables avec select. Ci-dessous, vous apportez age_square juste après age.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Exclure Hollande-Pays-Bas
Lorsqu'un groupe au sein d'une entité n'a qu'une seule observation, il n'apporte aucune information au modèle. Au contraire, cela peut conduire à une erreur lors de la validation croisée.
Vérifions l'origine du ménage
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
La fonctionnalité native_country n'a qu'un seul foyer venant des Pays-Bas. Vous l'excluez.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Étape 3) Créer un pipeline de traitement des données
Semblable à scikit-learn, Pyspark dispose d'une API de pipeline.
Un pipeline est très pratique pour maintenir la structure des données. Vous poussez les données dans le pipeline. À l’intérieur du pipeline, diverses opérations sont effectuées, la sortie est utilisée pour alimenter l’algorithme.
Par exemple, une transformation universelle en apprentissage automatique consiste à convertir une chaîne en un encodeur à chaud, c'est-à-dire une colonne par groupe. Un encodeur chaud est généralement une matrice pleine de zéros.
Les étapes pour transformer les données sont très similaires à celles de scikit-learn. Vous devez:
- Indexer la chaîne en numérique
- Créez le seul encodeur chaud
- Transformez les données
Deux API font le travail : StringIndexer, OneHotEncoder
- Tout d’abord, vous sélectionnez la colonne de chaîne à indexer. Le inputCol est le nom de la colonne dans l'ensemble de données. outputCol est le nouveau nom donné à la colonne transformée.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Ajuster les données et les transformer
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Créez les colonnes d'actualités en fonction du groupe. Par exemple, s'il y a 10 groupes dans la fonctionnalité, la nouvelle matrice comportera 10 colonnes, une pour chaque groupe.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Construire le pipeline
Vous construirez un pipeline pour convertir toutes les fonctionnalités précises et les ajouter à l'ensemble de données final. Le pipeline comportera quatre opérations, mais n'hésitez pas à ajouter autant d'opérations que vous le souhaitez.
- Encoder les données catégorielles
- Indexer la fonctionnalité d'étiquette
- Ajouter une variable continue
- Assemblez les marches.
Chaque étape est stockée dans une liste nommée étapes. Cette liste indiquera au VectorAssembler quelle opération effectuer à l’intérieur du pipeline.
1. Encodez les données catégorielles
Cette étape est exactement la même que l'exemple ci-dessus, sauf que vous parcourez toutes les fonctionnalités catégorielles.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Indexez la fonction d'étiquette
Spark, comme beaucoup d'autres bibliothèques, n'accepte pas les valeurs de chaîne pour l'étiquette. Vous convertissez la fonctionnalité d'étiquette avec StringIndexer et l'ajoutez aux étapes de la liste
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Ajouter une variable continue
Le inputCols du VectorAssembler est une liste de colonnes. Vous pouvez créer une nouvelle liste contenant toutes les nouvelles colonnes. Le code ci-dessous remplit la liste avec les fonctionnalités catégorielles codées et les fonctionnalités continues.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Assemblez les marches.
Enfin, vous passez toutes les étapes dans VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Maintenant que toutes les étapes sont prêtes, vous transférez les données vers le pipeline.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Si vous vérifiez le nouvel ensemble de données, vous pouvez voir qu'il contient toutes les entités, transformées et non transformées. Vous n'êtes intéressé que par le nouveau label et les fonctionnalités. Les fonctionnalités incluent toutes les fonctionnalités transformées et les variables continues.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Étape 4) Construire le classificateur : logistique
Pour accélérer le calcul, vous convertissez le modèle en DataFrame.
Vous devez sélectionner une nouvelle étiquette et des fonctionnalités du modèle à l'aide de la carte.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Vous êtes prêt à créer les données du train sous forme de DataFrame. Vous utilisez le sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Vérifiez la deuxième ligne
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Créer un ensemble d'entraînement/test
Vous divisez l'ensemble de données 80/20 avec randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Comptons combien de personnes avec un revenu inférieur/supérieur à 50 dans l'ensemble de formation et de test
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Construire le régresseur logistique
Enfin et surtout, vous pouvez créer le classificateur. Pyspark dispose d'une API appelée LogisticRegression pour effectuer une régression logistique.
Vous initialisez lr en indiquant la colonne d'étiquette et les colonnes de fonctionnalités. Vous définissez un maximum de 10 itérations et ajoutez un paramètre de régularisation avec une valeur de 0.3. Notez que dans la section suivante, vous utiliserez la validation croisée avec une grille de paramètres pour affiner le modèle.
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Vous pouvez voir les coefficients de la régression
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Étape 5) Former et évaluer le modèle
Pour générer une prédiction pour votre ensemble de test,
Vous pouvez utiliser LinearModel avec transform() sur test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Vous pouvez imprimer les éléments dans les prédictions
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Vous êtes intéressé par le label, la prédiction et la probabilité
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Évaluer le modèle
Vous devez examiner la mesure de précision pour voir dans quelle mesure (ou mauvaise) le modèle fonctionne. Actuellement, il n'existe aucune API pour calculer la mesure de précision dans Spark. La valeur par défaut est la ROC, courbe caractéristique de fonctionnement du récepteur. Il s’agit d’une métrique différente qui prend en compte le taux de faux positifs.
Avant d'examiner le ROC, construisons la mesure de précision. Vous connaissez mieux cette métrique. La mesure de précision est la somme des prédictions correctes sur le nombre total d’observations.
Vous créez un DataFrame avec le label et la `prediction.
cm = predictions.select("label", "prediction")
Vous pouvez vérifier le numéro de classe dans l'étiquette et la prédiction
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Par exemple, dans l’ensemble de test, il y a 1578 50 ménages avec un revenu supérieur à 5021 617 et 50 en dessous. Le classificateur prédit cependant ménages avec un revenu supérieur à .
Vous pouvez calculer la précision en calculant le nombre lorsque les étiquettes sont correctement classées sur le nombre total de lignes.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Vous pouvez tout regrouper et écrire une fonction pour calculer la précision.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
Métriques ROC
Le module BinaryClassificationEvaluator inclut les mesures ROC. Le récepteur OperaLa courbe caractéristique est un autre outil couramment utilisé avec la classification binaire. Elle est très similaire à la courbe précision/rappel, mais au lieu de tracer la précision par rapport au rappel, la courbe ROC montre le taux de vrais positifs (c'est-à-dire le rappel) par rapport au taux de faux positifs. Le taux de faux positifs est le rapport des instances négatives qui sont incorrectement classées comme positives. Il est égal à un moins le taux véritablement négatif. Le taux véritablement négatif est également appelé spécificité. Par conséquent, la courbe ROC représente la sensibilité (rappel) par rapport à 1 – spécificité
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192zoneSous ROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Étape 6) Ajustez l'hyperparamètre
Enfin et surtout, vous pouvez régler les hyperparamètres. Semblable à scikit apprendre vous créez une grille de paramètres et vous ajoutez les paramètres que vous souhaitez régler.
Pour réduire le temps de calcul, vous ajustez uniquement le paramètre de régularisation avec seulement deux valeurs.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Enfin, vous évaluez le modèle en utilisant la méthode de valorisation croisée à 5 plis. Il faut environ 16 minutes pour s'entraîner.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Temps d'entraînement du modèle : 978.807 secondes
Le meilleur hyperparamètre de régularisation est 0.01, avec une précision de 85.316 %.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Vous pouvez extraire le paramètre recommandé en chaînant cvModel.bestModel avec extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Résumé
Spark est un outil fondamental pour un data scientist. Il permet au praticien de connecter une application à différentes sources de données, d'effectuer des analyses de données de manière transparente ou d'ajouter un modèle prédictif.
Pour commencer Spark, vous devez lancer un Spark Contexte avec :
»SparkContexte()'
et et SQL contexte pour se connecter à une source de données :
'SQLContext()'
Dans le didacticiel, vous apprenez à entraîner une régression logistique :
- Convertissez l'ensemble de données en Dataframe avec :
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Notez que le nom de la colonne de l'étiquette est newlabel et que toutes les fonctionnalités sont regroupées dans Features. Modifiez ces valeurs si elles sont différentes dans votre ensemble de données.
- Créer l'ensemble d'entraînement/test
randomSplit([.8,.2],seed=1234)
- Former le modèle
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Faire des prédictions
linearModel.transform()