PySpark Yeni Başlayanlar İçin Eğitim: ÖRNEKLERLE Öğrenin

Py'yi öğrenmeden önceSpark, anlayalım:

Apache nedir? Spark?

Spark Hadoop MapReduce'tan daha kolay ve daha hızlı olduğu kanıtlanmış bir büyük veri çözümüdür. Spark UC Berkeley RAD laboratuvarı tarafından 2009 yılında geliştirilen açık kaynaklı bir yazılımdır. 2010 yılında halka sunulduğundan bu yana, Spark popülerliği arttı ve sektörde benzeri görülmemiş bir ölçekte kullanılıyor.

Döneminde büyük Veriuygulayıcıların veri akışını işlemek için her zamankinden daha hızlı ve güvenilir araçlara ihtiyacı var. MapReduce gibi eski araçlar favoriydi ancak yavaştı. Bu sorunun üstesinden gelmek için, Spark hem hızlı hem de genel amaçlı bir çözüm sunuyor. Aradaki temel fark Spark ve MapReduce bu Spark Daha sonra sabit diskte bellekte hesaplamalar çalıştırır. Yüksek hızlı erişim ve veri işlemeye izin vererek, süreleri saatlerden dakikalara düşürür.

Py nedir?Spark?

PySpark Apache tarafından oluşturulmuş bir araçtır Spark Kullanmak için topluluk Python ile Spark. RDD (Esnek Dağıtılmış Veri Kümesi) ile çalışmaya olanak tanır. Python. Ayrıca Py'yi de sunuyorSpark Bağlanacak kabuk Python API'ler Spark başlatmak için çekirdek Spark Bağlam. Spark küme hesaplamayı gerçekleştiren isim motorudur, Py iseSpark is PythonKullanılacak kütüphane Spark.

Nasıl Spark çalışır?

Spark hesaplamalı motora dayalıdır, yani planlama, dağıtım ve izleme uygulamasıyla ilgilenir. Her görev, bilgi işlem kümesi adı verilen çeşitli çalışan makinelerde gerçekleştirilir. Bilgi işlem kümesi, görevlerin bölünmesini ifade eder. Bir makine bir görevi yerine getirirken diğerleri farklı bir görev aracılığıyla nihai çıktıya katkıda bulunur. Sonunda, tüm görevler bir çıktı üretmek için bir araya getirilir. Spark admin çeşitli konularda 360 genel bakış sunar Spark Meslekler.

Nasıl Spark İş
Nasıl Spark İş

Spark ile çalışmak üzere tasarlanmıştır

  • Python
  • Java
  • Scala
  • SQL

Önemli bir özelliği Spark makine öğrenimi için MLlib'i de içeren çok sayıda yerleşik kitaplıktır. Spark ayrıca Hadoop kümeleriyle çalışacak şekilde tasarlanmıştır ve aralarında Hive verileri, CSV, JSON, Casandra verilerinin de bulunduğu geniş dosya türlerini okuyabilir.

Neden kullanım Spark?

Gelecekte bir veri uygulayıcısı olarak, Python'ın ünlü kütüphanelerine aşina olmalısınız: Pandas ve scikit-learn. Bu iki kütüphane, orta büyüklükteki veri kümelerini keşfetmek için harikadır. Düzenli makine öğrenimi projeleri aşağıdaki metodoloji etrafında oluşturulur:

  • Verileri diske yükleyin
  • Verileri makinenin belleğine aktarın
  • Verileri işleyin/analiz edin
  • Makine öğrenimi modelini oluşturma
  • Tahmini diske geri saklayın

Sorun, veri bilimcinin bir bilgisayar için çok büyük olan verileri işlemek istemesi durumunda ortaya çıkar. Veri biliminin ilk günlerinde, büyük veri kümeleri üzerinde eğitime her zaman ihtiyaç duyulmadığından uygulayıcılar bunu örnekliyorlardı. Veri bilimci iyi bir istatistiksel örnek bulacak, ek bir sağlamlık kontrolü gerçekleştirecek ve mükemmel bir model ortaya çıkaracaktır.

