Примери за Hadoop & Mapreduce: Създаване на първа програма в Java

В този урок ще се научите да използвате Hadoop с примери за MapReduce. Използваните входни данни са SalesJan2009.csv. Той съдържа информация, свързана с продажбите, като име на продукта, цена, начин на плащане, град, държава на клиента и т.н. Целта е да Разберете броя на продадените продукти във всяка държава.

Първата програма Hadoop MapReduce

Сега в това Урок за MapReduce, ние ще създадем нашия първи Java Програма MapReduce:

Първата програма Hadoop MapReduce

Данни за продажби януари 2009 г

Уверете се, че имате инсталиран Hadoop. Преди да започнете с действителния процес, променете потребителя на „hduser“ (идентификатор, използван при конфигурацията на Hadoop, можете да превключите към потребителския идентификатор, използван по време на вашата програмна конфигурация на Hadoop).

su - hduser_

Първата програма Hadoop MapReduce

Стъпка 1)

Създайте нова директория с име MapReduceTutorial както е показано в примера за MapReduce по-долу

sudo mkdir MapReduceTutorial

Първата програма Hadoop MapReduce

Дайте разрешения

sudo chmod -R 777 MapReduceTutorial

Първата програма Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Изтеглете файлове тук

Първата програма Hadoop MapReduce

Проверете файловите разрешения на всички тези файлове

Първата програма Hadoop MapReduce

и ако липсват разрешения за четене, дайте същите-

Първата програма Hadoop MapReduce

Стъпка 2)

Експортирайте classpath, както е показано в примера на Hadoop по-долу

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Първата програма Hadoop MapReduce

Стъпка 3)

Компилирам Java файлове (тези файлове присъстват в директорията Final-MapReduceHandsOn). Неговите клас файлове ще бъдат поставени в директорията на пакета

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Първата програма Hadoop MapReduce

Това предупреждение може спокойно да бъде игнорирано.

Тази компилация ще създаде директория в текуща директория, наречена с името на пакета, указано в изходния файл на java (т.е. SalesCountry в нашия случай) и поставете всички компилирани клас файлове в него.

Първата програма Hadoop MapReduce

Стъпка 4)

Създайте нов файл Manifest.txt

sudo gedit Manifest.txt

добавете следните редове към него,

Main-Class: SalesCountry.SalesCountryDriver

Първата програма Hadoop MapReduce

SalesCountry.SalesCountryDriver е името на главния клас. Моля, обърнете внимание, че трябва да натиснете клавиша Enter в края на този ред.

Стъпка 5)

Създайте Jar файл

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Първата програма Hadoop MapReduce

Проверете дали jar файлът е създаден

Първата програма Hadoop MapReduce

Стъпка 6)

Стартирайте Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Стъпка 7)

Копирайте файла SalesJan2009.csv в ~/inputMapReduce

Сега използвайте командата по-долу, за да копирате ~/inputMapReduce към HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Първата програма Hadoop MapReduce

Спокойно можем да пренебрегнем това предупреждение.

Проверете дали файлът наистина е копиран или не.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Първата програма Hadoop MapReduce

Стъпка 8)

Изпълнете задание MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Първата програма Hadoop MapReduce

Това ще създаде изходна директория с име mapreduce_output_sales HDFS. Съдържанието на тази директория ще бъде файл, съдържащ продажбите на продукти по държави.

Стъпка 9)

Резултатът може да се види чрез командния интерфейс като,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Първата програма Hadoop MapReduce

Резултатите могат да се видят и чрез уеб интерфейс като-

Отворете r в уеб браузър.

Първата програма Hadoop MapReduce

Сега изберете „Преглед на файловата система“ и отидете до /mapreduce_output_sales

Първата програма Hadoop MapReduce

отворено част-r-00000

Първата програма Hadoop MapReduce

Обяснение на клас SalesMapper

В този раздел ще разберем изпълнението на SalesMapper клас.

1. Започваме, като посочим име на пакет за нашия клас. SalesCountry е име на нашия пакет. Моля, обърнете внимание, че резултатът от компилацията, SalesMapper.class ще отиде в директория, наречена с това име на пакет: SalesCountry.

След това импортираме библиотечни пакети.

По-долу моментната снимка показва изпълнение на SalesMapper клас-

Обяснение на клас SalesMapper

Примерно обяснение на кода:

1. Дефиниция на клас SalesMapper-

