PySpark บทช่วยสอนสำหรับผู้เริ่มต้น: เรียนรู้พร้อมตัวอย่าง

ก่อนจะเรียนพี่.Sparkมาทำความเข้าใจกันดีกว่า:

อาปาเช่คืออะไร Spark?

Spark เป็นโซลูชันข้อมูลขนาดใหญ่ที่ได้รับการพิสูจน์แล้วว่าง่ายและรวดเร็วกว่า Hadoop MapReduce Spark เป็นซอฟต์แวร์โอเพ่นซอร์สที่พัฒนาโดยห้องปฏิบัติการ UC Berkeley RAD ในปี 2009 นับตั้งแต่เปิดตัวสู่สาธารณะในปี 2010 Spark ได้รับความนิยมเพิ่มขึ้นและมีการใช้ในอุตสาหกรรมในขนาดที่ไม่เคยมีมาก่อน

ในยุคของ ข้อมูลขนาดใหญ่ผู้ปฏิบัติงานต้องการเครื่องมือที่รวดเร็วและเชื่อถือได้มากกว่าที่เคยเพื่อประมวลผลการสตรีมข้อมูล เครื่องมือก่อนหน้านี้อย่าง MapReduce เป็นที่ชื่นชอบแต่ทำงานได้ช้า เพื่อเอาชนะปัญหานี้ Spark นำเสนอโซลูชั่นที่รวดเร็วและใช้งานได้ทั่วไป ความแตกต่างที่สำคัญระหว่าง Spark และ MapReduce ก็คือสิ่งนั้น Spark รันการคำนวณในหน่วยความจำในฮาร์ดดิสก์ในภายหลัง ช่วยให้เข้าถึงข้อมูลและประมวลผลข้อมูลได้รวดเร็ว ลดเวลาจากหลายชั่วโมงเหลือเพียงไม่กี่นาที

พีคืออะไรSpark?

PySpark เป็นเครื่องมือที่สร้างโดย Apache Spark ชุมชนเพื่อการใช้งาน Python กับ Spark- ช่วยให้สามารถทำงานร่วมกับ RDD (Resilient Distributed Dataset) ได้ Python- นอกจากนี้ยังมีบริการ PySpark เชลล์เพื่อเชื่อมโยง Python API ด้วย Spark หลักในการเริ่มต้น Spark บริบท. Spark คือชื่อเอนจิ้นที่ใช้ในการทำการประมวลผลแบบคลัสเตอร์ ในขณะที่ PySpark is Pythonห้องสมุดที่จะใช้ Spark.

อย่างไรบ้าง Spark ทำงานอย่างไร

Spark อิงตามกลไกการคำนวณ ซึ่งหมายความว่ามันดูแลการจัดตาราง การกระจาย และการตรวจสอบแอปพลิเคชัน งานแต่ละงานจะดำเนินการผ่านเครื่องเวิร์กเกอร์ต่างๆ ที่เรียกว่าคลัสเตอร์คอมพิวเตอร์ คลัสเตอร์คอมพิวเตอร์หมายถึงการแบ่งงาน เครื่องหนึ่งทำงานหนึ่งงาน ในขณะที่เครื่องอื่นๆ มีส่วนสนับสนุนผลลัพธ์ขั้นสุดท้ายผ่านงานอื่น ในท้ายที่สุด งานทั้งหมดจะถูกรวบรวมเพื่อสร้างผลลัพธ์ Spark แอดมินให้ภาพรวม 360 องศาต่างๆ Spark งาน

อย่างไร Spark งาน
อย่างไร Spark งาน

Spark ได้รับการออกแบบมาให้ทำงานร่วมกับ

  • Python
  • Java
  • สกาล่า
  • SQL

ลักษณะสำคัญของ Spark คือไลบรารี่ในตัวจำนวนมหาศาล รวมถึง MLlib สำหรับการเรียนรู้ของเครื่องด้วย. Spark ได้รับการออกแบบมาให้ทำงานกับคลัสเตอร์ Hadoop และสามารถอ่านไฟล์ประเภทต่างๆ ได้หลากหลาย รวมถึงข้อมูล Hive, CSV, JSON, ข้อมูล Casandra และอื่นๆ อีกมากมาย

ทำไมต้องใช้ Spark?

ในฐานะผู้ปฏิบัติงานด้านข้อมูลในอนาคต คุณควรคุ้นเคยกับไลบรารีที่มีชื่อเสียงของ Python: Pandas และ scikit-learn ไลบรารีทั้งสองนี้ยอดเยี่ยมมากสำหรับการสำรวจชุดข้อมูลขนาดกลาง โปรเจ็กต์การเรียนรู้ของเครื่องทั่วไปจะสร้างขึ้นโดยใช้ระเบียบวิธีต่อไปนี้:

  • โหลดข้อมูลลงดิสก์
  • นำเข้าข้อมูลเข้าสู่หน่วยความจำของเครื่อง
  • ประมวลผล/วิเคราะห์ข้อมูล
  • สร้างโมเดลการเรียนรู้ของเครื่อง
  • เก็บคำทำนายกลับไปยังดิสก์

