초보자를 위한 PySpark 튜토리얼: 예제를 통해 학습

PySpark를 배우기 전에 다음 사항을 이해해 봅시다.

아파치 스파크란 무엇입니까?

Spark는 Hadoop MapReduce보다 쉽고 빠른 것으로 검증된 빅데이터 솔루션입니다. Spark는 2009년 UC Berkeley RAD 연구소에서 개발한 오픈 소스 소프트웨어입니다. 2010년 대중에게 공개된 이후 Spark는 전례 없는 규모로 업계에서 널리 사용되고 있습니다.

시대에 빅 데이터따라서 실무자는 데이터 스트리밍을 처리하기 위해 그 어느 때보다 빠르고 안정적인 도구가 필요합니다. MapReduce와 같은 이전 도구는 인기가 있었지만 속도가 느렸습니다. 이 문제를 극복하기 위해 Spark는 빠르고 범용적인 솔루션을 제공합니다. Spark와 MapReduce의 주요 차이점은 Spark가 나중에 하드 디스크에서 메모리에서 계산을 실행한다는 것입니다. 이를 통해 고속 액세스 및 데이터 처리가 가능해 시간이 몇 시간에서 몇 분으로 단축됩니다.

PySpark란 무엇입니까?

파이 스파크 Python을 Spark와 함께 사용하기 위해 Apache Spark 커뮤니티에서 만든 도구입니다. Python에서 RDD(Resilient Distributed Dataset)로 작업할 수 있습니다. 또한 Python API를 Spark 코어와 연결하여 Spark 컨텍스트를 시작하는 PySpark Shell을 제공합니다. Spark는 클러스터 컴퓨팅을 구현하기 위한 네임 엔진인 반면, PySpark는 Spark를 사용하기 위한 Python의 라이브러리입니다.

스파크는 어떻게 작동하나요?

Spark는 컴퓨팅 엔진을 기반으로 합니다. 즉, 애플리케이션 예약, 배포 및 모니터링을 관리합니다. 각 작업은 컴퓨팅 클러스터라고 하는 다양한 작업자 시스템에서 수행됩니다. 컴퓨팅 클러스터는 작업 분할을 의미합니다. 한 기계는 하나의 작업을 수행하고 다른 기계는 다른 작업을 통해 최종 출력에 기여합니다. 결국 모든 작업이 집계되어 출력이 생성됩니다. Spark 관리자는 다양한 Spark 작업에 대한 360도 개요를 제공합니다.

스파크는 어떻게 작동하나요?
스파크는 어떻게 작동하나요?

Spark는 다음과 함께 작동하도록 설계되었습니다.

  • Python
  • 자바
  • 스칼라
  • SQL

Spark의 중요한 특징은 기계 학습을 위한 MLlib를 포함하여 방대한 양의 내장 라이브러리입니다.. Spark는 또한 Hadoop 클러스터와 함께 작동하도록 설계되었으며 Hive 데이터, CSV, JSON, Casandra 데이터를 포함한 광범위한 유형의 파일을 읽을 수 있습니다.

스파크를 사용하는 이유는 무엇입니까?

미래의 데이터 실무자로서 여러분은 Python의 유명한 라이브러리인 Pandas 및 scikit-learn에 익숙해야 합니다. 이 두 라이브러리는 최대 중간 크기의 데이터 세트를 탐색하는 데 환상적입니다. 일반 기계 학습 프로젝트는 Followo를 중심으로 구축됩니다.wing 방법론:

  • 데이터를 디스크에 로드
  • 데이터를 기기의 메모리로 가져오기
  • 데이터 처리/분석
  • 기계 학습 모델 구축
  • 예측을 다시 디스크에 저장

데이터 과학자가 한 대의 컴퓨터에 비해 너무 큰 데이터를 처리하려는 경우 문제가 발생합니다. 데이터 과학 초기에는 대규모 데이터 세트에 대한 교육이 항상 필요한 것은 아니었기 때문에 실무자들은 데이터를 샘플링했습니다. 데이터 과학자는 좋은 통계 샘플을 찾고 추가 견고성 검사를 수행하여 우수한 모델을 제시합니다.

