Příklady Hadoop & Mapreduce: Vytvořte první program v Java
V tomto tutoriálu se naučíte používat Hadoop s příklady MapReduce. Použitá vstupní data jsou ProdejJan2009.csv. Obsahuje informace související s prodejem, jako je název produktu, cena, způsob platby, město, země klienta atd. Cílem je Zjistěte počet prodaných produktů v každé zemi.
První program Hadoop MapReduce
Nyní v tomto Výukový program MapReduce, vytvoříme náš první Java Program MapReduce:
Ujistěte se, že máte nainstalovaný Hadoop. Než začnete se skutečným procesem, změňte uživatele na 'hduser' (ID používané při konfiguraci Hadoop, můžete přepnout na uživatelské ID používané během konfigurace programování Hadoop).
su - hduser_
Krok 1)
Vytvořte nový adresář s názvem MapReduceTutorial jak je uvedeno v níže uvedeném příkladu MapReduce
sudo mkdir MapReduceTutorial
Udělit oprávnění
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException { String valueString = value.toString(); String[] SingleCountryData = valueString.split(","); output.collect(new Text(SingleCountryData[7]), one); } }
SalesCountryReducer.java
package SalesCountry; import java.io.IOException; import java.util.*; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { Text key = t_key; int frequencyForCountry = 0; while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); } output.collect(key, new IntWritable(frequencyForCountry)); } }
SalesCountryDriver.java
package SalesCountry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; public class SalesCountryDriver { public static void main(String[] args) { JobClient my_client = new JobClient(); // Create a configuration object for the job JobConf job_conf = new JobConf(SalesCountryDriver.class); // Set a name of the Job job_conf.setJobName("SalePerCountry"); // Specify data type of output key and value job_conf.setOutputKeyClass(Text.class); job_conf.setOutputValueClass(IntWritable.class); // Specify names of Mapper and Reducer Class job_conf.setMapperClass(SalesCountry.SalesMapper.class); job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class); // Specify formats of the data type of Input and output job_conf.setInputFormat(TextInputFormat.class); job_conf.setOutputFormat(TextOutputFormat.class); // Set input and output directories using command line arguments, //arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file. FileInputFormat.setInputPaths(job_conf, new Path(args[0])); FileOutputFormat.setOutputPath(job_conf, new Path(args[1])); my_client.setConf(job_conf); try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); } } }
Zkontrolujte souborová oprávnění všech těchto souborů
a pokud chybí oprávnění ke čtení, udělte totéž-
Krok 2)
Export classpath, jak je znázorněno v níže uvedeném příkladu Hadoop
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Krok 3)
Sestavit Java soubory (tyto soubory se nacházejí v adresáři Final-MapReduceHandsOn). Soubory jeho tříd budou umístěny do adresáře balíčku
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Toto varování lze bezpečně ignorovat.
Tato kompilace vytvoří adresář v aktuálním adresáři pojmenovaném názvem balíčku zadaným ve zdrojovém souboru java (tj Země prodeje v našem případě) a vložte do něj všechny zkompilované soubory tříd.
Krok 4)
Vytvořte nový soubor Manifest.txt
sudo gedit Manifest.txt
přidejte k tomu následující řádky,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver je název hlavní třídy. Upozorňujeme, že na konci tohoto řádku musíte stisknout klávesu Enter.
Krok 5)
Vytvořte soubor Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Zkontrolujte, zda je vytvořen soubor jar
Krok 6)
Spusťte Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Krok 7)
Zkopírujte soubor ProdejJan2009.csv do ~/inputMapReduce
Nyní ke kopírování použijte níže uvedený příkaz ~/inputMapReduce na HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Toto varování můžeme klidně ignorovat.
Ověřte, zda je soubor skutečně zkopírován nebo ne.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Krok 8)
Spusťte úlohu MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Tím se vytvoří výstupní adresář s názvem mapreduce_output_sales on HDFS. Obsahem tohoto adresáře bude soubor obsahující prodeje produktů v jednotlivých zemích.
Krok 9)
Výsledek lze vidět prostřednictvím příkazového rozhraní jako,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Výsledky lze také zobrazit prostřednictvím webového rozhraní jako-
Otevřete r ve webovém prohlížeči.
Nyní vyberte 'Procházet souborový systém' a přejděte na /mapreduce_output_sales
Otevřená část-r-00000
Vysvětlení třídy SalesMapper
V této části porozumíme implementaci SalesMapper třída.
1. Začneme zadáním názvu balíčku pro naši třídu. Země prodeje je název našeho balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesMapper.class přejde do adresáře pojmenovaného tímto názvem balíčku: Země prodeje.
Poté importujeme balíčky knihoven.
Níže uvedený snímek ukazuje implementaci SalesMapper třída-
Vysvětlení ukázkového kódu:
1. Definice třídy SalesMapper-
public class SalesMapper rozšiřuje MapReduceBase implementuje Mapper {
Každá třída mapovače musí být rozšířena z MapReduceBase třídy a musí ji implementovat Mapper rozhraní.
2. Definování funkce 'mapa'-
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
Hlavní částí třídy Mapper je a 'mapa()' metoda, která přijímá čtyři argumenty.
Při každém volání 'mapa()' metoda, a klíč – hodnota pár ('klíč' si 'hodnota' v tomto kódu) je předán.
'mapa()' metoda začíná rozdělením vstupního textu, který je přijat jako argument. K rozdělení těchto řádků na slova používá tokenizér.
String valueString = value.toString(); String[] SingleCountryData = valueString.split(",");
Zde, ',' se používá jako oddělovač.
Poté se vytvoří pár pomocí záznamu na 7. indexu pole 'SingleCountryData' a hodnotu '1'.
output.collect(new Text(SingleCountryData[7]), jeden);
Vybíráme záznam na 7. indexu, protože potřebujeme Země data a je umístěn na 7. indexu v poli 'SingleCountryData'.
Vezměte prosím na vědomí, že naše vstupní data jsou v níže uvedeném formátu (kde Země je na 7 XNUMXth index s 0 jako počátečním indexem)-
Datum_transakce,Produkt,Cena,Typ_platby,Název,Město,Stát,Země,Account_Created,Last_Login,Latitude,Longitude
Výstupem mapperu je opět a klíč – hodnota pár, který je vydán pomocí 'sbírat()' metoda "OutputCollector".
Vysvětlení třídy SalesCountryReducer
V této části porozumíme implementaci SalesCountryReducer třída.
1. Začneme zadáním názvu balíčku pro naši třídu. Země prodeje je název balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesCountryReducer.class přejde do adresáře pojmenovaného tímto názvem balíčku: Země prodeje.
Poté importujeme balíčky knihoven.
Níže uvedený snímek ukazuje implementaci SalesCountryReducer třída-
Vysvětlení kódu:
1. Definice třídy SalesCountryReducer-
public class SalesCountryReducer rozšiřuje MapReduceBase implementuje Reducer {
Zde jsou první dva datové typy, 'Text' si 'IntWritable' jsou datovým typem vstupního páru klíč-hodnota do reduktoru.
Výstup mapovače je ve formě , . Tento výstup mapovače se stává vstupem do reduktoru. Chcete-li se tedy sladit s jeho datovým typem, Text si IntWritable jsou zde použity jako datový typ.
Poslední dva datové typy, 'Text' a 'IntWritable' jsou datovým typem výstupu generovaného reduktorem ve formě páru klíč-hodnota.
Každá třída reduktoru musí být rozšířena z MapReduceBase třídy a musí ji implementovat Redukce rozhraní.
2. Definování funkce 'snížení'-
public void reduce( Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Vstup do snížit() metoda je klíč se seznamem více hodnot.
V našem případě to bude např.
, , , , , .
Toto je dáno reduktoru jako
Pro přijetí argumentů této formy se tedy používají první dva datové typy, tj. Text si Iterátor. Text je datový typ klíče a Iterátor je datový typ pro seznam hodnot pro tento klíč.
Další argument je typu OutputCollector který shromažďuje výstup redukční fáze.
snížit() metoda začíná zkopírováním hodnoty klíče a inicializací počtu frekvencí na 0.
Textový klíč = t_key;
int frekvenceForCountry = 0;
Poté pomocí 'zatímco' smyčky, iterujeme seznamem hodnot spojených s klíčem a vypočítáme konečnou frekvenci sečtením všech hodnot.
while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); }
Nyní vložíme výsledek do výstupního kolektoru ve tvaru klíč a získal počet frekvencí.
Níže uvedený kód to dělá -
output.collect(key, new IntWritable(frequencyForCountry));
Vysvětlení třídy SalesCountryDriver
V této části porozumíme implementaci SalesCountryDriver třída
1. Začneme zadáním názvu balíčku pro naši třídu. Země prodeje je název balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesCountryDriver.class přejde do adresáře pojmenovaného tímto názvem balíčku: Země prodeje.
Zde je řádek určující název balíčku následovaný kódem pro import balíčků knihovny.
2. Definujte třídu ovladače, která vytvoří novou klientskou úlohu, konfigurační objekt a inzeruje třídy Mapper a Reducer.
Třída ovladače je zodpovědná za spuštění naší úlohy MapReduce Hadoop. V této třídě specifikujeme název úlohy, datový typ vstupu/výstupu a názvy tříd mapovače a reduktoru.
3. V níže uvedeném úryvku kódu nastavíme vstupní a výstupní adresáře, které se používají ke spotřebování vstupní datové sady a k produkci výstupu.
argument[0] si argument[1] jsou argumenty příkazového řádku předávané příkazem zadaným v MapReduce hands-on, tj.
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. Spusťte naši práci
Níže uvedený kód spustí provádění úlohy MapReduce-
try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); }