ปัญหาจะเกิดขึ้นหากนักวิทยาศาสตร์ข้อมูลต้องการประมวลผลข้อมูลที่มีขนาดใหญ่เกินไปสำหรับคอมพิวเตอร์เครื่องเดียว ในช่วงก่อนหน้านี้ของวิทยาศาสตร์ข้อมูล ผู้ปฏิบัติงานจะสุ่มตัวอย่างเนื่องจากการฝึกอบรมเกี่ยวกับชุดข้อมูลขนาดใหญ่ไม่จำเป็นเสมอไป นักวิทยาศาสตร์ข้อมูลจะค้นหาตัวอย่างทางสถิติที่ดี ทำการตรวจสอบความทนทานเพิ่มเติม และสร้างแบบจำลองที่ยอดเยี่ยมขึ้นมา

อย่างไรก็ตาม มีปัญหาบางประการเกี่ยวกับเรื่องนี้:

  • ชุดข้อมูลสะท้อนโลกแห่งความเป็นจริงหรือไม่?
  • ข้อมูลมีตัวอย่างที่เฉพาะเจาะจงหรือไม่?
  • โมเดลนี้เหมาะสำหรับการสุ่มตัวอย่างหรือไม่

รับคำแนะนำผู้ใช้เช่น ผู้แนะนำอาศัยการเปรียบเทียบผู้ใช้กับผู้ใช้รายอื่นในการประเมินการตั้งค่าของพวกเขา หากผู้ปฏิบัติงานด้านข้อมูลรับข้อมูลเพียงชุดย่อย ก็จะไม่มีกลุ่มผู้ใช้ที่คล้ายกันมาก ผู้แนะนำจำเป็นต้องรันบนชุดข้อมูลทั้งหมดหรือไม่ทำงานเลย

การแก้ปัญหาคืออะไร?

วิธีแก้ปัญหานี้ชัดเจนมานานแล้ว โดยแบ่งปัญหาออกเป็นคอมพิวเตอร์หลายเครื่อง การประมวลผลแบบคู่ขนานก็มีปัญหามากมายเช่นกัน นักพัฒนาซอฟต์แวร์มักประสบปัญหาในการเขียนโค้ดแบบคู่ขนานและต้องแก้ปัญหาที่ซับซ้อนมากมายเกี่ยวกับการประมวลผลหลายเครื่อง

Pyspark มอบ API ให้กับนักวิทยาศาสตร์ข้อมูลเพื่อใช้ในการแก้ปัญหาการประมวลผลข้อมูลแบบขนาน Pyspark จัดการกับความซับซ้อนของการประมวลผลหลายระบบ เช่น การกระจายข้อมูล การกระจายโค้ด และการรวบรวมเอาต์พุตจากเวิร์กเกอร์บนคลัสเตอร์ของเครื่องจักร

Spark สามารถทำงานแบบสแตนด์อโลนได้ แต่ส่วนใหญ่มักจะทำงานบนกรอบงานการประมวลผลแบบคลัสเตอร์ เช่น Hadoop อย่างไรก็ตาม ในการทดสอบและการพัฒนา นักวิทยาศาสตร์ข้อมูลสามารถรันได้อย่างมีประสิทธิภาพ Spark บนกล่องพัฒนาหรือแล็ปท็อปที่ไม่มีคลัสเตอร์

• ข้อดีหลักประการหนึ่งของ Spark คือการสร้างสถาปัตยกรรมที่ครอบคลุมการบริหารจัดการสตรีมข้อมูล การสอบถามข้อมูลอย่างราบรื่น การคาดการณ์การเรียนรู้ของเครื่อง และการเข้าถึงการวิเคราะห์ต่างๆ แบบเรียลไทม์

• Spark ทำงานอย่างใกล้ชิดกับภาษา SQL เช่น ข้อมูลที่มีโครงสร้าง ช่วยให้สามารถสืบค้นข้อมูลแบบเรียลไทม์

• หน้าที่หลักของนักวิทยาศาสตร์ข้อมูลคือการวิเคราะห์และสร้างแบบจำลองการคาดการณ์ กล่าวโดยสรุป นักวิทยาศาสตร์ข้อมูลจำเป็นต้องรู้วิธีสืบค้นข้อมูลโดยใช้ SQLสร้างรายงานทางสถิติและใช้ประโยชน์จากการเรียนรู้ของเครื่องเพื่อสร้างการคาดการณ์ นักวิทยาศาสตร์ด้านข้อมูลใช้เวลาส่วนใหญ่ไปกับการทำความสะอาด เปลี่ยนแปลง และวิเคราะห์ข้อมูล เมื่อชุดข้อมูลหรือเวิร์กโฟลว์ข้อมูลพร้อม นักวิทยาศาสตร์ข้อมูลจะใช้เทคนิคต่างๆ เพื่อค้นหาข้อมูลเชิงลึกและรูปแบบที่ซ่อนอยู่ การจัดการข้อมูลควรมีความแข็งแกร่งและใช้งานง่ายเหมือนเดิม Spark เป็นเครื่องมือที่เหมาะสมด้วยความเร็วและ API ที่หลากหลาย