Ancak bununla ilgili bazı sorunlar var:

  • Veri seti gerçek dünyayı yansıtıyor mu?
  • Veriler spesifik bir örnek içeriyor mu?
  • Model örneklemeye uygun mu?

Örneğin kullanıcıların tavsiyesini alın. Önericiler, tercihlerini değerlendirirken kullanıcıları diğer kullanıcılarla karşılaştırmaya güvenir. Veri uygulayıcısı verilerin yalnızca bir alt kümesini alırsa, birbirine çok benzeyen bir kullanıcı grubu olmayacaktır. Öneri sahiplerinin tüm veri kümesi üzerinde çalışması veya hiç çalışmaması gerekir.

Çözüm nedir?

Çözüm uzun zamandır belliydi, sorunu birden fazla bilgisayara bölmek. Paralel hesaplama da birden fazla sorunla birlikte gelir. Geliştiriciler genellikle paralel kod yazmakta zorluk çeker ve sonunda çoklu işleme etrafındaki karmaşık sorunların bir kısmını çözmek zorunda kalırlar.

Pyspark, veri bilimcilerine paralel veri işleme sorunlarını çözmek için kullanılabilecek bir API sunar. Pyspark, verileri dağıtma, kodu dağıtma ve bir makine kümesindeki çalışanlardan çıktı toplama gibi çoklu işleme karmaşıklıklarını ele alır.

Spark bağımsız olarak çalışabilir ancak çoğu zaman Hadoop gibi bir küme bilişim çerçevesi üzerinde çalışır. Ancak test ve geliştirmede bir veri bilimci verimli bir şekilde çalışabilir Spark kümeleme olmadan geliştirme kutularında veya dizüstü bilgisayarlarında

• Başlıca avantajlarından biri Spark Veri akışı yönetimini, sorunsuz veri sorgularını, makine öğrenimi tahminlerini ve çeşitli analizlere gerçek zamanlı erişimi kapsayan bir mimari oluşturmaktır.

• Spark SQL diliyle, yani yapılandırılmış verilerle yakın çalışır. Verilerin gerçek zamanlı olarak sorgulanmasına olanak sağlar.

• Veri bilimcinin asıl görevi tahmine dayalı modelleri analiz etmek ve oluşturmaktır. Kısacası, bir veri bilimcinin veriyi kullanarak verileri nasıl sorgulayacağını bilmesi gerekir. SQL, istatistiksel bir rapor üretin ve tahminler üretmek için makine öğreniminden yararlanın. Veri bilimci zamanının önemli bir kısmını verileri temizlemeye, dönüştürmeye ve analiz etmeye harcıyor. Veri kümesi veya veri iş akışı hazır olduğunda veri bilimcisi, içgörüleri ve gizli kalıpları keşfetmek için çeşitli teknikler kullanır. Veri manipülasyonu sağlam ve aynı şekilde kullanımı kolay olmalıdır. Spark hızı ve zengin API'leri sayesinde doğru araçtır.

Bu Py'deSpark öğreticide Py ile nasıl sınıflandırıcı oluşturulacağını öğreneceksinizSpark örnekleri.

Py Nasıl KurulurSpark AWS'li

The Jupyter ekip çalıştırılacak bir Docker görüntüsü oluşturdu Spark verimli bir şekilde. Py yüklemek için izleyebileceğiniz adımlar aşağıdadırSpark AWS'deki örnek.

Şuradaki eğitimimize bakın: AWS ve TensorFlow

1. Adım: Bir Örnek Oluşturun

Öncelikle bir örnek oluşturmanız gerekiyor. AWS hesabınıza gidin ve örneği başlatın. Depolamayı 15 g'a kadar artırabilir ve TensorFlow eğitimindekiyle aynı güvenlik grubunu kullanabilirsiniz.

2. Adım: Bağlantıyı açın

Bağlantıyı açın ve docker konteynerini yükleyin. Daha fazla ayrıntı için TensorFlow ile öğreticiye bakın liman işçisi. Doğru çalışma dizininde olmanız gerektiğini unutmayın.

Docker'ı yüklemek için şu kodları çalıştırmanız yeterlidir:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

