Esempi di Hadoop e Mapreduce: crea il primo programma in Java

In questo tutorial imparerai a utilizzare Hadoop con gli esempi MapReduce. I dati di input utilizzati sono VenditeJan2009.csv. Contiene informazioni relative alle vendite come nome del prodotto, prezzo, modalità di pagamento, città, paese del cliente, ecc. L'obiettivo è quello di Scopri il numero di prodotti venduti in ciascun paese.

Primo programma Hadoop MapReduce

Ora in questo Tutorial su MapReduce, creeremo il nostro primo Java Programma MapReduce:

Primo programma Hadoop MapReduce

Dati delle vendite gennaio 2009

Assicurati di avere Hadoop installato. Prima di iniziare con il processo vero e proprio, cambia utente in "hduser" (id utilizzato durante la configurazione di Hadoop, puoi passare all'ID utente utilizzato durante la configurazione della programmazione Hadoop).

su - hduser_

Primo programma Hadoop MapReduce

Passo 1)

Crea una nuova directory con il nome MapReduceTutorial come mostrato nell'esempio MapReduce qui sotto

sudo mkdir MapReduceTutorial

Primo programma Hadoop MapReduce

Concedi i permessi

sudo chmod -R 777 MapReduceTutorial

Primo programma Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Scarica i file qui

Primo programma Hadoop MapReduce

Controlla i permessi di tutti questi file

Primo programma Hadoop MapReduce

e se mancano le autorizzazioni di "lettura", concedi le stesse-

Primo programma Hadoop MapReduce

Passo 2)

Esporta il percorso di classe come mostrato nell'esempio Hadoop seguente

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Primo programma Hadoop MapReduce

Passo 3)

Compilare Java file (questi file sono presenti nella directory Final-MapReduceHandsOn). I suoi file di classe verranno inseriti nella directory del pacchetto

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Primo programma Hadoop MapReduce

Questo avviso può essere tranquillamente ignorato.

Questa compilazione creerà una directory in una directory corrente denominata con il nome del pacchetto specificato nel file sorgente Java (ad es Paese di vendita nel nostro caso) e inserirvi tutti i file di classe compilati.

Primo programma Hadoop MapReduce

Passo 4)

Crea un nuovo file Manifesto.txt

sudo gedit Manifest.txt

aggiungere le seguenti righe,

Main-Class: SalesCountry.SalesCountryDriver

Primo programma Hadoop MapReduce

SalesCountry.SalesCountryDriver è il nome della classe principale. Tieni presente che devi premere il tasto Invio alla fine di questa riga.

Passo 5)

Crea un file Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Primo programma Hadoop MapReduce

Verifica che il file jar sia stato creato

Primo programma Hadoop MapReduce

Passo 6)

Avvia Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Passo 7)

Copia il file VenditeJan2009.csv ai miglioramenti ~/inputMapReduce

Ora usa il comando seguente per copiare ~/inputMapReduce a HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Primo programma Hadoop MapReduce

Possiamo tranquillamente ignorare questo avvertimento.

Verificare se un file è effettivamente copiato o meno.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Primo programma Hadoop MapReduce

Passo 8)

Esegui il lavoro MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Primo programma Hadoop MapReduce

Ciò creerà una directory di output denominata mapreduce_output_sales su HDFS. Il contenuto di questa directory sarà un file contenente le vendite di prodotti per paese.

Passo 9)

Il risultato può essere visto attraverso l'interfaccia di comando come,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Primo programma Hadoop MapReduce

I risultati possono anche essere visualizzati tramite un'interfaccia web come-

Apri r in un browser web.

Primo programma Hadoop MapReduce

ora scegliere 'Sfoglia il filesystem' e vai a /mapreduce_output_sales

Primo programma Hadoop MapReduce

Apri parte-r-00000

Primo programma Hadoop MapReduce

Spiegazione della classe SalesMapper

In questa sezione ne comprenderemo l'implementazione SalesMapper classe.

1. Iniziamo specificando il nome del pacchetto per la nostra classe. Paese di vendita è il nome del nostro pacchetto. Si prega di notare che l'output della compilazione, SalesMapper.class andrà in una directory denominata con questo nome di pacchetto: Paese di vendita.

Successivamente importiamo i pacchetti di librerie.

Di seguito l'istantanea mostra un'implementazione di SalesMapper classe-

Spiegazione della classe SalesMapper

Spiegazione del codice di esempio:

1. Definizione della classe SalesMapper-

