Hadoop & Mapreduce exempel: Skapa första program i Java

I den här handledningen lär du dig att använda Hadoop med MapReduce-exempel. Indata som används är FörsäljningJan2009.csv. Den innehåller försäljningsrelaterad information som produktnamn, pris, betalningssätt, stad, kundland etc. Målet är att Ta reda på antalet sålda produkter i varje land.

Första Hadoop MapReduce-programmet

Nu i detta Handledning för MapReduce, kommer vi att skapa vår första Java MapReduce-program:

Första Hadoop MapReduce-programmet

Försäljningsdata januari 2009

Se till att du har Hadoop installerat. Innan du börjar med den faktiska processen, ändra användare till 'hduser' (id som användes under Hadoop-konfigurationen, du kan byta till användar-id som användes under din Hadoop-programmeringskonfiguration ).

su - hduser_

Första Hadoop MapReduce-programmet

Steg 1)

Skapa en ny katalog med namn MapReduce Tutorial som visas i MapReduce-exemplet nedan

sudo mkdir MapReduceTutorial

Första Hadoop MapReduce-programmet

Ge behörigheter

sudo chmod -R 777 MapReduceTutorial

Första Hadoop MapReduce-programmet

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Ladda ner filer här

Första Hadoop MapReduce-programmet

Kontrollera filbehörigheterna för alla dessa filer

Första Hadoop MapReduce-programmet

och om "läs"-behörigheter saknas, ge samma-

Första Hadoop MapReduce-programmet

Steg 2)

Exportera klasssökväg som visas i Hadoop-exemplet nedan

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Första Hadoop MapReduce-programmet

Steg 3)

Sammanställa Java filer (dessa filer finns i katalogen Final-MapReduceHandsOn). Dess klassfiler kommer att läggas i paketkatalogen

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Första Hadoop MapReduce-programmet

Denna varning kan säkert ignoreras.

Denna kompilering kommer att skapa en katalog i en aktuell katalog som heter med paketnamnet som anges i java-källfilen (dvs. Försäljningsland i vårt fall) och placera alla kompilerade klassfiler i den.

Första Hadoop MapReduce-programmet

Steg 4)

Skapa en ny fil Manifest.txt

sudo gedit Manifest.txt

lägg till följande rader,

Main-Class: SalesCountry.SalesCountryDriver

Första Hadoop MapReduce-programmet

SalesCountry.SalesCountryDriver är namnet på huvudklassen. Observera att du måste trycka på enter-tangenten i slutet av denna rad.

Steg 5)

Skapa en Jar-fil

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Första Hadoop MapReduce-programmet

Kontrollera att jar-filen är skapad

Första Hadoop MapReduce-programmet

Steg 6)

Starta Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Steg 7)

Kopiera filen FörsäljningJan2009.csv in ~/inputMapReduce

Använd nu kommandot nedan för att kopiera ~/inputMapReduce till HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Första Hadoop MapReduce-programmet

Vi kan lugnt ignorera denna varning.

Kontrollera om en fil faktiskt har kopierats eller inte.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Första Hadoop MapReduce-programmet

Steg 8)

Kör MapReduce-jobbet

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Första Hadoop MapReduce-programmet

Detta kommer att skapa en utdatakatalog med namnet mapreduce_output_sales on HDFS. Innehållet i denna katalog kommer att vara en fil som innehåller produktförsäljning per land.

Steg 9)

Resultatet kan ses genom kommandogränssnittet som,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Första Hadoop MapReduce-programmet

Resultaten kan också ses via ett webbgränssnitt som-

Öppna r i en webbläsare.

Första Hadoop MapReduce-programmet

Välj nu "Bläddra i filsystemet" och navigera till /mapreduce_output_sales

Första Hadoop MapReduce-programmet

Öppen del-r-00000

Första Hadoop MapReduce-programmet

Förklaring av SalesMapper Class

I det här avsnittet kommer vi att förstå implementeringen av SalesMapper klass.

1. Vi börjar med att ange ett namn på paketet för vår klass. Försäljningsland är ett namn på vårt paket. Observera att utdata från kompilering, SalesMapper.class kommer att gå in i en katalog som heter detta paketnamn: Försäljningsland.

Därefter importerar vi bibliotekspaket.

Nedan ögonblicksbild visar en implementering av SalesMapper klass-

SalesMapper Klassförklaring

Exempelkod förklaring:

1. SalesMapper Klassdefinition-

