Hadoop & Mapreduce-eksempler: Opret første program i Java

I denne tutorial lærer du at bruge Hadoop med MapReduce-eksempler. De anvendte inputdata er SalgJan2009.csv. Den indeholder salgsrelaterede oplysninger som produktnavn, pris, betalingsmåde, by, klientland osv. Målet er at Find ud af antallet af solgte produkter i hvert land.

Første Hadoop MapReduce-program

Nu i dette MapReduce tutorial, vil vi skabe vores første Java MapReduce-program:

Første Hadoop MapReduce-program

Salgsdata januar 2009

Sørg for, at du har Hadoop installeret. Inden du starter med selve processen, skal du ændre bruger til 'hduser' (id brugt under Hadoop-konfigurationen, du kan skifte til det bruger-id, der blev brugt under din Hadoop-programmeringskonfiguration).

su - hduser_

Første Hadoop MapReduce-program

Trin 1)

Opret en ny mappe med navn MapReduce Tutorial som vist i nedenstående MapReduce-eksempel

sudo mkdir MapReduceTutorial

Første Hadoop MapReduce-program

Giv tilladelser

sudo chmod -R 777 MapReduceTutorial

Første Hadoop MapReduce-program

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Download filer her

Første Hadoop MapReduce-program

Kontroller filtilladelserne for alle disse filer

Første Hadoop MapReduce-program

og hvis "læse"-tilladelser mangler, så giv samme-

Første Hadoop MapReduce-program

Trin 2)

Eksportér klassesti som vist i Hadoop-eksemplet nedenfor

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Første Hadoop MapReduce-program

Trin 3)

Kompiler Java filer (disse filer er til stede i mappen Final-MapReduceHandsOn). Dens klassefiler vil blive lagt i pakkebiblioteket

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Første Hadoop MapReduce-program

Denne advarsel kan sikkert ignoreres.

Denne kompilering vil oprette en mappe i en aktuel mappe navngivet med pakkenavnet angivet i java-kildefilen (dvs. Salgsland i vores tilfælde) og læg alle kompilerede klassefiler i den.

Første Hadoop MapReduce-program

Trin 4)

Opret en ny fil Manifest.txt

sudo gedit Manifest.txt

tilføje følgende linjer til det,

Main-Class: SalesCountry.SalesCountryDriver

Første Hadoop MapReduce-program

SalesCountry.SalesCountryDriver er navnet på hovedklassen. Bemærk venligst, at du skal trykke på Enter-tasten i slutningen af ​​denne linje.

Trin 5)

Opret en Jar-fil

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Første Hadoop MapReduce-program

Tjek, at jar-filen er oprettet

Første Hadoop MapReduce-program

Trin 6)

Start Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Trin 7)

Kopier filen SalgJan2009.csv ind ~/inputMapReduce

Brug nu kommandoen nedenfor til at kopiere ~/inputMapReduce til HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Første Hadoop MapReduce-program

Vi kan roligt ignorere denne advarsel.

Kontroller, om en fil faktisk er kopieret eller ej.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Første Hadoop MapReduce-program

Trin 8)

Kør MapReduce-job

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Første Hadoop MapReduce-program

Dette vil oprette en output-mappe med navnet mapreduce_output_sales on HDFS. Indholdet af denne mappe vil være en fil, der indeholder produktsalg pr. land.

Trin 9)

Resultatet kan ses gennem kommandogrænsefladen som,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Første Hadoop MapReduce-program

Resultater kan også ses via en webgrænseflade som-

Åbn r i en webbrowser.

Første Hadoop MapReduce-program

Vælg nu 'Gennemse filsystemet' og navigere til /mapreduce_output_sales

Første Hadoop MapReduce-program

Åbne del-r-00000

Første Hadoop MapReduce-program

Forklaring af SalesMapper Class

I dette afsnit vil vi forstå implementeringen af SalesMapper klasse.

1. Vi begynder med at angive et navn på pakken til vores klasse. Salgsland er et navn på vores pakke. Bemærk venligst, at output fra kompilering, SalesMapper.class vil gå ind i en mappe navngivet af dette pakkenavn: Salgsland.

Efterfulgt af dette importerer vi bibliotekspakker.

Nedenstående snapshot viser en implementering af SalesMapper klasse-

SalesMapper Klasseforklaring

Eksempelkodeforklaring:

1. SalesMapper Klasse Definition-