ในพีนี้Spark บทช่วยสอน คุณจะได้เรียนรู้วิธีสร้างตัวแยกประเภทด้วย PySpark ตัวอย่าง.

วิธีการติดตั้ง Py.mq4Spark ด้วย AWS

รางวัล Jupyter ทีมสร้างอิมเมจ Docker เพื่อรัน Spark อย่างมีประสิทธิภาพ ด้านล่างนี้เป็นขั้นตอนที่คุณสามารถปฏิบัติตามเพื่อติดตั้ง PySpark อินสแตนซ์ใน AWS

ดูบทช่วยสอนของเราที่ AWS และ TensorFlow

ขั้นตอนที่ 1: สร้างอินสแตนซ์

ก่อนอื่น คุณต้องสร้างอินสแตนซ์ก่อน ไปที่บัญชี AWS ของคุณแล้วเปิดใช้งานอินสแตนซ์ คุณสามารถเพิ่มพื้นที่จัดเก็บข้อมูลได้สูงสุดถึง 15g และใช้กลุ่มความปลอดภัยเดียวกันกับในบทช่วยสอน TensorFlow

ขั้นตอนที่ 2: เปิดการเชื่อมต่อ

เปิดการเชื่อมต่อและติดตั้งคอนเทนเนอร์ Docker สำหรับรายละเอียดเพิ่มเติม โปรดดูบทช่วยสอนด้วย TensorFlow นักเทียบท่า- โปรดทราบว่าคุณจะต้องอยู่ในไดเร็กทอรีการทำงานที่ถูกต้อง

เพียงเรียกใช้รหัสเหล่านี้เพื่อติดตั้ง Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

ขั้นตอนที่ 3: เปิดการเชื่อมต่ออีกครั้งและติดตั้ง Spark

หลังจากที่คุณเปิดการเชื่อมต่ออีกครั้ง คุณสามารถติดตั้งอิมเมจที่มี Py ได้Spark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

ขั้นตอนที่ 4: เปิด Jupyter

ตรวจสอบคอนเทนเนอร์และชื่อ

docker ps

เปิดตัวนักเทียบท่าด้วยบันทึกนักเทียบท่าตามด้วยชื่อของนักเทียบท่า ตัวอย่างเช่น นักเทียบท่าบันทึก zealous_goldwasser

ไปที่เบราว์เซอร์ของคุณและเปิดใช้งาน Jupyter- ที่อยู่คือ http://localhost:8888/ วางรหัสผ่านที่กำหนดโดยเทอร์มินัล

หมายเหตุ: หากคุณต้องการอัพโหลด/ดาวน์โหลดไฟล์ไปยังเครื่อง AWS ของคุณ คุณสามารถใช้ซอฟต์แวร์ Cyberduck https://cyberduck.io/.

วิธีการติดตั้ง Py.mq4Spark on Windows/แมคกับคอนดา

ต่อไปนี้เป็นกระบวนการโดยละเอียดเกี่ยวกับวิธีการติดตั้ง PySpark on Windows/Mac ใช้อนาคอนดา:

หากต้องการติดตั้ง Spark บนเครื่องท้องถิ่นของคุณ แนวทางปฏิบัติที่แนะนำคือการสร้างสภาพแวดล้อม conda ใหม่ สภาพแวดล้อมใหม่นี้จะติดตั้ง Python 3.6, Spark และการพึ่งพาอาศัยกันทั้งหมด

ผู้ใช้ Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows ผู้ใช้งาน

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

คุณสามารถแก้ไขไฟล์ .yml ได้ ระวังการเยื้อง ต้องเว้นวรรคสองช่องก่อน –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

บันทึกและสร้างสภาพแวดล้อม มันต้องใช้เวลาพอสมควร

conda env create -f hello-spark.yml

สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับตำแหน่ง โปรดดูบทช่วยสอนการติดตั้ง TensorFlow

คุณสามารถตรวจสอบสภาพแวดล้อมทั้งหมดที่ติดตั้งในเครื่องของคุณได้

conda env list
Activate hello-spark

ผู้ใช้ Mac

source activate hello-spark

Windows ผู้ใช้งาน

activate hello-spark

หมายเหตุ คุณได้สร้างสภาพแวดล้อม TensorFlow เฉพาะเพื่อเรียกใช้บทช่วยสอนบน TensorFlow แล้ว การสร้างสภาพแวดล้อมใหม่ที่แตกต่างจาก hello-tf จะสะดวกกว่า มันไม่สมเหตุสมผลเลยที่จะโอเวอร์โหลด hello-tf ด้วย Spark หรือไลบรารีการเรียนรู้ของเครื่องอื่นๆ

