PySpark Tutorial para principiantes: aprende con EJEMPLOS

Antes de aprender PySpark, entendamos:

que es apache Spark?

Spark es una solución de big data que ha demostrado ser más fácil y rápida que Hadoop MapReduce. Spark es un software de código abierto desarrollado por el laboratorio RAD de UC Berkeley en 2009. Desde su lanzamiento al público en 2010, Spark ha ganado popularidad y se utiliza en la industria a una escala sin precedentes.

En la era de Big Data, los profesionales necesitan más que nunca herramientas rápidas y confiables para procesar la transmisión de datos. Las herramientas anteriores como MapReduce eran las favoritas, pero eran lentas. Para superar este problema, Spark ofrece una solución rápida y de uso general. La principal diferencia entre Spark y MapReduce es que Spark Ejecuta cálculos en la memoria durante el transcurso del tiempo en el disco duro. Permite el acceso y procesamiento de datos a alta velocidad, reduciendo los tiempos de horas a minutos.

¿Qué es Py?Spark?

PySpark es una herramienta creada por Apache Spark Comunidad para usar Python con Spark. Permite trabajar con RDD (Resilient Distributed Dataset) en Python. También ofrece PySpark Shell para vincular Python API con Spark núcleo para iniciar Spark Contexto. Spark es el nombre del motor para realizar la computación en clúster, mientras que PySpark is Pythonbiblioteca para usar Spark.

Cómo Se Compara Spark funciona?

Spark se basa en un motor computacional, lo que significa que se encarga de la programación, distribución y monitoreo de la aplicación. Cada tarea se realiza en varias máquinas de trabajo llamadas clúster de computación. Un clúster de computación se refiere a la división de tareas. Una máquina realiza una tarea, mientras que las demás contribuyen al resultado final a través de una tarea diferente. Al final, todas las tareas se agregan para producir un resultado. Spark El administrador ofrece una descripción general de 360 ​​grados de varios Spark Empleo.

¿Cómo Spark Trabaja
¿Cómo Spark Trabaja

Spark está diseñado para trabajar con

  • Python
  • Java
  • Scala
  • SQL

Una característica significativa de Spark es la gran cantidad de biblioteca incorporada, incluido MLlib para aprendizaje automático. Spark También está diseñado para trabajar con clústeres Hadoop y puede leer una amplia variedad de tipos de archivos, incluidos datos de Hive, CSV, JSON, datos de Casandra, entre otros.

¿Por qué utilizar Spark?

Como futuro profesional de datos, deberías estar familiarizado con las famosas bibliotecas de Python: Pandas y scikit-learn. Estas dos bibliotecas son fantásticas para explorar conjuntos de datos de tamaño mediano. Los proyectos de aprendizaje automático habituales se basan en la siguiente metodología:

  • Cargar los datos al disco.
  • Importar los datos a la memoria de la máquina.
  • Procesar/analizar los datos
  • Construya el modelo de aprendizaje automático
  • Almacenar la predicción en el disco

El problema surge si el científico de datos quiere procesar datos que son demasiado grandes para una computadora. Durante los primeros días de la ciencia de datos, los profesionales tomaban muestras, ya que no siempre era necesaria la capacitación en grandes conjuntos de datos. El científico de datos encontraría una buena muestra estadística, realizaría una verificación de robustez adicional y generaría un modelo excelente.

Sin embargo, existen algunos problemas con esto:

  • ¿El conjunto de datos refleja el mundo real?
  • ¿Los datos incluyen un ejemplo específico?
  • ¿Es el modelo apto para el muestreo?

Tomemos como ejemplo las recomendaciones de los usuarios. Los recomendadores se basan en comparar usuarios con otros usuarios para evaluar sus preferencias. Si el profesional de datos toma solo un subconjunto de datos, no habrá una cohorte de usuarios que sean muy similares entre sí. Los recomendadores deben ejecutarse en todo el conjunto de datos o no ejecutarse en absoluto.

