PySpark บทช่วยสอนสำหรับผู้เริ่มต้น: เรียนรู้พร้อมตัวอย่าง
ก่อนจะเรียนพี่.Sparkมาทำความเข้าใจกันดีกว่า:
อาปาเช่คืออะไร Spark?
Spark เป็นโซลูชันข้อมูลขนาดใหญ่ที่ได้รับการพิสูจน์แล้วว่าง่ายและรวดเร็วกว่า Hadoop MapReduce Spark เป็นซอฟต์แวร์โอเพ่นซอร์สที่พัฒนาโดยห้องปฏิบัติการ UC Berkeley RAD ในปี 2009 นับตั้งแต่เปิดตัวสู่สาธารณะในปี 2010 Spark ได้รับความนิยมเพิ่มขึ้นและมีการใช้ในอุตสาหกรรมในขนาดที่ไม่เคยมีมาก่อน
ในยุคของ ข้อมูลขนาดใหญ่ผู้ปฏิบัติงานต้องการเครื่องมือที่รวดเร็วและเชื่อถือได้มากกว่าที่เคยเพื่อประมวลผลการสตรีมข้อมูล เครื่องมือก่อนหน้านี้อย่าง MapReduce เป็นที่ชื่นชอบแต่ทำงานได้ช้า เพื่อเอาชนะปัญหานี้ Spark นำเสนอโซลูชั่นที่รวดเร็วและใช้งานได้ทั่วไป ความแตกต่างที่สำคัญระหว่าง Spark และ MapReduce ก็คือสิ่งนั้น Spark รันการคำนวณในหน่วยความจำในฮาร์ดดิสก์ในภายหลัง ช่วยให้เข้าถึงข้อมูลและประมวลผลข้อมูลได้รวดเร็ว ลดเวลาจากหลายชั่วโมงเหลือเพียงไม่กี่นาที
พีคืออะไรSpark?
PySpark เป็นเครื่องมือที่สร้างโดย Apache Spark ชุมชนเพื่อการใช้งาน Python กับ Spark- ช่วยให้สามารถทำงานร่วมกับ RDD (Resilient Distributed Dataset) ได้ Python- นอกจากนี้ยังมีบริการ PySpark เชลล์เพื่อเชื่อมโยง Python API ด้วย Spark หลักในการเริ่มต้น Spark บริบท. Spark คือชื่อเอนจิ้นที่ใช้ในการทำการประมวลผลแบบคลัสเตอร์ ในขณะที่ PySpark is Pythonห้องสมุดที่จะใช้ Spark.
อย่างไรบ้าง Spark ทำงานอย่างไร
Spark อิงตามกลไกการคำนวณ ซึ่งหมายความว่ามันดูแลการจัดตาราง การกระจาย และการตรวจสอบแอปพลิเคชัน งานแต่ละงานจะดำเนินการผ่านเครื่องเวิร์กเกอร์ต่างๆ ที่เรียกว่าคลัสเตอร์คอมพิวเตอร์ คลัสเตอร์คอมพิวเตอร์หมายถึงการแบ่งงาน เครื่องหนึ่งทำงานหนึ่งงาน ในขณะที่เครื่องอื่นๆ มีส่วนสนับสนุนผลลัพธ์ขั้นสุดท้ายผ่านงานอื่น ในท้ายที่สุด งานทั้งหมดจะถูกรวบรวมเพื่อสร้างผลลัพธ์ Spark แอดมินให้ภาพรวม 360 องศาต่างๆ Spark งาน
Spark ได้รับการออกแบบมาให้ทำงานร่วมกับ
- Python
- Java
- สกาล่า
- SQL
ลักษณะสำคัญของ Spark คือไลบรารี่ในตัวจำนวนมหาศาล รวมถึง MLlib สำหรับการเรียนรู้ของเครื่องด้วย. Spark ได้รับการออกแบบมาให้ทำงานกับคลัสเตอร์ Hadoop และสามารถอ่านไฟล์ประเภทต่างๆ ได้หลากหลาย รวมถึงข้อมูล Hive, CSV, JSON, ข้อมูล Casandra และอื่นๆ อีกมากมาย
ทำไมต้องใช้ Spark?
ในฐานะผู้ปฏิบัติงานด้านข้อมูลในอนาคต คุณควรคุ้นเคยกับไลบรารีที่มีชื่อเสียงของ Python: Pandas และ scikit-learn ไลบรารีทั้งสองนี้ยอดเยี่ยมมากสำหรับการสำรวจชุดข้อมูลขนาดกลาง โปรเจ็กต์การเรียนรู้ของเครื่องทั่วไปจะสร้างขึ้นโดยใช้ระเบียบวิธีต่อไปนี้:
- โหลดข้อมูลลงดิสก์
- นำเข้าข้อมูลเข้าสู่หน่วยความจำของเครื่อง
- ประมวลผล/วิเคราะห์ข้อมูล
- สร้างโมเดลการเรียนรู้ของเครื่อง
- เก็บคำทำนายกลับไปยังดิสก์
ปัญหาจะเกิดขึ้นหากนักวิทยาศาสตร์ข้อมูลต้องการประมวลผลข้อมูลที่มีขนาดใหญ่เกินไปสำหรับคอมพิวเตอร์เครื่องเดียว ในช่วงก่อนหน้านี้ของวิทยาศาสตร์ข้อมูล ผู้ปฏิบัติงานจะสุ่มตัวอย่างเนื่องจากการฝึกอบรมเกี่ยวกับชุดข้อมูลขนาดใหญ่ไม่จำเป็นเสมอไป นักวิทยาศาสตร์ข้อมูลจะค้นหาตัวอย่างทางสถิติที่ดี ทำการตรวจสอบความทนทานเพิ่มเติม และสร้างแบบจำลองที่ยอดเยี่ยมขึ้นมา
อย่างไรก็ตาม มีปัญหาบางประการเกี่ยวกับเรื่องนี้:
- ชุดข้อมูลสะท้อนโลกแห่งความเป็นจริงหรือไม่?
- ข้อมูลมีตัวอย่างที่เฉพาะเจาะจงหรือไม่?
- โมเดลนี้เหมาะสำหรับการสุ่มตัวอย่างหรือไม่
รับคำแนะนำผู้ใช้เช่น ผู้แนะนำอาศัยการเปรียบเทียบผู้ใช้กับผู้ใช้รายอื่นในการประเมินการตั้งค่าของพวกเขา หากผู้ปฏิบัติงานด้านข้อมูลรับข้อมูลเพียงชุดย่อย ก็จะไม่มีกลุ่มผู้ใช้ที่คล้ายกันมาก ผู้แนะนำจำเป็นต้องรันบนชุดข้อมูลทั้งหมดหรือไม่ทำงานเลย
การแก้ปัญหาคืออะไร?
วิธีแก้ปัญหานี้ชัดเจนมานานแล้ว โดยแบ่งปัญหาออกเป็นคอมพิวเตอร์หลายเครื่อง การประมวลผลแบบคู่ขนานก็มีปัญหามากมายเช่นกัน นักพัฒนาซอฟต์แวร์มักประสบปัญหาในการเขียนโค้ดแบบคู่ขนานและต้องแก้ปัญหาที่ซับซ้อนมากมายเกี่ยวกับการประมวลผลหลายเครื่อง
Pyspark มอบ API ให้กับนักวิทยาศาสตร์ข้อมูลเพื่อใช้ในการแก้ปัญหาการประมวลผลข้อมูลแบบขนาน Pyspark จัดการกับความซับซ้อนของการประมวลผลหลายระบบ เช่น การกระจายข้อมูล การกระจายโค้ด และการรวบรวมเอาต์พุตจากเวิร์กเกอร์บนคลัสเตอร์ของเครื่องจักร
Spark สามารถทำงานแบบสแตนด์อโลนได้ แต่ส่วนใหญ่มักจะทำงานบนกรอบงานการประมวลผลแบบคลัสเตอร์ เช่น Hadoop อย่างไรก็ตาม ในการทดสอบและการพัฒนา นักวิทยาศาสตร์ข้อมูลสามารถรันได้อย่างมีประสิทธิภาพ Spark บนกล่องพัฒนาหรือแล็ปท็อปที่ไม่มีคลัสเตอร์
• ข้อดีหลักประการหนึ่งของ Spark คือการสร้างสถาปัตยกรรมที่ครอบคลุมการบริหารจัดการสตรีมข้อมูล การสอบถามข้อมูลอย่างราบรื่น การคาดการณ์การเรียนรู้ของเครื่อง และการเข้าถึงการวิเคราะห์ต่างๆ แบบเรียลไทม์
• Spark ทำงานอย่างใกล้ชิดกับภาษา SQL เช่น ข้อมูลที่มีโครงสร้าง ช่วยให้สามารถสืบค้นข้อมูลแบบเรียลไทม์
• หน้าที่หลักของนักวิทยาศาสตร์ข้อมูลคือการวิเคราะห์และสร้างแบบจำลองการคาดการณ์ กล่าวโดยสรุป นักวิทยาศาสตร์ข้อมูลจำเป็นต้องรู้วิธีสืบค้นข้อมูลโดยใช้ SQLสร้างรายงานทางสถิติและใช้ประโยชน์จากการเรียนรู้ของเครื่องเพื่อสร้างการคาดการณ์ นักวิทยาศาสตร์ด้านข้อมูลใช้เวลาส่วนใหญ่ไปกับการทำความสะอาด เปลี่ยนแปลง และวิเคราะห์ข้อมูล เมื่อชุดข้อมูลหรือเวิร์กโฟลว์ข้อมูลพร้อม นักวิทยาศาสตร์ข้อมูลจะใช้เทคนิคต่างๆ เพื่อค้นหาข้อมูลเชิงลึกและรูปแบบที่ซ่อนอยู่ การจัดการข้อมูลควรมีความแข็งแกร่งและใช้งานง่ายเหมือนเดิม Spark เป็นเครื่องมือที่เหมาะสมด้วยความเร็วและ API ที่หลากหลาย
ในพีนี้Spark บทช่วยสอน คุณจะได้เรียนรู้วิธีสร้างตัวแยกประเภทด้วย PySpark ตัวอย่าง.
วิธีการติดตั้ง Py.mq4Spark ด้วย AWS
รางวัล Jupyter ทีมสร้างอิมเมจ Docker เพื่อรัน Spark อย่างมีประสิทธิภาพ ด้านล่างนี้เป็นขั้นตอนที่คุณสามารถปฏิบัติตามเพื่อติดตั้ง PySpark อินสแตนซ์ใน AWS
ดูบทช่วยสอนของเราที่ AWS และ TensorFlow
ขั้นตอนที่ 1: สร้างอินสแตนซ์
ก่อนอื่น คุณต้องสร้างอินสแตนซ์ก่อน ไปที่บัญชี AWS ของคุณแล้วเปิดใช้งานอินสแตนซ์ คุณสามารถเพิ่มพื้นที่จัดเก็บข้อมูลได้สูงสุดถึง 15g และใช้กลุ่มความปลอดภัยเดียวกันกับในบทช่วยสอน TensorFlow
ขั้นตอนที่ 2: เปิดการเชื่อมต่อ
เปิดการเชื่อมต่อและติดตั้งคอนเทนเนอร์ Docker สำหรับรายละเอียดเพิ่มเติม โปรดดูบทช่วยสอนด้วย TensorFlow นักเทียบท่า- โปรดทราบว่าคุณจะต้องอยู่ในไดเร็กทอรีการทำงานที่ถูกต้อง
เพียงเรียกใช้รหัสเหล่านี้เพื่อติดตั้ง Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
ขั้นตอนที่ 3: เปิดการเชื่อมต่ออีกครั้งและติดตั้ง Spark
หลังจากที่คุณเปิดการเชื่อมต่ออีกครั้ง คุณสามารถติดตั้งอิมเมจที่มี Py ได้Spark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
ขั้นตอนที่ 4: เปิด Jupyter
ตรวจสอบคอนเทนเนอร์และชื่อ
docker ps
เปิดตัวนักเทียบท่าด้วยบันทึกนักเทียบท่าตามด้วยชื่อของนักเทียบท่า ตัวอย่างเช่น นักเทียบท่าบันทึก zealous_goldwasser
ไปที่เบราว์เซอร์ของคุณและเปิดใช้งาน Jupyter- ที่อยู่คือ http://localhost:8888/ วางรหัสผ่านที่กำหนดโดยเทอร์มินัล
หมายเหตุ: หากคุณต้องการอัพโหลด/ดาวน์โหลดไฟล์ไปยังเครื่อง AWS ของคุณ คุณสามารถใช้ซอฟต์แวร์ Cyberduck https://cyberduck.io/.
วิธีการติดตั้ง Py.mq4Spark on Windows/แมคกับคอนดา
ต่อไปนี้เป็นกระบวนการโดยละเอียดเกี่ยวกับวิธีการติดตั้ง PySpark on Windows/Mac ใช้อนาคอนดา:
หากต้องการติดตั้ง Spark บนเครื่องท้องถิ่นของคุณ แนวทางปฏิบัติที่แนะนำคือการสร้างสภาพแวดล้อม conda ใหม่ สภาพแวดล้อมใหม่นี้จะติดตั้ง Python 3.6, Spark และการพึ่งพาอาศัยกันทั้งหมด
ผู้ใช้ Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows ผู้ใช้งาน
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
คุณสามารถแก้ไขไฟล์ .yml ได้ ระวังการเยื้อง ต้องเว้นวรรคสองช่องก่อน –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
บันทึกและสร้างสภาพแวดล้อม มันต้องใช้เวลาพอสมควร
conda env create -f hello-spark.yml
สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับตำแหน่ง โปรดดูบทช่วยสอนการติดตั้ง TensorFlow
คุณสามารถตรวจสอบสภาพแวดล้อมทั้งหมดที่ติดตั้งในเครื่องของคุณได้
conda env list
Activate hello-spark
ผู้ใช้ Mac
source activate hello-spark
Windows ผู้ใช้งาน
activate hello-spark
หมายเหตุ คุณได้สร้างสภาพแวดล้อม TensorFlow เฉพาะเพื่อเรียกใช้บทช่วยสอนบน TensorFlow แล้ว การสร้างสภาพแวดล้อมใหม่ที่แตกต่างจาก hello-tf จะสะดวกกว่า มันไม่สมเหตุสมผลเลยที่จะโอเวอร์โหลด hello-tf ด้วย Spark หรือไลบรารีการเรียนรู้ของเครื่องอื่นๆ
ลองนึกภาพโปรเจ็กต์ส่วนใหญ่ของคุณเกี่ยวข้องกับ TensorFlow แต่คุณจำเป็นต้องใช้ Spark เพื่อโครงการหนึ่งโดยเฉพาะ คุณสามารถตั้งค่าสภาพแวดล้อม TensorFlow ให้กับโปรเจ็กต์ทั้งหมดและสร้างสภาพแวดล้อมแยกต่างหากได้ Spark- คุณสามารถเพิ่มไลบรารี่ได้มากเท่าที่ต้องการ Spark ตามที่คุณต้องการโดยไม่รบกวนสภาพแวดล้อม TensorFlow เมื่อคุณทำเสร็จแล้วกับ Sparkของโปรเจ็กต์ คุณสามารถลบมันได้โดยไม่ส่งผลกระทบต่อสภาพแวดล้อม TensorFlow
Jupyter
จุดเปิด Jupyter Notebook แล้วลอง Py ดูครับSpark งาน. ในสมุดบันทึกใหม่วาง Py ต่อไปนี้Spark รหัสตัวอย่าง:
import pyspark from pyspark import SparkContext sc =SparkContext()
หากแสดงข้อผิดพลาดก็มีแนวโน้มว่าจะเป็นเช่นนั้น Java ไม่ได้ติดตั้งบนเครื่องของคุณ ใน Mac ให้เปิดเทอร์มินัลและเขียน java -version หากมีเวอร์ชัน Java ตรวจสอบให้แน่ใจว่าเป็น 1.8 Windowsให้ไปที่ Application แล้วตรวจสอบว่ามีหรือไม่ Java โฟลเดอร์ หากมีก Java โฟลเดอร์ ให้ตรวจสอบสิ่งนั้น Java ติดตั้ง 1.8 แล้ว ขณะที่เขียนนี้ PySpark เข้ากันไม่ได้กับ Java9 ขึ้นไป
หากคุณต้องการติดตั้ง Javaคุณต้องคิด ลิงค์ และดาวน์โหลด jdk-8u181-windows-x64.exe
สำหรับผู้ใช้ Mac แนะนำให้ใช้ `brew.`
brew tap caskroom/versions brew cask install java8
อ้างอิงบทช่วยสอนทีละขั้นตอนนี้ใน วิธีการติดตั้ง Java
หมายเหตุ: ใช้การลบเพื่อลบสภาพแวดล้อมทั้งหมด
conda env remove -n hello-spark -y
Spark บริบท
Sparkบริบทเป็นเครื่องมือภายในที่ช่วยให้สามารถเชื่อมต่อกับคลัสเตอร์ได้ หากคุณต้องการเรียกใช้การดำเนินการ คุณต้องมี Sparkบริบท.
สร้าง Sparkบริบท
ก่อนอื่นคุณต้องเริ่มต้นก Sparkบริบท.
import pyspark from pyspark import SparkContext sc =SparkContext()
ตอนนี้ที่ Sparkเมื่อบริบทพร้อมแล้ว คุณสามารถสร้างชุดข้อมูลที่เรียกว่า RDD หรือ Resilient Distributed Dataset การคำนวณใน RDD จะถูกขนานกันโดยอัตโนมัติทั่วทั้งคลัสเตอร์
nums= sc.parallelize([1,2,3,4])
คุณสามารถเข้าถึงแถวแรกโดยใช้เทค
nums.take(1)
[1]
คุณสามารถใช้การแปลงกับข้อมูลด้วยฟังก์ชันแลมบ์ดา ในส่วนของปี่Spark ตัวอย่างด้านล่าง คุณจะคืนค่ากำลังสองของตัวเลข เป็นการแปลงแผนที่
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContext
วิธีที่สะดวกกว่าคือการใช้ DataFrame Sparkบริบทได้รับการตั้งค่าแล้ว คุณสามารถใช้เพื่อสร้าง dataFrame ได้ คุณต้องประกาศ SQLContext ด้วย
SQLContext อนุญาตให้เชื่อมต่อเอ็นจิ้นกับแหล่งข้อมูลที่แตกต่างกัน ใช้เพื่อเริ่มต้นการทำงานของ Spark SQL
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
ตอนนี้อยู่ในนี้ Spark เกี่ยวกับการสอน Pythonเรามาสร้างรายการสิ่งอันดับกันดีกว่า แต่ละสิ่งอันดับจะมีชื่อของบุคคลและอายุของพวกเขา จำเป็นต้องมีสี่ขั้นตอน:
ขั้นตอน 1) สร้างรายการสิ่งอันดับพร้อมข้อมูล
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
ขั้นตอน 2) สร้าง RDD
rdd = sc.parallelize(list_p)
ขั้นตอน 3) แปลงสิ่งอันดับ
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
ขั้นตอน 4) สร้างบริบท DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
หากคุณต้องการเข้าถึงประเภทของฟีเจอร์แต่ละรายการ คุณสามารถใช้ printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
ตัวอย่างการเรียนรู้ของเครื่องด้วย PySpark
ตอนนี้คุณมีความคิดสั้น ๆ เกี่ยวกับ Spark และ SQLContext คุณก็พร้อมที่จะสร้างโปรแกรม Machine Learning โปรแกรมแรกของคุณแล้ว
ต่อไปนี้เป็นขั้นตอนในการสร้างโปรแกรมการเรียนรู้ของเครื่องด้วย PySpark:
- ขั้นตอน 1) การใช้งานพื้นฐานด้วย PySpark
- ขั้นตอน 2) การประมวลผลข้อมูลล่วงหน้า
- ขั้นตอน 3) สร้างไปป์ไลน์การประมวลผลข้อมูล
- ขั้นตอน 4) สร้างลักษณนาม: ลอจิสติกส์
- ขั้นตอน 5) ฝึกอบรมและประเมินแบบจำลอง
- ขั้นตอน 6) ปรับไฮเปอร์พารามิเตอร์
ในพีนี้Spark บทช่วยสอนเกี่ยวกับการเรียนรู้ของเครื่อง เราจะใช้ชุดข้อมูลสำหรับผู้ใหญ่ จุดประสงค์ของบทช่วยสอนนี้คือเพื่อเรียนรู้วิธีใช้ Pyspark สำหรับข้อมูลเพิ่มเติมเกี่ยวกับชุดข้อมูล โปรดดูบทช่วยสอนนี้
โปรดทราบว่าชุดข้อมูลไม่มีนัยสำคัญและคุณอาจคิดว่าการคำนวณใช้เวลานาน Spark ได้รับการออกแบบมาเพื่อประมวลผลข้อมูลจำนวนมาก Sparkประสิทธิภาพของจะเพิ่มขึ้นเมื่อเทียบกับไลบรารีการเรียนรู้ของเครื่องอื่นๆ เมื่อชุดข้อมูลที่ประมวลผลมีขนาดใหญ่ขึ้น
ขั้นตอนที่ 1) การใช้งานพื้นฐานด้วย PySpark
ก่อนอื่น คุณต้องเริ่มต้นก่อน SQLContext ยังไม่ได้เริ่มต้น
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
จากนั้นคุณสามารถอ่านไฟล์ cvs ด้วย sqlContext.read.csv คุณใช้ inferSchema ตั้งค่าเป็น True เพื่อบอก Spark เพื่อคาดเดาประเภทของข้อมูลโดยอัตโนมัติ โดยค่าเริ่มต้น จะกลายเป็นเท็จ
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
มาดูประเภทข้อมูลกันดีกว่า
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
คุณสามารถดูข้อมูลพร้อมการแสดง
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
หากคุณไม่ได้ตั้งค่า inderShema เป็น True นี่คือสิ่งที่เกิดขึ้นกับประเภทดังกล่าว มีทั้งหมดในสตริง
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
หากต้องการแปลงตัวแปรต่อเนื่องในรูปแบบที่ถูกต้อง คุณสามารถใช้การหล่อคอลัมน์ใหม่ได้ คุณสามารถใช้ withColumn เพื่อบอกได้ Spark คอลัมน์ใดที่จะดำเนินการแปลง
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
เลือกคอลัมน์
คุณสามารถเลือกและแสดงแถวโดยเลือกและชื่อของคุณสมบัติได้ ด้านล่าง อายุและ fnlwgt จะถูกเลือก
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
นับตามกลุ่ม
หากคุณต้องการนับจำนวนครั้งที่เกิดขึ้นตามกลุ่ม คุณสามารถโยง:
- กลุ่มโดย()
- นับ()
ด้วยกัน. ในส่วนของปี่Spark ตัวอย่างด้านล่าง คุณนับจำนวนแถวตามระดับการศึกษา
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
อธิบายข้อมูล
หากต้องการดูสถิติสรุปของข้อมูล คุณสามารถใช้ description() มันจะคำนวณ:
- นับ
- หมายความ
- ส่วนเบี่ยงเบนมาตรฐาน
- นาที
- แม็กซ์
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
หากคุณต้องการสรุปสถิติของคอลัมน์เดียว ให้เพิ่มชื่อคอลัมน์เข้าไปข้างใน description()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
การคำนวณครอสแท็บ
ในบางกรณี การดูสถิติเชิงพรรณนาระหว่างคอลัมน์คู่สองคอลัมน์อาจเป็นเรื่องที่น่าสนใจ ตัวอย่างเช่น คุณสามารถนับจำนวนคนที่รายได้ต่ำกว่าหรือสูงกว่า 50 ดอลลาร์โดยจำแนกตามระดับการศึกษา การดำเนินการนี้เรียกว่าตารางไขว้
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
คุณจะเห็นว่าไม่มีใครมีรายได้เกิน 50 เมื่อพวกเขายังเป็นเด็ก
วางคอลัมน์
มี API ที่ใช้งานง่ายสองตัวในการวางคอลัมน์:
- drop(): วางคอลัมน์
- dropna(): วาง NA's
ด้านล่างคุณวางคอลัมน์ education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
กรองข้อมูล
คุณสามารถใช้ filter() เพื่อใช้สถิติเชิงพรรณนาในชุดย่อยของข้อมูล เช่น คุณสามารถนับจำนวนผู้ที่มีอายุมากกว่า 40 ปีได้
df.filter(df.age > 40).count()
13443
Descriptสถิติตามกลุ่ม
สุดท้าย คุณสามารถจัดกลุ่มข้อมูลตามกลุ่ม และคำนวณการดำเนินการทางสถิติ เช่น ค่าเฉลี่ย
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
ขั้นตอนที่ 2) การประมวลผลข้อมูลล่วงหน้า
การประมวลผลข้อมูลเป็นขั้นตอนสำคัญในการเรียนรู้ของเครื่อง หลังจากที่คุณลบข้อมูลขยะแล้ว คุณจะได้รับข้อมูลเชิงลึกที่สำคัญบางประการ
ตัวอย่างเช่น คุณรู้ว่าอายุไม่ใช่ฟังก์ชันเชิงเส้นของรายได้ เมื่อคนหนุ่มสาวรายได้มักจะต่ำกว่าวัยกลางคน หลังจากเกษียณอายุ ครัวเรือนจะใช้เงินออม ส่งผลให้รายได้ลดลง หากต้องการจับภาพรูปแบบนี้ คุณสามารถเพิ่มรูปสี่เหลี่ยมจัตุรัสให้กับฟีเจอร์อายุได้
เพิ่มตารางอายุ
หากต้องการเพิ่มคุณลักษณะใหม่ คุณต้อง:
- เลือกคอลัมน์
- ใช้การแปลงและเพิ่มลงใน DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
คุณจะเห็นว่าการเพิ่ม age_square ลงในกรอบข้อมูลสำเร็จแล้ว คุณสามารถเปลี่ยนลำดับของตัวแปรได้ด้วยการเลือก ด้านล่างนี้ คุณนำ age_square ทีละอายุ
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
ไม่รวมฮอลแลนด์-เนเธอร์แลนด์
เมื่อกลุ่มภายในคุณลักษณะมีการสังเกตเพียงครั้งเดียว ระบบจะไม่นำข้อมูลมาสู่แบบจำลอง ในทางตรงกันข้าม อาจทำให้เกิดข้อผิดพลาดระหว่างการตรวจสอบความถูกต้องข้ามได้
เรามาตรวจสอบที่มาของครัวเรือนกันดีกว่า
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
ฟีเจอร์ Native_country มีเพียงครัวเรือนเดียวที่มาจากเนเธอร์แลนด์ คุณยกเว้นมัน
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
ขั้นตอนที่ 3) สร้างไปป์ไลน์การประมวลผลข้อมูล
Pyspark มี API แบบไปป์ไลน์คล้ายกับ scikit-learn
ไพพ์ไลน์นั้นสะดวกมากสำหรับการรักษาโครงสร้างของข้อมูล คุณใส่ข้อมูลเข้าไปในไพพ์ไลน์ ภายในไพพ์ไลน์นั้นจะมีการดำเนินการต่างๆ เกิดขึ้น และเอาท์พุตจะถูกใช้เพื่อป้อนให้กับอัลกอริทึม
ตัวอย่างเช่น การเปลี่ยนแปลงสากลอย่างหนึ่งในแมชชีนเลิร์นนิงประกอบด้วยการแปลงสตริงเป็นตัวเข้ารหัสแบบฮอตตัวเดียว กล่าวคือ ทีละคอลัมน์ ตัวเข้ารหัสแบบร้อนตัวหนึ่งมักเป็นเมทริกซ์ที่เต็มไปด้วยเลขศูนย์
ขั้นตอนในการแปลงข้อมูลนั้นคล้ายกับ scikit-learn มาก คุณต้อง:
- จัดทำดัชนีสตริงเป็นตัวเลข
- สร้างตัวเข้ารหัสที่ร้อนแรงตัวเดียว
- แปลงข้อมูล
API สองตัวทำงาน: StringIndexer, OneHotEncoder
- ก่อนอื่น คุณเลือกคอลัมน์สตริงที่จะทำดัชนี inputCol คือชื่อของคอลัมน์ในชุดข้อมูล outputCol เป็นชื่อใหม่ที่กำหนดให้กับคอลัมน์ที่ถูกแปลง
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- ปรับข้อมูลให้เหมาะสมและแปลงข้อมูล
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- สร้างคอลัมน์ข่าวตามกลุ่ม ตัวอย่างเช่น หากมี 10 กลุ่มในฟีเจอร์นี้ เมทริกซ์ใหม่จะมี 10 คอลัมน์ XNUMX คอลัมน์สำหรับแต่ละกลุ่ม
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
สร้างท่อส่งน้ำ
คุณจะสร้างไปป์ไลน์เพื่อแปลงฟีเจอร์ที่แม่นยำทั้งหมดและเพิ่มลงในชุดข้อมูลขั้นสุดท้าย ไปป์ไลน์จะมีสี่การดำเนินการ แต่คุณสามารถเพิ่มการดำเนินการได้มากเท่าที่คุณต้องการ
- เข้ารหัสข้อมูลหมวดหมู่
- จัดทำดัชนีคุณลักษณะป้ายกำกับ
- เพิ่มตัวแปรต่อเนื่อง
- ประกอบขั้นตอน.
แต่ละขั้นตอนจะถูกเก็บไว้ในรายการที่มีชื่อขั้นตอน รายการนี้จะแจ้งให้ VectorAssembler ทราบว่าจะต้องดำเนินการใดภายในไปป์ไลน์
1. เข้ารหัสข้อมูลหมวดหมู่
ขั้นตอนนี้เหมือนกันทุกประการกับตัวอย่างข้างต้น ยกเว้นว่าคุณวนซ้ำคุณลักษณะตามหมวดหมู่ทั้งหมด
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. จัดทำดัชนีคุณสมบัติฉลาก
Sparkเช่นเดียวกับไลบรารีอื่นๆ มากมาย ไม่ยอมรับค่าสตริงสำหรับป้ายกำกับ คุณแปลงฟีเจอร์ป้ายกำกับด้วย StringIndexer และเพิ่มลงในสเตจรายการ
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. เพิ่มตัวแปรต่อเนื่อง
inputCols ของ VectorAssembler คือรายการคอลัมน์ คุณสามารถสร้างรายการใหม่ที่มีคอลัมน์ใหม่ทั้งหมดได้ รหัสด้านล่างเติมรายการด้วยคุณสมบัติหมวดหมู่ที่เข้ารหัสและคุณสมบัติต่อเนื่อง
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. ประกอบขั้นตอนต่างๆ
ในที่สุด คุณก็ผ่านทุกขั้นตอนใน VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
เมื่อทุกขั้นตอนพร้อมแล้ว คุณก็ส่งข้อมูลไปที่ไปป์ไลน์ได้
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
หากคุณตรวจสอบชุดข้อมูลใหม่ คุณจะเห็นว่าชุดข้อมูลดังกล่าวมีคุณสมบัติทั้งหมด ทั้งที่ได้รับการแปลงและไม่ถูกแปลง คุณสนใจเฉพาะป้ายกำกับและฟีเจอร์ใหม่เท่านั้น คุณลักษณะนี้รวมถึงคุณลักษณะที่ได้รับการแปลงทั้งหมดและตัวแปรต่อเนื่อง
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
ขั้นตอนที่ 4) สร้างลักษณนาม: ลอจิสติกส์
เพื่อให้การคำนวณเร็วขึ้น คุณต้องแปลงโมเดลเป็น DataFrame
คุณต้องเลือกป้ายกำกับใหม่และคุณสมบัติจากโมเดลโดยใช้แผนที่
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
คุณพร้อมที่จะสร้างข้อมูลรถไฟเป็น DataFrame แล้ว คุณใช้ sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
ตรวจสอบแถวที่สอง
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
สร้างชุดฝึก/ชุดทดสอบ
คุณแบ่งชุดข้อมูล 80/20 ด้วย RandomSplit
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
ลองนับดูว่ามีกี่คนที่มีรายได้ต่ำกว่า/มากกว่า 50 ทั้งชุดฝึกอบรมและชุดทดสอบ
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
สร้างตัวถดถอยโลจิสติก
สุดท้ายแต่ไม่ท้ายสุด คุณสามารถสร้างตัวจำแนกประเภทได้ Pyspark มี API ที่เรียกว่า LogisticRegression เพื่อดำเนินการถดถอยแบบลอจิสติกส์
คุณเริ่มต้น lr โดยระบุคอลัมน์ป้ายกำกับและคอลัมน์คุณลักษณะ คุณตั้งค่าการวนซ้ำสูงสุด 10 ครั้งและเพิ่มพารามิเตอร์การทำให้เป็นมาตรฐานด้วยค่า 0.3 โปรดทราบว่าในส่วนถัดไป คุณจะใช้การตรวจสอบข้ามกับตารางพารามิเตอร์เพื่อปรับแต่งโมเดล
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#สามารถดูค่าสัมประสิทธิ์จากการถดถอยได้
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
ขั้นตอนที่ 5) ฝึกอบรมและประเมินแบบจำลอง
เพื่อสร้างการทำนายสำหรับชุดทดสอบของคุณ
คุณสามารถใช้ linearModel กับ Transformer() บน test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
คุณสามารถพิมพ์องค์ประกอบในการทำนายได้
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
คุณสนใจป้ายกำกับ การทำนาย และความน่าจะเป็น
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
ประเมินแบบจำลอง
คุณต้องดูเมตริกความแม่นยำเพื่อดูว่าโมเดลทำงานได้ดีเพียงใด (หรือไม่ดี) ปัจจุบันไม่มี API ที่จะคำนวณการวัดความแม่นยำ Sparkค่าเริ่มต้นคือ ROC, กราฟลักษณะการทำงานของตัวรับ เป็นเมตริกที่แตกต่างกันซึ่งคำนึงถึงอัตราการเกิดบวกปลอม
ก่อนที่คุณจะดู ROC มาสร้างการวัดความแม่นยำกันก่อน คุณคุ้นเคยกับตัวชี้วัดนี้มากขึ้น การวัดความแม่นยำคือผลรวมของการทำนายที่ถูกต้องต่อจำนวนการสังเกตทั้งหมด
คุณสร้าง DataFrame ด้วยป้ายกำกับและ `prediction
cm = predictions.select("label", "prediction")
คุณสามารถตรวจสอบจำนวนคลาสได้ในฉลากและการทำนาย
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
ตัวอย่างเช่น ในชุดทดสอบ มี 1578 ครัวเรือนที่มีรายได้มากกว่า 50 และต่ำกว่า 5021 ครัวเรือน อย่างไรก็ตาม ตัวแยกประเภทได้ทำนายครัวเรือน 617 ครัวเรือนที่มีรายได้มากกว่า 50 ครัวเรือน
คุณสามารถคำนวณความแม่นยำได้โดยคำนวณการนับเมื่อมีการจำแนกป้ายกำกับตามจำนวนแถวทั้งหมดอย่างถูกต้อง
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
คุณสามารถรวมทุกอย่างเข้าด้วยกันและเขียนฟังก์ชันเพื่อคำนวณความแม่นยำได้
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ตัวชี้วัด ROC
โมดูล BinaryClassificationEvaluator มีการวัด ROC ผู้รับ Operating Characteristic curve เป็นอีกหนึ่งเครื่องมือทั่วไปที่ใช้กับการจำแนกไบนารี มันคล้ายกับเส้นโค้งความแม่นยำ/การเรียกคืนมาก แต่แทนที่จะวางแผนความแม่นยำกับการเรียกคืน เส้นโค้ง ROC จะแสดงอัตราบวกที่แท้จริง (เช่น การเรียกคืน) เทียบกับอัตราบวกลวง อัตราผลบวกลวงคืออัตราส่วนของอินสแตนซ์เชิงลบที่จัดประเภทไม่ถูกต้องว่าเป็นบวก มันเท่ากับ 1 ลบอัตราติดลบจริง อัตราลบที่แท้จริงเรียกอีกอย่างว่าความจำเพาะ ดังนั้นเส้นโค้ง ROC จะพล็อตความไว (การเรียกคืน) เทียบกับ XNUMX – ความจำเพาะ
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192พื้นที่UnderROC
print(evaluator.evaluate(predictions))
0.8940481662695192
ขั้นตอนที่ 6) ปรับไฮเปอร์พารามิเตอร์
สุดท้ายแต่ไม่ท้ายสุด คุณสามารถปรับแต่งไฮเปอร์พารามิเตอร์ได้ คล้ายกับ scikit เรียนรู้ คุณสร้างตารางพารามิเตอร์ และเพิ่มพารามิเตอร์ที่คุณต้องการปรับแต่ง
เพื่อลดเวลาในการคำนวณ คุณเพียงปรับพารามิเตอร์การทำให้เป็นมาตรฐานด้วยค่าเพียงสองค่าเท่านั้น
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
สุดท้าย คุณประเมินโมเดลโดยใช้วิธีตรวจสอบความถูกต้องแบบไขว้ 5 เท่า ใช้เวลาในการฝึกประมาณ 16 นาที
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
เวลาในการฝึกโมเดล: 978.807 วินาที
ไฮเปอร์พารามิเตอร์การทำให้เป็นมาตรฐานที่ดีที่สุดคือ 0.01 โดยมีความแม่นยำ 85.316 เปอร์เซ็นต์
accuracy_m(model = cvModel) Model accuracy: 85.316%
คุณสามารถแยกพารามิเตอร์ที่แนะนำได้โดยการผูกมัด cvModel.bestModel ด้วย extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
สรุป
Spark เป็นเครื่องมือพื้นฐานสำหรับนักวิทยาศาสตร์ข้อมูล ช่วยให้ผู้ปฏิบัติงานเชื่อมต่อแอปกับแหล่งข้อมูลต่างๆ ทำการวิเคราะห์ข้อมูลได้อย่างราบรื่น หรือเพิ่มแบบจำลองการคาดการณ์
เริ่มต้นกับ Sparkคุณต้องเริ่มต้นก Spark บริบทด้วย:
'Sparkบริบท()'
และและ SQL บริบทเพื่อเชื่อมต่อกับแหล่งข้อมูล:
'SQLContext()'
ในบทช่วยสอน คุณจะได้เรียนรู้วิธีฝึกการถดถอยโลจิสติก:
- แปลงชุดข้อมูลเป็น Dataframe ด้วย:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
โปรดทราบว่าชื่อคอลัมน์ของป้ายกำกับคือ newlabel และคุณลักษณะทั้งหมดจะรวมอยู่ในคุณลักษณะต่างๆ เปลี่ยนค่าเหล่านี้หากแตกต่างในชุดข้อมูลของคุณ
- สร้างชุดฝึก/ชุดทดสอบ
randomSplit([.8,.2],seed=1234)
- ฝึกโมเดล
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- ทำนายกัน
linearModel.transform()