그러나 여기에는 몇 가지 문제가 있습니다.

  • 데이터 세트가 현실 세계를 반영하고 있나요?
  • 데이터에 구체적인 예가 포함되어 있나요?
  • 모델이 샘플링에 적합한가?

예를 들어 사용자 추천을 들어보세요. 추천자는 선호도를 평가할 때 사용자를 다른 사용자와 비교하는 데 의존합니다. 데이터 실무자가 데이터의 하위 집합만 취한다면 서로 매우 유사한 사용자 집단은 존재하지 않을 것입니다. 추천자는 전체 데이터 세트에서 실행되거나 전혀 실행되지 않아야 합니다.

해결 방안은 무엇인가?

해결책은 오랫동안 분명해졌으며 문제를 여러 컴퓨터로 분할했습니다. 병렬 컴퓨팅에도 여러 가지 문제가 있습니다. 개발자는 종종 병렬 코드를 작성하는 데 어려움을 겪고 결국 여러 가지 COM 문제를 해결해야 합니다.plex 다중 처리 자체에 관한 문제.

Pyspark는 데이터 과학자에게 병렬 데이터 진행 문제를 해결하는 데 사용할 수 있는 API를 제공합니다. Pyspark가 com을 처리합니다.plex데이터 배포, 코드 배포, 기계 클러스터의 작업자로부터 출력 수집과 같은 다중 처리의 특성.

Spark는 독립형으로 실행될 수도 있지만 Hadoop과 같은 클러스터 컴퓨팅 프레임워크 위에서 실행되는 경우가 가장 많습니다. 그러나 테스트 및 개발에서 데이터 과학자는 개발 과정에서 Spark를 효율적으로 실행할 수 있습니다. box클러스터가 없는 ES 또는 노트북

• Spark의 주요 장점 중 하나는 데이터 스트리밍 관리, 원활한 데이터 쿼리, 머신 러닝 예측, 다양한 분석에 대한 실시간 액세스를 포괄하는 아키텍처를 구축하는 것입니다.

• Spark는 SQL 언어, 즉 구조화된 데이터와 긴밀하게 작동합니다. 실시간으로 데이터를 쿼리할 수 있습니다.

• 데이터 과학자 메인의 임무는 예측 모델을 분석하고 구축하는 것입니다. 간단히 말해서, 데이터 과학자는 다음을 사용하여 데이터를 쿼리하는 방법을 알아야 합니다. SQL, 통계 보고서를 생성하고 기계 학습을 활용하여 예측을 생성합니다. 데이터 과학자는 데이터 정리, 변환 및 분석에 상당한 시간을 소비합니다. 데이터 세트 또는 데이터 워크플로우가 준비되면 데이터 과학자는 다양한 기술을 사용하여 통찰력과 숨겨진 패턴을 발견합니다. 데이터 조작은 강력해야 하며 사용하기 쉬워야 합니다. Spark는 속도와 풍부한 API 덕분에 적합한 도구입니다.

이 PySpark 튜토리얼에서는 PySpark 예제를 사용하여 분류자를 구축하는 방법을 배웁니다.

AWS로 PySpark를 설치하는 방법

Jupyter 팀은 Spark를 효율적으로 실행하기 위해 Docker 이미지를 구축합니다. 다음은 AWS에 PySpark 인스턴스를 설치하기 위해 수행할 수 있는 단계입니다.

튜토리얼을 참조하세요. AWS 또한 TensorFlow

1단계: 인스턴스 생성

우선 인스턴스를 생성해야 합니다. AWS 계정으로 이동하여 인스턴스를 시작하십시오. 스토리지를 최대 15g까지 늘릴 수 있으며 TensorFlow 튜토리얼과 동일한 보안 그룹을 사용할 수 있습니다.

2단계: 연결 열기