¿Cuál es la solución?

La solución ha sido evidente durante mucho tiempo: dividir el problema en varias computadoras. La computación paralela también conlleva múltiples problemas. Los desarrolladores a menudo tienen problemas para escribir código paralelo y terminan teniendo que resolver una serie de problemas complejos relacionados con el multiprocesamiento en sí.

Pyspark ofrece al científico de datos una API que puede utilizarse para resolver problemas de procesamiento de datos en paralelo. Pyspark se encarga de las complejidades del multiprocesamiento, como la distribución de datos, la distribución de código y la recopilación de resultados de los trabajadores en un clúster de máquinas.

Spark Puede ejecutarse de forma independiente, pero la mayoría de las veces se ejecuta sobre un marco de computación en clúster como Hadoop. Sin embargo, en pruebas y desarrollo, un científico de datos puede ejecutar de manera eficiente Spark en sus cajas de desarrollo o portátiles sin clúster

• Una de las principales ventajas de Spark Se trata de construir una arquitectura que abarque la gestión de la transmisión de datos, consultas de datos sin problemas, predicción de aprendizaje automático y acceso en tiempo real a diversos análisis.

• Spark Trabaja en estrecha colaboración con el lenguaje SQL, es decir, datos estructurados. Permite consultar los datos en tiempo real.

• El trabajo principal del científico de datos es analizar y construir modelos predictivos. En resumen, un científico de datos necesita saber cómo consultar datos utilizando SQL, producir un informe estadístico y utilizar el aprendizaje automático para producir predicciones. Los científicos de datos dedican una cantidad significativa de su tiempo a limpiar, transformar y analizar los datos. Una vez que el conjunto de datos o el flujo de trabajo de datos está listo, el científico de datos utiliza varias técnicas para descubrir conocimientos y patrones ocultos. La manipulación de datos debe ser sólida e igualmente fácil de usar. Spark es la herramienta adecuada gracias a su velocidad y sus ricas API.

En este PySpark tutorial, aprenderá cómo construir un clasificador con PySpark ejemplos.

Cómo instalar PySpark con AWS

La Jupyter El equipo crea una imagen de Docker para ejecutar. Spark eficientemente. A continuación se detallan los pasos que puede seguir para instalar PySpark instancia en AWS.

Consulte nuestro tutorial sobre AWS y TensorFlow

Paso 1: crear una instancia

En primer lugar, necesitas crear una instancia. Vaya a su cuenta de AWS e inicie la instancia. Puede aumentar el almacenamiento hasta 15 gy utilizar el mismo grupo de seguridad que en el tutorial de TensorFlow.

Paso 2: abre la conexión

Abra la conexión e instale el contenedor Docker. Para obtener más detalles, consulte el tutorial con TensorFlow con Docker. Tenga en cuenta que debe estar en el directorio de trabajo correcto.

Simplemente ejecute estos códigos para instalar Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Paso 3: Vuelva a abrir la conexión e instale Spark

Después de volver a abrir la conexión, puede instalar la imagen que contiene PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Paso 4: abierto Jupyter

Consulta el contenedor y su nombre.

docker ps

Inicie la ventana acoplable con los registros de la ventana acoplable seguidos del nombre de la ventana acoplable. Por ejemplo, Docker registra zealous_goldwasser

Vaya a su navegador y ejecute Jupyter. La dirección es http://localhost:8888/. Pega la contraseña proporcionada por el terminal.

Nota: si desea cargar/descargar un archivo en su máquina AWS, puede utilizar el software Cyberduck, https://cyberduck.io/.

Cómo instalar PySpark on Windows/Mac con Conda

A continuación se muestra un proceso detallado sobre cómo instalar PySpark on Windows/Mac usando Anaconda:

Para instalar Spark en su máquina local, una práctica recomendada es crear un nuevo entorno conda. Este nuevo entorno se instalará Python 3.6, Spark y todas las dependencias.

