Przykłady Hadoop i Mapreduce: Utwórz pierwszy program w Java

W tym samouczku nauczysz się używać Hadoop z przykładami MapReduce. Stosowane dane wejściowe to SprzedażJan2009.csv. Zawiera informacje związane ze sprzedażą, takie jak nazwa produktu, cena, sposób płatności, miasto, kraj klienta itp. Celem jest Sprawdź liczbę produktów sprzedanych w każdym kraju.

Pierwszy program Hadoop MapReduce

Teraz w tym Samouczek MapReduce, stworzymy nasz pierwszy Java Program MapReduce:

Pierwszy program Hadoop MapReduce

Dane sprzedaży styczeń 2009

Upewnij się, że masz zainstalowany Hadoop. Zanim zaczniesz właściwy proces, zmień użytkownika na „hduser” (identyfikator używany podczas konfiguracji Hadoopa, możesz przełączyć się na identyfikator użytkownika używany podczas konfiguracji programowania Hadoopa).

su - hduser_

Pierwszy program Hadoop MapReduce

Krok 1)

Utwórz nowy katalog o nazwie MapReduceSamouczek jak pokazano w poniższym przykładzie MapReduce

sudo mkdir MapReduceTutorial

Pierwszy program Hadoop MapReduce

Przyznaj uprawnienia

sudo chmod -R 777 MapReduceTutorial

Pierwszy program Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Pobierz pliki tutaj

Pierwszy program Hadoop MapReduce

Sprawdź uprawnienia do wszystkich tych plików

Pierwszy program Hadoop MapReduce

a jeśli brakuje uprawnień „odczyt”, przyznaj to samo-

Pierwszy program Hadoop MapReduce

Krok 2)

Eksportuj ścieżkę klas, jak pokazano w poniższym przykładzie Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Pierwszy program Hadoop MapReduce

Krok 3)

skompilować Java pliki (te pliki znajdują się w katalogu Final-MapReduceHandsOn). Pliki jego klas zostaną umieszczone w katalogu pakietu

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Pierwszy program Hadoop MapReduce

To ostrzeżenie można bezpiecznie zignorować.

Ta kompilacja utworzy katalog w bieżącym katalogu o nazwie odpowiadającej nazwie pakietu określonej w pliku źródłowym Java (tj Kraj sprzedaży w naszym przypadku) i umieść w nim wszystkie skompilowane pliki klas.

Pierwszy program Hadoop MapReduce

Krok 4)

Utwórz nowy plik Manifest.txt

sudo gedit Manifest.txt

dodaj do niego następujące linie,

Main-Class: SalesCountry.SalesCountryDriver

Pierwszy program Hadoop MapReduce

SalesCountry.SalesCountryDriver to nazwa klasy głównej. Pamiętaj, że musisz nacisnąć klawisz Enter na końcu tej linii.

Krok 5)

Utwórz plik Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Pierwszy program Hadoop MapReduce

Sprawdź, czy plik jar został utworzony

Pierwszy program Hadoop MapReduce

Krok 6)

Uruchom Hadoopa

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Krok 7)

Skopiuj plik SprzedażJan2009.csv najnowszych ~/inputMapReduce

Teraz użyj poniższego polecenia, aby skopiować ~/inputMapReduce do HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Pierwszy program Hadoop MapReduce

Możemy spokojnie zignorować to ostrzeżenie.

Sprawdź, czy plik rzeczywiście został skopiowany, czy nie.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Pierwszy program Hadoop MapReduce

Krok 8)

Uruchom zadanie MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Pierwszy program Hadoop MapReduce

Spowoduje to utworzenie katalogu wyjściowego o nazwie mapreduce_output_sales on HDFS. Zawartość tego katalogu będzie plikiem zawierającym sprzedaż produktów w poszczególnych krajach.

Krok 9)

Wynik można zobaczyć w interfejsie poleceń jako:

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Pierwszy program Hadoop MapReduce

Wyniki można również zobaczyć za pośrednictwem interfejsu internetowego, ponieważ:

Otwórz r w przeglądarce internetowej.

Pierwszy program Hadoop MapReduce

Teraz wybierz „Przeglądaj system plików” i przejdź do /mapreduce_output_sales

Pierwszy program Hadoop MapReduce

Otwarte część-r-00000

Pierwszy program Hadoop MapReduce

Objaśnienie klasy SalesMapper

W tej sekcji zrozumiemy implementację SalesMapper class.

1. Zaczynamy od podania nazwy pakietu dla naszej klasy. Kraj sprzedaży to nazwa naszego pakietu. Należy pamiętać, że wynik kompilacji, SalesMapper.class przejdzie do katalogu o nazwie tej nazwy pakietu: Kraj sprzedaży.

Następnie importujemy pakiety bibliotek.

Poniższy zrzut ekranu przedstawia implementację SalesMapper klasa-

Objaśnienie klasy SalesMapper

Przykładowy kod wyjaśnienie:

1. Definicja klasy SalesMapper-