публичен клас SalesMapper разширява MapReduceBase прилага Mapper {

Всеки клас на картограф трябва да бъде разширен от MapReduceBase клас и трябва да се реализира Картограф интерфейс.

2. Дефиниране на функцията "карта"-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Основната част от класа Mapper е a 'map()' метод, който приема четири аргумента.

При всяко обаждане до 'map()' метод, а ключ-стойност двойка ("ключ" намлява "стойност" в този код) се предава.

'map()' методът започва с разделяне на входния текст, който се получава като аргумент. Той използва токенизатора, за да раздели тези редове на думи.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Тук ',' се използва като разделител.

След това се формира двойка, като се използва запис на 7-ми индекс на масива „SingleCountryData“ и стойност "1".

output.collect(нов текст(SingleCountryData[7]), един);

Избираме запис на 7-ми индекс, защото имаме нужда Страна данни и се намира на 7-ми индекс в масива „SingleCountryData“.

Моля, обърнете внимание, че нашите входни данни са във формат по-долу (където Страна е на 7th индекс, с 0 като начален индекс)-

Transaction_date,Product,Price,Payment_Type,Name,City,State,Страна,Account_Created,Last_Login,Latitude,Longitude

Изходът на mapper отново е a ключ-стойност двойка, която се извежда с помощта на 'collect()' метод на "OutputCollector".

Обяснение на клас SalesCountryReducer

В този раздел ще разберем изпълнението на SalesCountryReducer клас.

1. Започваме, като посочим име на пакета за нашия клас. SalesCountry е име на пакет. Моля, обърнете внимание, че резултатът от компилацията, SalesCountryReducer.class ще отиде в директория, наречена с това име на пакет: SalesCountry.

След това импортираме библиотечни пакети.

По-долу моментната снимка показва изпълнение на SalesCountryReducer клас-

Обяснение на клас SalesCountryReducer

Обяснение на кода:

1. Дефиниция на клас SalesCountryReducer-

публичен клас SalesCountryReducer разширява MapReduceBase прилага Reducer {

Тук първите два типа данни, 'Текст' намлява „IntWritable“ са тип данни на входен ключ-стойност към редуктора.

Изходът на картографа е под формата на , . Този изход на mapper става вход за редуктора. Така че, за да се приведе в съответствие с неговия тип данни, Текст намлява IntWritable се използват като тип данни тук.

Последните два типа данни, „Текст“ и „IntWritable“ са тип данни на изход, генериран от редуктора под формата на двойка ключ-стойност.

Всеки клас редуктор трябва да бъде разширен от MapReduceBase клас и трябва да се реализира Редуктор интерфейс.

2. Дефиниране на функцията „намаляване“-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Вход към намали () методът е ключ със списък от множество стойности.

Например, в нашия случай ще бъде-

, , , , , .

Това се дава на редуктора като

Така че, за да се приемат аргументи от тази форма, се използват първите два типа данни, а именно, Текст намлява Итератор. Текст е тип данни на ключ и Итератор е тип данни за списък със стойности за този ключ.

Следващият аргумент е от тип OutputCollector който събира изхода от фазата на редуктора.

намали () методът започва с копиране на ключовата стойност и инициализиране на брояча на честотите до 0.

Текстов ключ = t_key;
int frequencyForCountry = 0;

След това, използвайки 'докато' цикъл, преминаваме през списъка със стойности, свързани с ключа, и изчисляваме крайната честота, като сумираме всички стойности.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Сега изпращаме резултата към изходния колектор под формата на ключ и получено честотен брой.

Кодът по-долу прави това-

output.collect(key, new IntWritable(frequencyForCountry));

Обяснение на клас SalesCountryDriver

В този раздел ще разберем изпълнението на SalesCountryDriver клас

1. Започваме, като посочим име на пакет за нашия клас. SalesCountry е име на пакет. Моля, обърнете внимание, че резултатът от компилацията, SalesCountryDriver.class ще отиде в директория, наречена с това име на пакет: SalesCountry.

Ето ред, указващ име на пакет, последван от код за импортиране на библиотечни пакети.

Обяснение на клас SalesCountryDriver

2. Дефинирайте клас драйвер, който ще създаде нова клиентска работа, конфигурационен обект и ще рекламира класове Mapper и Reducer.

Класът на драйвера е отговорен за настройването на нашата задача MapReduce да се изпълнява Hadoop. В този клас ние уточняваме име на задание, тип данни за вход/изход и имена на класове картограф и редуктор.

Обяснение на клас SalesCountryDriver

3. В кодовия фрагмент по-долу задаваме входни и изходни директории, които се използват съответно за използване на набор от входни данни и генериране на изход.

аргумент [0] намлява аргумент [1] са аргументите на командния ред, предавани с команда, дадена в MapReduce практически, т.е.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Обяснение на клас SalesCountryDriver

4. Задействайте нашата работа

Кодът по-долу стартира изпълнението на заданието MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}