3. Adım: Bağlantıyı yeniden açın ve yükleyin Spark

Bağlantıyı yeniden açtıktan sonra Py içeren imajı yükleyebilirsiniz.Spark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Adım 4: Açık Jupyter

Kapsayıcıyı ve adını kontrol edin

docker ps

Docker'ı docker günlükleriyle ve ardından docker'ın adıyla başlatın. Örneğin, liman işçisi zealous_goldwasser'ı günlüğe kaydeder

Tarayıcınıza gidin ve başlatın Jupyter. Adres http://localhost:8888/. Terminal tarafından verilen şifreyi yapıştırın.

not: AWS makinenize dosya yüklemek/indirmek istiyorsanız Cyberduck yazılımını kullanabilirsiniz, https://cyberduck.io/.

Py Nasıl KurulurSpark on Windows/Conda ile Mac

Aşağıda Py'nin nasıl kurulacağına dair ayrıntılı bir süreç yer almaktadırSpark on Windows/Mac Anaconda'yı kullanıyor:

Yüklemek için Spark yerel makinenizde yeni bir conda ortamı oluşturmak önerilen bir uygulamadır. Bu yeni ortam yüklenecek Python 3.6, Spark ve tüm bağımlılıklar.

Mac Kullanıcısı

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows kullanıcı

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

.yml dosyasını düzenleyebilirsiniz. Girinti konusunda dikkatli olun. Önce iki boşluk gereklidir –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Kaydedin ve ortamı yaratın. Biraz zaman alır

conda env create -f hello-spark.yml

Konum hakkında daha fazla ayrıntı için lütfen TensorFlow'u Yükleme öğreticisini inceleyin

Makinenizde yüklü olan tüm ortamı kontrol edebilirsiniz.

conda env list
Activate hello-spark

Mac Kullanıcısı

source activate hello-spark

Windows kullanıcı

activate hello-spark

Not: TensorFlow'daki eğitimleri çalıştırmak için zaten belirli bir TensorFlow ortamı oluşturdunuz. Hello-tf'den farklı yeni bir ortam yaratmak daha uygundur. Hello-tf'yi aşırı yüklemenin bir anlamı yok Spark veya başka herhangi bir makine öğrenimi kütüphanesi.

Projenizin çoğunun TensorFlow'u içerdiğini ancak kullanmanız gerektiğini düşünün. Spark belirli bir proje için. Tüm projeniz için TensorFlow ortamı ayarlayabilir ve projeniz için ayrı bir ortam oluşturabilirsiniz. Spark. İstediğiniz kadar kitaplık ekleyebilirsiniz Spark TensorFlow ortamına müdahale etmeden ortamı istediğiniz gibi kullanın. İşleminiz bittiğinde Sparkprojesini TensorFlow ortamını etkilemeden silebilirsiniz.

Jupyter

Açılış Jupyter Not defteri ve Py'yi deneyinSpark çalışır. Yeni bir not defterine aşağıdaki Py'yi yapıştırınSpark basit kod:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Bir hata gösterilirse büyük ihtimalle Java makinenize kurulu değil. Mac'te, terminali açın ve java -version yazın, eğer bir java sürümü varsa, 1.8 olduğundan emin olun. Windows, Uygulama'ya gidin ve bir uygulama olup olmadığını kontrol edin. Java dosya. eğer varsa Java klasör, şunu kontrol et Java 1.8 yüklü. Bu yazının yazıldığı an itibarıyla PySpark ile uyumlu değil Java9 ve üstü.

Yüklemeniz gerekiyorsa Java, düşünmelisin Link ve jdk-8u181-windows-x64.exe dosyasını indirin

Jupyter

Mac Kullanıcısı için "brew" kullanılması tavsiye edilir.

brew tap caskroom/versions
brew cask install java8

Bu adım adım eğitime bakın yükleme Java

not: Bir ortamı tamamen silmek için kaldır'ı kullanın.

 conda env remove -n hello-spark -y

Spark bağlam

SparkBağlam, kümelerle bağlantılara izin veren dahili motordur. Bir operasyonu yürütmek istiyorsanız, bir SparkBağlam.

