Hadoop & Mapreduce-eksempler: Lag første program i Java

I denne opplæringen lærer du å bruke Hadoop med MapReduce-eksempler. Inndataene som brukes er SalgJan2009.csv. Den inneholder salgsrelatert informasjon som produktnavn, pris, betalingsmåte, by, klientland osv. Målet er å Finn ut antall produkter som selges i hvert land.

Første Hadoop MapReduce-program

Nå i dette MapReduce opplæring, vil vi lage vår første Java MapReduce-program:

Første Hadoop MapReduce-program

Data for salg januar 2009

Sørg for at du har Hadoop installert. Før du starter med selve prosessen, endre bruker til 'hduser' (id brukt mens Hadoop konfigurasjon, kan du bytte til bruker-ID som ble brukt under Hadoop programmeringskonfigurasjonen).

su - hduser_

Første Hadoop MapReduce-program

Trinn 1)

Opprett en ny katalog med navn MapReduceTutorial som vist i MapReduce-eksemplet nedenfor

sudo mkdir MapReduceTutorial

Første Hadoop MapReduce-program

Gi tillatelser

sudo chmod -R 777 MapReduceTutorial

Første Hadoop MapReduce-program

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Last ned filer her

Første Hadoop MapReduce-program

Sjekk filtillatelsene til alle disse filene

Første Hadoop MapReduce-program

og hvis "lese"-tillatelser mangler, gi de samme-

Første Hadoop MapReduce-program

Trinn 2)

Eksporter klassebane som vist i Hadoop-eksemplet nedenfor

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Første Hadoop MapReduce-program

Trinn 3)

Kompilere Java filer (disse filene finnes i katalogen Final-MapReduceHandsOn). Klassefilene vil bli lagt i pakkekatalogen

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Første Hadoop MapReduce-program

Denne advarselen kan trygt ignoreres.

Denne kompileringen vil opprette en katalog i en gjeldende katalog navngitt med pakkenavnet spesifisert i java-kildefilen (dvs. Salgsland i vårt tilfelle) og legg alle kompilerte klassefiler i den.

Første Hadoop MapReduce-program

Trinn 4)

Opprett en ny fil Manifest.txt

sudo gedit Manifest.txt

legg til følgende linjer,

Main-Class: SalesCountry.SalesCountryDriver

Første Hadoop MapReduce-program

SalesCountry.SalesCountryDriver er navnet på hovedklassen. Vær oppmerksom på at du må trykke enter-tasten på slutten av denne linjen.

Trinn 5)

Lag en Jar-fil

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Første Hadoop MapReduce-program

Sjekk at jar-filen er opprettet

Første Hadoop MapReduce-program

Trinn 6)

Start Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Trinn 7)

Kopier filen SalgJan2009.csv inn ~/inputMapReduce

Bruk nå kommandoen nedenfor for å kopiere ~/inputMapReduce til HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Første Hadoop MapReduce-program

Vi kan trygt ignorere denne advarselen.

Kontroller om en fil faktisk er kopiert eller ikke.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Første Hadoop MapReduce-program

Trinn 8)

Kjør MapReduce-jobben

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Første Hadoop MapReduce-program

Dette vil opprette en utdatakatalog kalt mapreduce_output_sales on HDFS. Innholdet i denne katalogen vil være en fil som inneholder produktsalg per land.

Trinn 9)

Resultatet kan sees gjennom kommandogrensesnittet som,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Første Hadoop MapReduce-program

Resultatene kan også sees via et nettgrensesnitt som-

Åpne r i en nettleser.

Første Hadoop MapReduce-program

Velg nå "Bla gjennom filsystemet" og naviger til /mapreduce_output_sales

Første Hadoop MapReduce-program

Åpen del-r-00000

Første Hadoop MapReduce-program

Forklaring av SalesMapper Class

I denne delen vil vi forstå implementeringen av SalesMapper klasse.

1. Vi begynner med å spesifisere et navn på pakken for klassen vår. Salgsland er navnet på pakken vår. Vær oppmerksom på at utdata fra kompilering, SalesMapper.class vil gå inn i en katalog som heter dette pakkenavnet: Salgsland.

Etterfulgt av dette importerer vi bibliotekpakker.

Nedenfor øyeblikksbilde viser en implementering av SalesMapper klasse-

SalesMapper Klasseforklaring

Eksempelkodeforklaring:

1. SalesMapper Klassedefinisjon-

