Primjeri Hadoop & Mapreduce: Stvorite prvi program u Java

U ovom vodiču naučit ćete koristiti Hadoop s MapReduce primjerima. Korišteni ulazni podaci su ProdajaSiječanj2009.csv. Sadrži podatke vezane uz prodaju kao što su naziv proizvoda, cijena, način plaćanja, grad, država klijenta itd. Cilj je Saznajte broj proizvoda prodanih u svakoj zemlji.

Prvi Hadoop MapReduce program

Sada u ovome Vodič za MapReduce, stvorit ćemo svoj prvi Java Program MapReduce:

Prvi Hadoop MapReduce program

Podaci o prodaji siječanj 2009

Provjerite imate li instaliran Hadoop. Prije nego započnete sa stvarnim procesom, promijenite korisnika u 'hduser' (id koji se koristi tijekom konfiguracije Hadoop-a, možete se prebaciti na korisnički id koji se koristi tijekom vaše programske konfiguracije Hadoop-a).

su - hduser_

Prvi Hadoop MapReduce program

Korak 1)

Napravite novi imenik s imenom MapReduceTutorial kao što je prikazano u donjem primjeru MapReduce

sudo mkdir MapReduceTutorial

Prvi Hadoop MapReduce program

Dajte dozvole

sudo chmod -R 777 MapReduceTutorial

Prvi Hadoop MapReduce program

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Preuzmite datoteke ovdje

Prvi Hadoop MapReduce program

Provjerite dopuštenja za sve te datoteke

Prvi Hadoop MapReduce program

a ako nedostaju dopuštenja za 'čitanje', dodijelite ih isto-

Prvi Hadoop MapReduce program

Korak 2)

Izvezite classpath kao što je prikazano u donjem Hadoop primjeru

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Prvi Hadoop MapReduce program

Korak 3)

Sastaviti Java datoteke (ove datoteke su prisutne u direktoriju Final-MapReduceHandsOn). Njegove datoteke klase bit će smještene u direktorij paketa

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Prvi Hadoop MapReduce program

Ovo upozorenje možete slobodno zanemariti.

Ova kompilacija će stvoriti direktorij u trenutnom direktoriju nazvan imenom paketa specificiranim u java izvornoj datoteci (tj. SalesCountry u našem slučaju) i u nju staviti sve kompilirane datoteke klasa.

Prvi Hadoop MapReduce program

Korak 4)

Stvorite novu datoteku Manifest.txt

sudo gedit Manifest.txt

dodajte mu sljedeće retke,

Main-Class: SalesCountry.SalesCountryDriver

Prvi Hadoop MapReduce program

SalesCountry.SalesCountryDriver je naziv glavne klase. Imajte na umu da morate pritisnuti tipku enter na kraju ovog retka.

Korak 5)

Stvorite Jar datoteku

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Prvi Hadoop MapReduce program

Provjerite je li jar datoteka stvorena

Prvi Hadoop MapReduce program

Korak 6)

Pokrenite Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Korak 7)

Kopiraj datoteku ProdajaSiječanj2009.csv u ~/inputMapReduce

Sada koristite donju naredbu za kopiranje ~/inputMapReduce na HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Prvi Hadoop MapReduce program

Možemo slobodno zanemariti ovo upozorenje.

Provjerite je li datoteka stvarno kopirana ili ne.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Prvi Hadoop MapReduce program

Korak 8)

Pokrenite zadatak MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Prvi Hadoop MapReduce program

Ovo će stvoriti izlazni direktorij pod nazivom mapreduce_output_sales on HDFS. Sadržaj ovog imenika bit će datoteka koja će sadržavati prodaju proizvoda po zemlji.

Korak 9)

Rezultat se može vidjeti kroz naredbeno sučelje kao,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Prvi Hadoop MapReduce program

Rezultati se također mogu vidjeti putem web sučelja kao-

Otvorite r u web pregledniku.

Prvi Hadoop MapReduce program

Sada odaberite "Pregledaj datotečni sustav" i krenite u /mapreduce_output_sales

Prvi Hadoop MapReduce program

Otvoren dio-r-00000

Prvi Hadoop MapReduce program

Objašnjenje klase SalesMapper

U ovom odjeljku ćemo razumjeti implementaciju SalesMapper klase.

1. Počinjemo određivanjem imena paketa za našu klasu. SalesCountry je naziv našeg paketa. Imajte na umu da izlaz kompilacije, SalesMapper.class otići će u direktorij nazvan ovim nazivom paketa: SalesCountry.

Nakon toga uvozimo knjižnične pakete.

Donja snimka prikazuje implementaciju SalesMapper razred-

Objašnjenje klase SalesMapper

Primjer objašnjenja koda:

1. Definicija klase SalesMapper-