ลองนึกภาพโปรเจ็กต์ส่วนใหญ่ของคุณเกี่ยวข้องกับ TensorFlow แต่คุณจำเป็นต้องใช้ Spark เพื่อโครงการหนึ่งโดยเฉพาะ คุณสามารถตั้งค่าสภาพแวดล้อม TensorFlow ให้กับโปรเจ็กต์ทั้งหมดและสร้างสภาพแวดล้อมแยกต่างหากได้ Spark- คุณสามารถเพิ่มไลบรารี่ได้มากเท่าที่ต้องการ Spark ตามที่คุณต้องการโดยไม่รบกวนสภาพแวดล้อม TensorFlow เมื่อคุณทำเสร็จแล้วกับ Sparkของโปรเจ็กต์ คุณสามารถลบมันได้โดยไม่ส่งผลกระทบต่อสภาพแวดล้อม TensorFlow

Jupyter

จุดเปิด Jupyter Notebook แล้วลอง Py ดูครับSpark งาน. ในสมุดบันทึกใหม่วาง Py ต่อไปนี้Spark รหัสตัวอย่าง:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

หากแสดงข้อผิดพลาดก็มีแนวโน้มว่าจะเป็นเช่นนั้น Java ไม่ได้ติดตั้งบนเครื่องของคุณ ใน Mac ให้เปิดเทอร์มินัลและเขียน java -version หากมีเวอร์ชัน Java ตรวจสอบให้แน่ใจว่าเป็น 1.8 Windowsให้ไปที่ Application แล้วตรวจสอบว่ามีหรือไม่ Java โฟลเดอร์ หากมีก Java โฟลเดอร์ ให้ตรวจสอบสิ่งนั้น Java ติดตั้ง 1.8 แล้ว ขณะที่เขียนนี้ PySpark เข้ากันไม่ได้กับ Java9 ขึ้นไป

หากคุณต้องการติดตั้ง Javaคุณต้องคิด ลิงค์ และดาวน์โหลด jdk-8u181-windows-x64.exe

Jupyter

สำหรับผู้ใช้ Mac แนะนำให้ใช้ `brew.`

brew tap caskroom/versions
brew cask install java8

อ้างอิงบทช่วยสอนทีละขั้นตอนนี้ใน วิธีการติดตั้ง Java

หมายเหตุ: ใช้การลบเพื่อลบสภาพแวดล้อมทั้งหมด

 conda env remove -n hello-spark -y

Spark บริบท

Sparkบริบทเป็นเครื่องมือภายในที่ช่วยให้สามารถเชื่อมต่อกับคลัสเตอร์ได้ หากคุณต้องการเรียกใช้การดำเนินการ คุณต้องมี Sparkบริบท.

สร้าง Sparkบริบท

ก่อนอื่นคุณต้องเริ่มต้นก Sparkบริบท.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

ตอนนี้ที่ Sparkเมื่อบริบทพร้อมแล้ว คุณสามารถสร้างชุดข้อมูลที่เรียกว่า RDD หรือ Resilient Distributed Dataset การคำนวณใน RDD จะถูกขนานกันโดยอัตโนมัติทั่วทั้งคลัสเตอร์

nums= sc.parallelize([1,2,3,4])

คุณสามารถเข้าถึงแถวแรกโดยใช้เทค

nums.take(1)
[1]

คุณสามารถใช้การแปลงกับข้อมูลด้วยฟังก์ชันแลมบ์ดา ในส่วนของปี่Spark ตัวอย่างด้านล่าง คุณจะคืนค่ากำลังสองของตัวเลข เป็นการแปลงแผนที่

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

วิธีที่สะดวกกว่าคือการใช้ DataFrame Sparkบริบทได้รับการตั้งค่าแล้ว คุณสามารถใช้เพื่อสร้าง dataFrame ได้ คุณต้องประกาศ SQLContext ด้วย

SQLContext อนุญาตให้เชื่อมต่อเอ็นจิ้นกับแหล่งข้อมูลที่แตกต่างกัน ใช้เพื่อเริ่มต้นการทำงานของ Spark SQL

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

ตอนนี้อยู่ในนี้ Spark เกี่ยวกับการสอน Pythonเรามาสร้างรายการสิ่งอันดับกันดีกว่า แต่ละสิ่งอันดับจะมีชื่อของบุคคลและอายุของพวกเขา จำเป็นต้องมีสี่ขั้นตอน:

ขั้นตอน 1) สร้างรายการสิ่งอันดับพร้อมข้อมูล

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

ขั้นตอน 2) สร้าง RDD

rdd = sc.parallelize(list_p)

ขั้นตอน 3) แปลงสิ่งอันดับ

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

ขั้นตอน 4) สร้างบริบท DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

หากคุณต้องการเข้าถึงประเภทของฟีเจอร์แต่ละรายการ คุณสามารถใช้ printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

ตัวอย่างการเรียนรู้ของเครื่องด้วย PySpark

ตอนนี้คุณมีความคิดสั้น ๆ เกี่ยวกับ Spark และ SQLContext คุณก็พร้อมที่จะสร้างโปรแกรม Machine Learning โปรแกรมแรกของคุณแล้ว