Usuario de Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows User

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Puede editar el archivo .yml. Tenga cuidado con la sangría. Se requieren dos espacios antes –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Guárdalo y crea el entorno. lleva algo de tiempo

conda env create -f hello-spark.yml

Para obtener más detalles sobre la ubicación, consulte el tutorial Instalar TensorFlow

Puede comprobar todo el entorno instalado en su máquina.

conda env list
Activate hello-spark

Usuario de Mac

source activate hello-spark

Windows User

activate hello-spark

Nota: Ya ha creado un entorno de TensorFlow específico para ejecutar los tutoriales en TensorFlow. Es más conveniente crear un nuevo entorno diferente a hello-tf. No tiene sentido sobrecargar hello-tf con Spark o cualquier otra biblioteca de aprendizaje automático.

Imagine que la mayor parte de su proyecto involucra TensorFlow, pero necesita usar Spark para un proyecto en particular. Puede configurar un entorno TensorFlow para todo su proyecto y crear un entorno separado para Spark. Puede agregar tantas bibliotecas en Spark entorno como desee sin interferir con el entorno de TensorFlow. Una vez que haya terminado con el Sparkdel proyecto, puedes borrarlo sin afectar el entorno de TensorFlow.

Jupyter

Abierto Jupyter Cuaderno y prueba si PySpark Funciona. En un nuevo cuaderno pegue el siguiente PySpark Código de muestra:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Si se muestra un error, es probable que Java No está instalado en su máquina. En Mac, abra la terminal y escriba java -version. Si hay una versión de Java, asegúrese de que sea la 1.8. Windows, vaya a Aplicación y verifique si hay una Java carpeta. Si hay un Java carpeta, verifique que Java 1.8 está instalado. Al momento de escribir este artículo, PySpark no es compatible con Java9 y superiores.

Si necesita instalar Java, tu a pensar enlace y descargar jdk-8u181-windows-x64.exe

Jupyter

Para usuarios de Mac, se recomienda utilizar "brew".

brew tap caskroom/versions
brew cask install java8

Consulte este tutorial paso a paso sobre cómo instalar Java

Nota: Utilice eliminar para borrar un entorno por completo.

 conda env remove -n hello-spark -y

Spark Contexto

SparkEl contexto es el motor interno que permite las conexiones con los clústeres. Si desea ejecutar una operación, necesita un SparkContexto.

Créar un SparkContexto

En primer lugar, es necesario iniciar un SparkContexto.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Ahora que la SparkEl contexto está listo, puedes crear una colección de datos llamada RDD, Resilient Distributed Dataset. El cálculo en un RDD se paraleliza automáticamente en todo el clúster.

nums= sc.parallelize([1,2,3,4])

Puedes acceder a la primera fila con tomar

nums.take(1)
[1]

Puede aplicar una transformación a los datos con una función lambda. en el pySpark En el siguiente ejemplo, se devuelve el cuadrado de números. Es una transformación del mapa.

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

Contexto SQL

Una forma más conveniente es utilizar DataFrame. SparkEl contexto ya está configurado, puede usarlo para crear el marco de datos. También necesitas declarar el SQLContext

SQLContext permite conectar el motor con diferentes fuentes de datos. Se utiliza para iniciar las funcionalidades de Spark SQL

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Ahora en esto Spark tutoriales Python, creemos una lista de tuplas. Cada tupla contendrá el nombre de las personas y su edad. Se requieren cuatro pasos:

Paso 1) Crea la lista de tuplas con la información.

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Paso 2) Construir un RDD

rdd = sc.parallelize(list_p)

Paso 3) Convertir las tuplas

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Paso 4) Crear un contexto de marco de datos

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Si desea acceder al tipo de cada característica, puede usar printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Ejemplo de aprendizaje automático con PySpark

Ahora que tienes una breve idea de Spark y SQLContext, está listo para crear su primer programa de aprendizaje automático.