연결을 열고 도커 컨테이너를 설치합니다. 더 많은 드를 위해tails, TensorFlow 튜토리얼을 참조하세요. 도커. 올바른 작업 디렉토리에 있어야 합니다.

Docker를 설치하려면 다음 코드를 실행하세요.

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

3단계: 연결을 다시 열고 Spark 설치

연결을 다시 연 후 PySpark가 포함된 이미지를 설치할 수 있습니다.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

4 단계 : 열기 Jupyter

컨테이너와 이름을 확인하세요.

docker ps

docker 로그와 docker 이름을 사용하여 docker를 시작합니다. 예를 들어 docker는 zealous_goldwasser를 기록합니다.

브라우저로 이동하여 실행하세요. Jupyter. 주소는 http://localhost:8888/입니다. 터미널에서 제공한 비밀번호를 붙여넣으세요.

주의 사항: AWS 시스템에 파일을 업로드/다운로드하려면 Cyberduck 소프트웨어를 사용할 수 있습니다. https://cyberduck.io/.

Conda를 사용하여 Windows/Mac에 PySpark를 설치하는 방법

FOLLOwing Anaconda를 사용하여 Windows/Mac에 PySpark를 설치하는 방법에 대한 자세한 프로세스는 다음과 같습니다.

로컬 컴퓨터에 Spark를 설치하려면 새 Conda 환경을 만드는 것이 좋습니다. 이 새로운 환경은 Python 3.6, Spark 및 모든 종속성을 설치합니다.

맥 사용자

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows 사용자

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

.yml 파일을 편집할 수 있습니다. 들여쓰기에 주의하세요. 이전에 두 개의 공간이 필요합니다 –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

저장하고 환경을 만들어주세요. 시간이 좀 걸립니다

conda env create -f hello-spark.yml

더 많은 드를 위해tails 위치에 대해서는 튜토리얼 TensorFlow 설치를 확인하세요.

귀하의 컴퓨터에 설치된 모든 환경을 확인할 수 있습니다

conda env list
Activate hello-spark

맥 사용자

source activate hello-spark

Windows 사용자

activate hello-spark

참고 : TensorFlow에서 튜토리얼을 실행하기 위해 특정 TensorFlow 환경을 이미 만들었습니다. hello-tf와는 다른 새로운 환경을 만드는 것이 더 편리합니다. Spark나 다른 기계 학습 라이브러리로 hello-tf를 오버로드하는 것은 의미가 없습니다.

대부분의 프로젝트에 TensorFlow가 포함되어 있지만 특정 프로젝트 하나에 Spark를 사용해야 한다고 상상해 보세요. 모든 프로젝트에 대해 TensorFlow 환경을 설정하고 Spark에 대한 별도의 환경을 만들 수 있습니다. TensorFlow 환경을 방해하지 않고 Spark 환경에 원하는 만큼 많은 라이브러리를 추가할 수 있습니다. Spark 프로젝트가 완료되면 TensorFlow 환경에 영향을 주지 않고 지울 수 있습니다.

Jupyter

엽니다 Jupyter 노트북을 사용하고 PySpark가 작동하는지 시도해 보세요. 새 노트북에 다음 내용을 붙여넣으세요.wing PySpark 샘플 코드:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

오류가 표시되면 시스템에 Java가 설치되지 않았을 가능성이 높습니다. Mac에서는 터미널을 열고 java -version을 입력하고, Java 버전이 있으면 1.8인지 확인하세요. Windows에서 애플리케이션으로 이동하여 Java 폴더가 있는지 확인하세요. Java 폴더가 있는 경우 Java 1.8이 설치되어 있는지 확인하세요. 이 글을 쓰는 시점에서 PySpark는 Java9 이상과 호환되지 않습니다.

Java를 설치해야 한다면 다음과 같이 생각하세요. 링크 jdk-8u181-windows-x64.exe를 다운로드하세요.

Jupyter

Mac 사용자의 경우 `brew` 사용을 권장합니다.

brew tap caskroom/versions
brew cask install java8