offentlig klasse SalesMapper utvider MapReduceBase implementerer Mapper {

Hver kartleggerklasse må utvides fra MapReduceBase klasse og den må gjennomføre Kartlegger grensesnitt.

2. Definere 'kart' funksjon-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Hoveddelen av Mapper-klassen er en 'kart()' metode som aksepterer fire argumenter.

Ved hver samtale til 'kart()' metode, a nøkkelverdi par ('nøkkel' og 'verdi' i denne koden) er bestått.

'kart()' metoden begynner med å dele inn tekst som mottas som et argument. Den bruker tokenizeren til å dele disse linjene i ord.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Her '' brukes som skilletegn.

Etter dette dannes et par ved å bruke en post ved 7. indeks av array 'SingleCountryData' og en verdi '1'.

output.collect(ny tekst(SingleCountryData[7]), en);

Vi velger rekord på 7. indeks fordi vi trenger Land data og den er plassert på 7. indeks i array 'SingleCountryData'.

Vær oppmerksom på at våre inndata er i formatet nedenfor (hvor Land er på 7th indeks, med 0 som startindeks)-

Transaksjonsdato, Produkt, Pris, Betalingstype, Navn, By, Stat,Land,Konto_opprettet,Siste_pålogging,Breddegrad,Lengdegrad

En utgang fra mapper er igjen en nøkkelverdi par som sendes ut ved hjelp av 'samle()' Metode av 'OutputCollector'.

Forklaring av SalesCountryReducer Class

I denne delen vil vi forstå implementeringen av SalgsCountryReducer klasse.

1. Vi begynner med å spesifisere et navn på pakken for klassen vår. Salgsland er navnet på utpakken. Vær oppmerksom på at utdata fra kompilering, SalesCountryReducer.class vil gå inn i en katalog som heter dette pakkenavnet: Salgsland.

Etterfulgt av dette importerer vi bibliotekpakker.

Nedenfor øyeblikksbilde viser en implementering av SalgsCountryReducer klasse-

SalesCountryReducer Klasseforklaring

Kodeforklaring:

1. SalesCountryReducer Klassedefinisjon-

offentlig klasse SalesCountryReducer utvider MapReduceBase implementerer Reducer {

Her er de to første datatypene, 'Tekst' og 'IntWritable' er datatype for inngangsnøkkel-verdi til reduksjonen.

Utgang av kartlegger er i form av , . Denne utgangen fra mapper blir input til redusering. Så for å tilpasse seg datatypen, tekst og IntWritable brukes som datatype her.

De to siste datatypene, 'Tekst' og 'IntWritable' er datatype utdata generert av redusering i form av nøkkelverdi-par.

Hver reduksjonsklasse må utvides fra MapReduceBase klasse og den må gjennomføre Redusering grensesnitt.

2. Definere 'redusere' funksjon-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Et innspill til redusere() metode er en nøkkel med en liste over flere verdier.

For eksempel, i vårt tilfelle vil det være-

, , , , , .

Dette er gitt til redusering som

Så for å akseptere argumenter av denne formen, brukes de to første datatypene, nemlig tekst og Iterator. tekst er en datatype nøkkel og Iterator er en datatype for liste over verdier for den nøkkelen.

Det neste argumentet er av typen OutputCollector som samler utgangen fra reduksjonsfasen.

redusere() metoden begynner med å kopiere nøkkelverdien og initialisere frekvenstellingen til 0.

Teksttast = t_key;
int frequencyForCountry = 0;

Deretter bruker du 'mens' løkke, itererer vi gjennom listen over verdier knyttet til nøkkelen og beregner den endelige frekvensen ved å summere alle verdiene.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Nå skyver vi resultatet til utgangssamleren i form av nøkkel og oppnådd frekvensantall.

Nedenfor koden gjør dette-

output.collect(key, new IntWritable(frequencyForCountry));

Forklaring av SalesCountryDriver Class

I denne delen vil vi forstå implementeringen av SalesCountryDriver klasse

1. Vi begynner med å spesifisere et navn på pakken for klassen vår. Salgsland er navnet på utpakken. Vær oppmerksom på at utdata fra kompilering, SalesCountryDriver.class vil gå inn i katalogen navngitt med dette pakkenavnet: Salgsland.

Her er en linje som spesifiserer pakkenavn etterfulgt av kode for å importere bibliotekpakker.

SalesCountryDriver Klasseforklaring

2. Definer en driverklasse som vil opprette en ny klientjobb, konfigurasjonsobjekt og annonsere Mapper- og Reducer-klasser.

Førerklassen er ansvarlig for å sette MapReduce-jobben vår til å kjøre inn Hadoop. I denne klassen spesifiserer vi jobbnavn, datatype for input/output og navn på mapper og reduseringsklasser.

SalesCountryDriver Klasseforklaring

3. I kodebiten nedenfor setter vi inn- og utdatakataloger som brukes til å konsumere henholdsvis input-datasett og produsere utdata.

arg[0] og arg[1] er kommandolinjeargumentene sendt med en kommando gitt i MapReduce hands-on, dvs.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

SalesCountryDriver Klasseforklaring

4. Trigger jobben vår

Nedenfor koden starter kjøringen av MapReduce jobb-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}