Exemplos de Hadoop e Mapreduce: crie o primeiro programa em Java

Neste tutorial, você aprenderá a usar o Hadoop com exemplos de MapReduce. Os dados de entrada usados ​​são VendasJan2009.csv. Ele contém informações relacionadas a vendas, como nome do produto, preço, forma de pagamento, cidade, país do cliente, etc. Descubra o número de produtos vendidos em cada país.

Primeiro programa Hadoop MapReduce

Agora neste Tutorial MapReduce, criaremos nosso primeiro Java Programa MapReduce:

Primeiro programa Hadoop MapReduce

Dados de VendasJan2009

Certifique-se de ter o Hadoop instalado. Antes de iniciar o processo real, altere o usuário para 'hduser' (id usado durante a configuração do Hadoop, você pode mudar para o ID do usuário usado durante a configuração de programação do Hadoop).

su - hduser_

Primeiro programa Hadoop MapReduce

Passo 1)

Crie um novo diretório com nome Tutorial MapReduce como mostrado no exemplo MapReduce abaixo

sudo mkdir MapReduceTutorial

Primeiro programa Hadoop MapReduce

Dê permissões

sudo chmod -R 777 MapReduceTutorial

Primeiro programa Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Baixe os arquivos aqui

Primeiro programa Hadoop MapReduce

Verifique as permissões de arquivo de todos esses arquivos

Primeiro programa Hadoop MapReduce

e se as permissões de 'leitura' estiverem faltando, conceda o mesmo-

Primeiro programa Hadoop MapReduce

Passo 2)

Exporte o caminho de classe conforme mostrado no exemplo do Hadoop abaixo

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Primeiro programa Hadoop MapReduce

Passo 3)

Compilar Java arquivos (esses arquivos estão presentes no diretório Final-MapReduceHandsOn). Seus arquivos de classe serão colocados no diretório do pacote

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Primeiro programa Hadoop MapReduce

Este aviso pode ser ignorado com segurança.

Esta compilação criará um diretório em um diretório atual nomeado com o nome do pacote especificado no arquivo fonte java (ou seja, País de Vendas no nosso caso) e coloque todos os arquivos de classe compilados nele.

Primeiro programa Hadoop MapReduce

Passo 4)

Crie um novo arquivo Manifesto.txt

sudo gedit Manifest.txt

adicione as seguintes linhas a ele,

Main-Class: SalesCountry.SalesCountryDriver

Primeiro programa Hadoop MapReduce

SalesCountry.SalesCountryDriver é o nome da classe principal. Observe que você deve pressionar a tecla Enter no final desta linha.

Passo 5)

Crie um arquivo Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Primeiro programa Hadoop MapReduce

Verifique se o arquivo jar foi criado

Primeiro programa Hadoop MapReduce

Passo 6)

Inicie o Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Passo 7)

Copie o arquivo VendasJan2009.csv para dentro ~/inputMapReduce

Agora use o comando abaixo para copiar ~/inputMapReduce para HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Primeiro programa Hadoop MapReduce

Podemos ignorar este aviso com segurança.

Verifique se um arquivo foi realmente copiado ou não.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Primeiro programa Hadoop MapReduce

Passo 8)

Execute o trabalho MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Primeiro programa Hadoop MapReduce

Isto criará um diretório de saída chamado mapreduce_output_sales em HDFS. O conteúdo deste diretório será um arquivo contendo as vendas de produtos por país.

Passo 9)

O resultado pode ser visto através da interface de comando como,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Primeiro programa Hadoop MapReduce

Os resultados também podem ser vistos através de uma interface web como-

Abra r em um navegador da web.

Primeiro programa Hadoop MapReduce

Agora selecione 'Navegar no sistema de arquivos' e navegue até /mapreduce_output_sales

Primeiro programa Hadoop MapReduce

Abra parte-r-00000

Primeiro programa Hadoop MapReduce

Explicação da classe SalesMapper

Nesta seção, entenderemos a implementação de SalesMapper classe.

1. Começamos especificando um nome de pacote para nossa classe. País de Vendas é o nome do nosso pacote. Observe que a saída da compilação, SalesMapper.class irá para um diretório nomeado por este nome de pacote: País de Vendas.

Em seguida, importamos pacotes de biblioteca.

O instantâneo abaixo mostra uma implementação de SalesMapper aula-

Explicação da classe SalesMapper

Explicação do código de exemplo:

1. Definição de classe SalesMapper-