la classe pubblica SalesMapper estende MapReduceBase implementa Mapper {

Ogni classe del mapper deve essere estesa da MapReduceBase classe e deve implementare Mapper interfaccia.

2. Definizione della funzione 'mappa'-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

La parte principale della classe Mapper è a 'carta geografica()' metodo che accetta quattro argomenti.

Ad ogni chiamata a 'carta geografica()' metodo, a valore-chiave paio ('chiave' e a 'valore' in questo codice) viene passato.

'carta geografica()' Il metodo inizia suddividendo il testo di input ricevuto come argomento. Utilizza il tokenizer per dividere queste righe in parole.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Qui, '' viene utilizzato come delimitatore.

Successivamente, viene formata una coppia utilizzando un record al 7° indice dell'array "Dati per Paese singolo" e un valore '1'.

output.collect(new Text(SingleCountryData[7]), uno);

Stiamo scegliendo il record al 7° indice perché ne abbiamo bisogno Paese dati e si trova al 7° indice dell'array "Dati per Paese singolo".

Tieni presente che i nostri dati di input sono nel formato seguente (dove Paese è alle 7th indice, con 0 come indice iniziale)-

Data_transazione,Prodotto,Prezzo,Tipo_pagamento,Nome,Città,Stato,Paese,Account_Creato,Ultimo_Accesso,Latitudine,Longitudine

Un output del mapper è ancora una volta a valore-chiave coppia che viene emessa utilizzando 'raccogliere()' metodo di 'Collettore di output'.

Spiegazione della classe SalesCountryReducer

In questa sezione ne comprenderemo l'implementazione SalesCountryReducer classe.

1. Iniziamo specificando il nome del pacchetto per la nostra classe. Paese di vendita è il nome del nostro pacchetto. Si prega di notare che l'output della compilazione, SalesCountryReducer.class andrà in una directory denominata con questo nome di pacchetto: Paese di vendita.

Successivamente importiamo i pacchetti di librerie.

Di seguito l'istantanea mostra un'implementazione di SalesCountryReducer classe-

Spiegazione della classe SalesCountryReducer

Spiegazione del codice:

1. Definizione della classe SalesCountryReducer-

la classe pubblica SalesCountryReducer estende MapReduceBase implementa Reducer {

Qui, i primi due tipi di dati, 'Testo' e a 'IntWritable' sono il tipo di dati del valore-chiave di input per il riduttore.

L'output del mapper è sotto forma di , . Questa uscita del mappatore diventa input per il riduttore. Quindi, per allinearsi al tipo di dati, Testo e a IntWritable vengono utilizzati come tipo di dati qui.

Gli ultimi due tipi di dati, "Text" e "IntWritable" sono tipi di dati di output generati dal riduttore sotto forma di coppia chiave-valore.

Ogni classe di riduttore deve essere estesa da MapReduceBase classe e deve implementare Reducer interfaccia.

2. Definizione della funzione "riduci"

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Un input per il ridurre() Il metodo è una chiave con un elenco di più valori.

Ad esempio, nel nostro caso, sarà-

, , , , , .

Questo è dato al riduttore come

Quindi, per accettare argomenti di questa forma, vengono utilizzati prima due tipi di dati, vale a dire, Testo e a Iteratore. Testo è un tipo di dati di chiave e Iteratore è un tipo di dati per l'elenco di valori per quella chiave.

L'argomento successivo è di tipo OutputCollector che raccoglie l'uscita della fase riduttore.

ridurre() Il metodo inizia copiando il valore della chiave e inizializzando il conteggio della frequenza su 0.

Chiave testo = t_key;
int frequenzaForCountry = 0;

Quindi, utilizzando 'Mentre' loop, iteriamo attraverso l'elenco dei valori associati alla chiave e calcoliamo la frequenza finale sommando tutti i valori.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Ora inviamo il risultato al raccoglitore di output sotto forma di chiave e ottenuto conteggio delle frequenze.

Il codice seguente fa questo-

output.collect(key, new IntWritable(frequencyForCountry));

Spiegazione della classe SalesCountryDriver

In questa sezione ne comprenderemo l'implementazione SalesCountryDriver classe

1. Iniziamo specificando il nome del pacchetto per la nostra classe. Paese di vendita è il nome del nostro pacchetto. Si prega di notare che l'output della compilazione, SalesCountryDriver.class andrà nella directory denominata con questo nome di pacchetto: Paese di vendita.

Ecco una riga che specifica il nome del pacchetto seguito dal codice per importare i pacchetti della libreria.

Spiegazione della classe SalesCountryDriver

2. Definire una classe driver che creerà un nuovo lavoro client, un oggetto di configurazione e pubblicizzerà le classi Mapper e Reducer.

La classe driver è responsabile dell'impostazione del nostro lavoro MapReduce per l'esecuzione Hadoop. In questa classe, specifichiamo nome del lavoro, tipo di dati di input/output e nomi delle classi di mapper e reducer.

Spiegazione della classe SalesCountryDriver

3. Nello snippet di codice seguente, impostiamo le directory di input e output che vengono utilizzate rispettivamente per consumare il set di dati di input e produrre output.

argomento[0] e a argomento[1] sono gli argomenti della riga di comando passati con un comando fornito in MapReduce, ovvero

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Spiegazione della classe SalesCountryDriver

4. Attiva il nostro lavoro

Di seguito il codice avvia l'esecuzione del lavoro MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}