Hat için bir Sparkbağlam

Öncelikle bir girişim başlatmanız gerekiyor SparkBağlam.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Şimdi SparkBağlam hazır, RDD, Resilient Distributed Dataset adı verilen bir veri koleksiyonu oluşturabilirsiniz. Bir RDD'deki hesaplama, küme genelinde otomatik olarak paralelleştirilir.

nums= sc.parallelize([1,2,3,4])

İlk satıra take ile ulaşabilirsiniz.

nums.take(1)
[1]

Lambda fonksiyonuyla verilere dönüşüm uygulayabilirsiniz. Py'deSpark Aşağıdaki örnekte sayıların karesini döndürürsünüz. Bu bir harita dönüşümüdür

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQL Bağlamı

Daha uygun bir yol DataFrame'i kullanmaktır. SparkBağlam zaten ayarlanmıştır, bunu dataFrame'i oluşturmak için kullanabilirsiniz. Ayrıca SQLContext'i de bildirmeniz gerekir.

SQLContext, motorun farklı veri kaynaklarına bağlanmasına olanak tanır. İşlevsellikleri başlatmak için kullanılır. Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Şimdi bunda Spark öğretici Python, bir tuple listesi oluşturalım. Her bir demet, kişilerin adını ve yaşlarını içerecektir. Dört adım gereklidir:

) 1 Adım Bilgileri içeren Tuple listesini oluşturun

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

) 2 Adım Bir RDD oluşturun

rdd = sc.parallelize(list_p)

) 3 Adım Tuple'ları dönüştür

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

) 4 Adım DataFrame bağlamı oluşturma

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Her özelliğin türüne erişmek istiyorsanız printSchema() işlevini kullanabilirsiniz.

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Py ile Makine Öğrenimi ÖrneğiSpark

Artık kısa bir fikriniz olduğuna göre Spark ve SQLContext ile ilk Makine öğrenimi programınızı oluşturmaya hazırsınız.

Py ile bir Makine Öğrenmesi programı oluşturmanın adımları şunlardır:Spark:

  • ) 1 Adım Py ile temel işlemSpark
  • ) 2 Adım Veri ön işleme
  • ) 3 Adım Veri işleme hattı oluşturun
  • ) 4 Adım Sınıflandırıcıyı oluşturun: lojistik
  • ) 5 Adım Modeli eğitin ve değerlendirin
  • ) 6 Adım Hiperparametreyi ayarlayın

Bu Py'deSpark Makine Öğrenmesi eğitiminde yetişkin veri setini kullanacağız. Bu eğitimin amacı Pyspark'ı nasıl kullanacağınızı öğrenmektir. Veri seti hakkında daha fazla bilgi için bu eğitime bakın.

Veri kümesinin anlamlı olmadığını ve hesaplamanın uzun sürdüğünü düşünebileceğinizi unutmayın. Spark Önemli miktarda veriyi işlemek için tasarlanmıştır. Sparkİşlenen veri kümesi büyüdükçe performansı diğer makine öğrenimi kitaplıklarına göre artar.

Adım 1) Py ile temel işlemSpark

Her şeyden önce, henüz başlatılmamış olan SQLContext'i başlatmanız gerekir.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

daha sonra cvs dosyasını sqlContext.read.csv ile okuyabilirsiniz. Bunu söylemek için inferSchema'yı True olarak ayarlayarak kullanırsınız Spark Veri türünü otomatik olarak tahmin etmek için. Varsayılan olarak False'a dönüşür.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Veri türüne bir göz atalım

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Verileri show ile görebilirsiniz.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

inderShema'yı True olarak ayarlamadıysanız, türe şunlar olur: Hepsi dizide var.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Sürekli değişkeni doğru formatta dönüştürmek için sütunları yeniden düzenlemeyi kullanabilirsiniz. Bunu söylemek için withColumn'u kullanabilirsiniz. Spark Dönüşümün hangi sütunda gerçekleştirileceği.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Sütun seçin

Select ile satırları ve özellik adlarını seçip gösterebilirsiniz. Aşağıda yaş ve fnlwgt seçilmiştir.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Gruba göre say