classe pública SalesMapper estende MapReduceBase implementa Mapper {

Cada classe de mapeador deve ser estendida de MapReduceBase classe e deve implementar Mapper interface.

2. Definindo a função 'mapa'-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

A parte principal da classe Mapper é um 'mapa()' método que aceita quatro argumentos.

A cada chamada para 'mapa()' método, um valor chave par ('chave' e 'valor' neste código) é passado.

'mapa()' O método começa dividindo o texto de entrada que é recebido como argumento. Ele usa o tokenizer para dividir essas linhas em palavras.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Aqui, ',' é usado como delimitador.

Depois disso, um par é formado usando um registro no 7º índice do array 'Dados de País Único' e um valor '1'.

output.collect(novo Texto(SingleCountryData[7]), um);

Estamos escolhendo o registro no 7º índice porque precisamos País dados e está localizado no 7º índice da matriz 'Dados de País Único'.

Observe que nossos dados de entrada estão no formato abaixo (onde País é às 7th índice, com 0 como índice inicial)-

Data_da_transação,Produto,Preço,Tipo_de_pagamento,Nome,Cidade,Estado,País,Conta_criada,Último_login,Latitude,Longitude

Uma saída do mapper é novamente um valor chave par que é emitido usando 'coletar ()' método de 'Coletor de saída'.

Explicação da classe SalesCountryReducer

Nesta seção, entenderemos a implementação de SalesCountryReducer classe.

1. Começamos especificando o nome do pacote para nossa classe. País de Vendas é o nome do nosso pacote. Observe que a saída da compilação, SalesCountryReducer.class irá para um diretório nomeado por este nome de pacote: País de Vendas.

Em seguida, importamos pacotes de biblioteca.

O instantâneo abaixo mostra uma implementação de SalesCountryReducer aula-

Explicação da classe SalesCountryReducer

Explicação do código:

1. Definição da classe SalesCountryReducer-

classe pública SalesCountryReducer estende MapReduceBase implementa Redutor {

Aqui, os dois primeiros tipos de dados, 'Texto' e 'Intgravável' são tipos de dados de valor-chave de entrada para o redutor.

A saída do mapeador está na forma de , . Esta saída do mapeador torna-se uma entrada para o redutor. Então, para alinhar com seu tipo de dados, Texto e Intgravável são usados ​​​​como tipo de dados aqui.

Os dois últimos tipos de dados, 'Texto' e 'IntWritable' são tipos de dados de saída gerados pelo redutor na forma de par chave-valor.

Toda classe redutora deve ser estendida de MapReduceBase classe e deve implementar Redutor interface.

2. Definindo a função 'reduzir'-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Uma entrada para o reduzir() método é uma chave com uma lista de vários valores.

Por exemplo, no nosso caso, será-

, , , , , .

Isso é dado ao redutor como

Portanto, para aceitar argumentos desta forma, os primeiros dois tipos de dados são usados, a saber, Texto e Iterador. Texto é um tipo de dados de chave e Iterador é um tipo de dados para a lista de valores dessa chave.

O próximo argumento é do tipo Coletor de saída que coleta a saída da fase redutora.

reduzir() O método começa copiando o valor da chave e inicializando a contagem de frequência como 0.

Chave de texto = t_key;
int frequênciaForCountry = 0;

Então, usando 'enquanto' loop, iteramos pela lista de valores associados à chave e calculamos a frequência final somando todos os valores.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Agora, enviamos o resultado para o coletor de saída na forma de chave e obtido contagem de freqüência.

O código abaixo faz isso-

output.collect(key, new IntWritable(frequencyForCountry));

Explicação da classe SalesCountryDriver

Nesta seção, entenderemos a implementação de SalesCountryDriver classe

1. Começamos especificando um nome de pacote para nossa classe. País de Vendas é o nome do nosso pacote. Observe que a saída da compilação, SalesCountryDriver.class irá para o diretório nomeado por este nome de pacote: País de Vendas.

Aqui está uma linha especificando o nome do pacote seguido do código para importar pacotes de biblioteca.

Explicação da classe SalesCountryDriver

2. Defina uma classe de driver que criará um novo trabalho de cliente, objeto de configuração e anunciará classes Mapeadora e Redutora.

A classe driver é responsável por configurar nosso trabalho MapReduce para ser executado em Hadoop. Nesta classe, especificamos nome do trabalho, tipo de dados de entrada/saída e nomes das classes mapeadoras e redutoras.

Explicação da classe SalesCountryDriver

3. No trecho de código abaixo, definimos diretórios de entrada e saída que são usados ​​para consumir o conjunto de dados de entrada e produzir saída, respectivamente.

arg [0] e arg [1] são os argumentos de linha de comando passados ​​​​com um comando fornecido no MapReduce prático, ou seja,

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Explicação da classe SalesCountryDriver

4. Acione nosso trabalho

Abaixo do código inicia a execução do trabalho MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}