Exemple Hadoop și Mapreduce: Creați primul program în Java
În acest tutorial, veți învăța să utilizați Hadoop cu exemple MapReduce. Datele de intrare utilizate sunt VânzăriJan2009.csv. Conține informații legate de vânzări, cum ar fi numele produsului, prețul, modul de plată, orașul, țara clientului etc. Scopul este să Aflați numărul de produse vândute în fiecare țară.
Primul program Hadoop MapReduce
Acum în asta Tutorial MapReduce, vom crea primul nostru Java Programul MapReduce:
Asigurați-vă că aveți Hadoop instalat. Înainte de a începe procesul propriu-zis, schimbați utilizatorul în „hduser” (id-ul folosit în timpul configurației Hadoop, puteți comuta la userid-ul folosit în timpul configurației de programare Hadoop).
su - hduser_
Pas 1)
Creați un director nou cu nume MapReduceTutorial așa cum se arată în exemplul MapReduce de mai jos
sudo mkdir MapReduceTutorial
Acordați permisiuni
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException { String valueString = value.toString(); String[] SingleCountryData = valueString.split(","); output.collect(new Text(SingleCountryData[7]), one); } }
SalesCountryReducer.java
package SalesCountry; import java.io.IOException; import java.util.*; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { Text key = t_key; int frequencyForCountry = 0; while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); } output.collect(key, new IntWritable(frequencyForCountry)); } }
SalesCountryDriver.java
package SalesCountry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; public class SalesCountryDriver { public static void main(String[] args) { JobClient my_client = new JobClient(); // Create a configuration object for the job JobConf job_conf = new JobConf(SalesCountryDriver.class); // Set a name of the Job job_conf.setJobName("SalePerCountry"); // Specify data type of output key and value job_conf.setOutputKeyClass(Text.class); job_conf.setOutputValueClass(IntWritable.class); // Specify names of Mapper and Reducer Class job_conf.setMapperClass(SalesCountry.SalesMapper.class); job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class); // Specify formats of the data type of Input and output job_conf.setInputFormat(TextInputFormat.class); job_conf.setOutputFormat(TextOutputFormat.class); // Set input and output directories using command line arguments, //arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file. FileInputFormat.setInputPaths(job_conf, new Path(args[0])); FileOutputFormat.setOutputPath(job_conf, new Path(args[1])); my_client.setConf(job_conf); try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); } } }
Verificați permisiunile pentru toate aceste fișiere
și dacă lipsesc permisiunile de „citire”, acordați aceleași-
Pas 2)
Exportați calea clasei, așa cum se arată în exemplul Hadoop de mai jos
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Pas 3)
Compila Java fișiere (aceste fișiere sunt prezente în director Final-MapReduceHandsOn). Fișierele sale de clasă vor fi puse în directorul pachetului
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Acest avertisment poate fi ignorat în siguranță.
Această compilare va crea un director într-un director curent numit cu numele pachetului specificat în fișierul sursă java (de ex Vânzări Țara în cazul nostru) și puneți toate fișierele de clasă compilate în el.
Pas 4)
Creați un fișier nou Manifest.txt
sudo gedit Manifest.txt
adăugați următoarele rânduri la el,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver este numele clasei principale. Vă rugăm să rețineți că trebuie să apăsați tasta Enter la sfârșitul acestui rând.
Pas 5)
Creați un fișier Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Verificați dacă fișierul jar este creat
Pas 6)
Porniți Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Pas 7)
Copiați fișierul VânzăriJan2009.csv în ~/inputMapReduce
Acum, utilizați comanda de mai jos pentru a copia ~/inputMapReduce la HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Putem ignora în siguranță acest avertisment.
Verificați dacă un fișier este copiat sau nu.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Pas 8)
Rulați jobul MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Aceasta va crea un director de ieșire numit mapreduce_output_sales on HDFS. Conținutul acestui director va fi un fișier care conține vânzările de produse pe țară.
Pas 9)
Rezultatul poate fi văzut prin interfața de comandă ca,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Rezultatele pot fi văzute și printr-o interfață web ca-
Deschideți r într-un browser web.
Acum selectați „Răsfoiește sistemul de fișiere” și navigați la /mapreduce_output_sales
Operatii Deschise piesa-r-00000
Explicația clasei SalesMapper
În această secțiune, vom înțelege implementarea SalesMapper clasă.
1. Începem prin a specifica un nume de pachet pentru clasa noastră. Vânzări Țara este un nume al pachetului nostru. Vă rugăm să rețineți că rezultatul compilației, SalesMapper.class va intra într-un director numit după acest nume de pachet: Vânzări Țara.
După aceasta, importăm pachete de bibliotecă.
Instantaneul de mai jos arată o implementare a SalesMapper clasă-
Exemplu de cod explicație:
1. Definiția clasei SalesMapper-
Clasa publică SalesMapper extinde MapReduceBase implementează Mapper {
Fiecare clasă mapper trebuie extinsă de la MapReduceBase clasa și trebuie implementat Cartograf interfață.
2. Definirea funcției „hartă”-
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
Partea principală a clasei Mapper este a 'Hartă()' metodă care acceptă patru argumente.
La fiecare apel către 'Hartă()' metoda, a valoare cheie pereche ('cheie' si 'valoare' în acest cod) este trecut.
'Hartă()' metoda începe prin împărțirea textului de intrare care este primit ca argument. Folosește tokenizerul pentru a împărți aceste linii în cuvinte.
String valueString = value.toString(); String[] SingleCountryData = valueString.split(",");
Aici, '' este folosit ca delimitator.
După aceasta, se formează o pereche folosind o înregistrare la al 7-lea index al matricei „SingleCountryData” si o valoare „1”.
output.collect(text nou (SingleCountryData[7]), unu);
Alegem înregistrarea la indicele 7 pentru că avem nevoie Țară date și este situat la al șaptelea index în matrice „SingleCountryData”.
Vă rugăm să rețineți că datele noastre de intrare sunt în formatul de mai jos (unde Țară este la 7th index, cu 0 ca indice de pornire)-
Data_tranzacției,Produs,Preț,Tip_Plată,Nume,Oraș,Stat,Țară,Cont_creat,Ultima_conectare,Latitudine,Longitudine
O ieșire a mapper este din nou a valoare cheie pereche care este ieșită folosind 'colectarea()' Metodă de „OutputCollector”.
Explicația clasei SalesCountryReducer
În această secțiune, vom înțelege implementarea SalesCountryReducer clasă.
1. Începem prin a specifica un nume al pachetului pentru clasa noastră. Vânzări Țara este un nume de pachet. Vă rugăm să rețineți că rezultatul compilației, SalesCountryReducer.class va intra într-un director numit după acest nume de pachet: Vânzări Țara.
După aceasta, importăm pachete de bibliotecă.
Instantaneul de mai jos arată o implementare a SalesCountryReducer clasă-
Explicația codului:
1. Definiția clasei SalesCountryReducer-
clasa publică SalesCountryReducer extinde MapReduceBase implementează Reducer {
Aici, primele două tipuri de date, 'Text' si „IntWritable” sunt tipul de date de intrare cheie-valoare către reductor.
Ieșirea mapperului este sub formă de , . Această ieșire a mapperului devine intrare la reductor. Deci, pentru a se alinia cu tipul său de date, Text si IntWritable sunt folosite ca tip de date aici.
Ultimele două tipuri de date, „Text” și „IntWritable” sunt tipuri de date de ieșire generate de reductor sub formă de pereche cheie-valoare.
Fiecare clasă de reductoare trebuie extinsă de la MapReduceBase clasa și trebuie implementat reductor interfață.
2. Definirea funcției „reduce”-
public void reduce( Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
O intrare la reduce() metoda este o cheie cu o listă de valori multiple.
De exemplu, în cazul nostru, va fi...
, , , , , .
Acesta este dat reductorului ca
Deci, pentru a accepta argumente de această formă, sunt utilizate primele două tipuri de date, și anume, Text si Iterator. Text este un tip de date de cheie și Iterator este un tip de date pentru lista de valori pentru acea cheie.
Următorul argument este de tip OutputCollector care colectează ieșirea fazei reductorului.
reduce() metoda începe prin copierea valorii cheii și inițializarea numărului de frecvență la 0.
Tasta text = t_key;
int frequencyForCountry = 0;
Apoi, folosind 'in timp ce' buclă, repetăm lista de valori asociate cu cheia și calculăm frecvența finală prin însumarea tuturor valorilor.
while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); }
Acum, împingem rezultatul în colectorul de ieșire sub forma cheie si obtinut număr de frecvențe.
Codul de mai jos face asta -
output.collect(key, new IntWritable(frequencyForCountry));
Explicația clasei SalesCountryDriver
În această secțiune, vom înțelege implementarea SalesCountryDriver clasă
1. Începem prin a specifica un nume de pachet pentru clasa noastră. Vânzări Țara este un nume de pachet. Vă rugăm să rețineți că rezultatul compilației, SalesCountryDriver.class va intra în directorul numit de acest nume de pachet: Vânzări Țara.
Aici este o linie care specifică numele pachetului urmat de cod pentru a importa pachete de bibliotecă.
2. Definiți o clasă de driver care va crea un nou loc de muncă client, un obiect de configurare și va face publicitate claselor Mapper și Reducer.
Clasa de șoferi este responsabilă pentru setarea jobului nostru MapReduce pentru a rula Hadoop. În această clasă, precizăm numele jobului, tipul de date de intrare/ieșire și numele claselor de cartografiere și reductoare.
3. În fragmentul de cod de mai jos, setăm directoarele de intrare și de ieșire care sunt folosite pentru a consuma setul de date de intrare și, respectiv, a produce ieșiri.
arg[0] si arg[1] sunt argumentele liniei de comandă transmise cu o comandă dată în MapReduce hands-on, adică,
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. Declanșează-ne jobul
Mai jos codul începe execuția jobului MapReduce-
try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); }