ต่อไปนี้เป็นขั้นตอนในการสร้างโปรแกรมการเรียนรู้ของเครื่องด้วย PySpark:

  • ขั้นตอน 1) การใช้งานพื้นฐานด้วย PySpark
  • ขั้นตอน 2) การประมวลผลข้อมูลล่วงหน้า
  • ขั้นตอน 3) สร้างไปป์ไลน์การประมวลผลข้อมูล
  • ขั้นตอน 4) สร้างลักษณนาม: ลอจิสติกส์
  • ขั้นตอน 5) ฝึกอบรมและประเมินแบบจำลอง
  • ขั้นตอน 6) ปรับไฮเปอร์พารามิเตอร์

ในพีนี้Spark บทช่วยสอนเกี่ยวกับการเรียนรู้ของเครื่อง เราจะใช้ชุดข้อมูลสำหรับผู้ใหญ่ จุดประสงค์ของบทช่วยสอนนี้คือเพื่อเรียนรู้วิธีใช้ Pyspark สำหรับข้อมูลเพิ่มเติมเกี่ยวกับชุดข้อมูล โปรดดูบทช่วยสอนนี้

โปรดทราบว่าชุดข้อมูลไม่มีนัยสำคัญและคุณอาจคิดว่าการคำนวณใช้เวลานาน Spark ได้รับการออกแบบมาเพื่อประมวลผลข้อมูลจำนวนมาก Sparkประสิทธิภาพของจะเพิ่มขึ้นเมื่อเทียบกับไลบรารีการเรียนรู้ของเครื่องอื่นๆ เมื่อชุดข้อมูลที่ประมวลผลมีขนาดใหญ่ขึ้น

ขั้นตอนที่ 1) การใช้งานพื้นฐานด้วย PySpark

ก่อนอื่น คุณต้องเริ่มต้นก่อน SQLContext ยังไม่ได้เริ่มต้น

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

จากนั้นคุณสามารถอ่านไฟล์ cvs ด้วย sqlContext.read.csv คุณใช้ inferSchema ตั้งค่าเป็น True เพื่อบอก Spark เพื่อคาดเดาประเภทของข้อมูลโดยอัตโนมัติ โดยค่าเริ่มต้น จะกลายเป็นเท็จ

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

มาดูประเภทข้อมูลกันดีกว่า

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

คุณสามารถดูข้อมูลพร้อมการแสดง

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

หากคุณไม่ได้ตั้งค่า inderShema เป็น True นี่คือสิ่งที่เกิดขึ้นกับประเภทดังกล่าว มีทั้งหมดในสตริง

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

หากต้องการแปลงตัวแปรต่อเนื่องในรูปแบบที่ถูกต้อง คุณสามารถใช้การหล่อคอลัมน์ใหม่ได้ คุณสามารถใช้ withColumn เพื่อบอกได้ Spark คอลัมน์ใดที่จะดำเนินการแปลง

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

เลือกคอลัมน์

คุณสามารถเลือกและแสดงแถวโดยเลือกและชื่อของคุณสมบัติได้ ด้านล่าง อายุและ fnlwgt จะถูกเลือก

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

นับตามกลุ่ม

หากคุณต้องการนับจำนวนครั้งที่เกิดขึ้นตามกลุ่ม คุณสามารถโยง:

  • กลุ่มโดย()
  • นับ()

ด้วยกัน. ในส่วนของปี่Spark ตัวอย่างด้านล่าง คุณนับจำนวนแถวตามระดับการศึกษา

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

อธิบายข้อมูล

หากต้องการดูสถิติสรุปของข้อมูล คุณสามารถใช้ description() มันจะคำนวณ:

  • นับ
  • หมายความ
  • ส่วนเบี่ยงเบนมาตรฐาน
  • นาที
  • แม็กซ์
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

หากคุณต้องการสรุปสถิติของคอลัมน์เดียว ให้เพิ่มชื่อคอลัมน์เข้าไปข้างใน description()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

การคำนวณครอสแท็บ

ในบางกรณี การดูสถิติเชิงพรรณนาระหว่างคอลัมน์คู่สองคอลัมน์อาจเป็นเรื่องที่น่าสนใจ ตัวอย่างเช่น คุณสามารถนับจำนวนคนที่รายได้ต่ำกว่าหรือสูงกว่า 50 ดอลลาร์โดยจำแนกตามระดับการศึกษา การดำเนินการนี้เรียกว่าตารางไขว้

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

คุณจะเห็นว่าไม่มีใครมีรายได้เกิน 50 เมื่อพวกเขายังเป็นเด็ก

วางคอลัมน์

มี API ที่ใช้งานง่ายสองตัวในการวางคอลัมน์:

  • drop(): วางคอลัมน์
  • dropna(): วาง NA's

ด้านล่างคุณวางคอลัมน์ education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

กรองข้อมูล

คุณสามารถใช้ filter() เพื่อใช้สถิติเชิงพรรณนาในชุดย่อยของข้อมูล เช่น คุณสามารถนับจำนวนผู้ที่มีอายุมากกว่า 40 ปีได้

