PySpark 초보자를 위한 튜토리얼: 예시를 통해 학습
Py를 배우기 전에Spark, 이해해 봅시다:
아파치란? Spark?
Spark Hadoop MapReduce보다 쉽고 빠르다는 것이 검증된 빅데이터 솔루션입니다. Spark UC Berkeley RAD 연구소에서 2009년에 개발한 오픈소스 소프트웨어입니다. 2010년 대중에게 공개된 이후, Spark 인기가 높아졌으며 전례 없는 규모로 업계 전반에 걸쳐 사용되고 있습니다.
시대에 빅 데이터따라서 실무자는 데이터 스트리밍을 처리하기 위해 그 어느 때보다 빠르고 안정적인 도구가 필요합니다. MapReduce와 같은 이전 도구는 인기가 있었지만 속도가 느렸습니다. 이 문제를 극복하기 위해, Spark 빠르고 범용적인 솔루션을 제공합니다. 주요 차이점은 Spark MapReduce는 Spark 하드 디스크에서 나중에 메모리에서 계산을 실행합니다. 고속 액세스 및 데이터 처리를 허용하여 시간을 몇 시간에서 몇 분으로 단축합니다.
파이(Py)란 무엇인가Spark?
PySpark Apache에서 만든 도구입니다. Spark 사용을 위한 커뮤니티 Python 과 Spark. RDD(Resilient Distributed Dataset) 작업을 허용합니다. Python. 또한 Py도 제공합니다.Spark 링크할 쉘 Python API Spark 시작하는 핵심 Spark 문맥. Spark 클러스터 컴퓨팅을 실현하는 이름 엔진인 PySpark is Python님의 라이브러리를 사용할 수 있습니다. Spark.
어떻게합니까 Spark 일하세요?
Spark 계산 엔진을 기반으로 하며, 이는 스케줄링, 배포 및 모니터링 애플리케이션을 처리한다는 의미입니다. 각 작업은 컴퓨팅 클러스터라고 하는 다양한 작업자 머신에서 수행됩니다. 컴퓨팅 클러스터는 작업 분할을 의미합니다. 한 머신은 한 가지 작업을 수행하는 반면, 다른 머신은 다른 작업을 통해 최종 출력에 기여합니다. 결국 모든 작업이 집계되어 출력을 생성합니다. Spark 관리자는 다양한 상황에 대한 360도 개요를 제공합니다. Spark 일자리.
Spark 함께 작동하도록 설계되었습니다.
- Python
- Java
- 스칼라
- SQL
중요한 특징은 Spark 기계 학습을 위한 MLlib를 포함한 방대한 양의 내장 라이브러리입니다.. Spark 또한 Hadoop 클러스터와 함께 작동하도록 설계되었으며 Hive 데이터, CSV, JSON, Casandra 데이터 등을 포함한 다양한 유형의 파일을 읽을 수 있습니다.
왜 사용 Spark?
미래의 데이터 실무자로서, 파이썬의 유명한 라이브러리인 판다스와 scikit-learn에 대해 잘 알고 있어야 합니다. 이 두 라이브러리는 중간 크기까지의 데이터 세트를 탐색하기에 환상적입니다. 일반적인 머신 러닝 프로젝트는 다음 방법론을 중심으로 구축됩니다.
- 데이터를 디스크에 로드
- 데이터를 기기의 메모리로 가져오기
- 데이터 처리/분석
- 기계 학습 모델 구축
- 예측을 다시 디스크에 저장
데이터 과학자가 한 대의 컴퓨터에 비해 너무 큰 데이터를 처리하려는 경우 문제가 발생합니다. 데이터 과학 초기에는 대규모 데이터 세트에 대한 교육이 항상 필요한 것은 아니었기 때문에 실무자들은 데이터를 샘플링했습니다. 데이터 과학자는 좋은 통계 샘플을 찾고 추가 견고성 검사를 수행하여 우수한 모델을 제시합니다.
그러나 여기에는 몇 가지 문제가 있습니다.
- 데이터 세트가 현실 세계를 반영하고 있나요?
- 데이터에 구체적인 예가 포함되어 있나요?
- 모델이 샘플링에 적합한가?
예를 들어 사용자 추천을 들어보세요. 추천자는 선호도를 평가할 때 사용자를 다른 사용자와 비교하는 데 의존합니다. 데이터 실무자가 데이터의 하위 집합만 취한다면 서로 매우 유사한 사용자 집단은 존재하지 않을 것입니다. 추천자는 전체 데이터 세트에서 실행되거나 전혀 실행되지 않아야 합니다.
해결 방안은 무엇인가?
해결책은 오랫동안 분명했습니다. 문제를 여러 대의 컴퓨터로 분할하는 것입니다. 병렬 컴퓨팅에도 여러 가지 문제가 있습니다. 개발자는 종종 병렬 코드를 작성하는 데 어려움을 겪고 결국 멀티 프로세싱 자체에 대한 복잡한 문제를 해결해야 합니다.
Pyspark는 데이터 과학자에게 병렬 데이터 처리 문제를 해결하는 데 사용할 수 있는 API를 제공합니다. Pyspark는 데이터 분산, 코드 분산, 머신 클러스터의 작업자로부터 출력 수집과 같은 멀티프로세싱의 복잡성을 처리합니다.
Spark 단독으로 실행할 수 있지만 대부분 Hadoop과 같은 클러스터 컴퓨팅 프레임워크 위에서 실행됩니다. 그러나 테스트 및 개발에서는 데이터 과학자가 효율적으로 실행할 수 있습니다. Spark 클러스터가 없는 개발 상자나 노트북에서
• 주요 장점 중 하나 Spark 데이터 스트리밍 관리, 원활한 데이터 쿼리, 머신 러닝 예측, 다양한 분석에 대한 실시간 액세스를 포괄하는 아키텍처를 구축하는 것입니다.
• Spark SQL 언어, 즉 구조화된 데이터와 긴밀하게 작동합니다. 실시간으로 데이터를 쿼리할 수 있습니다.
• 데이터 과학자 메인의 임무는 예측 모델을 분석하고 구축하는 것입니다. 간단히 말해서, 데이터 과학자는 다음을 사용하여 데이터를 쿼리하는 방법을 알아야 합니다. SQL, 통계 보고서를 생성하고 기계 학습을 활용하여 예측을 생성합니다. 데이터 과학자는 데이터 정리, 변환 및 분석에 상당한 시간을 소비합니다. 데이터 세트 또는 데이터 워크플로우가 준비되면 데이터 과학자는 다양한 기술을 사용하여 통찰력과 숨겨진 패턴을 발견합니다. 데이터 조작은 강력해야 하며 사용하기 쉬워야 합니다. Spark 속도와 풍부한 API 덕분에 적합한 도구입니다.
이 파이에서는Spark 튜토리얼에서는 Py로 분류기를 구축하는 방법을 배우게 됩니다.Spark 예.
파이를 설치하는 방법Spark AWS와 함께
이 어플리케이션에는 XNUMXµm 및 XNUMXµm 파장에서 최대 XNUMXW의 평균 출력을 제공하는 Jupyter 팀에서 실행할 Docker 이미지 빌드 Spark 효율적으로. 다음은 Py를 설치하기 위해 따를 수 있는 단계입니다.Spark AWS의 인스턴스.
튜토리얼을 참조하세요. AWS and TensorFlow
1단계: 인스턴스 생성
우선 인스턴스를 생성해야 합니다. AWS 계정으로 이동하여 인스턴스를 시작하십시오. 스토리지를 최대 15g까지 늘릴 수 있으며 TensorFlow 튜토리얼과 동일한 보안 그룹을 사용할 수 있습니다.
2단계: 연결 열기
연결을 열고 docker 컨테이너를 설치합니다. 자세한 내용은 TensorFlow를 사용한 튜토리얼을 참조하세요. 도커. 올바른 작업 디렉토리에 있어야 합니다.
Docker를 설치하려면 다음 코드를 실행하세요.
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
3단계: 연결을 다시 열고 설치 Spark
연결을 다시 연 후 Py가 포함된 이미지를 설치할 수 있습니다.Spark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
4 단계 : 열기 Jupyter
컨테이너와 이름을 확인하세요.
docker ps
docker 로그와 docker 이름을 사용하여 docker를 시작합니다. 예를 들어 docker는 zealous_goldwasser를 기록합니다.
브라우저로 이동하여 실행하세요. Jupyter. 주소는 http://localhost:8888/입니다. 터미널에서 제공한 비밀번호를 붙여넣으세요.
주의 사항: AWS 시스템에 파일을 업로드/다운로드하려면 Cyberduck 소프트웨어를 사용할 수 있습니다. https://cyberduck.io/.
파이를 설치하는 방법Spark on Windows/Mac과 Conda
다음은 Py를 설치하는 방법에 대한 자세한 프로세스입니다.Spark on Windows/Anaconda를 사용하는 Mac:
설치하려면 Spark 로컬 컴퓨터에서 권장되는 방법은 새 Conda 환경을 만드는 것입니다. 이 새로운 환경이 설치됩니다. Python 3.6, Spark 그리고 모든 종속성.
맥 사용자
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows 사용자
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
.yml 파일을 편집할 수 있습니다. 들여쓰기에 주의하세요. 이전에 두 개의 공간이 필요합니다 –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
저장하고 환경을 만들어주세요. 시간이 좀 걸립니다
conda env create -f hello-spark.yml
위치에 대한 자세한 내용은 TensorFlow 설치 튜토리얼을 확인하세요.
귀하의 컴퓨터에 설치된 모든 환경을 확인할 수 있습니다
conda env list
Activate hello-spark
맥 사용자
source activate hello-spark
Windows 사용자
activate hello-spark
참고 : TensorFlow에서 튜토리얼을 실행하기 위해 특정 TensorFlow 환경을 이미 만들었습니다. hello-tf와는 다른 새로운 환경을 만드는 것이 더 편리합니다. hello-tf를 과부하시키는 것은 의미가 없습니다. Spark 또는 다른 기계 학습 라이브러리.
프로젝트의 대부분이 TensorFlow와 관련되어 있지만 다음을 사용해야 한다고 상상해 보세요. Spark 하나의 특정 프로젝트에 대해. 모든 프로젝트에 대해 TensorFlow 환경을 설정하고 별도의 환경을 생성할 수 있습니다. Spark. 라이브러리를 원하는 만큼 추가할 수 있습니다. Spark TensorFlow 환경을 방해하지 않고 원하는 대로 환경을 구축할 수 있습니다. 일단 당신이 작업을 마치면 Spark의 프로젝트를 삭제하면 TensorFlow 환경에 영향을 주지 않고 삭제할 수 있습니다.
Jupyter
엽니다 Jupyter 노트북을 사용하고 Py가 있는지 시도해 보세요.Spark 작동합니다. 새 노트북에 다음 Py를 붙여넣습니다.Spark 샘플 코드 :
import pyspark from pyspark import SparkContext sc =SparkContext()
오류가 표시되면 다음과 같은 가능성이 있습니다. Java 귀하의 컴퓨터에 설치되어 있지 않습니다. Mac에서 터미널을 열고 java -version을 작성합니다. Java 버전이 있는 경우 1.8인지 확인합니다. Windows, 애플리케이션으로 이동하여 Java 접는 사람. 있는 경우 Java 폴더, 확인해 보세요 Java 1.8이 설치되어 있습니다. 이 글을 쓰는 시점에서 PySpark 와 호환되지 않습니다 Java9 이상.
설치해야 할 경우 Java, 당신은 생각 링크 그리고 jdk-8u181-windows-x64.exe를 다운로드하세요
Mac 사용자의 경우 `brew` 사용을 권장합니다.
brew tap caskroom/versions brew cask install java8
이 단계별 튜토리얼을 참조하세요. 설치하는 방법 Java
주의 사항: 환경을 완전히 지우려면 제거를 사용합니다.
conda env remove -n hello-spark -y
Spark 문맥
Spark컨텍스트는 클러스터와의 연결을 허용하는 내부 엔진입니다. 작업을 실행하려면 다음이 필요합니다. Spark문맥.
만들기 Spark문맥
우선, Spark문맥.
import pyspark from pyspark import SparkContext sc =SparkContext()
이제 Spark컨텍스트가 준비되면 RDD(Resilient Distributed Dataset)라는 데이터 컬렉션을 만들 수 있습니다. RDD의 계산은 클러스터 전체에서 자동으로 병렬화됩니다.
nums= sc.parallelize([1,2,3,4])
take를 사용하여 첫 번째 행에 액세스할 수 있습니다.
nums.take(1)
[1]
람다 함수를 사용하여 데이터에 변환을 적용할 수 있습니다. 파이에서Spark 아래 예에서는 숫자의 제곱을 반환합니다. 지도변형이다
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQL컨텍스트
더 편리한 방법은 DataFrame을 사용하는 것입니다. Spark컨텍스트가 이미 설정되어 있으므로 이를 사용하여 dataFrame을 생성할 수 있습니다. 또한 SQLContext를 선언해야 합니다.
SQLContext를 사용하면 엔진을 다른 데이터 소스와 연결할 수 있습니다. 기능을 시작하는 데 사용됩니다. Spark sql.sql.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
이제 이것에서 Spark 지도 시간 Python, 튜플 목록을 만들어 보겠습니다. 각 튜플에는 사람의 이름과 나이가 포함됩니다. 다음 네 단계가 필요합니다.
단계 1) 정보를 사용하여 튜플 목록을 만듭니다.
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
단계 2) RDD 구축
rdd = sc.parallelize(list_p)
단계 3) 튜플 변환
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
단계 4) DataFrame 컨텍스트 만들기
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
각 기능의 유형에 액세스하려면 printSchema()를 사용할 수 있습니다.
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Py를 사용한 머신러닝 예시Spark
이제 간략하게 알아보았으니 Spark 및 SQLContext를 사용하면 첫 번째 기계 학습 프로그램을 구축할 준비가 되었습니다.
Py를 사용하여 머신 러닝 프로그램을 빌드하는 단계는 다음과 같습니다.Spark:
- 단계 1) Py를 이용한 기본 작업Spark
- 단계 2) 데이터 전처리
- 단계 3) 데이터 처리 파이프라인 구축
- 단계 4) 분류기 구축: 로지스틱
- 단계 5) 모델 학습 및 평가
- 단계 6) 초매개변수 조정
이 파이에서는Spark 머신 러닝 튜토리얼, 우리는 성인 데이터 세트를 사용할 것입니다. 이 튜토리얼의 목적은 Pyspark를 사용하는 방법을 배우는 것입니다. 데이터 세트에 대한 자세한 내용은 이 튜토리얼을 참조하세요.
데이터 세트는 중요하지 않으며 계산에 오랜 시간이 걸린다고 생각할 수도 있습니다. Spark 상당한 양의 데이터를 처리하도록 설계되었습니다. Spark처리되는 데이터 세트가 커지면 다른 기계 학습 라이브러리에 비해 의 성능이 향상됩니다.
1단계) Py를 이용한 기본 연산Spark
우선, 아직 시작되지 않은 SQLContext를 초기화해야 합니다.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
그런 다음 sqlContext.read.csv를 사용하여 cvs 파일을 읽을 수 있습니다. True로 설정된 inferSchema를 사용하여 알려줍니다. Spark 데이터 유형을 자동으로 추측합니다. 기본적으로 False로 설정되어 있습니다.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
데이터 유형을 살펴보겠습니다.
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
show를 통해 데이터를 볼 수 있습니다.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
inderShema를 True로 설정하지 않은 경우 유형에 발생하는 상황은 다음과 같습니다. 문자열에는 모두 있습니다.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
연속형 변수를 올바른 형식으로 변환하려면 열을 다시 캐스팅하면 됩니다. withColumn을 사용하여 알 수 있습니다. Spark 어떤 열에서 변환을 수행할 것인가.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
열 선택
선택 및 기능 이름을 사용하여 행을 선택하고 표시할 수 있습니다. 아래에서는 age 및 fnlwgt가 선택되었습니다.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
그룹별로 계산
그룹별로 발생 횟수를 계산하려면 다음을 연결하면 됩니다.
- 그룹바이()
- 카운트()
함께. 파이에서Spark 아래 예에서는 교육 수준에 따라 행 수를 계산합니다.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
데이터 설명
데이터의 요약 통계를 얻으려면 explain()을 사용할 수 있습니다. 다음을 계산합니다.
- 계산
- 평균
- 표준 편차
- 분
- 최대
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
한 열의 요약 통계를 원할 경우, explain() 안에 해당 열 이름을 추가합니다.
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
크로스탭 계산
어떤 경우에는 두 쌍의 열 사이의 설명적 통계를 보는 것이 흥미로울 수 있습니다. 예를 들어, 교육 수준에 따라 50k 미만 또는 초과 소득을 가진 사람의 수를 셀 수 있습니다. 이 작업을 교차표라고 합니다.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
젊을 때 50달러 이상의 수익을 올린 사람이 없다는 것을 알 수 있습니다.
열 삭제
열을 삭제하는 두 가지 직관적인 API가 있습니다.
- drop(): 열 삭제
- dropna(): NA를 삭제합니다.
아래에 education_num 열을 삭제합니다.
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
데이터 필터링
filter()를 사용하여 데이터 하위 집합에 기술 통계를 적용할 수 있습니다. 예를 들어 40세 이상의 사람 수를 셀 수 있습니다.
df.filter(df.age > 40).count()
13443
Descript그룹별 통계
마지막으로, 그룹별로 데이터를 그룹화하고 평균과 같은 통계 연산을 계산할 수 있습니다.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
2단계) 데이터 전처리
데이터 처리는 머신러닝에서 중요한 단계입니다. 가비지 데이터를 제거한 후에는 몇 가지 중요한 통찰력을 얻을 수 있습니다.
예를 들어, 나이는 소득에 대한 선형 함수가 아니라는 것을 알고 있습니다. 젊은 사람들의 소득은 대개 중년보다 낮습니다. 은퇴 후에는 가계가 저축한 돈을 사용하게 되므로 소득이 감소하게 됩니다. 이 패턴을 포착하려면 연령 특성에 사각형을 추가하면 됩니다.
연령 제곱 추가
새로운 기능을 추가하려면 다음을 수행해야 합니다.
- 열 선택
- 변환을 적용하고 DataFrame에 추가합니다.
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
age_square가 데이터 프레임에 성공적으로 추가된 것을 확인할 수 있습니다. 선택을 사용하여 변수의 순서를 변경할 수 있습니다. 아래에서는 age 바로 뒤에 age_square를 가져옵니다.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
네덜란드-네덜란드 제외
기능 내의 그룹에 관찰이 하나만 있는 경우 모델에 정보를 가져오지 않습니다. 반대로 교차 검증 중에 오류가 발생할 수 있습니다.
세대의 출신지를 확인해보자
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Native_country 기능에는 네덜란드에서 온 가구가 한 가구뿐입니다. 당신은 그것을 제외합니다.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
3단계) 데이터 처리 파이프라인 구축
scikit-learn과 비슷하게 Pyspark에는 파이프라인 API가 있습니다.
파이프라인은 데이터 구조를 유지하는 데 매우 편리합니다. 파이프라인에 데이터를 푸시합니다. 파이프라인 내부에서 다양한 작업이 수행되고 출력은 알고리즘에 피드하는 데 사용됩니다.
예를 들어, 기계 학습의 하나의 범용 변환은 문자열을 하나의 핫 인코더(즉, 그룹별로 하나의 열)로 변환하는 것으로 구성됩니다. 하나의 핫 인코더는 일반적으로 XNUMX으로 가득 찬 행렬입니다.
데이터를 변환하는 단계는 scikit-learn과 매우 유사합니다. 다음을 수행해야 합니다.
- 문자열을 숫자로 인덱싱
- 하나의 핫 인코더 만들기
- 데이터 변환
두 가지 API가 작업을 수행합니다: StringIndexer, OneHotEncoder
- 먼저 인덱싱할 문자열 열을 선택합니다. inputCol은 데이터 세트에 있는 열의 이름입니다. outputCol은 변환된 열에 지정된 새 이름입니다.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- 데이터를 피팅하고 변환
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- 그룹을 기반으로 뉴스 칼럼을 생성합니다. 예를 들어 특성에 10개의 그룹이 있는 경우 새 행렬에는 각 그룹에 하나씩 10개의 열이 있습니다.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
파이프라인 빌드
모든 정확한 기능을 변환하고 최종 데이터 세트에 추가하는 파이프라인을 빌드합니다. 파이프라인에는 4개의 작업이 있지만 원하는 만큼 많은 작업을 추가해도 됩니다.
- 범주형 데이터 인코딩
- 레이블 기능 색인화
- 연속변수 추가
- 단계를 조립합니다.
각 단계는 stages라는 목록에 저장됩니다. 이 목록은 VectorAssembler에게 파이프라인 내부에서 수행할 작업을 알려줍니다.
1. 범주형 데이터 인코딩
이 단계는 모든 범주형 기능을 반복한다는 점을 제외하면 위의 예와 완전히 동일합니다.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. 라벨 기능 색인화
Spark는 다른 많은 라이브러리와 마찬가지로 레이블에 문자열 값을 허용하지 않습니다. StringIndexer를 사용하여 레이블 기능을 변환하고 목록 단계에 추가합니다.
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. 연속변수 추가
VectorAssembler의 inputCols는 열 목록입니다. 모든 새 열을 포함하는 새 목록을 만들 수 있습니다. 아래 코드는 인코딩된 범주형 기능과 연속형 기능으로 목록을 채웁니다.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. 계단을 조립합니다.
마지막으로 VectorAssembler의 모든 단계를 통과합니다.
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
이제 모든 단계가 준비되었으므로 데이터를 파이프라인에 푸시합니다.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
새 데이터 세트를 확인하면 변환된 기능과 변환되지 않은 기능이 모두 포함되어 있음을 알 수 있습니다. 당신은 새로운 라벨과 기능에만 관심이 있습니다. 특성에는 변환된 모든 특성과 연속형 변수가 포함됩니다.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
4단계) 분류기 구축: 로지스틱
계산 속도를 높이려면 모델을 DataFrame으로 변환합니다.
지도를 사용하여 모델에서 새 라벨과 기능을 선택해야 합니다.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
열차 데이터를 DataFrame으로 생성할 준비가 되었습니다. sqlContext를 사용합니다.
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
두 번째 행을 확인하세요.
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
열차/테스트 세트 생성
RandomSplit을 사용하여 데이터 세트를 80/20으로 분할합니다.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
훈련 세트와 테스트 세트 모두에서 소득이 50 이하/이상인 사람의 수를 세어 보겠습니다.
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
로지스틱 회귀 분석기 구축
마지막으로 분류기를 빌드할 수 있습니다. Pyspark에는 로지스틱 회귀를 수행하는 LogisticRegression이라는 API가 있습니다.
레이블 열과 기능 열을 표시하여 lr을 초기화합니다. 최대 10번의 반복을 설정하고 정규화 매개변수 값을 0.3으로 추가합니다. 다음 섹션에서는 모델을 조정하기 위해 매개변수 그리드와 교차 검증을 사용합니다.
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#회귀분석에서 계수를 볼 수 있습니다
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
5단계) 모델 학습 및 평가
테스트 세트에 대한 예측을 생성하려면
test_data에서 변환()과 함께 선형 모델을 사용할 수 있습니다.
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
예측의 요소를 인쇄할 수 있습니다.
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
라벨, 예측, 확률에 관심이 있습니다.
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
모델 평가
모델의 성능이 얼마나 좋은지(또는 나쁜지) 확인하려면 정확도 측정항목을 살펴봐야 합니다. 현재 정확도 측정을 계산하는 API는 없습니다. Spark. 기본값은 ROC, 수신기 작동 특성 곡선입니다. 거짓 양성률을 고려하는 다른 지표입니다.
ROC를 살펴보기 전에 정확도 측정을 구성해 보겠습니다. 이 측정항목에 더 익숙합니다. 정확도 측정값은 총 관측치 수에 대한 올바른 예측의 합입니다.
레이블과 'prediction'을 사용하여 DataFrame을 만듭니다.
cm = predictions.select("label", "prediction")
라벨과 예측에서 클래스 수를 확인할 수 있습니다.
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
예를 들어, 테스트 세트에는 소득이 1578 이상인 가구가 50개 있고 그 이하인 가구가 5021개 있습니다. 그러나 분류자는 소득이 617만 달러 이상인 가구를 50개로 예측했습니다.
전체 행 수에 대해 레이블이 올바르게 분류되었을 때 개수를 계산하여 정확도를 계산할 수 있습니다.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
모든 것을 하나로 묶고 정확도를 계산하는 함수를 작성할 수 있습니다.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC 지표
BinaryClassificationEvaluator 모듈에는 ROC 측정값이 포함되어 있습니다. 수신자 Opera특성 곡선은 이진 분류에 사용되는 또 다른 일반적인 도구입니다. 정밀도/재현율 곡선과 매우 유사하지만 정밀도 대 재현율을 표시하는 대신 ROC 곡선은 위양성률에 대한 참양성률(즉, 재현율)을 표시합니다. 위양성률은 양성으로 잘못 분류된 음성 인스턴스의 비율입니다. 1에서 진음성률을 뺀 것과 같습니다. 진음성률을 특이성이라고도 합니다. 따라서 ROC 곡선은 민감도(재현율) 대 XNUMX - 특이성을 표시합니다.
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192areaUnderROC
print(evaluator.evaluate(predictions))
0.8940481662695192
6단계) 초매개변수 조정
마지막으로 하이퍼파라미터를 조정할 수 있습니다. 비슷하다 사이킷 런 매개변수 그리드를 생성하고 조정하려는 매개변수를 추가합니다.
계산 시간을 줄이려면 두 개의 값만 사용하여 정규화 매개변수만 조정하면 됩니다.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
마지막으로 5겹 교차 검증 방법을 사용하여 모델을 평가합니다. 훈련하는 데 약 16분이 소요됩니다.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
모델 학습 시간: 978.807초
최고의 정규화 하이퍼파라미터는 0.01이며 정확도는 85.316%입니다.
accuracy_m(model = cvModel) Model accuracy: 85.316%
cvModel.bestModel을 extractParamMap()과 연결하여 권장 매개변수를 추출할 수 있습니다.
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
요약
Spark 데이터 과학자를 위한 기본 도구입니다. 이를 통해 실무자는 앱을 다양한 데이터 소스에 연결하고, 데이터 분석을 원활하게 수행하거나 예측 모델을 추가할 수 있습니다.
우선 첫째로 Spark, 당신은 Spark 다음과 관련된 맥락:
'Spark문맥()'
및 SQL 데이터 소스에 연결하기 위한 컨텍스트:
'SQL컨텍스트()'
튜토리얼에서는 로지스틱 회귀를 학습하는 방법을 배웁니다.
- 다음을 사용하여 데이터세트를 데이터프레임으로 변환합니다.
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
레이블의 열 이름은 newlabel이고 모든 기능은 기능에 수집됩니다. 데이터 세트가 다른 경우 이 값을 변경하세요.
- 열차/테스트 세트 생성
randomSplit([.8,.2],seed=1234)
- 모델 훈련
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- 예측하기
linearModel.transform()