Tutorial PySpark para iniciantes: aprenda com EXEMPLOS

Antes de aprender o PySpark, vamos entender:

O que é o Apache Spark?

Spark é uma solução de big data comprovadamente mais fácil e rápida que o Hadoop MapReduce. Spark é um software de código aberto desenvolvido pelo laboratório RAD da UC Berkeley em 2009. Desde que foi lançado ao público em 2010, o Spark cresceu em popularidade e é usado pela indústria em uma escala sem precedentes.

Na era da Big Data, os profissionais precisam mais do que nunca de ferramentas rápidas e confiáveis ​​para processar o streaming de dados. Ferramentas anteriores como MapReduce eram as favoritas, mas eram lentas. Para superar esse problema, o Spark oferece uma solução rápida e de uso geral. A principal diferença entre Spark e MapReduce é que o Spark executa cálculos na memória posteriormente no disco rígido. Permite acesso e processamento de dados em alta velocidade, reduzindo o tempo de horas para minutos.

O que é PySpark?

PySparkGenericName é uma ferramenta criada pela comunidade Apache Spark para usar Python com Spark. Permite trabalhar com RDD (Resilient Distributed Dataset) em Python. Ele também oferece PySpark Shell para vincular APIs Python ao núcleo Spark para iniciar o Spark Context. Spark é o mecanismo de nome para realizar a computação em cluster, enquanto PySpark é a biblioteca do Python para usar o Spark.

Como funciona o Spark?

O Spark é baseado em motor computacional, ou seja, cuida do agendamento, distribuição e monitoramento da aplicação. Cada tarefa é realizada em várias máquinas de trabalho chamadas cluster de computação. Um cluster de computação refere-se à divisão de tarefas. Uma máquina executa uma tarefa, enquanto as outras contribuem para o resultado final por meio de uma tarefa diferente. No final, todas as tarefas são agregadas para produzir um resultado. O administrador do Spark oferece uma visão geral 360º de vários trabalhos do Spark.

Como funciona o Spark
Como funciona o Spark

Spark foi projetado para funcionar com

  • Python
  • Java
  • Scala
  • SQL

Um recurso significativo do Spark é a grande quantidade de bibliotecas integradas, incluindo MLlib para aprendizado de máquina. O Spark também foi projetado para funcionar com clusters Hadoop e pode ler vários tipos de arquivos, incluindo dados Hive, CSV, JSON, dados Casandra, entre outros.

Por que usar o Spark?

Como futuro profissional de dados, você deve estar familiarizado com as famosas bibliotecas do python: Pandas e scikit-learn. Essas duas bibliotecas são fantásticas para explorar conjuntos de dados de tamanho médio. Projetos regulares de aprendizado de máquina são construídos em torno do seguintewing metodologia:

  • Carregue os dados no disco
  • Importe os dados para a memória da máquina
  • Processar/analisar os dados
  • Construa o modelo de aprendizado de máquina
  • Armazene a previsão de volta no disco

O problema surge se o cientista de dados quiser processar dados grandes demais para um computador. Durante os primeiros dias da ciência de dados, os profissionais experimentavam o treinamento em grandes conjuntos de dados nem sempre era necessário. O cientista de dados encontraria uma boa amostra estatística, realizaria uma verificação adicional de robustez e chegaria a um modelo excelente.

No entanto, existem alguns problemas com isso:

  • O conjunto de dados reflete o mundo real?
  • Os dados incluem um exemplo específico?
  • O modelo é adequado para amostragem?

Veja a recomendação dos usuários, por exemplo. Os recomendadores baseiam-se na comparação de usuários com outros usuários para avaliar suas preferências. Se o profissional de dados obtiver apenas um subconjunto dos dados, não haverá um grupo de usuários muito semelhantes entre si. Os recomendadores precisam ser executados no conjunto de dados completo ou não.

Qual é a solução?

A solução já é evidente há muito tempo: dividir o problema em vários computadores. A computação paralela também apresenta vários problemas. Os desenvolvedores muitas vezes têm problemas para escrever código paralelo e acabam tendo que resolver vários problemas de comunicação.plex questões em torno do próprio multiprocessamento.

O Pyspark fornece ao cientista de dados uma API que pode ser usada para resolver problemas de processamento paralelo de dados. Pyspark cuida do complexcapacidades de multiprocessamento, como distribuição de dados, distribuição de código e coleta de resultados dos trabalhadores em um cluster de máquinas.

O Spark pode ser executado de forma independente, mas na maioria das vezes é executado em uma estrutura de computação em cluster, como o Hadoop. Em teste e desenvolvimento, no entanto, um cientista de dados pode executar o Spark com eficiência em seu desenvolvimento boxes ou laptops sem cluster