이 단계별 튜토리얼을 참조하세요. 자바를 설치하는 방법

주의 사항: 환경을 완전히 지우려면 제거를 사용합니다.

 conda env remove -n hello-spark -y

스파크 컨텍스트

SparkContext는 클러스터와의 연결을 허용하는 내부 엔진입니다. 작업을 실행하려면 SparkContext가 필요합니다.

SparkContext 생성

먼저 SparkContext를 시작해야 합니다.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

이제 SparkContext가 준비되었으므로 RDD(Resilient Distributed Dataset)라는 데이터 컬렉션을 생성할 수 있습니다. RDD의 계산은 클러스터 전체에서 자동으로 병렬화됩니다.

nums= sc.parallelize([1,2,3,4])

take를 사용하여 첫 번째 행에 액세스할 수 있습니다.

nums.take(1)
[1]

람다 함수를 사용하여 데이터에 변환을 적용할 수 있습니다. 아래 PySpark 예제에서는 숫자의 제곱을 반환합니다. 지도변형이다

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQL컨텍스트

더 편리한 방법은 DataFrame을 사용하는 것입니다. SparkContext는 이미 설정되어 있으므로 이를 사용하여 dataFrame을 생성할 수 있습니다. 또한 SQLContext를 선언해야 합니다.

SQLContext를 사용하면 엔진을 다른 데이터 소스와 연결할 수 있습니다. Spark SQL의 기능을 시작하는 데 사용됩니다.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

이제 이 Spark 튜토리얼 Python에서 튜플 목록을 만들어 보겠습니다. 각 튜플에는 사람의 이름과 나이가 포함됩니다. 다음 네 단계가 필요합니다.

단계 1) 정보를 사용하여 튜플 목록을 만듭니다.

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

단계 2) RDD 구축

rdd = sc.parallelize(list_p)

단계 3) 튜플 변환

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

단계 4) DataFrame 컨텍스트 만들기

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

각 기능의 유형에 액세스하려면 printSchema()를 사용할 수 있습니다.

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

PySpark를 사용한 머신러닝 예시

이제 Spark 및 SQLContext에 대한 간단한 아이디어를 얻었으므로 첫 번째 기계 학습 프로그램을 구축할 준비가 되었습니다.

FOLLOwing PySpark를 사용하여 기계 학습 프로그램을 구축하는 단계는 다음과 같습니다.

  • 단계 1) PySpark를 사용한 기본 작업
  • 단계 2) 데이터 전처리
  • 단계 3) 데이터 처리 파이프라인 구축
  • 단계 4) 분류기 구축: 로지스틱
  • 단계 5) 모델 학습 및 평가
  • 단계 6) 초매개변수 조정

이 PySpark Machine Learning 튜토리얼에서는 성인 데이터 세트를 사용합니다. 이 튜토리얼의 목적은 Pyspark 사용법을 배우는 것입니다. 데이터세트에 대한 자세한 내용은 이 튜토리얼을 참조하세요.

데이터 세트는 중요하지 않으며 계산에 오랜 시간이 걸린다고 생각할 수도 있습니다. Spark는 상당한 양의 데이터를 처리하도록 설계되었습니다. 처리되는 데이터세트가 커질수록 Spark의 성능은 다른 기계 학습 라이브러리에 비해 향상됩니다.

1단계) PySpark를 사용한 기본 작업

우선, 아직 시작되지 않은 SQLContext를 초기화해야 합니다.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

그런 다음 sqlContext.read.csv를 사용하여 cvs 파일을 읽을 수 있습니다. InferSchema를 True로 설정하여 Spark에 자동으로 데이터 유형을 추측하도록 지시합니다. 기본적으로 False로 설정되어 있습니다.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

데이터 유형을 살펴보겠습니다.

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

show를 통해 데이터를 볼 수 있습니다.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

inderShema를 True로 설정하지 않은 경우 유형에 발생하는 상황은 다음과 같습니다. 문자열에는 모두 있습니다.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