A continuación se muestran los pasos para crear un programa de aprendizaje automático con PySpark:

  • Paso 1) Operación básica con PySpark
  • Paso 2) Preprocesamiento de datos
  • Paso 3) Construya un canal de procesamiento de datos
  • Paso 4) Construir el clasificador: logístico
  • Paso 5) Entrenar y evaluar el modelo.
  • Paso 6) Ajustar el hiperparámetro

En este PySpark Tutorial de aprendizaje automático. Utilizaremos el conjunto de datos para adultos. El objetivo de este tutorial es aprender a utilizar Pyspark. Para obtener más información sobre el conjunto de datos, consulte este tutorial.

Tenga en cuenta que el conjunto de datos no es significativo y puede pensar que el cálculo lleva mucho tiempo. Spark está diseñado para procesar una cantidad considerable de datos. SparkEl rendimiento de aumenta en relación con otras bibliotecas de aprendizaje automático cuando el conjunto de datos procesado crece.

Paso 1) Operación básica con PySpark

En primer lugar, debe inicializar el SQLContext que aún no se ha iniciado.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

luego, puede leer el archivo cvs con sqlContext.read.csv. Usas inferSchema configurado en True para saber Spark para adivinar automáticamente el tipo de datos. De forma predeterminada, cambia a Falso.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Echemos un vistazo al tipo de datos.

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Puedes ver los datos con show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Si no configuró inderShema en True, esto es lo que sucede con el tipo. Están todos en hilo.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Para convertir la variable continua al formato correcto, puede utilizar la refundición de las columnas. Puedes usar withColumn para saber Spark En qué columna operar la transformación.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Seleccionar columnas

Puede seleccionar y mostrar las filas con select y los nombres de las funciones. A continuación, se seleccionan edad y fnlwgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Contar por grupo

Si desea contar el número de ocurrencias por grupo, puede encadenar:

  • agrupar por()
  • contar()

juntos. en el pySpark En el siguiente ejemplo, se cuenta el número de filas según el nivel educativo.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Describe los datos

Para obtener un resumen estadístico de los datos, puede utilizar describe(). Calculará:

  • contar
  • mean
  • Desviación Estándar
  • min
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Si desea el resumen estadístico de una sola columna, agregue el nombre de la columna dentro de describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Cálculo de tablas cruzadas

En algunas ocasiones, puede resultar interesante ver las estadísticas descriptivas entre dos columnas por pares. Por ejemplo, se puede contar la cantidad de personas con ingresos inferiores o superiores a 50 XNUMX dólares por nivel de educación. Esta operación se denomina tabla de referencias cruzadas.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Se puede ver que ninguna persona tiene ingresos superiores a 50 cuando son jóvenes.

Columna de caída

Hay dos API intuitivas para eliminar columnas:

  • soltar(): soltar una columna
  • dropna(): Elimina NA

Debajo sueltas la columna num_educación

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Filtrar datos

Puede utilizar filter() para aplicar estadísticas descriptivas en un subconjunto de datos. Por ejemplo, puedes contar el número de personas mayores de 40 años.

df.filter(df.age > 40).count()

13443

DescriptCinco estadísticas por grupo.

Finalmente, puedes agrupar datos por grupo y calcular operaciones estadísticas como la media.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Paso 2) Preprocesamiento de datos

El procesamiento de datos es un paso crítico en el aprendizaje automático. Después de eliminar los datos basura, obtendrá información importante.

Por ejemplo, sabes que la edad no es una función lineal con los ingresos. Cuando las personas son jóvenes, sus ingresos suelen ser inferiores a los de la mediana edad. Después de la jubilación, un hogar utiliza sus ahorros, lo que significa una disminución de sus ingresos. Para capturar este patrón, puede agregar un cuadrado a la función de edad.

Añadir cuadro de edad

