Ejemplos de Hadoop y Mapreduce: crear el primer programa en Java

En este tutorial, aprenderá a utilizar Hadoop con ejemplos de MapReduce. Los datos de entrada utilizados son VentasEne2009.csv. Contiene información relacionada con las ventas, como nombre del producto, precio, forma de pago, ciudad, país del cliente, etc. El objetivo es Descubra la cantidad de productos vendidos en cada país.

Primer programa Hadoop MapReduce

Ahora en esto Tutorial de MapReduce, crearemos nuestro primer Java Programa MapReduce:

Primer programa Hadoop MapReduce

Datos de VentasEne2009

Asegúrese de tener Hadoop instalado. Antes de comenzar con el proceso real, cambie el usuario a "hduser" (id utilizado durante la configuración de Hadoop, puede cambiar al ID de usuario utilizado durante la configuración de programación de Hadoop).

su - hduser_

Primer programa Hadoop MapReduce

Paso 1)

Crear un nuevo directorio con nombre MapReduceTutorial como se muestra en el siguiente ejemplo de MapReduce

sudo mkdir MapReduceTutorial

Primer programa Hadoop MapReduce

Dar permisos

sudo chmod -R 777 MapReduceTutorial

Primer programa Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Descargar archivos aquí

Primer programa Hadoop MapReduce

Verifique los permisos de archivo de todos estos archivos.

Primer programa Hadoop MapReduce

y si faltan permisos de "lectura", conceda lo mismo.

Primer programa Hadoop MapReduce

Paso 2)

Exporte classpath como se muestra en el siguiente ejemplo de Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Primer programa Hadoop MapReduce

Paso 3)

Compilar Java archivos (estos archivos están presentes en el directorio Final-MapReduceHandsOn). Sus archivos de clase se colocarán en el directorio del paquete.

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Primer programa Hadoop MapReduce

Esta advertencia se puede ignorar con seguridad.

Esta compilación creará un directorio en un directorio actual cuyo nombre de paquete se especifica en el archivo fuente de Java (es decir, País de ventas en nuestro caso) y coloque todos los archivos de clase compilados en él.

Primer programa Hadoop MapReduce

Paso 4)

Crear un nuevo archivo Manifiesto.txt

sudo gedit Manifest.txt

Agregue las siguientes líneas:

Main-Class: SalesCountry.SalesCountryDriver

Primer programa Hadoop MapReduce

PaísVentas.PaísVentasDriver es el nombre de la clase principal. Tenga en cuenta que debe presionar la tecla Intro al final de esta línea.

Paso 5)

Crear un archivo Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Primer programa Hadoop MapReduce

Compruebe que el archivo jar esté creado

Primer programa Hadoop MapReduce

Paso 6)

Iniciar Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Paso 7)

Copiar el archivo VentasEne2009.csv into ~/inputMapReduce

Ahora use el siguiente comando para copiar ~/inputMapReduce a HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Primer programa Hadoop MapReduce

Podemos ignorar con seguridad esta advertencia.

Verifique si un archivo realmente se copia o no.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Primer programa Hadoop MapReduce

Paso 8)

Ejecute el trabajo MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Primer programa Hadoop MapReduce

Esto creará un directorio de salida llamado mapreduce_output_sales en HDFS. El contenido de este directorio será un archivo que contendrá las ventas de productos por país.

Paso 9)

El resultado se puede ver a través de la interfaz de comando como,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Primer programa Hadoop MapReduce

Los resultados también se pueden ver a través de una interfaz web como:

Abra r en un navegador web.

Primer programa Hadoop MapReduce

Ahora selecciona 'Explorar el sistema de archivos' y navega a /mapreduce_output_ventas

Primer programa Hadoop MapReduce

Abierto parte-r-00000

Primer programa Hadoop MapReduce

Explicación de la clase SalesMapper

En esta sección entenderemos la implementación de Mapeador de ventas clase.

1. Comenzamos especificando un nombre de paquete para nuestra clase. País de ventas es un nombre de nuestro paquete. Tenga en cuenta que el resultado de la compilación, SalesMapper.clase irá a un directorio nombrado con este nombre de paquete: País de ventas.

Seguido de esto, importamos paquetes de biblioteca.

La siguiente instantánea muestra una implementación de Mapeador de ventas clase-

Explicación de la clase SalesMapper

Explicación del código de muestra:

1. Definición de clase SalesMapper