• Uma das principais vantagens do Spark é construir uma arquitetura que engloba gerenciamento de streaming de dados, consultas de dados contínuas, previsão de aprendizado de máquina e acesso em tempo real a diversas análises.

• O Spark trabalha em estreita colaboração com a linguagem SQL, ou seja, dados estruturados. Permite consultar os dados em tempo real.

• A principal função do cientista de dados é analisar e construir modelos preditivos. Resumindo, um cientista de dados precisa saber como consultar dados usando SQL, produza um relatório estatístico e faça uso do aprendizado de máquina para produzir previsões. O cientista de dados gasta uma quantidade significativa de seu tempo limpando, transformando e analisando os dados. Assim que o conjunto de dados ou fluxo de trabalho de dados estiver pronto, o cientista de dados usa várias técnicas para descobrir insights e padrões ocultos. A manipulação dos dados deve ser robusta e igualmente fácil de usar. Spark é a ferramenta certa graças à sua velocidade e APIs ricas.

Neste tutorial do PySpark, você aprenderá como construir um classificador com exemplos do PySpark.

Como instalar o PySpark com AWS

A Jupyter equipe cria uma imagem Docker para executar o Spark com eficiência. Abaixo estão as etapas que você pode seguir para instalar a instância PySpark na AWS.

Consulte nosso tutorial em AWS e TensorFlow

Etapa 1: crie uma instância

Primeiro de tudo, você precisa criar uma instância. Acesse sua conta AWS e execute a instância. Você pode aumentar o armazenamento em até 15g e usar o mesmo grupo de segurança do tutorial do TensorFlow.

Passo 2: Abra a conexão

Abra a conexão e instale o contêiner docker. Para mais details, consulte o tutorial com TensorFlow com Estivador. Observe que você precisa estar no diretório de trabalho correto.

Basta executar estes códigos para instalar o Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Etapa 3: reabra a conexão e instale o Spark

Depois de reabrir a conexão, você poderá instalar a imagem que contém o PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Etapa 4: Abrir Jupyter

Verifique o contêiner e seu nome

docker ps

Inicie a janela de encaixe com os logs da janela de encaixe seguidos do nome da janela de encaixe. Por exemplo, docker registra zealous_goldwasser

Vá para o seu navegador e inicie Jupyter. O endereço é http://localhost:8888/. Cole a senha fornecida pelo terminal.

Note: se você deseja fazer upload/download de um arquivo para sua máquina AWS, você pode usar o software Cyberduck, https://cyberduck.io/.

Como instalar o PySpark no Windows/Mac com Conda

Following é um processo detalhado sobre como instalar o PySpark no Windows/Mac usando Anaconda:

Para instalar o Spark em sua máquina local, uma prática recomendada é criar um novo ambiente conda. Este novo ambiente instalará Python 3.6, Spark e todas as dependências.

Usuário Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Usuário do Windows

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Você pode editar o arquivo .yml. Tenha cuidado com o recuo. Dois espaços são necessários antes -

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Salve-o e crie o ambiente. Isso leva algum tempo

conda env create -f hello-spark.yml

Para mais details sobre a localização, consulte o tutorial Instalar TensorFlow

Você pode verificar todo o ambiente instalado em sua máquina

conda env list
Activate hello-spark

Usuário Mac

source activate hello-spark

Usuário do Windows

activate hello-spark

Nota: Você já criou um ambiente específico do TensorFlow para executar os tutoriais no TensorFlow. É mais conveniente criar um novo ambiente diferente do hello-tf. Não faz sentido sobrecarregar o hello-tf com Spark ou qualquer outra biblioteca de aprendizado de máquina.

Imagine que a maior parte do seu projeto envolve o TensorFlow, mas você precisa usar o Spark para um projeto específico. Você pode definir um ambiente TensorFlow para todo o seu projeto e criar um ambiente separado para o Spark. Você pode adicionar quantas bibliotecas desejar no ambiente Spark, sem interferir no ambiente TensorFlow. Depois de concluir o projeto do Spark, você pode apagá-lo sem afetar o ambiente do TensorFlow.

Jupyter

Abra Jupyter Notebook e teste se o PySpark funciona. Em um novo caderno cole o seguintewing Código de amostra PySpark:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Se for exibido um erro, é provável que o Java não esteja instalado em sua máquina. No mac, abra o terminal e escreva java -version, se houver uma versão java, certifique-se de que seja 1.8. No Windows, acesse Aplicativo e verifique se existe uma pasta Java. Se houver uma pasta Java, verifique se o Java 1.8 está instalado. No momento em que este livro foi escrito, PySpark não era compatível com Java9 e superior.

