Exemple Hadoop și Mapreduce: Creați primul program în Java

În acest tutorial, veți învăța să utilizați Hadoop cu exemple MapReduce. Datele de intrare utilizate sunt VânzăriJan2009.csv. Conține informații legate de vânzări, cum ar fi numele produsului, prețul, modul de plată, orașul, țara clientului etc. Scopul este să Aflați numărul de produse vândute în fiecare țară.

Primul program Hadoop MapReduce

Acum în asta Tutorial MapReduce, vom crea primul nostru Java Programul MapReduce:

Primul program Hadoop MapReduce

Datele vânzărilor ianuarie 2009

Asigurați-vă că aveți Hadoop instalat. Înainte de a începe procesul propriu-zis, schimbați utilizatorul în „hduser” (id-ul folosit în timpul configurației Hadoop, puteți comuta la userid-ul folosit în timpul configurației de programare Hadoop).

su - hduser_

Primul program Hadoop MapReduce

Pas 1)

Creați un director nou cu nume MapReduceTutorial așa cum se arată în exemplul MapReduce de mai jos

sudo mkdir MapReduceTutorial

Primul program Hadoop MapReduce

Acordați permisiuni

sudo chmod -R 777 MapReduceTutorial

Primul program Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Descărcați fișiere aici

Primul program Hadoop MapReduce

Verificați permisiunile pentru toate aceste fișiere

Primul program Hadoop MapReduce

și dacă lipsesc permisiunile de „citire”, acordați aceleași-

Primul program Hadoop MapReduce

Pas 2)

Exportați calea clasei, așa cum se arată în exemplul Hadoop de mai jos

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Primul program Hadoop MapReduce

Pas 3)

Compila Java fișiere (aceste fișiere sunt prezente în director Final-MapReduceHandsOn). Fișierele sale de clasă vor fi puse în directorul pachetului

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Primul program Hadoop MapReduce

Acest avertisment poate fi ignorat în siguranță.

Această compilare va crea un director într-un director curent numit cu numele pachetului specificat în fișierul sursă java (de ex Vânzări Țara în cazul nostru) și puneți toate fișierele de clasă compilate în el.

Primul program Hadoop MapReduce

Pas 4)

Creați un fișier nou Manifest.txt

sudo gedit Manifest.txt

adăugați următoarele rânduri la el,

Main-Class: SalesCountry.SalesCountryDriver

Primul program Hadoop MapReduce

SalesCountry.SalesCountryDriver este numele clasei principale. Vă rugăm să rețineți că trebuie să apăsați tasta Enter la sfârșitul acestui rând.

Pas 5)

Creați un fișier Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Primul program Hadoop MapReduce

Verificați dacă fișierul jar este creat

Primul program Hadoop MapReduce

Pas 6)

Porniți Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Pas 7)

Copiați fișierul VânzăriJan2009.csv în ~/inputMapReduce

Acum, utilizați comanda de mai jos pentru a copia ~/inputMapReduce la HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Primul program Hadoop MapReduce

Putem ignora în siguranță acest avertisment.

Verificați dacă un fișier este copiat sau nu.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Primul program Hadoop MapReduce

Pas 8)

Rulați jobul MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Primul program Hadoop MapReduce

Aceasta va crea un director de ieșire numit mapreduce_output_sales on HDFS. Conținutul acestui director va fi un fișier care conține vânzările de produse pe țară.

Pas 9)

Rezultatul poate fi văzut prin interfața de comandă ca,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Primul program Hadoop MapReduce

Rezultatele pot fi văzute și printr-o interfață web ca-

Deschideți r într-un browser web.

Primul program Hadoop MapReduce

Acum selectați „Răsfoiește sistemul de fișiere” și navigați la /mapreduce_output_sales

Primul program Hadoop MapReduce

Operatii Deschise piesa-r-00000

Primul program Hadoop MapReduce

Explicația clasei SalesMapper

În această secțiune, vom înțelege implementarea SalesMapper clasă.

1. Începem prin a specifica un nume de pachet pentru clasa noastră. Vânzări Țara este un nume al pachetului nostru. Vă rugăm să rețineți că rezultatul compilației, SalesMapper.class va intra într-un director numit după acest nume de pachet: Vânzări Țara.

După aceasta, importăm pachete de bibliotecă.

Instantaneul de mai jos arată o implementare a SalesMapper clasă-

Explicația clasei SalesMapper

Exemplu de cod explicație:

1. Definiția clasei SalesMapper-