la clase pública SalesMapper extiende MapReduceBase implementa Mapper {

Cada clase de mapeador debe extenderse desde MapaReducirBase clase y debe implementar Mapper de la interfaz del.

2. Definición de la función 'mapa'

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

La parte principal de la clase Mapper es una 'mapa()' método que acepta cuatro argumentos.

En cada llamada a 'mapa()' método, una valor clave par ('llave' y 'valor' en este código) se pasa.

'mapa()' El método comienza dividiendo el texto de entrada que se recibe como argumento. Utiliza el tokenizador para dividir estas líneas en palabras.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Aquí, ',' se utiliza como delimitador.

Después de esto, se forma un par usando un registro en el séptimo índice de la matriz. 'Datos de un solo país' y un valor '1'.

salida.collect(nuevo texto(SingleCountryData[7]), uno);

Elegimos el récord en el séptimo índice porque necesitamos País datos y se encuentra en el séptimo índice de la matriz 'Datos de un solo país'.

Tenga en cuenta que nuestros datos de entrada están en el siguiente formato (donde País es a las 7th índice, con 0 como índice inicial)-

Fecha_transacción,Producto,Precio,Tipo_pago,Nombre,Ciudad,Estado,País,Cuenta_Creada,Último_Inicio de sesión,Latitud,Longitud

Una salida de mapper es nuevamente una valor clave par que se genera usando 'recolectar()' método de 'Coleccionista de salida'.

Explicación de la clase SalesCountryReducer

En esta sección entenderemos la implementación de VentasPaísReductor clase.

1. Comenzamos especificando un nombre del paquete para nuestra clase. País de ventas es el nombre de nuestro paquete. Tenga en cuenta que el resultado de la compilación, Clase SalesCountryReducer irá a un directorio nombrado con este nombre de paquete: País de ventas.

Seguido de esto, importamos paquetes de biblioteca.

La siguiente instantánea muestra una implementación de VentasPaísReductor clase-

Explicación de la clase SalesCountryReducer

Explicación del código:

1. Definición de clase SalesCountryReducer

clase pública SalesCountryReducer extiende MapReduceBase implementa Reducer {

Aquí, los dos primeros tipos de datos, 'Texto' y 'Intescribible' son tipos de datos de valor-clave de entrada para el reductor.

La salida del asignador tiene el formato , . Esta salida del mapeador se convierte en entrada al reductor. Entonces, para alinearse con su tipo de datos, Texto y Intescribible se utilizan como tipo de datos aquí.

Los dos últimos tipos de datos, 'Texto' e 'IntWritable' son tipos de datos de salida generados por el reductor en forma de par clave-valor.

Cada clase de reductor debe ampliarse desde MapaReducirBase clase y debe implementar Reductor de la interfaz del.

2. Definición de la función 'reducir'

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Una entrada a la reducir() El método es una clave con una lista de múltiples valores.

Por ejemplo, en nuestro caso, será-

, , ,, , .

Esto se le da al reductor como

Entonces, para aceptar argumentos de esta forma, primero se utilizan dos tipos de datos, a saber: Texto y Iterador. Texto es un tipo de datos de clave y Iterador es un tipo de datos para la lista de valores para esa clave.

El siguiente argumento es de tipo OutputCollector que recoge la salida de la fase reductora.

reducir() El método comienza copiando el valor clave e inicializando el recuento de frecuencia en 0.

Clave de texto = t_key;
int frecuenciaParaPaís = 0;

Luego, usando 'mientras' bucle, iteramos a través de la lista de valores asociados con la clave y calculamos la frecuencia final sumando todos los valores.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Ahora, enviamos el resultado al recopilador de salida en forma de clave y obtenido recuento de frecuencia.

El siguiente código hace esto:

output.collect(key, new IntWritable(frequencyForCountry));

Explicación de la clase SalesCountryDriver

En esta sección entenderemos la implementación de VentasPaísConductor clase

1. Comenzamos especificando un nombre de paquete para nuestra clase. País de ventas es el nombre de nuestro paquete. Tenga en cuenta que el resultado de la compilación, Clase SalesCountryDriver irá al directorio nombrado con este nombre de paquete: País de ventas.

Aquí hay una línea que especifica el nombre del paquete seguido de un código para importar paquetes de biblioteca.

Explicación de la clase SalesCountryDriver

2. Defina una clase de controlador que creará un nuevo trabajo de cliente, un objeto de configuración y anunciará las clases Mapper y Reducer.

La clase de controlador es responsable de configurar nuestro trabajo MapReduce para que se ejecute Hadoop. En esta clase especificamos nombre del trabajo, tipo de datos de entrada/salida y nombres de las clases de asignador y reductor.

Explicación de la clase SalesCountryDriver

3. En el siguiente fragmento de código, configuramos directorios de entrada y salida que se utilizan para consumir el conjunto de datos de entrada y producir resultados, respectivamente.

arg [0] y arg [1] son los argumentos de la línea de comandos pasados ​​con un comando dado en la práctica de MapReduce, es decir,

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Explicación de la clase SalesCountryDriver

4. Activar nuestro trabajo

A continuación el código inicia la ejecución del trabajo MapReduce.

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}