offentlig klasse SalesMapper udvider MapReduceBase implementerer Mapper {

Hver mapper-klasse skal udvides fra MapReduceBase klasse, og den skal gennemføre Mapper interface.

2. Definition af 'kort' funktion-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Hoveddelen af ​​Mapper-klassen er en 'kort()' metode, der accepterer fire argumenter.

Ved hvert opkald til 'kort()' metode, a nøgleværdi par ('nøgle' og 'værdi' i denne kode) er bestået.

'kort()' metoden begynder med at opdele inputtekst, som modtages som et argument. Den bruger tokenizeren til at opdele disse linjer i ord.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Her, '' bruges som afgrænsning.

Herefter dannes et par ved hjælp af en post ved arrays 7. indeks 'SingleCountryData' og en værdi '1'.

output.collect(ny tekst(SingleCountryData[7]), en);

Vi vælger rekord på 7. indeks, fordi vi har brug for Land data, og det er placeret på 7. indeks i array 'SingleCountryData'.

Bemærk venligst, at vores inputdata er i nedenstående format (hvor Land er på 7th indeks, med 0 som startindeks)-

Transaktionsdato, Produkt, Pris, Betalingstype, Navn, By, Stat,Land,Konto_Oprettet,Sidste_Login,Latitude,Længdegrad

Et output fra mapper er igen en nøgleværdi par, som udsendes vha 'indsamle()' metode til 'OutputCollector'.

Forklaring af SalesCountryReducer Class

I dette afsnit vil vi forstå implementeringen af SalgsCountryReducer klasse.

1. Vi begynder med at angive et navn på pakken til vores klasse. Salgsland er et navn på udpakken. Bemærk venligst, at output fra kompilering, SalgsCountryReducer.class vil gå ind i en mappe navngivet af dette pakkenavn: Salgsland.

Efterfulgt af dette importerer vi bibliotekspakker.

Nedenstående snapshot viser en implementering af SalgsCountryReducer klasse-

SalesCountryReducer Klasseforklaring

Kodeforklaring:

1. SalesCountryReducer Klassedefinition-

offentlig klasse SalesCountryReducer udvider MapReduceBase implementerer Reducer {

Her er de to første datatyper, 'Tekst' og 'IntWritable' er datatype for input-nøgle-værdi til reducereren.

Output af mapper er i form af , . Dette output fra mapper bliver input til reducereren. Så for at tilpasse dens datatype, tekst og IntWritable bruges som datatype her.

De sidste to datatyper, 'Tekst' og 'IntWritable' er datatype output genereret af reducer i form af nøgle-værdi-par.

Hver reduktionsklasse skal forlænges fra MapReduceBase klasse, og den skal gennemføre Reducer interface.

2. Definition af 'reducer' funktion-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Et input til reducere() metode er en nøgle med en liste med flere værdier.

For eksempel vil det i vores tilfælde være-

, , , , , .

Dette gives til reducer som

Så for at acceptere argumenter af denne form, bruges de første to datatyper, dvs. tekst og Iterator. tekst er en datatype nøgle og Iterator er en datatype for liste over værdier for den pågældende nøgle.

Det næste argument er af typen OutputCollector som samler output fra reduktionsfasen.

reducere() metoden begynder med at kopiere nøgleværdien og initialisere frekvenstællingen til 0.

Teksttast = t_key;
int frequencyForCountry = 0;

Brug derefter 'mens' loop, gentager vi listen over værdier, der er knyttet til nøglen, og beregner den endelige frekvens ved at summere alle værdierne.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Nu skubber vi resultatet til outputkollektoren i form af nøgle og opnået hyppighedstælling.

Nedenstående kode gør dette-

output.collect(key, new IntWritable(frequencyForCountry));

Forklaring af SalesCountryDriver Class

I dette afsnit vil vi forstå implementeringen af SalesCountryDriver klasse

1. Vi begynder med at angive et navn på pakken til vores klasse. Salgsland er et navn på udpakken. Bemærk venligst, at output fra kompilering, SalesCountryDriver.class vil gå ind i mappe navngivet af dette pakkenavn: Salgsland.

Her er en linje, der specificerer pakkenavn efterfulgt af kode for at importere bibliotekspakker.

SalesCountryDriver Klasseforklaring

2. Definer en driverklasse, som vil oprette et nyt klientjob, konfigurationsobjekt og annoncere Mapper- og Reducer-klasser.

Chaufførklassen er ansvarlig for at indstille vores MapReduce-job til at køre ind Hadoop. I denne klasse specificerer vi jobnavn, datatype for input/output og navne på mapper- og reduceringsklasser.

SalesCountryDriver Klasseforklaring

3. I nedenstående kodestykke indstiller vi input- og outputmapper, som bruges til henholdsvis at forbruge inputdatasæt og producere output.

arg[0] og arg[1] er kommandolinjeargumenterne sendt med en kommando givet i MapReduce hands-on, dvs.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

SalesCountryDriver Klasseforklaring

4. Udløs vores job

Nedenstående kode start udførelse af MapReduce job-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}