klasa publiczna SalesMapper rozszerza MapReduceBase implementującą Mapper {

Każda klasa mapująca musi zostać rozszerzona z MapReduceBase class i musi zostać zaimplementowany Twórca map berło.

2. Definiowanie funkcji „mapa”-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Główną częścią klasy Mapper jest a 'mapa()' metoda, która przyjmuje cztery argumenty.

Przy każdym wezwaniu do 'mapa()' metoda, A kluczowa wartość para ('klucz' i 'wartość' w tym kodzie) jest przekazywane.

'mapa()' Metoda rozpoczyna się od podziału tekstu wejściowego, który jest odbierany jako argument. Używa tokenizera, aby podzielić te linie na słowa.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Tutaj, „,” jest używany jako ogranicznik.

Następnie tworzona jest para przy użyciu rekordu znajdującego się na 7. indeksie tablicy „Dane jednego kraju” i wartość "1".

wyjście.collect(nowy tekst(SingleCountryData[7]), jeden);

Wybieramy rekord na 7. indeksie, bo tego potrzebujemy Państwo data i znajduje się pod siódmym indeksem tablicy „Dane jednego kraju”.

Należy pamiętać, że nasze dane wejściowe mają poniższy format (gdzie Państwo jest o 7th indeks, z 0 jako indeksem początkowym)-

Data_transakcji, Produkt, Cena, Typ_płatności, Nazwa, Miasto, Stan,Państwo,Utworzono_konto,Ostatnie_logowanie,Szerokość geograficzna,Długość geograficzna

Dane wyjściowe programu mapującego to ponownie a kluczowa wartość para, która jest wyprowadzana za pomocą 'zbierać()' metoda „Kolektor wyjściowy”.

Objaśnienie klasy SalesCountryReducer

W tej sekcji zrozumiemy implementację Kraj sprzedażyReduktor class.

1. Zaczynamy od podania nazwy pakietu dla naszej klasy. Kraj sprzedaży to nazwa pakietu. Należy pamiętać, że wynik kompilacji, SalesCountryReducer.class przejdzie do katalogu o nazwie tej nazwy pakietu: Kraj sprzedaży.

Następnie importujemy pakiety bibliotek.

Poniższy zrzut ekranu przedstawia implementację Kraj sprzedażyReduktor klasa-

Objaśnienie klasy SalesCountryReducer

Wyjaśnienie kodu:

1. Definicja klasy SalesCountryReducer-

klasa publiczna SalesCountryReducer rozszerza MapReduceBase implementuje Reduktor {

Tutaj pierwsze dwa typy danych, 'Tekst' i „Niezapisywalny” to typ danych wejściowej pary klucz-wartość do reduktora.

Dane wyjściowe programu mapującego mają postać , . To wyjście mapera staje się wejściem do reduktora. Aby więc dostosować się do typu danych, Tekst i Możliwość zapisu są tutaj używane jako typ danych.

Ostatnie dwa typy danych, „Text” i „IntWritable”, to typ danych wyjściowych generowanych przez reduktor w postaci pary klucz-wartość.

Każda klasa reduktora musi zostać rozszerzona z MapReduceBase class i musi zostać zaimplementowany Reduktor berło.

2. Definiowanie funkcji „zmniejsz”

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Wejście do zmniejszyć() metoda jest kluczem z listą wielu wartości.

W naszym przypadku będzie to na przykład:

, , , , , .

Jest to podawane do reduktora jako

Aby zaakceptować argumenty tej formy, najpierw używa się dwóch typów danych, tj. Tekst i Iterator. Tekst jest typem danych klucza i Iterator to typ danych dla listy wartości tego klucza.

Następny argument jest typu Kolektor wyjściowy który zbiera moc wyjściową fazy reduktora.

zmniejszyć() Metoda rozpoczyna się od skopiowania wartości klucza i zainicjowania licznika częstotliwości na 0.

Klucz tekstowy = t_key;
int częstotliwośćDlaKraju = 0;

Następnie za pomocą 'chwila' pętli, iterujemy po liście wartości powiązanych z kluczem i obliczamy końcową częstotliwość, sumując wszystkie wartości.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Teraz wypychamy wynik do kolektora wyjściowego w postaci klucz i uzyskany liczba częstotliwości.

Poniższy kod to robi-

output.collect(key, new IntWritable(frequencyForCountry));

Objaśnienie klasy SalesCountryDriver

W tej sekcji zrozumiemy implementację Krajowy kierowca sprzedaży klasa

1. Zaczynamy od podania nazwy pakietu dla naszej klasy. Kraj sprzedaży to nazwa pakietu. Należy pamiętać, że wynik kompilacji, Klasa SalesCountryDriver przejdzie do katalogu nazwanego tą nazwą pakietu: Kraj sprzedaży.

Oto linia określająca nazwę pakietu, po której następuje kod importowania pakietów bibliotek.

Objaśnienie klasy SalesCountryDriver

2. Zdefiniuj klasę sterownika, która utworzy nowe zadanie klienta, obiekt konfiguracyjny i rozgłosi klasy Mapper i Reduktor.

Klasa sterownika jest odpowiedzialna za ustawienie naszego zadania MapReduce do uruchomienia Hadoop. W tej klasie określamy nazwa zadania, typ danych wejścia/wyjścia oraz nazwy klas mapujących i redukujących.

Objaśnienie klasy SalesCountryDriver

3. W poniższym fragmencie kodu ustawiamy katalogi wejściowe i wyjściowe, które służą odpowiednio do pobierania wejściowego zbioru danych i generowania danych wyjściowych.

argument[0] i argument[1] to argumenty wiersza poleceń przekazywane wraz z poleceniem podanym w praktyce MapReduce, tj.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Objaśnienie klasy SalesCountryDriver

4. Uruchom naszą pracę

Poniżej kod rozpoczyna wykonywanie zadania MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}