public class SalesMapper utökar MapReduceBase implementerar Mapper {

Varje kartläggarklass måste utökas från MapReduceBase klass och den måste genomföra mapper gränssnitt.

2. Definiera 'karta' funktion-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Huvuddelen av Mapper-klassen är en 'Karta()' metod som accepterar fyra argument.

Vid varje samtal till 'Karta()' metod, a nyckelvärde par ('nyckel' och 'värde' i denna kod) godkänns.

'Karta()' metoden börjar med att dela upp inmatad text som tas emot som ett argument. Den använder tokenizern för att dela upp dessa rader i ord.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Här, '' används som avgränsare.

Efter detta bildas ett par med användning av en post vid arrayens sjunde index "SingleCountryData" och ett värde '1'.

output.collect(ny text(SingleCountryData[7]), en);

Vi väljer rekord på 7:e index eftersom vi behöver Land data och den är belägen på 7:e index i array "SingleCountryData".

Observera att vår indata är i formatet nedan (där Land är vid 7th index, med 0 som startindex)-

Transaktionsdatum, Produkt, Pris, Betalningstyp, Namn, Stad, Stat,Land,Konto_Skapat,Sista_inloggning,Latitud,Longitud

En utdata från mapper är återigen en nyckelvärde par som matas ut med hjälp av 'samla()' metod av 'OutputCollector'.

Förklaring av SalesCountryReducer Class

I det här avsnittet kommer vi att förstå implementeringen av SalesCountryReducer klass.

1. Vi börjar med att ange ett namn på paketet för vår klass. Försäljningsland är ett namn på utpaketet. Observera att utdata från kompilering, SalesCountryReducer.class kommer att gå in i en katalog som heter detta paketnamn: Försäljningsland.

Därefter importerar vi bibliotekspaket.

Nedan ögonblicksbild visar en implementering av SalesCountryReducer klass-

SalesCountryReducer Klass Förklaring

Kodförklaring:

1. SalesCountryReducer Class Definition-

public class SalesCountryReducer utökar MapReduceBase implementerar Reducer {

Här är de två första datatyperna, 'Text' och "IntWritable" är datatyp av inmatat nyckel-värde till reduceraren.

Utdata från mapper är i form av , . Denna utdata från mapparen blir indata till reduceraren. Så för att anpassa sig till dess datatyp, text och IntWritable används som datatyp här.

De två sista datatyperna, "Text" och "IntWritable" är datatyp av utdata som genereras av reducerare i form av nyckel-värdepar.

Varje reducerklass måste förlängas från MapReduceBase klass och den måste genomföra Reducer gränssnitt.

2. Definiera "reducera" funktion-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

En input till minska() metod är en nyckel med en lista med flera värden.

Till exempel, i vårt fall kommer det att vara-

, , , , , .

Detta ges till reducer som

Så för att acceptera argument av denna form används de två första datatyperna, nämligen, text och Iterator. text är en datatyp av nyckel och Iterator är en datatyp för värdelistan för den nyckeln.

Nästa argument är av typ OutputCollector som samlar utsignalen från reducerfasen.

minska() Metoden börjar med att kopiera nyckelvärdet och initialisera frekvensräkningen till 0.

Textnyckel = t_key;
int frequencyForCountry = 0;

Använd sedan 'medan' loop, itererar vi genom listan med värden som är associerade med nyckeln och beräknar den slutliga frekvensen genom att summera alla värden.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Nu skjuter vi resultatet till utdatasamlaren i form av nyckel och erhållits frekvensräkning.

Nedanstående kod gör detta-

output.collect(key, new IntWritable(frequencyForCountry));

Förklaring av SalesCountryDriver Class

I det här avsnittet kommer vi att förstå implementeringen av SalesCountryDriver klass

1. Vi börjar med att ange ett namn på paketet för vår klass. Försäljningsland är ett namn på utpaketet. Observera att utdata från kompilering, SalesCountryDriver.class kommer att gå in i katalogen med detta paketnamn: Försäljningsland.

Här är en rad som anger paketnamn följt av kod för att importera bibliotekspaket.

SalesCountryDriver Klass Förklaring

2. Definiera en drivrutinsklass som skapar ett nytt klientjobb, konfigurationsobjekt och annonserar Mapper- och Reducer-klasser.

Förarklassen ansvarar för att ställa in vårt MapReduce-jobb att köra in Hadoop. I denna klass specificerar vi jobbnamn, datatyp för input/output och namn på mappar- och reducerklasser.

SalesCountryDriver Klass Förklaring

3. I kodavsnittet nedan ställer vi in ​​indata- och utdatakataloger som används för att konsumera indatauppsättning respektive producera utdata.

arg[0] och arg[1] är kommandoradsargumenten som skickas med ett kommando som ges i MapReduce praktiskt, dvs.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

SalesCountryDriver Klass Förklaring

4. Trigga vårt jobb

Nedan kod börjar körningen av MapReduce-jobbet-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}