연속형 변수를 올바른 형식으로 변환하려면 열을 다시 캐스팅하면 됩니다. withColumn을 사용하여 변환을 수행할 열을 Spark에 알릴 수 있습니다.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

열 선택

선택 및 기능 이름을 사용하여 행을 선택하고 표시할 수 있습니다. 아래에서는 age 및 fnlwgt가 선택되었습니다.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

그룹별로 계산

그룹별로 발생 횟수를 계산하려면 다음을 연결하면 됩니다.

  • 그룹바이()
  • 카운트()

함께. 아래 PySpark 예에서는 교육 수준에 따라 행 수를 계산합니다.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

데이터 설명

데이터의 요약 통계를 얻으려면 explain()을 사용할 수 있습니다. 다음을 계산합니다.

  • 계산
  • 평균
  • 표준 편차
  • 최대
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

한 열의 요약 통계를 원할 경우, explain() 안에 해당 열 이름을 추가합니다.

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

크로스탭 계산

어떤 경우에는 두 쌍의 열 사이의 기술 통계를 보는 것이 흥미로울 수 있습니다. 예를 들어, 교육 수준에 따라 소득이 50 이하 또는 이상인 사람의 수를 계산할 수 있습니다. 이 작업을 크로스탭이라고 합니다.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

젊을 때 50달러 이상의 수익을 올린 사람이 없다는 것을 알 수 있습니다.

열 삭제

열을 삭제하는 두 가지 직관적인 API가 있습니다.

  • drop(): 열 삭제
  • dropna(): NA를 삭제합니다.

아래에 education_num 열을 삭제합니다.

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

데이터 필터링

filter()를 사용하여 데이터 하위 집합에 기술 통계를 적용할 수 있습니다. 예를 들어 40세 이상의 사람 수를 셀 수 있습니다.

df.filter(df.age > 40).count()

13443

그룹별 기술통계

마지막으로 데이터를 그룹별로 그룹화하고 평균과 같은 통계 연산을 계산할 수 있습니다.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

2단계) 데이터 전처리

데이터 처리는 머신러닝에서 중요한 단계입니다. 가비지 데이터를 제거한 후에는 몇 가지 중요한 통찰력을 얻을 수 있습니다.

예를 들어, 나이는 소득에 대한 선형 함수가 아니라는 것을 알고 있습니다. 젊은 사람들의 소득은 대개 중년보다 낮습니다. 은퇴 후에는 가계가 저축한 돈을 사용하게 되므로 소득이 감소하게 됩니다. 이 패턴을 포착하려면 연령 특성에 사각형을 추가하면 됩니다.

연령 제곱 추가

새로운 기능을 추가하려면 다음을 수행해야 합니다.

  1. 열 선택
  2. 변환을 적용하고 DataFrame에 추가합니다.
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

age_square가 데이터 프레임에 성공적으로 추가된 것을 확인할 수 있습니다. 선택을 사용하여 변수의 순서를 변경할 수 있습니다. 아래에서는 age 바로 뒤에 age_square를 가져옵니다.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

네덜란드-네덜란드 제외

기능 내의 그룹에 관찰이 하나만 있는 경우 모델에 정보를 가져오지 않습니다. 반대로 교차 검증 중에 오류가 발생할 수 있습니다.

세대의 출신지를 확인해보자

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Native_country 기능에는 네덜란드에서 온 가구가 한 가구뿐입니다. 당신은 그것을 제외합니다.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

3단계) ​​데이터 처리 파이프라인 구축

scikit-learn과 유사하게 Pyspark에는 파이프라인 API가 있습니다.

파이프라인은 데이터 구조를 유지하는 데 매우 편리합니다. 데이터를 파이프라인으로 푸시합니다. 파이프라인 내부에서는 다양한 작업이 수행되며 출력은 알고리즘에 공급되는 데 사용됩니다.

예를 들어, 기계 학습의 하나의 범용 변환은 문자열을 하나의 핫 인코더(즉, 그룹별로 하나의 열)로 변환하는 것으로 구성됩니다. 하나의 핫 인코더는 일반적으로 XNUMX으로 가득 찬 행렬입니다.