javna klasa SalesMapper proširuje MapReduceBase implementira Mapper {

Svaka klasa preslikača mora se proširiti iz MapReduceBase klase i mora se implementirati Maper sučelje.

2. Definiranje funkcije 'map'-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Glavni dio Mapper klase je a 'karta()' metoda koja prihvaća četiri argumenta.

Na svaki poziv na 'karta()' metoda, a ključ-vrijednost par ('ključ' i 'vrijednost' u ovom kodu) se prenosi.

'karta()' metoda počinje dijeljenjem ulaznog teksta koji se prima kao argument. Koristi tokenizer za dijeljenje ovih redaka u riječi.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Ovdje, „” koristi se kao graničnik.

Nakon toga, formira se par pomoću zapisa na 7. indeksu niza 'SingleCountryData' i vrijednost '1'.

output.collect(novi tekst(SingleCountryData[7]), jedan);

Biramo zapis na 7. indeksu jer nam je potreban Zemlja podataka i nalazi se na 7. indeksu u nizu 'SingleCountryData'.

Imajte na umu da su naši ulazni podaci u donjem formatu (gdje Zemlja je u 7th indeks, s 0 kao početnim indeksom)-

Transaction_date, Product, Price, Payment_Type, Name, City, State,Zemlja,Account_Created,Last_Login,Latitude,Longitude

Izlaz mapera je opet a ključ-vrijednost par koji se ispisuje pomoću 'prikupiti()' metoda 'OutputCollector'.

Objašnjenje klase SalesCountryReducer

U ovom odjeljku ćemo razumjeti implementaciju SalesCountryReducer klase.

1. Počinjemo određivanjem imena paketa za našu klasu. SalesCountry je naziv našeg paketa. Imajte na umu da izlaz kompilacije, SalesCountryReducer.class otići će u direktorij nazvan ovim nazivom paketa: SalesCountry.

Nakon toga uvozimo knjižnične pakete.

Donja snimka prikazuje implementaciju SalesCountryReducer razred-

Objašnjenje klase SalesCountryReducer

Objašnjenje koda:

1. Definicija klase SalesCountryReducer-

javna klasa SalesCountryReducer proširuje MapReduceBase implementira Reducer {

Ovdje su prva dva tipa podataka, 'Tekst' i 'IntWritable' su tip podataka ulaznog ključa/vrijednosti u reduktor.

Izlaz mapera je u obliku , . Ovaj izlaz mapera postaje ulaz reduktora. Dakle, za usklađivanje s njegovom vrstom podataka, Tekst i IntWritable ovdje se koriste kao tip podataka.

Posljednje dvije vrste podataka, 'Tekst' i 'IntWritable' su vrste podataka koje generira reduktor u obliku para ključ-vrijednost.

Svaka klasa reduktora mora se proširiti s MapReduceBase klase i mora se implementirati Redukcija sučelje.

2. Definiranje funkcije reduciranja-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Unos u smanjiti() metoda je ključ s popisom višestrukih vrijednosti.

Na primjer, u našem slučaju to će biti-

, , , , , .

Ovo se daje reduktoru kao

Dakle, za prihvaćanje argumenata ovog oblika koriste se prva dva tipa podataka, tj. Tekst i Iterator. Tekst je podatkovni tip ključa i Iterator je tip podataka za popis vrijednosti za taj ključ.

Sljedeći argument je tipa OutputCollector koji prikuplja izlaz reduktorske faze.

smanjiti() metoda počinje kopiranjem vrijednosti ključa i inicijalizacijom brojača frekvencije na 0.

Tekstni ključ = t_ključ;
int frekvencijaZaDržavu = 0;

Zatim, koristeći 'dok' petlje, ponavljamo kroz popis vrijednosti povezanih s ključem i izračunavamo konačnu učestalost zbrajanjem svih vrijednosti.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Sada guramo rezultat u izlazni kolektor u obliku ključ i dobiveno broj frekvencija.

Donji kod radi ovo-

output.collect(key, new IntWritable(frequencyForCountry));

Objašnjenje klase SalesCountryDriver

U ovom odjeljku ćemo razumjeti implementaciju SalesCountryDriver razred

1. Počinjemo određivanjem imena paketa za našu klasu. SalesCountry je naziv našeg paketa. Imajte na umu da izlaz kompilacije, SalesCountryDriver.class otići će u direktorij nazvan ovim nazivom paketa: SalesCountry.

Ovdje je redak koji navodi naziv paketa nakon kojeg slijedi kod za uvoz paketa knjižnice.

Objašnjenje klase SalesCountryDriver

2. Definirajte klasu pokretača koja će kreirati novi posao klijenta, konfiguracijski objekt i oglašavati klase Mapper i Reducer.

Klasa upravljačkog programa odgovorna je za postavljanje našeg MapReduce posla za izvođenje Hadoop. U ovoj klasi specificiramo naziv posla, tip podataka ulaza/izlaza i imena klasa mapera i reduktora.

Objašnjenje klase SalesCountryDriver

3. U donjem isječku koda postavljamo ulazne i izlazne direktorije koji se koriste za potrošnju ulaznog skupa podataka i proizvodnju izlaza.

arg[0] i arg[1] su argumenti naredbenog retka proslijeđeni s naredbom danom u MapReduce hands-on, tj.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Objašnjenje klase SalesCountryDriver

4. Pokrenite naš posao

Ispod koda započinje izvršenje posla MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}