Clasa publică SalesMapper extinde MapReduceBase implementează Mapper {

Fiecare clasă mapper trebuie extinsă de la MapReduceBase clasa și trebuie implementat Cartograf interfață.

2. Definirea funcției „hartă”-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Partea principală a clasei Mapper este a 'Hartă()' metodă care acceptă patru argumente.

La fiecare apel către 'Hartă()' metoda, a valoare cheie pereche ('cheie' si 'valoare' în acest cod) este trecut.

'Hartă()' metoda începe prin împărțirea textului de intrare care este primit ca argument. Folosește tokenizerul pentru a împărți aceste linii în cuvinte.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Aici, '' este folosit ca delimitator.

După aceasta, se formează o pereche folosind o înregistrare la al 7-lea index al matricei „SingleCountryData” si o valoare „1”.

output.collect(text nou (SingleCountryData[7]), unu);

Alegem înregistrarea la indicele 7 pentru că avem nevoie Țară date și este situat la al șaptelea index în matrice „SingleCountryData”.

Vă rugăm să rețineți că datele noastre de intrare sunt în formatul de mai jos (unde Țară este la 7th index, cu 0 ca indice de pornire)-

Data_tranzacției,Produs,Preț,Tip_Plată,Nume,Oraș,Stat,Țară,Cont_creat,Ultima_conectare,Latitudine,Longitudine

O ieșire a mapper este din nou a valoare cheie pereche care este ieșită folosind 'colectarea()' Metodă de „OutputCollector”.

Explicația clasei SalesCountryReducer

În această secțiune, vom înțelege implementarea SalesCountryReducer clasă.

1. Începem prin a specifica un nume al pachetului pentru clasa noastră. Vânzări Țara este un nume de pachet. Vă rugăm să rețineți că rezultatul compilației, SalesCountryReducer.class va intra într-un director numit după acest nume de pachet: Vânzări Țara.

După aceasta, importăm pachete de bibliotecă.

Instantaneul de mai jos arată o implementare a SalesCountryReducer clasă-

Explicația clasei SalesCountryReducer

Explicația codului:

1. Definiția clasei SalesCountryReducer-

clasa publică SalesCountryReducer extinde MapReduceBase implementează Reducer {

Aici, primele două tipuri de date, 'Text' si „IntWritable” sunt tipul de date de intrare cheie-valoare către reductor.

Ieșirea mapperului este sub formă de , . Această ieșire a mapperului devine intrare la reductor. Deci, pentru a se alinia cu tipul său de date, Text si IntWritable sunt folosite ca tip de date aici.

Ultimele două tipuri de date, „Text” și „IntWritable” sunt tipuri de date de ieșire generate de reductor sub formă de pereche cheie-valoare.

Fiecare clasă de reductoare trebuie extinsă de la MapReduceBase clasa și trebuie implementat reductor interfață.

2. Definirea funcției „reduce”-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

O intrare la reduce() metoda este o cheie cu o listă de valori multiple.

De exemplu, în cazul nostru, va fi...

, , , , , .

Acesta este dat reductorului ca

Deci, pentru a accepta argumente de această formă, sunt utilizate primele două tipuri de date, și anume, Text si Iterator. Text este un tip de date de cheie și Iterator este un tip de date pentru lista de valori pentru acea cheie.

Următorul argument este de tip OutputCollector care colectează ieșirea fazei reductorului.

reduce() metoda începe prin copierea valorii cheii și inițializarea numărului de frecvență la 0.

Tasta text = t_key;
int frequencyForCountry = 0;

Apoi, folosind 'in timp ce' buclă, repetăm ​​lista de valori asociate cu cheia și calculăm frecvența finală prin însumarea tuturor valorilor.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Acum, împingem rezultatul în colectorul de ieșire sub forma cheie si obtinut număr de frecvențe.

Codul de mai jos face asta -

output.collect(key, new IntWritable(frequencyForCountry));

Explicația clasei SalesCountryDriver

În această secțiune, vom înțelege implementarea SalesCountryDriver clasă

1. Începem prin a specifica un nume de pachet pentru clasa noastră. Vânzări Țara este un nume de pachet. Vă rugăm să rețineți că rezultatul compilației, SalesCountryDriver.class va intra în directorul numit de acest nume de pachet: Vânzări Țara.

Aici este o linie care specifică numele pachetului urmat de cod pentru a importa pachete de bibliotecă.

Explicația clasei SalesCountryDriver

2. Definiți o clasă de driver care va crea un nou loc de muncă client, un obiect de configurare și va face publicitate claselor Mapper și Reducer.

Clasa de șoferi este responsabilă pentru setarea jobului nostru MapReduce pentru a rula Hadoop. În această clasă, precizăm numele jobului, tipul de date de intrare/ieșire și numele claselor de cartografiere și reductoare.

Explicația clasei SalesCountryDriver

3. În fragmentul de cod de mai jos, setăm directoarele de intrare și de ieșire care sunt folosite pentru a consuma setul de date de intrare și, respectiv, a produce ieșiri.

arg[0] si arg[1] sunt argumentele liniei de comandă transmise cu o comandă dată în MapReduce hands-on, adică,

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Explicația clasei SalesCountryDriver

4. Declanșează-ne jobul

Mai jos codul începe execuția jobului MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}