df.filter(df.age > 40).count()

13443

Descriptสถิติตามกลุ่ม

สุดท้าย คุณสามารถจัดกลุ่มข้อมูลตามกลุ่ม และคำนวณการดำเนินการทางสถิติ เช่น ค่าเฉลี่ย

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

ขั้นตอนที่ 2) การประมวลผลข้อมูลล่วงหน้า

การประมวลผลข้อมูลเป็นขั้นตอนสำคัญในการเรียนรู้ของเครื่อง หลังจากที่คุณลบข้อมูลขยะแล้ว คุณจะได้รับข้อมูลเชิงลึกที่สำคัญบางประการ

ตัวอย่างเช่น คุณรู้ว่าอายุไม่ใช่ฟังก์ชันเชิงเส้นของรายได้ เมื่อคนหนุ่มสาวรายได้มักจะต่ำกว่าวัยกลางคน หลังจากเกษียณอายุ ครัวเรือนจะใช้เงินออม ส่งผลให้รายได้ลดลง หากต้องการจับภาพรูปแบบนี้ คุณสามารถเพิ่มรูปสี่เหลี่ยมจัตุรัสให้กับฟีเจอร์อายุได้

เพิ่มตารางอายุ

หากต้องการเพิ่มคุณลักษณะใหม่ คุณต้อง:

  1. เลือกคอลัมน์
  2. ใช้การแปลงและเพิ่มลงใน DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

คุณจะเห็นว่าการเพิ่ม age_square ลงในกรอบข้อมูลสำเร็จแล้ว คุณสามารถเปลี่ยนลำดับของตัวแปรได้ด้วยการเลือก ด้านล่างนี้ คุณนำ age_square ทีละอายุ

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

ไม่รวมฮอลแลนด์-เนเธอร์แลนด์

เมื่อกลุ่มภายในคุณลักษณะมีการสังเกตเพียงครั้งเดียว ระบบจะไม่นำข้อมูลมาสู่แบบจำลอง ในทางตรงกันข้าม อาจทำให้เกิดข้อผิดพลาดระหว่างการตรวจสอบความถูกต้องข้ามได้

เรามาตรวจสอบที่มาของครัวเรือนกันดีกว่า

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

ฟีเจอร์ Native_country มีเพียงครัวเรือนเดียวที่มาจากเนเธอร์แลนด์ คุณยกเว้นมัน

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

ขั้นตอนที่ 3) สร้างไปป์ไลน์การประมวลผลข้อมูล

Pyspark มี API แบบไปป์ไลน์คล้ายกับ scikit-learn

ไพพ์ไลน์นั้นสะดวกมากสำหรับการรักษาโครงสร้างของข้อมูล คุณใส่ข้อมูลเข้าไปในไพพ์ไลน์ ภายในไพพ์ไลน์นั้นจะมีการดำเนินการต่างๆ เกิดขึ้น และเอาท์พุตจะถูกใช้เพื่อป้อนให้กับอัลกอริทึม

ตัวอย่างเช่น การเปลี่ยนแปลงสากลอย่างหนึ่งในแมชชีนเลิร์นนิงประกอบด้วยการแปลงสตริงเป็นตัวเข้ารหัสแบบฮอตตัวเดียว กล่าวคือ ทีละคอลัมน์ ตัวเข้ารหัสแบบร้อนตัวหนึ่งมักเป็นเมทริกซ์ที่เต็มไปด้วยเลขศูนย์

ขั้นตอนในการแปลงข้อมูลนั้นคล้ายกับ scikit-learn มาก คุณต้อง:

  • จัดทำดัชนีสตริงเป็นตัวเลข
  • สร้างตัวเข้ารหัสที่ร้อนแรงตัวเดียว
  • แปลงข้อมูล

API สองตัวทำงาน: StringIndexer, OneHotEncoder

  1. ก่อนอื่น คุณเลือกคอลัมน์สตริงที่จะทำดัชนี inputCol คือชื่อของคอลัมน์ในชุดข้อมูล outputCol เป็นชื่อใหม่ที่กำหนดให้กับคอลัมน์ที่ถูกแปลง
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. ปรับข้อมูลให้เหมาะสมและแปลงข้อมูล
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. สร้างคอลัมน์ข่าวตามกลุ่ม ตัวอย่างเช่น หากมี 10 กลุ่มในฟีเจอร์นี้ เมทริกซ์ใหม่จะมี 10 คอลัมน์ XNUMX คอลัมน์สำหรับแต่ละกลุ่ม
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

สร้างท่อส่งน้ำ

คุณจะสร้างไปป์ไลน์เพื่อแปลงฟีเจอร์ที่แม่นยำทั้งหมดและเพิ่มลงในชุดข้อมูลขั้นสุดท้าย ไปป์ไลน์จะมีสี่การดำเนินการ แต่คุณสามารถเพิ่มการดำเนินการได้มากเท่าที่คุณต้องการ

  1. เข้ารหัสข้อมูลหมวดหมู่
  2. จัดทำดัชนีคุณลักษณะป้ายกำกับ
  3. เพิ่มตัวแปรต่อเนื่อง
  4. ประกอบขั้นตอน.

