Примери за Hadoop & Mapreduce: Създаване на първа програма в Java
В този урок ще се научите да използвате Hadoop с примери за MapReduce. Използваните входни данни са SalesJan2009.csv. Той съдържа информация, свързана с продажбите, като име на продукта, цена, начин на плащане, град, държава на клиента и т.н. Целта е да Разберете броя на продадените продукти във всяка държава.
Първата програма Hadoop MapReduce
Сега в това Урок за MapReduce, ние ще създадем нашия първи Java Програма MapReduce:
Уверете се, че имате инсталиран Hadoop. Преди да започнете с действителния процес, променете потребителя на „hduser“ (идентификатор, използван при конфигурацията на Hadoop, можете да превключите към потребителския идентификатор, използван по време на вашата програмна конфигурация на Hadoop).
su - hduser_
Стъпка 1)
Създайте нова директория с име MapReduceTutorial както е показано в примера за MapReduce по-долу
sudo mkdir MapReduceTutorial
Дайте разрешения
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException { String valueString = value.toString(); String[] SingleCountryData = valueString.split(","); output.collect(new Text(SingleCountryData[7]), one); } }
SalesCountryReducer.java
package SalesCountry; import java.io.IOException; import java.util.*; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { Text key = t_key; int frequencyForCountry = 0; while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); } output.collect(key, new IntWritable(frequencyForCountry)); } }
SalesCountryDriver.java
package SalesCountry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; public class SalesCountryDriver { public static void main(String[] args) { JobClient my_client = new JobClient(); // Create a configuration object for the job JobConf job_conf = new JobConf(SalesCountryDriver.class); // Set a name of the Job job_conf.setJobName("SalePerCountry"); // Specify data type of output key and value job_conf.setOutputKeyClass(Text.class); job_conf.setOutputValueClass(IntWritable.class); // Specify names of Mapper and Reducer Class job_conf.setMapperClass(SalesCountry.SalesMapper.class); job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class); // Specify formats of the data type of Input and output job_conf.setInputFormat(TextInputFormat.class); job_conf.setOutputFormat(TextOutputFormat.class); // Set input and output directories using command line arguments, //arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file. FileInputFormat.setInputPaths(job_conf, new Path(args[0])); FileOutputFormat.setOutputPath(job_conf, new Path(args[1])); my_client.setConf(job_conf); try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); } } }
Проверете файловите разрешения на всички тези файлове
и ако липсват разрешения за четене, дайте същите-
Стъпка 2)
Експортирайте classpath, както е показано в примера на Hadoop по-долу
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Стъпка 3)
Компилирам Java файлове (тези файлове присъстват в директорията Final-MapReduceHandsOn). Неговите клас файлове ще бъдат поставени в директорията на пакета
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Това предупреждение може спокойно да бъде игнорирано.
Тази компилация ще създаде директория в текуща директория, наречена с името на пакета, указано в изходния файл на java (т.е. SalesCountry в нашия случай) и поставете всички компилирани клас файлове в него.
Стъпка 4)
Създайте нов файл Manifest.txt
sudo gedit Manifest.txt
добавете следните редове към него,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver е името на главния клас. Моля, обърнете внимание, че трябва да натиснете клавиша Enter в края на този ред.
Стъпка 5)
Създайте Jar файл
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Проверете дали jar файлът е създаден
Стъпка 6)
Стартирайте Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Стъпка 7)
Копирайте файла SalesJan2009.csv в ~/inputMapReduce
Сега използвайте командата по-долу, за да копирате ~/inputMapReduce към HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Спокойно можем да пренебрегнем това предупреждение.
Проверете дали файлът наистина е копиран или не.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Стъпка 8)
Изпълнете задание MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Това ще създаде изходна директория с име mapreduce_output_sales HDFS. Съдържанието на тази директория ще бъде файл, съдържащ продажбите на продукти по държави.
Стъпка 9)
Резултатът може да се види чрез командния интерфейс като,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Резултатите могат да се видят и чрез уеб интерфейс като-
Отворете r в уеб браузър.
Сега изберете „Преглед на файловата система“ и отидете до /mapreduce_output_sales
отворено част-r-00000
Обяснение на клас SalesMapper
В този раздел ще разберем изпълнението на SalesMapper клас.
1. Започваме, като посочим име на пакет за нашия клас. SalesCountry е име на нашия пакет. Моля, обърнете внимание, че резултатът от компилацията, SalesMapper.class ще отиде в директория, наречена с това име на пакет: SalesCountry.
След това импортираме библиотечни пакети.
По-долу моментната снимка показва изпълнение на SalesMapper клас-
Примерно обяснение на кода:
1. Дефиниция на клас SalesMapper-
публичен клас SalesMapper разширява MapReduceBase прилага Mapper {
Всеки клас на картограф трябва да бъде разширен от MapReduceBase клас и трябва да се реализира Картограф интерфейс.
2. Дефиниране на функцията "карта"-
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
Основната част от класа Mapper е a 'map()' метод, който приема четири аргумента.
При всяко обаждане до 'map()' метод, а ключ-стойност двойка ("ключ" намлява "стойност" в този код) се предава.
'map()' методът започва с разделяне на входния текст, който се получава като аргумент. Той използва токенизатора, за да раздели тези редове на думи.
String valueString = value.toString(); String[] SingleCountryData = valueString.split(",");
Тук ',' се използва като разделител.
След това се формира двойка, като се използва запис на 7-ми индекс на масива „SingleCountryData“ и стойност "1".
output.collect(нов текст(SingleCountryData[7]), един);
Избираме запис на 7-ми индекс, защото имаме нужда Страна данни и се намира на 7-ми индекс в масива „SingleCountryData“.
Моля, обърнете внимание, че нашите входни данни са във формат по-долу (където Страна е на 7th индекс, с 0 като начален индекс)-
Transaction_date,Product,Price,Payment_Type,Name,City,State,Страна,Account_Created,Last_Login,Latitude,Longitude
Изходът на mapper отново е a ключ-стойност двойка, която се извежда с помощта на 'collect()' метод на "OutputCollector".
Обяснение на клас SalesCountryReducer
В този раздел ще разберем изпълнението на SalesCountryReducer клас.
1. Започваме, като посочим име на пакета за нашия клас. SalesCountry е име на пакет. Моля, обърнете внимание, че резултатът от компилацията, SalesCountryReducer.class ще отиде в директория, наречена с това име на пакет: SalesCountry.
След това импортираме библиотечни пакети.
По-долу моментната снимка показва изпълнение на SalesCountryReducer клас-
Обяснение на кода:
1. Дефиниция на клас SalesCountryReducer-
публичен клас SalesCountryReducer разширява MapReduceBase прилага Reducer {
Тук първите два типа данни, 'Текст' намлява „IntWritable“ са тип данни на входен ключ-стойност към редуктора.
Изходът на картографа е под формата на , . Този изход на mapper става вход за редуктора. Така че, за да се приведе в съответствие с неговия тип данни, Текст намлява IntWritable се използват като тип данни тук.
Последните два типа данни, „Текст“ и „IntWritable“ са тип данни на изход, генериран от редуктора под формата на двойка ключ-стойност.
Всеки клас редуктор трябва да бъде разширен от MapReduceBase клас и трябва да се реализира Редуктор интерфейс.
2. Дефиниране на функцията „намаляване“-
public void reduce( Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Вход към намали () методът е ключ със списък от множество стойности.
Например, в нашия случай ще бъде-
, , , , , .
Това се дава на редуктора като
Така че, за да се приемат аргументи от тази форма, се използват първите два типа данни, а именно, Текст намлява Итератор. Текст е тип данни на ключ и Итератор е тип данни за списък със стойности за този ключ.
Следващият аргумент е от тип OutputCollector който събира изхода от фазата на редуктора.
намали () методът започва с копиране на ключовата стойност и инициализиране на брояча на честотите до 0.
Текстов ключ = t_key;
int frequencyForCountry = 0;
След това, използвайки 'докато' цикъл, преминаваме през списъка със стойности, свързани с ключа, и изчисляваме крайната честота, като сумираме всички стойности.
while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); }
Сега изпращаме резултата към изходния колектор под формата на ключ и получено честотен брой.
Кодът по-долу прави това-
output.collect(key, new IntWritable(frequencyForCountry));
Обяснение на клас SalesCountryDriver
В този раздел ще разберем изпълнението на SalesCountryDriver клас
1. Започваме, като посочим име на пакет за нашия клас. SalesCountry е име на пакет. Моля, обърнете внимание, че резултатът от компилацията, SalesCountryDriver.class ще отиде в директория, наречена с това име на пакет: SalesCountry.
Ето ред, указващ име на пакет, последван от код за импортиране на библиотечни пакети.
2. Дефинирайте клас драйвер, който ще създаде нова клиентска работа, конфигурационен обект и ще рекламира класове Mapper и Reducer.
Класът на драйвера е отговорен за настройването на нашата задача MapReduce да се изпълнява Hadoop. В този клас ние уточняваме име на задание, тип данни за вход/изход и имена на класове картограф и редуктор.
3. В кодовия фрагмент по-долу задаваме входни и изходни директории, които се използват съответно за използване на набор от входни данни и генериране на изход.
аргумент [0] намлява аргумент [1] са аргументите на командния ред, предавани с команда, дадена в MapReduce практически, т.е.
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. Задействайте нашата работа
Кодът по-долу стартира изпълнението на заданието MapReduce-
try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); }