Oluşum sayısını gruba göre saymak istiyorsanız aşağıdakileri zincirleyebilirsiniz:

  • grupBy()
  • Miktar()

birlikte. Py'deSpark Aşağıdaki örnekte satır sayısını eğitim düzeyine göre sayarsınız.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Verileri açıklayın

Verilerin özet istatistiklerini almak için, define() işlevini kullanabilirsiniz. Şunları hesaplayacaktır:

  • saymak
  • ortalama
  • standart sapma
  • dk
  • maksimum
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Yalnızca bir sütunun özet istatistiğini istiyorsanız, sütunun adını define() içine ekleyin.

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Çapraz tablo hesaplaması

Bazı durumlarda, iki çift sütun arasındaki tanımlayıcı istatistikleri görmek ilginç olabilir. Örneğin, eğitim seviyesine göre geliri 50k'nın altında veya üstünde olan kişilerin sayısını sayabilirsiniz. Bu işleme çapraz tablo denir.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Gençken hiçbir insanın 50 binin üzerinde geliri olmadığını görebilirsiniz.

Sütunu bırak

Sütunları bırakmak için iki sezgisel API vardır:

  • drop(): Bir sütunu bırakır
  • dropna(): NA'ları bırakır

Aşağıya education_num sütununu bırakın

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Verileri filtrele

Tanımlayıcı istatistikleri bir veri alt kümesine uygulamak için filter() işlevini kullanabilirsiniz. Örneğin 40 yaş üstü kişileri sayabilirsiniz.

df.filter(df.age > 40).count()

13443

DescriptGruba göre canlı istatistikler

Son olarak, verileri gruba göre gruplayabilir ve ortalama gibi istatistiksel işlemleri hesaplayabilirsiniz.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Adım 2) Veri ön işleme

Veri işleme, makine öğreniminde kritik bir adımdır. Çöp verilerini kaldırdıktan sonra bazı önemli bilgiler elde edersiniz.

Mesela yaşın gelirle doğrusal bir fonksiyon olmadığını biliyorsunuz. İnsanlar gençken gelirleri genellikle orta yaşın altındadır. Emeklilik sonrasında hanehalkı tasarruflarını kullanır, bu da gelirin azalması anlamına gelir. Bu modeli yakalamak için yaş özelliğine bir kare ekleyebilirsiniz.

Yaş karesi ekle

Yeni bir özellik eklemek için şunları yapmanız gerekir:

  1. sütunu seçin
  2. Dönüşümü uygulayın ve DataFrame'e ekleyin
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Age_square'in veri çerçevesine başarıyla eklendiğini görebilirsiniz. Select ile değişkenlerin sırasını değiştirebilirsiniz. Aşağıda, yaştan hemen sonra age_square'i getiriyorsunuz.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Hollanda-Hollanda'yı hariç tut

Bir özellik içindeki bir grubun yalnızca bir gözlemi varsa, bu durum modele hiçbir bilgi getirmez. Tam tersine çapraz doğrulama sırasında hataya yol açabilir.

Hane halkının kökenini kontrol edelim

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Native_country özelliğinde Hollanda'dan gelen yalnızca bir hane var. Siz onu hariç tutun.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Adım 3) Bir veri işleme hattı oluşturun

Scikit-learn'e benzer şekilde Pyspark'ın da bir pipeline API'si var.

Verilerin yapısını korumak için bir boru hattı çok uygundur. Verileri boru hattına aktarırsınız. Boru hattının içinde çeşitli işlemler yapılır, çıktı algoritmayı beslemek için kullanılır.

Örneğin, makine öğrenimindeki evrensel dönüşümlerden biri, bir dizenin bir sıcak kodlayıcıya, yani grup bazında bir sütuna dönüştürülmesinden oluşur. Bir sıcak kodlayıcı genellikle sıfırlarla dolu bir matristir.

Verileri dönüştürme adımları scikit-learn'e çok benzer. Gerek:

  • Dizeyi sayısal olarak indeksleyin
  • Tek etkin kodlayıcıyı oluşturun
  • Verileri dönüştürün