Se você precisa instalar o Java, pense link e baixe jdk-8u181-windows-x64.exe

Jupyter

Para usuários de Mac, é recomendado usar `brew.`

brew tap caskroom/versions
brew cask install java8

Consulte este tutorial passo a passo em como instalar Java

Note: Use remover para apagar completamente um ambiente.

 conda env remove -n hello-spark -y

Contexto de faísca

SparkContext é o motor interno que permite as conexões com os clusters. Se quiser executar uma operação, você precisa de um SparkContext.

Crie um SparkContext

Primeiro de tudo, você precisa iniciar um SparkContext.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Agora que o SparkContext está pronto, você pode criar uma coleção de dados chamada RDD, Resilient Distributed Dataset. A computação em um RDD é automaticamente paralelizada em todo o cluster.

nums= sc.parallelize([1,2,3,4])

Você pode acessar a primeira linha com take

nums.take(1)
[1]

Você pode aplicar uma transformação aos dados com uma função lambda. No exemplo do PySpark abaixo, você retorna o quadrado dos números. É uma transformação de mapa

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContexto

Uma maneira mais conveniente é usar o DataFrame. SparkContext já está definido, você pode usá-lo para criar o dataFrame. Você também precisa declarar o SQLContext

SQLContext permite conectar o mecanismo com diferentes fontes de dados. É usado para iniciar as funcionalidades do Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Agora, neste tutorial do Spark em Python, vamos criar uma lista de tuplas. Cada tupla conterá o nome das pessoas e sua idade. São necessárias quatro etapas:

Passo 1) Crie a lista de tupla com as informações

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Passo 2) Construa um RDD

rdd = sc.parallelize(list_p)

Passo 3) Converta as tuplas

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Passo 4) Crie um contexto DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Se quiser acessar o tipo de cada recurso, você pode usar printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Exemplo de aprendizado de máquina com PySpark

Agora que você tem uma breve ideia do Spark e do SQLContext, está pronto para construir seu primeiro programa de aprendizado de máquina.

Following estão as etapas para construir um programa de aprendizado de máquina com PySpark:

  • Passo 1) Operação básica com PySpark
  • Passo 2) Pré-processamento de dados
  • Passo 3) Crie um pipeline de processamento de dados
  • Passo 4) Construa o classificador: logístico
  • Passo 5) Treine e avalie o modelo
  • Passo 6) Ajuste o hiperparâmetro

Neste tutorial de aprendizado de máquina PySpark, usaremos o conjunto de dados adulto. O objetivo deste tutorial é aprender como usar o Pyspark. Para obter mais informações sobre o conjunto de dados, consulte este tutorial.

Observe que o conjunto de dados não é significativo e você pode pensar que o cálculo demora muito. O Spark foi projetado para processar uma quantidade considerável de dados. O desempenho do Spark aumenta em relação a outras bibliotecas de aprendizado de máquina quando o conjunto de dados processado aumenta.

Etapa 1) Operação básica com PySpark

Primeiro de tudo, você precisa inicializar o SQLContext que ainda não foi iniciado.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

então, você pode ler o arquivo cvs com sqlContext.read.csv. Você usa inferSchema definido como True para dizer ao Spark para adivinhar automaticamente o tipo de dados. Por padrão, é alterado para False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Vamos dar uma olhada no tipo de dados

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Você pode ver os dados com show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Se você não definiu inderShema como True, aqui está o que está acontecendo com o tipo. Estão todos em string.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Para converter a variável contínua no formato correto, você pode usar a reformulação das colunas. Você pode usar withColumn para informar ao Spark em qual coluna operar a transformação.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Selecionar colunas

Você pode selecionar e mostrar as linhas com select e os nomes dos recursos. Abaixo, idade e fnlwgt estão selecionados.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Contar por grupo

Se quiser contar o número de ocorrências por grupo, você pode encadear:

  • grupoBy()
  • contagem()

junto. No exemplo do PySpark abaixo, você conta o número de linhas por nível de escolaridade.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Descreva os dados

Para obter estatísticas resumidas dos dados, você pode usar description(). Ele irá calcular:

  • contar
  • significar
  • desvio padrão
  • minutos
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Se você deseja a estatística resumida de apenas uma coluna, adicione o nome da coluna dentro describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Cálculo de tabela cruzada