Para agregar una nueva función, debe:

  1. Seleccione la columna
  2. Aplicar la transformación y agregarla al DataFrame.
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Puede ver que age_square se agregó correctamente al marco de datos. Puede cambiar el orden de las variables con select. A continuación, traes age_square justo después de edad.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Excluir Holanda-Países Bajos

Cuando un grupo dentro de una característica tiene solo una observación, no aporta información al modelo. Por el contrario, puede dar lugar a un error durante la validación cruzada.

Comprobemos el origen del hogar.

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

La característica país_nativo tiene solo un hogar procedente de Holanda. Lo excluyes.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Paso 3) Construya un canal de procesamiento de datos

Similar a scikit-learn, Pyspark tiene una API de canalización.

Una canalización es muy conveniente para mantener la estructura de los datos. Insertas los datos en la tubería. Dentro de la canalización, se realizan varias operaciones y la salida se utiliza para alimentar el algoritmo.

Por ejemplo, una transformación universal en el aprendizaje automático consiste en convertir una cadena en un codificador activo, es decir, una columna por grupo. Un codificador activo suele ser una matriz llena de ceros.

Los pasos para transformar los datos son muy similares a los de scikit-learn. Necesitas:

  • Indexar la cadena a numérico
  • Crea el codificador activo
  • Transforma los datos

Dos API hacen el trabajo: StringIndexer, OneHotEncoder

  1. En primer lugar, selecciona la columna de cadena a indexar. inputCol es el nombre de la columna en el conjunto de datos. OutputCol es el nuevo nombre dado a la columna transformada.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Ajustar los datos y transformarlos
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Crea las columnas de noticias según el grupo. Por ejemplo, si hay 10 grupos en la función, la nueva matriz tendrá 10 columnas, una para cada grupo.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

construir el oleoducto

Construirá una canalización para convertir todas las características precisas y agregarlas al conjunto de datos final. La canalización tendrá cuatro operaciones, pero siéntase libre de agregar tantas operaciones como desee.

  1. Codificar los datos categóricos.
  2. Indexar la característica de la etiqueta
  3. Agregar variable continua
  4. Montar los escalones.

Cada paso se almacena en una lista denominada etapas. Esta lista le indicará a VectorAssembler qué operación realizar dentro de la tubería.

1. Codifique los datos categóricos.

Este paso es exactamente igual que el ejemplo anterior, excepto que recorre todas las características categóricas.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Indexar la característica de la etiqueta

Spark, como muchas otras bibliotecas, no acepta valores de cadena para la etiqueta. Convierte la función de etiqueta con StringIndexer y la agrega a las etapas de la lista

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Agregar variable continua

Los inputCols de VectorAssembler son una lista de columnas. Puede crear una nueva lista que contenga todas las columnas nuevas. El siguiente código completa la lista con características categóricas codificadas y características continuas.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Ensamble los escalones.

Finalmente, pasas todos los pasos en el VectorAssembler.

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Ahora que todos los pasos están listos, envía los datos a la canalización.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Si revisa el nuevo conjunto de datos, puede ver que contiene todas las características, transformadas y no transformadas. Solo le interesa la nueva etiqueta y las características. Las características incluyen todas las características transformadas y las variables continuas.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Paso 4) Construir el clasificador: logístico

Para agilizar el cálculo, convierta el modelo en un DataFrame.

Debe seleccionar una nueva etiqueta y características del modelo usando el mapa.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Está listo para crear los datos del tren como un DataFrame. Usas el sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Revisa la segunda fila

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Crear un conjunto de tren/prueba

Divide el conjunto de datos 80/20 con randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Contemos cuántas personas con ingresos inferiores o superiores a 50 XNUMX tanto en el conjunto de entrenamiento como en el de prueba

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Construir el regresor logístico

Por último, pero no por ello menos importante, puedes crear el clasificador. Pyspark tiene una API llamada LogisticRegression para realizar regresiones logísticas.