İki API işi yapar: StringIndexer, OneHotEncoder

  1. Öncelikle dizine eklenecek dize sütununu seçersiniz. inputCol, veri kümesindeki sütunun adıdır. OutputCol, dönüştürülen sütuna verilen yeni addır.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Verileri sığdırın ve dönüştürün
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Haber sütunlarını gruba göre oluşturun. Örneğin, özellikte 10 grup varsa, yeni matrisin her grup için bir tane olmak üzere 10 sütunu olacaktır.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Boru hattını oluşturun

Tüm kesin özellikleri dönüştürmek ve bunları son veri kümesine eklemek için bir işlem hattı oluşturacaksınız. İşlem hattında dört işlem olacaktır, ancak istediğiniz kadar işlem eklemekten çekinmeyin.

  1. Kategorik verileri kodlayın
  2. Etiket özelliğini indeksleyin
  3. Sürekli değişken ekle
  4. Adımları birleştirin.

Her adım, aşamalar adı verilen bir listede saklanır. Bu liste, VectorAssembler'a işlem hattı içinde hangi işlemin gerçekleştirileceğini söyleyecektir.

1. Kategorik verileri kodlayın

Bu adım, tüm kategorik özelliklerin üzerinden geçmeniz dışında yukarıdaki örnekle tamamen aynıdır.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Etiket özelliğini indeksleyin

Sparkdiğer birçok kitaplık gibi, etiket için dize değerlerini kabul etmez. Etiket özelliğini StringIndexer ile dönüştürüp liste aşamalarına eklersiniz

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Sürekli değişken ekleyin

VectorAssembler'ın inputCol'leri bir sütun listesidir. Tüm yeni sütunları içeren yeni bir liste oluşturabilirsiniz. Aşağıdaki kod, listeyi kodlanmış kategorik özellikler ve sürekli özelliklerle doldurur.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Adımları birleştirin.

Son olarak VectorAssembler'daki tüm adımları geçersiniz

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Artık tüm adımlar hazır olduğuna göre verileri ardışık düzene aktarırsınız.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Yeni veri setini kontrol ederseniz dönüştürülmüş ve dönüştürülmemiş tüm özellikleri içerdiğini görebilirsiniz. Yalnızca yeni etiket ve özelliklerle ilgileniyorsunuz. Özellikler, dönüştürülen tüm özellikleri ve sürekli değişkenleri içerir.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Adım 4) Sınıflandırıcıyı oluşturun: lojistik

Hesaplamayı daha hızlı hale getirmek için modeli bir DataFrame'e dönüştürürsünüz.

Haritayı kullanarak modelden yeni etiketi ve özellikleri seçmeniz gerekir.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Tren verilerini DataFrame olarak oluşturmaya hazırsınız. SqlContext'i kullanıyorsunuz

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

İkinci satırı kontrol edin

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Bir eğitim/test seti oluşturun

Veri kümesini 80/20'ye randomSplit ile bölersiniz.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Hem eğitim hem de test setinde geliri 50k'nin altında/üstünde olan kaç kişi olduğunu sayalım

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Lojistik regresörü oluşturun

Son olarak sınıflandırıcıyı oluşturabilirsiniz. Pyspark'ın lojistik regresyon gerçekleştirmek için LogisticRegression adında bir API'si vardır.

Etiket sütununu ve özellik sütunlarını belirterek lr'yi başlatırsınız. Maksimum 10 yineleme belirlersiniz ve 0.3 değerinde bir düzenleme parametresi eklersiniz. Bir sonraki bölümde modeli ayarlamak için parametre ızgarasıyla çapraz doğrulamayı kullanacağınızı unutmayın.

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Regresyondan katsayıları görebilirsiniz

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Adım 5) Modeli eğitin ve değerlendirin

Test kümeniz için tahmin oluşturmak üzere,

LineerModel'i test_data'da transform() ile kullanabilirsiniz.

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Tahminlerdeki öğeleri yazdırabilirsiniz

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Etiket, tahmin ve olasılık ile ilgileniyorsunuz

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Modeli değerlendirin