Em algumas ocasiões, pode ser interessante ver as estatísticas descritivas entre duas colunas emparelhadas. Por exemplo, você pode contar o número de pessoas com renda abaixo ou acima de 50 mil por nível de escolaridade. Esta operação é chamada de crosstab.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Você pode ver que nenhuma pessoa tem receita acima de 50 mil quando é jovem.

Coluna de queda

Existem duas APIs intuitivas para eliminar colunas:

  • drop(): descarta uma coluna
  • dropna(): Elimina NA's

Abaixo você solta a coluna education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Dados de filtro

Você pode usar filter() para aplicar estatísticas descritivas em um subconjunto de dados. Por exemplo, você pode contar o número de pessoas com mais de 40 anos

df.filter(df.age > 40).count()

13443

Estatísticas descritivas por grupo

Finalmente, você pode agrupar dados por grupo e calcular operações estatísticas como a média.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Etapa 2) Pré-processamento de dados

O processamento de dados é uma etapa crítica no aprendizado de máquina. Depois de remover dados inúteis, você obtém alguns insights importantes.

Por exemplo, você sabe que a idade não é uma função linear com a renda. Quando as pessoas são jovens, o seu rendimento é geralmente inferior ao da meia-idade. Após a reforma, o agregado familiar utiliza as suas poupanças, o que significa uma diminuição do rendimento. Para capturar esse padrão, você pode adicionar um quadrado ao recurso de idade

Adicionar quadrado de idade

Para adicionar um novo recurso, você precisa:

  1. Selecione a coluna
  2. Aplique a transformação e adicione-a ao DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Você pode ver que age_square foi adicionado com sucesso ao quadro de dados. Você pode alterar a ordem das variáveis ​​com select. Abaixo, você traz age_square logo após age.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Excluir Holanda-Holanda

Quando um grupo dentro de um recurso possui apenas uma observação, ele não traz nenhuma informação para o modelo. Pelo contrário, pode levar a um erro durante a validação cruzada.

Vamos verificar a origem do agregado familiar

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

O recurso native_country possui apenas um domicílio vindo da Holanda. Você exclui isso.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Etapa 3) Construir um pipeline de processamento de dados

Semelhante ao scikit-learn, o Pyspark possui uma API de pipeline.

Um pipeline é muito conveniente para manter a estrutura dos dados. Você envia os dados para o pipeline. Dentro do pipeline, diversas operações são realizadas, a saída é usada para alimentar o algoritmo.

Por exemplo, uma transformação universal em aprendizado de máquina consiste em converter uma string em um codificador ativo, ou seja, uma coluna por grupo. Um codificador ativo geralmente é uma matriz cheia de zeros.

As etapas para transformar os dados são muito semelhantes às do scikit-learn. Você precisa:

  • Indexar a string para numérico
  • Crie o único codificador ativo
  • Transforme os dados

Duas APIs fazem o trabalho: StringIndexer, OneHotEncoder

  1. Primeiro de tudo, você seleciona a coluna de string a ser indexada. O inputCol é o nome da coluna no conjunto de dados. outputCol é o novo nome dado à coluna transformada.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Ajuste os dados e transforme-os
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Crie as colunas de notícias com base no grupo. Por exemplo, se houver 10 grupos no recurso, a nova matriz terá 10 colunas, uma para cada grupo.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Construir o pipeline

Você construirá um pipeline para converter todos os recursos precisos e adicioná-los ao conjunto de dados final. O pipeline terá quatro operações, mas fique à vontade para adicionar quantas operações desejar.

  1. Codifique os dados categóricos
  2. Indexar o recurso de rótulo
  3. Adicionar variável contínua
  4. Monte as etapas.

Cada etapa é armazenada em uma lista denominada estágios. Esta lista informará ao VectorAssembler qual operação executar dentro do pipeline.

1. Codifique os dados categóricos

Esta etapa é exatamente igual ao exemplo acima, exceto que você percorre todos os recursos categóricos.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Indexe o recurso de rótulo

O Spark, como muitas outras bibliotecas, não aceita valores de string para o rótulo. Você converte o recurso de rótulo com StringIndexer e o adiciona aos estágios da lista

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Adicione variável contínua

O inputCols do VectorAssembler é uma lista de colunas. Você pode criar uma nova lista contendo todas as novas colunas. O código abaixo preenche a lista com recursos categóricos codificados e recursos contínuos.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Monte as etapas.

Finalmente, você passa todas as etapas no VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Agora que todas as etapas estão prontas, você envia os dados para o pipeline.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Se você verificar o novo conjunto de dados, verá que ele contém todos os recursos, transformados e não transformados. Você está interessado apenas no novo rótulo e nos recursos. Os recursos incluem todos os recursos transformados e as variáveis ​​contínuas.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Etapa 4) Construa o classificador: logístico

