PySpark Výukový program pro začátečníky: Učte se s PŘÍKLADY
Před učením PySpark, pochopme:
Co je Apache Spark?
Spark je řešení pro velká data, které je prokazatelně jednodušší a rychlejší než Hadoop MapReduce. Spark je software s otevřeným zdrojovým kódem vyvinutý laboratoří UC Berkeley RAD v roce 2009. Vzhledem k tomu, že byl v roce 2010 uvolněn pro veřejnost, Spark roste v popularitě a je používán v průmyslu v nebývalém měřítku.
V éře Big dat, praktici potřebují více než kdy jindy rychlé a spolehlivé nástroje pro zpracování streamování dat. Dřívější nástroje jako MapReduce byly oblíbené, ale byly pomalé. Chcete-li tento problém překonat, Spark nabízí řešení, které je rychlé a univerzální. Hlavní rozdíl mezi Spark a MapReduce je to Spark spouští výpočty v paměti během později na pevném disku. Umožňuje vysokorychlostní přístup a zpracování dat a zkracuje dobu z hodin na minuty.
Co je PySpark?
PySpark je nástroj vytvořený Apache Spark Komunita k použití Python s Spark. Umožňuje práci s RDD (Resilient Distributed Dataset) v Python. Nabízí také PySpark Shell k propojení Python API s Spark jádro zahájit Spark Kontext. Spark je jmenný engine pro realizaci clusterových výpočtů, zatímco PySpark is Python's knihovna k použití Spark.
Jak to funguje? Spark práce?
Spark je založen na výpočetním jádru, což znamená, že se stará o plánování, distribuci a monitorování aplikace. Každá úloha se provádí na různých pracovních strojích nazývaných výpočetní cluster. Výpočetní cluster označuje rozdělení úkolů. Jeden stroj provádí jeden úkol, zatímco ostatní přispívají ke konečnému výstupu prostřednictvím jiného úkolu. Nakonec jsou všechny úkoly agregovány, aby vytvořily výstup. The Spark admin poskytuje 360° přehled různých Spark Pracovní místa.
Spark je navržen pro práci s
- Python
- Java
- Scala
- SQL
Významnou vlastností Spark je obrovské množství vestavěných knihoven, včetně MLlib pro strojové učení. Spark je také navržen pro práci s clustery Hadoop a dokáže číst široký typ souborů, včetně dat Hive, CSV, JSON, Casandra mimo jiné.
Proč používat Spark?
Jako budoucí uživatel dat byste měli být obeznámeni se slavnými knihovnami pythonu: Pandy a scikit-learn. Tyto dvě knihovny jsou fantastické pro prozkoumání datové sady až do střední velikosti. Pravidelné projekty strojového učení jsou postaveny na následující metodologii:
- Nahrajte data na disk
- Importujte data do paměti stroje
- Zpracovat/analyzovat data
- Sestavte model strojového učení
- Uložte předpověď zpět na disk
Problém nastává, pokud chce datový vědec zpracovat data, která jsou pro jeden počítač příliš velká. Během dřívějších dnů vědy o datech si praktici vybírali vzorky, protože školení na velkých souborech dat nebylo vždy potřeba. Datový vědec by našel dobrý statistický vzorek, provedl dodatečnou kontrolu robustnosti a přišel s vynikajícím modelem.
S tím jsou však spojeny některé problémy:
- Odráží soubor dat skutečný svět?
- Zahrnují data konkrétní příklad?
- Je model vhodný pro odběr vzorků?
Vezměte si například doporučení uživatelů. Doporučující se při hodnocení jejich preferencí spoléhají na porovnání uživatelů s ostatními uživateli. Pokud odborník na data vezme pouze podmnožinu dat, nebude existovat kohorta uživatelů, kteří jsou si navzájem velmi podobní. Doporučení musí běžet na úplné datové sadě nebo vůbec.
Co je řešení?
Řešení je zřejmé již dlouhou dobu, rozdělit problém na více počítačů. Paralelní počítání také přináší řadu problémů. Vývojáři mají často problémy s psaním paralelního kódu a nakonec musí vyřešit spoustu složitých problémů kolem samotného multiprocesingu.
Pyspark poskytuje datovým vědcům API, které lze použít k řešení problémů s paralelním zpracováním dat. Pyspark řeší složitosti multiprocesingu, jako je distribuce dat, distribuce kódu a shromažďování výstupu od pracovníků na clusteru strojů.
Spark může běžet samostatně, ale nejčastěji běží na vrcholu clusterového výpočetního rámce, jako je Hadoop. Při testování a vývoji však může datový vědec efektivně pracovat Spark na jejich vývojových boxech nebo noteboocích bez clusteru
• Jednou z hlavních výhod Spark je vybudovat architekturu, která zahrnuje správu streamování dat, bezproblémové datové dotazy, predikci strojového učení a přístup k různým analýzám v reálném čase.
• Spark úzce spolupracuje s jazykem SQL, tedy se strukturovanými daty. Umožňuje dotazování na data v reálném čase.
• Hlavním úkolem datových vědců je analyzovat a vytvářet prediktivní modely. Stručně řečeno, datový vědec potřebuje vědět, jak se dotazovat na data pomocí SQL, vytvářet statistickou zprávu a využívat strojové učení k vytváření předpovědí. Datoví vědci tráví značné množství času čištěním, transformací a analýzou dat. Jakmile je datová sada nebo pracovní tok dat připraven, datový vědec používá různé techniky k objevování poznatků a skrytých vzorců. Manipulace s daty by měla být robustní a stejně snadno použitelná. Spark je ten správný nástroj díky své rychlosti a bohatým API.
V tomto PySpark tutoriálu se naučíte, jak vytvořit klasifikátor s PySpark příklady.
Jak nainstalovat PySpark s AWS
Jedno Jupyter tým vytvoří image Dockeru ke spuštění Spark efektivně. Níže jsou uvedeny kroky, které můžete provést při instalaci PySpark instance v AWS.
Podívejte se na náš tutoriál AWS si TensorFlow
Krok 1: Vytvořte instanci
Nejprve musíte vytvořit instanci. Přejděte na svůj účet AWS a spusťte instanci. Úložiště můžete zvýšit až na 15 g a použít stejnou skupinu zabezpečení jako v tutoriálu TensorFlow.
Krok 2: Otevřete připojení
Otevřete připojení a nainstalujte kontejner dockeru. Další podrobnosti najdete ve výukovém programu s TensorFlow with přístavní dělník. Všimněte si, že musíte být ve správném pracovním adresáři.
Jednoduše spusťte tyto kódy a nainstalujte Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Krok 3: Znovu otevřete připojení a nainstalujte Spark
Po opětovném otevření připojení můžete nainstalovat obraz obsahující PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Krok 4: Otevřete Jupyter
Zkontrolujte kontejner a jeho název
docker ps
Spusťte ukotvitelný panel s protokoly ukotvitelného panelu následovanými názvem ukotvitelného panelu. Například docker zaznamenává zealous_goldwasser
Přejděte do prohlížeče a spusťte Jupyter. Adresa je http://localhost:8888/. Vložte heslo dané terminálem.
Pozor: pokud chcete nahrát/stáhnout soubor do svého stroje AWS, můžete použít software Cyberduck, https://cyberduck.io/.
Jak nainstalovat PySpark on Windows/Mac s Condou
Následuje podrobný postup, jak nainstalovat PySpark on Windows/Mac používající Anaconda:
Chcete-li nainstalovat Spark na vašem místním počítači je doporučeným postupem vytvořit nové prostředí conda. Toto nové prostředí se nainstaluje Python 3.6, Spark a všechny závislosti.
Uživatel systému Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Uživatel
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Soubor .yml můžete upravit. Buďte opatrní s odsazením. Předtím jsou vyžadována dvě místa –
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Uložte to a vytvořte prostředí. Chvíli to trvá
conda env create -f hello-spark.yml
Další podrobnosti o umístění naleznete ve výukovém programu Instalace TensorFlow
Můžete zkontrolovat všechna prostředí nainstalovaná ve vašem počítači
conda env list
Activate hello-spark
Uživatel systému Mac
source activate hello-spark
Windows Uživatel
activate hello-spark
Poznámka: Již jste vytvořili specifické prostředí TensorFlow pro spouštění výukových programů na TensorFlow. Je pohodlnější vytvořit nové prostředí odlišné od hello-tf. Nemá smysl přetěžovat hello-tf Spark nebo jakékoli jiné knihovny strojového učení.
Představte si, že většina vašeho projektu zahrnuje TensorFlow, ale musíte jej použít Spark pro jeden konkrétní projekt. Pro celý svůj projekt můžete nastavit prostředí TensorFlow a vytvořit pro něj samostatné prostředí Spark. Můžete přidat tolik knihoven Spark prostředí, jak chcete, aniž byste zasahovali do prostředí TensorFlow. Jakmile budete hotovi s Sparkprojektu, můžete jej vymazat, aniž by to ovlivnilo prostředí TensorFlow.
Jupyter
Otevřená Jupyter Notebook a zkusit, jestli PySpark funguje. Do nového poznámkového bloku vložte následující PySpark ukázkový kód:
import pyspark from pyspark import SparkContext sc =SparkContext()
Pokud se zobrazí chyba, je to pravděpodobné Java není na vašem počítači nainstalováno. V mac otevřete terminál a napište java -version, pokud existuje verze java, ujistěte se, že je 1.8. V Windows, přejděte na Aplikace a zkontrolujte, zda existuje Java složku. Pokud existuje a Java složku, zkontrolujte to Java 1.8 je nainstalován. V době psaní tohoto článku PySpark není kompatibilní s Java9 a výše.
Pokud potřebujete nainstalovat Java, abyste si mysleli https://trials.autocruitment.com a stáhněte si jdk-8u181-windows-x64.exe
Pro uživatele Mac se doporučuje použít `brew.`
brew tap caskroom/versions brew cask install java8
Podívejte se na tento tutoriál krok za krokem jak nainstalovat Java
Pozor: K úplnému vymazání prostředí použijte tlačítko Odebrat.
conda env remove -n hello-spark -y
Spark Kontext
SparkKontext je interní engine, který umožňuje spojení s clustery. Pokud chcete spustit operaci, potřebujete a SparkKontext.
Vytvořit SparkKontext
Nejprve musíte zahájit a SparkKontext.
import pyspark from pyspark import SparkContext sc =SparkContext()
Nyní, když je SparkKontext je připraven, můžete vytvořit kolekci dat nazvanou RDD, Resilient Distributed Dataset. Výpočet v RDD je automaticky paralelizován napříč clusterem.
nums= sc.parallelize([1,2,3,4])
Do první řady se dostanete pomocí funkce Take
nums.take(1)
[1]
Na data můžete použít transformaci pomocí funkce lambda. V PySpark v příkladu níže vrátíte druhou mocninu čísel. Jedná se o transformaci mapy
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContext
Pohodlnějším způsobem je použití DataFrame. SparkKontext je již nastaven, můžete jej použít k vytvoření dataFrame. Musíte také deklarovat SQLContext
SQLContext umožňuje propojení motoru s různými zdroji dat. Používá se ke spuštění funkcí Spark SQL
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Nyní v tomto Spark konzultace Python, vytvoříme seznam n-tic. Každá n-tice bude obsahovat jména lidí a jejich věk. Jsou vyžadovány čtyři kroky:
Krok 1) Vytvořte seznam n-tic s informacemi
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Krok 2) Sestavte RDD
rdd = sc.parallelize(list_p)
Krok 3) Převeďte n-tice
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Krok 4) Vytvořte kontext DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Pokud chcete získat přístup k typu každé funkce, můžete použít printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Příklad strojového učení s PySpark
Nyní, když máte krátkou představu Spark a SQLContext, jste připraveni vytvořit svůj první program strojového učení.
Níže jsou uvedeny kroky k vytvoření programu strojového učení s PySpark:
- Krok 1) Základní operace s PySpark
- Krok 2) Předzpracování dat
- Krok 3) Vytvořte kanál pro zpracování dat
- Krok 4) Sestavte klasifikátor: logistiku
- Krok 5) Trénujte a vyhodnocujte model
- Krok 6) Vylaďte hyperparametr
V tomto PySpark Výukový program strojového učení, použijeme datovou sadu pro dospělé. Účelem tohoto tutoriálu je naučit se používat Pyspark. Další informace o datové sadě naleznete v tomto kurzu.
Všimněte si, že soubor dat není významný a můžete si myslet, že výpočet trvá dlouho. Spark je navržen tak, aby zpracovával značné množství dat. SparkVýkonnost se ve srovnání s jinými knihovnami strojového učení zvyšuje, když se zpracovávaná datová sada zvětšuje.
Krok 1) Základní operace s PySpark
Nejprve musíte inicializovat SQLContext ještě není iniciován.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
pak můžete číst soubor cvs pomocí sqlContext.read.csv. Ke zjištění použijte inferSchema nastavené na True Spark automaticky odhadnout typ dat. Ve výchozím nastavení je nastaveno na False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Pojďme se podívat na datový typ
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Data můžete zobrazit pomocí show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Pokud jste nenastavili inderShema na True, zde je to, co se děje s typem. Všechny jsou v řetězci.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Chcete-li převést spojitou proměnnou do správného formátu, můžete použít přetypování sloupců. Ke zjištění můžete použít withColumn Spark který sloupec transformaci provozovat.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Vyberte sloupce
Můžete vybrat a zobrazit řádky s výběrem a názvy prvků. Níže jsou vybrány věk a fnlwgt.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Počítejte podle skupiny
Pokud chcete spočítat počet výskytů podle skupiny, můžete řetězit:
- skupina vytvořená()
- počet()
spolu. V PySpark v níže uvedeném příkladu spočítáte počet řádků podle úrovně vzdělání.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Popište data
Chcete-li získat souhrnnou statistiku dat, můžete použít description(). Vypočítá to:
- počítat
- střední
- standardní odchylka
- min
- max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Pokud chcete souhrnnou statistiku pouze jednoho sloupce, přidejte název sloupce uvnitř description()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Křížový výpočet
V některých případech může být zajímavé vidět popisnou statistiku mezi dvěma párovými sloupci. Můžete například spočítat počet lidí s příjmem nižším nebo vyšším než 50 tisíc podle úrovně vzdělání. Tato operace se nazývá kontingenční tabulka.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Můžete vidět, že žádní lidé nemají tržby vyšší než 50 tisíc, když jsou mladí.
Vypustit sloupec
Existují dvě intuitivní rozhraní API pro vypouštění sloupců:
- drop(): Vypuštění sloupce
- dropna(): Drop NA's
Níže vynechte sloupec education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Filtrovat data
Pomocí funkce filter() můžete použít popisnou statistiku v podmnožině dat. Můžete například spočítat počet lidí starších 40 let
df.filter(df.age > 40).count()
13443
Descriptstatistiky podle skupin
Nakonec můžete seskupit data podle skupiny a vypočítat statistické operace, jako je průměr.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Krok 2) Předzpracování dat
Zpracování dat je kritickým krokem ve strojovém učení. Po odstranění nesmyslných dat získáte několik důležitých informací.
Například víte, že věk není lineární funkcí s příjmem. Když jsou lidé mladí, jejich příjem je obvykle nižší než ve středním věku. Po odchodu do důchodu domácnost využívá své úspory, což znamená pokles příjmu. Chcete-li zachytit tento vzor, můžete k funkci věku přidat čtverec
Přidat věkový čtverec
Chcete-li přidat novou funkci, musíte:
- Vyberte sloupec
- Aplikujte transformaci a přidejte ji do DataFrame
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Můžete vidět, že age_square byl úspěšně přidán do datového rámce. Pořadí proměnných můžete změnit pomocí select. Níže přinášíte age_square hned po věku.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Vyloučit Holandsko-Nizozemsko
Pokud má skupina v rámci prvku pouze jedno pozorování, nepřináší do modelu žádné informace. Naopak to může vést k chybě při křížové validaci.
Ověřme původ domácnosti
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Funkce native_country má pouze jednu domácnost pocházející z Nizozemska. Vyloučíte to.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Krok 3) Vytvořte kanál pro zpracování dat
Podobně jako scikit-learn má Pyspark pipeline API.
Potrubí je velmi výhodné pro zachování struktury dat. Vložíte data do potrubí. Uvnitř potrubí se provádějí různé operace, výstup se používá k napájení algoritmu.
Například jedna univerzální transformace ve strojovém učení sestává z převodu řetězce na jeden horký kodér, tj. jeden sloupec po skupině. Jeden horký kodér je obvykle matice plná nul.
Kroky k transformaci dat jsou velmi podobné scikit-learn. Musíš:
- Indexujte řetězec na číselný
- Vytvořte jeden horký kodér
- Transformujte data
Tuto práci dělají dvě rozhraní API: StringIndexer, OneHotEncoder
- Nejprve vyberte sloupec řetězce, který chcete indexovat. InputCol je název sloupce v datové sadě. outputCol je nový název přiřazený transformovanému sloupci.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Přizpůsobte data a transformujte je
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Vytvořte sloupce zpráv na základě skupiny. Pokud je v prvku například 10 skupin, nová matice bude mít 10 sloupců, jeden pro každou skupinu.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Postavte potrubí
Vytvoříte kanál pro převod všech přesných prvků a jejich přidání do konečné datové sady. Potrubí bude mít čtyři operace, ale můžete přidat tolik operací, kolik chcete.
- Kódujte kategorická data
- Indexujte funkci štítku
- Přidejte spojitou proměnnou
- Sestavte schůdky.
Každý krok je uložen v seznamu pojmenovaných etap. Tento seznam řekne VectorAssembleru, jakou operaci má provést uvnitř potrubí.
1. Kódujte kategorická data
Tento krok je přesně stejný jako výše uvedený příklad, kromě toho, že procházíte všechny kategorické prvky.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Indexujte funkci štítku
Spark, stejně jako mnoho jiných knihoven, nepřijímá řetězcové hodnoty pro štítek. Funkci štítku převedete pomocí StringIndexer a přidáte ji do fází seznamu
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Přidejte spojitou proměnnou
Vstupní sloupce VectorAssembler je seznam sloupců. Můžete vytvořit nový seznam obsahující všechny nové sloupce. Níže uvedený kód naplní seznam zakódovanými kategorickými prvky a spojitými prvky.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Sestavte schůdky.
Nakonec projdete všechny kroky ve VectorAssembleru
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Nyní, když jsou všechny kroky připraveny, odešlete data do potrubí.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Pokud zkontrolujete novou datovou sadu, uvidíte, že obsahuje všechny prvky, transformované i netransformované. Zajímá vás pouze nový štítek a funkce. Prvky zahrnují všechny transformované prvky a spojité proměnné.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Krok 4) Sestavte klasifikátor: logistiku
Aby byl výpočet rychlejší, převedete model na DataFrame.
Musíte vybrat nový štítek a prvky z modelu pomocí mapy.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Jste připraveni vytvořit data vlaku jako DataFrame. Používáte sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Zkontrolujte druhý řádek
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Vytvořte vlakovou/testovací soupravu
Dataset rozdělíte 80/20 pomocí randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Spočítejme si, kolik lidí s příjmem pod/nad 50 tisíc v tréninkové i testovací sadě
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Sestavte logistický regresor
V neposlední řadě můžete sestavit klasifikátor. Pyspark má API nazvané LogisticRegression k provádění logistické regrese.
lr inicializujete označením sloupce štítků a sloupců funkcí. Nastavíte maximálně 10 iterací a přidáte parametr regularizace s hodnotou 0.3. Všimněte si, že v další části použijete k vyladění modelu křížové ověření s mřížkou parametrů
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
#Můžete vidět koeficienty z regrese
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Krok 5) Trénujte a vyhodnoťte model
Chcete-li vygenerovat předpověď pro svou testovací sadu,
LinearModel můžete použít s transform() na test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Prvky můžete vytisknout v předpovědích
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Zajímá vás označení, předpověď a pravděpodobnost
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Vyhodnoťte model
Musíte se podívat na metriku přesnosti, abyste viděli, jak dobře (nebo špatně) model funguje. V současné době neexistuje API pro výpočet míry přesnosti Spark. Výchozí hodnota je ROC, provozní charakteristika přijímače. Jde o jiné metriky, které berou v úvahu míru falešné pozitivity.
Než se podíváte na ROC, pojďme sestrojit míru přesnosti. Tuto metriku znáte lépe. Míra přesnosti je součtem správné předpovědi z celkového počtu pozorování.
Vytvoříte DataFrame s popiskem a `predikcí.
cm = predictions.select("label", "prediction")
Můžete zkontrolovat číslo třídy v popisku a předpověď
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Například v testovací sadě je 1578 domácností s příjmem nad 50 5021 a 617 pod. Klasifikátor však předpověděl 50 domácností s příjmem nad XNUMX tisíc.
Přesnost můžete vypočítat výpočtem počtu, když jsou štítky správně klasifikovány v celkovém počtu řádků.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Můžete vše zabalit a napsat funkci pro výpočet přesnosti.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
metriky ROC
Modul BinaryClassificationEvaluator obsahuje měření ROC. Příjemce Operating Charakteristická křivka je dalším běžným nástrojem používaným s binární klasifikací. Je velmi podobná křivce přesnosti/vybavení, ale místo vynesení přesnosti versus vybavování ukazuje křivka ROC skutečnou pozitivní míru (tj. vybavování) proti frekvenci falešně pozitivních. Míra falešně pozitivních výsledků je poměr negativních instancí, které jsou nesprávně klasifikovány jako pozitivní. Rovná se jedné mínus skutečná záporná sazba. Skutečná negativní míra se také nazývá specificita. Křivka ROC tedy vykresluje citlivost (recall) versus 1 – specificitu
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192oblast Pod ROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Krok 6) Vylaďte hyperparametr
V neposlední řadě můžete vyladit hyperparametry. Podobný scikit učit se vytvoříte mřížku parametrů a přidáte parametry, které chcete vyladit.
Chcete-li zkrátit dobu výpočtu, vyladíte parametr regularizace pouze se dvěma hodnotami.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Nakonec model vyhodnotíte pomocí metody křížového ověření s 5 záhyby. Trénink trvá asi 16 minut.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Čas na trénování modelu: 978.807 sekund
Nejlepší regularizační hyperparametr je 0.01 s přesností 85.316 procenta.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Doporučený parametr můžete extrahovat zřetězením cvModel.bestModel pomocí extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Shrnutí
Spark je základním nástrojem pro datové vědce. Umožňuje praktikovi připojit aplikaci k různým zdrojům dat, bezproblémově provádět analýzu dat nebo přidat prediktivní model.
Začít s Spark, musíte zahájit a Spark Souvislosti s:
"SparkKontext()'
A a SQL kontext pro připojení ke zdroji dat:
'SQLContext()'
V tutoriálu se naučíte, jak trénovat logistickou regresi:
- Převeďte datovou sadu na datový rámec pomocí:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Všimněte si, že název sloupce štítku je nový štítek a všechny funkce jsou shromážděny ve funkcích. Změňte tyto hodnoty, pokud se ve vaší datové sadě liší.
- Vytvořte vlak/testovací sadu
randomSplit([.8,.2],seed=1234)
- Trénujte model
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Proveďte předpověď
linearModel.transform()