แต่ละขั้นตอนจะถูกเก็บไว้ในรายการที่มีชื่อขั้นตอน รายการนี้จะแจ้งให้ VectorAssembler ทราบว่าจะต้องดำเนินการใดภายในไปป์ไลน์

1. เข้ารหัสข้อมูลหมวดหมู่

ขั้นตอนนี้เหมือนกันทุกประการกับตัวอย่างข้างต้น ยกเว้นว่าคุณวนซ้ำคุณลักษณะตามหมวดหมู่ทั้งหมด

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. จัดทำดัชนีคุณสมบัติฉลาก

Sparkเช่นเดียวกับไลบรารีอื่นๆ มากมาย ไม่ยอมรับค่าสตริงสำหรับป้ายกำกับ คุณแปลงฟีเจอร์ป้ายกำกับด้วย StringIndexer และเพิ่มลงในสเตจรายการ

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. เพิ่มตัวแปรต่อเนื่อง

inputCols ของ VectorAssembler คือรายการคอลัมน์ คุณสามารถสร้างรายการใหม่ที่มีคอลัมน์ใหม่ทั้งหมดได้ รหัสด้านล่างเติมรายการด้วยคุณสมบัติหมวดหมู่ที่เข้ารหัสและคุณสมบัติต่อเนื่อง

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. ประกอบขั้นตอนต่างๆ

ในที่สุด คุณก็ผ่านทุกขั้นตอนใน VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

เมื่อทุกขั้นตอนพร้อมแล้ว คุณก็ส่งข้อมูลไปที่ไปป์ไลน์ได้

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

หากคุณตรวจสอบชุดข้อมูลใหม่ คุณจะเห็นว่าชุดข้อมูลดังกล่าวมีคุณสมบัติทั้งหมด ทั้งที่ได้รับการแปลงและไม่ถูกแปลง คุณสนใจเฉพาะป้ายกำกับและฟีเจอร์ใหม่เท่านั้น คุณลักษณะนี้รวมถึงคุณลักษณะที่ได้รับการแปลงทั้งหมดและตัวแปรต่อเนื่อง

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

ขั้นตอนที่ 4) สร้างลักษณนาม: ลอจิสติกส์

เพื่อให้การคำนวณเร็วขึ้น คุณต้องแปลงโมเดลเป็น DataFrame

คุณต้องเลือกป้ายกำกับใหม่และคุณสมบัติจากโมเดลโดยใช้แผนที่

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

คุณพร้อมที่จะสร้างข้อมูลรถไฟเป็น DataFrame แล้ว คุณใช้ sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

ตรวจสอบแถวที่สอง

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

สร้างชุดฝึก/ชุดทดสอบ

คุณแบ่งชุดข้อมูล 80/20 ด้วย RandomSplit

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

ลองนับดูว่ามีกี่คนที่มีรายได้ต่ำกว่า/มากกว่า 50 ทั้งชุดฝึกอบรมและชุดทดสอบ

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

สร้างตัวถดถอยโลจิสติก

สุดท้ายแต่ไม่ท้ายสุด คุณสามารถสร้างตัวจำแนกประเภทได้ Pyspark มี API ที่เรียกว่า LogisticRegression เพื่อดำเนินการถดถอยแบบลอจิสติกส์

คุณเริ่มต้น lr โดยระบุคอลัมน์ป้ายกำกับและคอลัมน์คุณลักษณะ คุณตั้งค่าการวนซ้ำสูงสุด 10 ครั้งและเพิ่มพารามิเตอร์การทำให้เป็นมาตรฐานด้วยค่า 0.3 โปรดทราบว่าในส่วนถัดไป คุณจะใช้การตรวจสอบข้ามกับตารางพารามิเตอร์เพื่อปรับแต่งโมเดล

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#สามารถดูค่าสัมประสิทธิ์จากการถดถอยได้

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

ขั้นตอนที่ 5) ฝึกอบรมและประเมินแบบจำลอง

เพื่อสร้างการทำนายสำหรับชุดทดสอบของคุณ

คุณสามารถใช้ linearModel กับ Transformer() บน test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

คุณสามารถพิมพ์องค์ประกอบในการทำนายได้

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

คุณสนใจป้ายกำกับ การทำนาย และความน่าจะเป็น

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

ประเมินแบบจำลอง

คุณต้องดูเมตริกความแม่นยำเพื่อดูว่าโมเดลทำงานได้ดีเพียงใด (หรือไม่ดี) ปัจจุบันไม่มี API ที่จะคำนวณการวัดความแม่นยำ Sparkค่าเริ่มต้นคือ ROC, กราฟลักษณะการทำงานของตัวรับ เป็นเมตริกที่แตกต่างกันซึ่งคำนึงถึงอัตราการเกิดบวกปลอม