데이터를 변환하는 단계는 scikit-learn과 매우 유사합니다. 다음을 수행해야 합니다.

  • 문자열을 숫자로 인덱싱
  • 하나의 핫 인코더 만들기
  • 데이터 변환

두 가지 API가 작업을 수행합니다: StringIndexer, OneHotEncoder

  1. 먼저 인덱싱할 문자열 열을 선택합니다. inputCol은 데이터 세트에 있는 열의 이름입니다. outputCol은 변환된 열에 지정된 새 이름입니다.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. 데이터를 피팅하고 변환
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. 그룹을 기반으로 뉴스 칼럼을 생성합니다. 예를 들어 특성에 10개의 그룹이 있는 경우 새 행렬에는 각 그룹에 하나씩 10개의 열이 있습니다.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

파이프라인 빌드

모든 정확한 기능을 변환하고 이를 최종 데이터 세트에 추가하는 파이프라인을 구축합니다. 파이프라인에는 XNUMX개의 작업이 있지만 원하는 만큼 작업을 추가할 수 있습니다.

  1. 범주형 데이터 인코딩
  2. 레이블 기능 색인화
  3. 연속변수 추가
  4. 단계를 조립합니다.

각 단계는 stage라는 목록에 저장됩니다. 이 목록은 VectorAssembler에게 파이프라인 내에서 수행할 작업을 알려줍니다.

1. 범주형 데이터 인코딩

이 단계는 모든 범주형 기능을 반복한다는 점을 제외하면 위의 예와 완전히 동일합니다.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. 라벨 기능 색인화

Spark는 다른 많은 라이브러리와 마찬가지로 레이블에 문자열 값을 허용하지 않습니다. StringIndexer를 사용하여 레이블 기능을 변환하고 목록 단계에 추가합니다.

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. 연속변수 추가

VectorAssembler의 inputCols는 열 목록입니다. 모든 새 열을 포함하는 새 목록을 만들 수 있습니다. 아래 코드는 인코딩된 범주형 기능과 연속형 기능으로 목록을 채웁니다.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. 계단을 조립합니다.

마지막으로 VectorAssembler의 모든 단계를 통과합니다.

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

이제 모든 단계가 준비되었으므로 데이터를 파이프라인에 푸시합니다.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

새 데이터 세트를 확인하면 변환된 기능과 변환되지 않은 기능이 모두 포함되어 있음을 알 수 있습니다. 당신은 새로운 라벨과 기능에만 관심이 있습니다. 특성에는 변환된 모든 특성과 연속형 변수가 포함됩니다.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

4단계) 분류기 구축: 로지스틱

계산 속도를 높이려면 모델을 DataFrame으로 변환합니다.

지도를 사용하여 모델에서 새 라벨과 기능을 선택해야 합니다.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

열차 데이터를 DataFrame으로 생성할 준비가 되었습니다. sqlContext를 사용합니다.

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

두 번째 행을 확인하세요.

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

열차/테스트 세트 생성

RandomSplit을 사용하여 데이터 세트를 80/20으로 분할합니다.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

훈련 세트와 테스트 세트 모두에서 소득이 50 이하/이상인 사람의 수를 세어 보겠습니다.

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

로지스틱 회귀 분석기 구축

마지막으로 분류기를 구축할 수 있습니다. Pyspark에는 로지스틱 회귀를 수행하는 LogisticRegression이라는 API가 있습니다.

레이블 열과 기능 열을 표시하여 lr을 초기화합니다. 최대 10번의 반복을 설정하고 정규화 매개변수 값을 0.3으로 추가합니다. 다음 섹션에서는 모델을 조정하기 위해 매개변수 그리드와 교차 검증을 사용합니다.

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#회귀분석에서 계수를 볼 수 있습니다

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

5단계) 모델 학습 및 평가

테스트 세트에 대한 예측을 생성하려면