Inicializa lr indicando la columna de etiqueta y las columnas de características. Establece un máximo de 10 iteraciones y agrega un parámetro de regularización con un valor de 0.3. Tenga en cuenta que en la siguiente sección utilizará la validación cruzada con una cuadrícula de parámetros para ajustar el modelo.

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Puedes ver los coeficientes de la regresión.

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Paso 5) Entrenar y evaluar el modelo.

Para generar predicción para su conjunto de prueba,

Puedes usar linearModel con transform() en test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Puedes imprimir los elementos en las predicciones.

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Te interesa la etiqueta, la predicción y la probabilidad.

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Evaluar el modelo

Es necesario observar la métrica de precisión para ver qué tan bien (o mal) se desempeña el modelo. Actualmente, no existe una API para calcular la medida de precisión en Spark. El valor predeterminado es la curva característica operativa del receptor, ROC. Es una métrica diferente que tiene en cuenta la tasa de falsos positivos.

Antes de mirar la República de China, construyamos la medida de precisión. Estás más familiarizado con esta métrica. La medida de precisión es la suma de la predicción correcta sobre el número total de observaciones.

Creas un DataFrame con la etiqueta y la `predicción.

cm = predictions.select("label", "prediction")

Puedes consultar el número de clase en la etiqueta y la predicción.

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Por ejemplo, en el conjunto de prueba, hay 1578 hogares con ingresos superiores a 50 y 5021 por debajo. El clasificador, sin embargo, predijo 617 hogares con ingresos superiores a 50.

Puede calcular la precisión calculando el recuento cuando las etiquetas están clasificadas correctamente sobre el número total de filas.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Puede unir todo y escribir una función para calcular la precisión.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

Métricas de la República de China

El módulo BinaryClassificationEvaluator incluye las medidas ROC. El receptor OperaLa curva característica es otra herramienta común utilizada con la clasificación binaria. Es muy similar a la curva de precisión/recuperación, pero en lugar de representar gráficamente la precisión versus la recuperación, la curva ROC muestra la tasa de verdaderos positivos (es decir, recuperación) frente a la tasa de falsos positivos. La tasa de falsos positivos es la proporción de casos negativos que se clasifican incorrectamente como positivos. Es igual a uno menos la tasa negativa verdadera. La tasa de verdaderos negativos también se llama especificidad. Por lo tanto, la curva ROC representa la sensibilidad (recuerdo) versus 1 – especificidad.

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192áreaBajoROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Paso 6) Ajusta el hiperparámetro

Por último, pero no menos importante, puedes ajustar los hiperparámetros. Similar a scikit aprende usted crea una cuadrícula de parámetros y agrega los parámetros que desea ajustar.

Para reducir el tiempo de cálculo, solo ajusta el parámetro de regularización con solo dos valores.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Finalmente, se evalúa el modelo utilizando el método de valoración cruzada con 5 pliegues. Se necesitan unos 16 minutos para entrenar.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Tiempo para entrenar el modelo: 978.807 segundos.

El mejor hiperparámetro de regularización es 0.01, con una precisión del 85.316 por ciento.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Puede extraer el parámetro recomendado encadenando cvModel.bestModel con extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Resumen

Spark es una herramienta fundamental para un científico de datos. Permite al profesional conectar una aplicación a diferentes fuentes de datos, realizar análisis de datos sin problemas o agregar un modelo predictivo.

Para empezar Spark, es necesario iniciar un Spark Contexto con:

"SparkContexto()'

y y SQL contexto para conectarse a una fuente de datos:

'SQLContexto()'

En el tutorial, aprenderá cómo entrenar una regresión logística:

  1. Convierta el conjunto de datos en un marco de datos con:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Tenga en cuenta que el nombre de la columna de la etiqueta es nueva etiqueta y todas las funciones están reunidas en funciones. Cambie estos valores si son diferentes en su conjunto de datos.

  1. Crear el conjunto de tren/prueba
randomSplit([.8,.2],seed=1234)
  1. Entrenar a la modelo
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Hacer predicción
linearModel.transform()