ก่อนที่คุณจะดู ROC มาสร้างการวัดความแม่นยำกันก่อน คุณคุ้นเคยกับตัวชี้วัดนี้มากขึ้น การวัดความแม่นยำคือผลรวมของการทำนายที่ถูกต้องต่อจำนวนการสังเกตทั้งหมด

คุณสร้าง DataFrame ด้วยป้ายกำกับและ `prediction

cm = predictions.select("label", "prediction")

คุณสามารถตรวจสอบจำนวนคลาสได้ในฉลากและการทำนาย

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

ตัวอย่างเช่น ในชุดทดสอบ มี 1578 ครัวเรือนที่มีรายได้มากกว่า 50 และต่ำกว่า 5021 ครัวเรือน อย่างไรก็ตาม ตัวแยกประเภทได้ทำนายครัวเรือน 617 ครัวเรือนที่มีรายได้มากกว่า 50 ครัวเรือน

คุณสามารถคำนวณความแม่นยำได้โดยคำนวณการนับเมื่อมีการจำแนกป้ายกำกับตามจำนวนแถวทั้งหมดอย่างถูกต้อง

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

คุณสามารถรวมทุกอย่างเข้าด้วยกันและเขียนฟังก์ชันเพื่อคำนวณความแม่นยำได้

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ตัวชี้วัด ROC

โมดูล BinaryClassificationEvaluator มีการวัด ROC ผู้รับ Operating Characteristic curve เป็นอีกหนึ่งเครื่องมือทั่วไปที่ใช้กับการจำแนกไบนารี มันคล้ายกับเส้นโค้งความแม่นยำ/การเรียกคืนมาก แต่แทนที่จะวางแผนความแม่นยำกับการเรียกคืน เส้นโค้ง ROC จะแสดงอัตราบวกที่แท้จริง (เช่น การเรียกคืน) เทียบกับอัตราบวกลวง อัตราผลบวกลวงคืออัตราส่วนของอินสแตนซ์เชิงลบที่จัดประเภทไม่ถูกต้องว่าเป็นบวก มันเท่ากับ 1 ลบอัตราติดลบจริง อัตราลบที่แท้จริงเรียกอีกอย่างว่าความจำเพาะ ดังนั้นเส้นโค้ง ROC จะพล็อตความไว (การเรียกคืน) เทียบกับ XNUMX – ความจำเพาะ

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192พื้นที่UnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

ขั้นตอนที่ 6) ปรับไฮเปอร์พารามิเตอร์

สุดท้ายแต่ไม่ท้ายสุด คุณสามารถปรับแต่งไฮเปอร์พารามิเตอร์ได้ คล้ายกับ scikit เรียนรู้ คุณสร้างตารางพารามิเตอร์ และเพิ่มพารามิเตอร์ที่คุณต้องการปรับแต่ง

เพื่อลดเวลาในการคำนวณ คุณเพียงปรับพารามิเตอร์การทำให้เป็นมาตรฐานด้วยค่าเพียงสองค่าเท่านั้น

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

สุดท้าย คุณประเมินโมเดลโดยใช้วิธีตรวจสอบความถูกต้องแบบไขว้ 5 เท่า ใช้เวลาในการฝึกประมาณ 16 นาที

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

เวลาในการฝึกโมเดล: 978.807 วินาที

ไฮเปอร์พารามิเตอร์การทำให้เป็นมาตรฐานที่ดีที่สุดคือ 0.01 โดยมีความแม่นยำ 85.316 เปอร์เซ็นต์

accuracy_m(model = cvModel)
Model accuracy: 85.316%

คุณสามารถแยกพารามิเตอร์ที่แนะนำได้โดยการผูกมัด cvModel.bestModel ด้วย extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

สรุป

Spark เป็นเครื่องมือพื้นฐานสำหรับนักวิทยาศาสตร์ข้อมูล ช่วยให้ผู้ปฏิบัติงานเชื่อมต่อแอปกับแหล่งข้อมูลต่างๆ ทำการวิเคราะห์ข้อมูลได้อย่างราบรื่น หรือเพิ่มแบบจำลองการคาดการณ์

เริ่มต้นกับ Sparkคุณต้องเริ่มต้นก Spark บริบทด้วย:

'Sparkบริบท()'

และและ SQL บริบทเพื่อเชื่อมต่อกับแหล่งข้อมูล:

'SQLContext()'

ในบทช่วยสอน คุณจะได้เรียนรู้วิธีฝึกการถดถอยโลจิสติก:

  1. แปลงชุดข้อมูลเป็น Dataframe ด้วย:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

โปรดทราบว่าชื่อคอลัมน์ของป้ายกำกับคือ newlabel และคุณลักษณะทั้งหมดจะรวมอยู่ในคุณลักษณะต่างๆ เปลี่ยนค่าเหล่านี้หากแตกต่างในชุดข้อมูลของคุณ

  1. สร้างชุดฝึก/ชุดทดสอบ
randomSplit([.8,.2],seed=1234)
  1. ฝึกโมเดล
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. ทำนายกัน
linearModel.transform()