test_data에서 변환()과 함께 선형 모델을 사용할 수 있습니다.

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

예측의 요소를 인쇄할 수 있습니다.

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

라벨, 예측, 확률에 관심이 있습니다.

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

모델 평가

모델의 성능이 얼마나 좋은지(또는 나쁜지) 확인하려면 정확도 측정항목을 살펴봐야 합니다. 현재 Spark에는 정확도 측정을 계산하는 API가 없습니다. 기본값은 ROC, 수신기 작동 특성 곡선입니다. 거짓양성률을 고려한 다른 측정항목입니다.

ROC를 살펴보기 전에 정확도 측정을 구성해 보겠습니다. 이 측정항목에 더 익숙합니다. 정확도 측정값은 총 관측치 수에 대한 올바른 예측의 합입니다.

레이블과 'prediction'을 사용하여 DataFrame을 만듭니다.

cm = predictions.select("label", "prediction")

라벨과 예측에서 클래스 수를 확인할 수 있습니다.

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

예를 들어, 테스트 세트에는 소득이 1578 이상인 가구가 50개 있고 그 이하인 가구가 5021개 있습니다. 그러나 분류자는 소득이 617만 달러 이상인 가구를 50개로 예측했습니다.

전체 행 수에 대해 레이블이 올바르게 분류되었을 때 개수를 계산하여 정확도를 계산할 수 있습니다.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

모든 것을 하나로 묶고 정확도를 계산하는 함수를 작성할 수 있습니다.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC 지표

BinaryClassificationEvaluator 모듈에는 ROC 측정값이 포함되어 있습니다. 수신기 작동 특성 곡선은 이진 분류에 사용되는 또 다른 일반적인 도구입니다. 정밀도/재현율 곡선과 매우 유사하지만 정밀도 대 재현율을 표시하는 대신 ROC 곡선은 위양성률에 대한 참양성률(즉, 재현율)을 표시합니다. 위양성률은 양성으로 잘못 분류된 음성 인스턴스의 비율입니다. 1에서 진음성률을 뺀 것과 같습니다. 진음성률을 특이성이라고도 합니다. 따라서 ROC 곡선은 민감도(재현율) 대 XNUMX - 특이성을 표시합니다.

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192areaUnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

6단계) 초매개변수 조정

마지막으로 하이퍼파라미터를 조정할 수 있습니다. 비슷하다 사이킷 런 매개변수 그리드를 생성하고 조정하려는 매개변수를 추가합니다.

계산 시간을 줄이려면 두 개의 값만 사용하여 정규화 매개변수만 조정하면 됩니다.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

마지막으로 5겹 교차 검증 방법을 사용하여 모델을 평가합니다. 훈련하는 데 약 16분이 소요됩니다.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

모델 학습 시간: 978.807초

최고의 정규화 하이퍼파라미터는 0.01이며 정확도는 85.316%입니다.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

cvModel.bestModel을 extractParamMap()과 연결하여 권장 매개변수를 추출할 수 있습니다.

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

요약

Spark는 데이터 과학자를 위한 기본 도구입니다. 이를 통해 실무자는 앱을 다양한 데이터 소스에 연결하고, 데이터 분석을 원활하게 수행하거나 예측 모델을 추가할 수 있습니다.

Spark를 시작하려면 다음을 사용하여 Spark Context를 시작해야 합니다.

'스파크컨텍스트()'

SQL 데이터 소스에 연결하기 위한 컨텍스트:

'SQL컨텍스트()'

튜토리얼에서는 로지스틱 회귀를 학습하는 방법을 배웁니다.

  1. 다음을 사용하여 데이터세트를 데이터프레임으로 변환합니다.
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

레이블의 열 이름은 newlabel이고 모든 기능은 기능에 수집됩니다. 데이터 세트가 다른 경우 이 값을 변경하세요.

  1. 열차/테스트 세트 생성
randomSplit([.8,.2],seed=1234)
  1. 모델 훈련
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. 예측하기
linearModel.transform()