Modelin ne kadar iyi (veya kötü) performans gösterdiğini görmek için doğruluk ölçüsüne bakmanız gerekir. Şu anda doğruluk ölçüsünü hesaplayacak bir API bulunmamaktadır. Spark. Varsayılan değer ROC, alıcı çalışma karakteristik eğrisidir. Yanlış pozitif oranını dikkate alan farklı bir ölçümdür.

ROC'ye bakmadan önce doğruluk ölçüsünü oluşturalım. Bu metriğe daha aşinasınız. Doğruluk ölçüsü, toplam gözlem sayısı üzerinden doğru tahminin toplamıdır.

Etiketi ve `tahmini' içeren bir DataFrame oluşturursunuz.

cm = predictions.select("label", "prediction")

Etiketteki sınıf sayısını ve tahmini kontrol edebilirsiniz.

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Örneğin test setinde geliri 1578 binin üzerinde ve 50'in altında olan 5021 hane var. Ancak sınıflandırıcı, geliri 617 binin üzerinde olan 50 haneyi tahmin etti.

Etiketin toplam satır sayısı üzerinden doğru şekilde sınıflandırılması durumunda sayıyı hesaplayarak doğruluğu hesaplayabilirsiniz.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Her şeyi bir araya toplayabilir ve doğruluğu hesaplayacak bir fonksiyon yazabilirsiniz.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC metrikleri

BinaryClassificationEvaluator modülü ROC ölçümlerini içerir. Alıcı OperaKarakteristik eğri ikili sınıflandırmada kullanılan diğer bir yaygın araçtır. Kesinlik/geri çağırma eğrisine çok benzer, ancak kesinlik ve geri çağırmanın grafiğini çizmek yerine ROC eğrisi, yanlış pozitif oranına karşı gerçek pozitif oranı (yani geri çağırma) gösterir. Yanlış pozitif oranı, yanlışlıkla pozitif olarak sınıflandırılan negatif örneklerin oranıdır. Bir eksi gerçek negatif orana eşittir. Gerçek negatif orana aynı zamanda özgüllük de denir. Dolayısıyla ROC eğrisi duyarlılığı (geri çağırma) karşı 1 – özgüllüğü gösterir

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192alanROC Altında

print(evaluator.evaluate(predictions))

0.8940481662695192

Adım 6) Hiperparametreyi ayarlayın

Son fakat bir o kadar da önemlisi, hiperparametreleri ayarlayabilirsiniz. Benzer Scikit öğrenmek bir parametre kılavuzu oluşturursunuz ve ayarlamak istediğiniz parametreleri eklersiniz.

Hesaplama süresini azaltmak için, düzenleme parametresini yalnızca iki değerle ayarlarsınız.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Son olarak modeli 5 katlamalı çapraz doğrulama yöntemini kullanarak değerlendiriyorsunuz. Antrenman yaklaşık 16 dakika sürer.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Modeli eğitme süresi: 978.807 saniye

En iyi düzenleme hiperparametresi yüzde 0.01 doğrulukla 85.316'dir.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

cvModel.bestModel'i extractParamMap() ile zincirleyerek önerilen parametreyi çıkarabilirsiniz.

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

ÖZET

Spark bir veri bilimci için temel bir araçtır. Uygulayıcının bir uygulamayı farklı veri kaynaklarına bağlamasına, sorunsuz bir şekilde veri analizi yapmasına veya tahmine dayalı bir model eklemesine olanak tanır.

Başlangıç ​​olarak Sparkbir başlatmanız gerekiyor Spark Bağlam:

'SparkBağlam()'

ve ve SQL bir veri kaynağına bağlanmak için bağlam:

'SQLBağlamı()'

Eğitimde lojistik regresyonun nasıl eğitileceğini öğreneceksiniz:

  1. Veri kümesini aşağıdakilerle bir Veri Çerçevesine dönüştürün:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Etiketin sütun adının newlabel olduğunu ve tüm özelliklerin özelliklerde toplandığını unutmayın. Veri kümenizde farklıysa bu değerleri değiştirin.

  1. Eğitim/test setini oluşturun
randomSplit([.8,.2],seed=1234)
  1. Modeli eğitin
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Tahmin etmek
linearModel.transform()