Para tornar o cálculo mais rápido, você converte o modelo em um DataFrame.

Você precisa selecionar um novo rótulo e recursos do modelo usando o mapa.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Você está pronto para criar os dados do trem como um DataFrame. Você usa o sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Verifique a segunda linha

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Crie um conjunto de treinamento/teste

Você divide o conjunto de dados 80/20 com randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Vamos contar quantas pessoas com renda abaixo/acima de 50 mil tanto no conjunto de treinamento quanto no conjunto de teste

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Construa o regressor logístico

Por último, mas não menos importante, você pode construir o classificador. Pyspark possui uma API chamada LogisticRegression para realizar regressão logística.

Você inicializa lr indicando a coluna de rótulo e as colunas de recursos. Você define um máximo de 10 iterações e adiciona um parâmetro de regularização com valor 0.3. Observe que na próxima seção, você usará a validação cruzada com uma grade de parâmetros para ajustar o modelo

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Você pode ver os coeficientes da regressão

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Etapa 5) Treine e avalie o modelo

Para gerar previsão para seu conjunto de testes,

Você pode usar linearModel com transform() em test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Você pode imprimir os elementos nas previsões

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Você está interessado no rótulo, na previsão e na probabilidade

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Avalie o modelo

Você precisa observar a métrica de precisão para ver o desempenho (ou ruim) do modelo. Atualmente, não há API para calcular a medida de precisão no Spark. O valor padrão é o ROC, curva característica de operação do receptor. É uma métrica diferente que leva em consideração a taxa de falsos positivos.

Antes de olhar para o ROC, vamos construir a medida de precisão. Você está mais familiarizado com essa métrica. A medida de precisão é a soma da previsão correta sobre o número total de observações.

Você cria um DataFrame com o rótulo e a `predição.

cm = predictions.select("label", "prediction")

Você pode verificar o número da turma no rótulo e na previsão

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Por exemplo, no conjunto de teste, há 1578 famílias com renda acima de 50 mil e 5021 abaixo. O classificador, no entanto, previu 617 famílias com rendimentos acima de 50 mil.

Você pode calcular a precisão calculando a contagem quando os rótulos são classificados corretamente no número total de linhas.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Você pode agrupar tudo e escrever uma função para calcular a precisão.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

Métricas ROC

O módulo BinaryClassificationEvaluator inclui as medidas ROC. A curva Receiver Operating Characteristic é outra ferramenta comum usada com classificação binária. É muito semelhante à curva de precisão/recall, mas em vez de representar graficamente precisão versus recall, a curva ROC mostra a taxa de verdadeiros positivos (ou seja, recall) contra a taxa de falsos positivos. A taxa de falsos positivos é a proporção de instâncias negativas que são classificadas incorretamente como positivas. É igual a um menos a taxa verdadeiramente negativa. A verdadeira taxa negativa também é chamada de especificidade. Portanto, a curva ROC representa a sensibilidade (recall) versus 1 – especificidade

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192áreaSobROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Etapa 6) Ajustar o hiperparâmetro

Por último, mas não menos importante, você pode ajustar os hiperparâmetros. Igual a scikit aprender você cria uma grade de parâmetros e adiciona os parâmetros que deseja ajustar.

Para reduzir o tempo de cálculo, você ajusta apenas o parâmetro de regularização com apenas dois valores.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Por fim, você avalia o modelo usando o método de avaliação cruzada com 5 dobras. Demora cerca de 16 minutos para treinar.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Tempo para treinar o modelo: 978.807 segundos

O melhor hiperparâmetro de regularização é 0.01, com precisão de 85.316 por cento.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Você pode extrair o parâmetro recomendado encadeando cvModel.bestModel com extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Resumo

Spark é uma ferramenta fundamental para um cientista de dados. Ele permite ao profissional conectar um aplicativo a diferentes fontes de dados, realizar análises de dados perfeitamente ou adicionar um modelo preditivo.

Para começar com o Spark, você precisa iniciar um contexto do Spark com:

'SparkContext()'

e e SQL contexto para se conectar a uma fonte de dados:

'SQLContext()'

No tutorial, você aprende como treinar uma regressão logística:

  1. Converta o conjunto de dados em um Dataframe com:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Observe que o nome da coluna do rótulo é newlabel e todos os recursos estão reunidos em recursos. Altere esses valores se forem diferentes em seu conjunto de dados.

  1. Crie o conjunto de treinamento/teste
randomSplit([.8,.2],seed=1234)
  1. Treine o modelo
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Fazer previsão
linearModel.transform()