Esimerkkejä Hadoopista ja Mapreducesta: Luo ensimmäinen ohjelma Java

Tässä opetusohjelmassa opit käyttämään Hadoopia MapReduce-esimerkkien kanssa. Käytetty syöttötieto on MyyntiJan2009.csv. Se sisältää myyntiin liittyviä tietoja, kuten tuotteen nimen, hinnan, maksutavan, kaupungin, asiakkaan maan jne. Tavoitteena on Selvitä myytyjen tuotteiden määrä kussakin maassa.

Ensimmäinen Hadoop MapReduce -ohjelma

Nyt tässä MapReduce opetusohjelma, luomme ensimmäisen Java MapReduce-ohjelma:

Ensimmäinen Hadoop MapReduce -ohjelma

Myyntitiedot tammikuu 2009

Varmista, että sinulla on Hadoop asennettuna. Ennen kuin aloitat varsinaisen prosessin, vaihda käyttäjäksi "hduser" (Hadoop-määrityksen aikana käytetty tunnus, voit vaihtaa Hadoop-ohjelmointimäärityksen aikana käytettyyn käyttäjätunnukseen).

su - hduser_

Ensimmäinen Hadoop MapReduce -ohjelma

Vaihe 1)

Luo uusi hakemisto nimellä MapReduceTutorial kuten alla olevassa MapReduce-esimerkissä näkyy

sudo mkdir MapReduceTutorial

Ensimmäinen Hadoop MapReduce -ohjelma

Anna käyttöoikeudet

sudo chmod -R 777 MapReduceTutorial

Ensimmäinen Hadoop MapReduce -ohjelma

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Lataa tiedostot tästä

Ensimmäinen Hadoop MapReduce -ohjelma

Tarkista kaikkien näiden tiedostojen tiedostooikeudet

Ensimmäinen Hadoop MapReduce -ohjelma

ja jos lukuoikeudet puuttuvat, myönnä samat

Ensimmäinen Hadoop MapReduce -ohjelma

Vaihe 2)

Vie luokkapolku alla olevan Hadoop-esimerkin mukaisesti

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Ensimmäinen Hadoop MapReduce -ohjelma

Vaihe 3)

Koota Java tiedostot (nämä tiedostot ovat hakemistossa Final-MapReduceHandsOn). Sen luokkatiedostot sijoitetaan pakettihakemistoon

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Ensimmäinen Hadoop MapReduce -ohjelma

Tämä varoitus voidaan jättää huomiotta.

Tämä kokoelma luo hakemiston nykyiseen hakemistoon, jonka nimi on Java-lähdetiedostossa määritetyllä paketin nimellä (esim. Myyntimaa meidän tapauksessamme) ja laita kaikki käännetyt luokkatiedostot siihen.

Ensimmäinen Hadoop MapReduce -ohjelma

Vaihe 4)

Luo uusi tiedosto Manifest.txt

sudo gedit Manifest.txt

lisää siihen seuraavat rivit,

Main-Class: SalesCountry.SalesCountryDriver

Ensimmäinen Hadoop MapReduce -ohjelma

SalesCountry.SalesCountryDriver on pääluokan nimi. Huomaa, että sinun on painettava Enter-näppäintä tämän rivin lopussa.

Vaihe 5)

Luo Jar-tiedosto

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Ensimmäinen Hadoop MapReduce -ohjelma

Tarkista, että jar-tiedosto on luotu

Ensimmäinen Hadoop MapReduce -ohjelma

Vaihe 6)

Käynnistä Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Vaihe 7)

Kopioi tiedosto MyyntiJan2009.csv tulee ~/inputMapReduce

Käytä nyt alla olevaa komentoa kopioidaksesi ~/inputMapReduce HDFS:ään.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Ensimmäinen Hadoop MapReduce -ohjelma

Voimme jättää tämän varoituksen huomiotta.

Tarkista, onko tiedosto todella kopioitu vai ei.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Ensimmäinen Hadoop MapReduce -ohjelma

Vaihe 8)

Suorita MapReduce-työ

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Ensimmäinen Hadoop MapReduce -ohjelma

Tämä luo tuloshakemiston nimeltä mapreduce_output_sales on HDFS. Tämän hakemiston sisältö on tiedosto, joka sisältää tuotteiden myynnin maittain.

Vaihe 9)

Tulos näkyy komentorajapinnan kautta kuten

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Ensimmäinen Hadoop MapReduce -ohjelma

Tulokset ovat nähtävissä myös verkkokäyttöliittymän kautta mm.

Avaa r verkkoselaimessa.

Ensimmäinen Hadoop MapReduce -ohjelma

Valitse nyt "Selaa tiedostojärjestelmää" ja navigoida /mapreduce_output_sales

Ensimmäinen Hadoop MapReduce -ohjelma

avoin osa-r-00000

Ensimmäinen Hadoop MapReduce -ohjelma

SalesMapper-luokan selitys

Tässä osiossa ymmärrämme täytäntöönpanon SalesMapper luokka.

1. Aloitamme määrittämällä paketin nimen luokallemme. Myyntimaa on pakettimme nimi. Huomaa, että kokoelman tulos, SalesMapper.class menee hakemistoon, joka on nimetty tällä paketin nimellä: Myyntimaa.

Tämän jälkeen tuomme kirjastopaketteja.

Alla tilannekuva näyttää toteutuksen SalesMapper luokka-

SalesMapper-luokan selitys

Esimerkkikoodin selitys:

1. SalesMapper-luokan määritelmä-

julkinen luokka SalesMapper laajentaa MapReduceBase toteuttaa Mapperin {

Jokaista kartoitusluokkaa on laajennettava alkaen MapReduceBase luokka ja se on toteutettava mapper käyttöliittymä.

2. Karttafunktion määrittäminen-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Mapper-luokan pääosa on a 'kartta()' menetelmä, joka hyväksyy neljä argumenttia.

Jokaisessa puhelussa 'kartta()' menetelmä, a avainarvo pari ('avain' ja 'arvo' tässä koodissa) on hyväksytty.

'kartta()' menetelmä alkaa jakamalla syöttöteksti, joka vastaanotetaan argumenttina. Se käyttää merkkiä jakaa nämä rivit sanoiksi.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Täällä '' käytetään erottimena.

Tämän jälkeen muodostetaan pari käyttämällä tietuetta taulukon 7. indeksissä "SingleCountryData" ja arvo '1'.

output.collect(new Text(SingleCountryData[7]), yksi);

Valitsemme ennätyksen 7. indeksistä, koska tarvitsemme Maa tiedot ja se sijaitsee taulukon 7. indeksissä "SingleCountryData".

Huomaa, että syöttötietomme ovat alla olevassa muodossa (jossa Maa on 7th indeksi, 0 aloitusindeksinä)-

Tapahtuman_päivä,Tuote,Hinta,Maksutyyppi,Nimi,Kaupunki,Osavaltio,Maa,Tili_Luotu,Viimeinen_kirjautuminen,Leveysaste,Pituusaste

Mapperin tulos on jälleen a avainarvo pari, joka tulostetaan käyttämällä 'kerätä()' menetelmä "OutputCollector".

SalesCountryReducer-luokan selitys

Tässä osiossa ymmärrämme täytäntöönpanon SalesCountryReducer luokka.

1. Aloitamme määrittämällä paketin nimen luokallemme. Myyntimaa on paketin nimi. Huomaa, että kokoelman tulos, SalesCountryReducer.class menee hakemistoon, joka on nimetty tällä paketin nimellä: Myyntimaa.

Tämän jälkeen tuomme kirjastopaketteja.

Alla tilannekuva näyttää toteutuksen SalesCountryReducer luokka-

SalesCountryReducer-luokan selitys

Koodin selitys:

1. SalesCountryReducer-luokan määritelmä-

julkisen luokan SalesCountryReducer laajentaa MapReduceBase toteuttaa Reducer {

Tässä kaksi ensimmäistä tietotyyppiä, 'Teksti' ja "IntWritable" ovat supistimeen syötetyn avainarvon tietotyyppi.

Mapperin tulos on muodossa , . Tästä kartoittimen ulostulosta tulee syöte supistimeen. Joten, jotta se vastaa sen tietotyyppiä, teksti ja IntWritable käytetään tässä tietotyyppinä.

Kaksi viimeistä tietotyyppiä, 'Teksti' ja 'IntWritable', ovat supistimen tuottamia datatyyppejä avain-arvo-parin muodossa.

Jokaista alennusluokkaa on laajennettava alkaen MapReduceBase luokka ja se on toteutettava Lyhentäjällä käyttöliittymä.

2. Vähennä-funktion määrittäminen

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Syöte kohteeseen vähentää() menetelmä on avain, jossa on luettelo useista arvoista.

Esimerkiksi meidän tapauksessamme se on

, , , , , .

Tämä annetaan reduktorille as

Joten tämän muotoisten argumenttien hyväksymiseksi käytetään kahta ensimmäistä tietotyyppiä, nimittäin teksti ja Iteraattori. teksti on avaimen ja Iteraattori on tietotyyppi kyseisen avaimen arvoluettelolle.

Seuraava argumentti on tyyppinen OutputCollector joka kerää vähennysvaiheen lähdön.

vähentää() menetelmä alkaa kopioimalla avaimen arvo ja alustamalla taajuuden laskenta nollaan.

Tekstiavain = t_avain;
int taajuusMaalle = 0;

Sitten käyttämällä 'sillä aikaa' silmukka, toistamme avaimeen liittyvien arvojen luetteloa ja laskemme lopullisen taajuuden summaamalla kaikki arvot.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Nyt työnnämme tuloksen lähtökerääjälle muodossa avain ja saatu taajuuslaskenta.

Alla oleva koodi tekee tämän -

output.collect(key, new IntWritable(frequencyForCountry));

SalesCountryDriver-luokan selitys

Tässä osiossa ymmärrämme täytäntöönpanon SalesCountryDriver luokka

1. Aloitamme määrittämällä paketin nimen luokallemme. Myyntimaa on paketin nimi. Huomaa, että kokoelman tulos, SalesCountryDriver.class menee hakemistoon, joka on nimetty tämän paketin nimellä: Myyntimaa.

Tässä on rivi, joka määrittää paketin nimen ja sen jälkeen koodin kirjastopakettien tuontia varten.

SalesCountryDriver-luokan selitys

2. Määritä ajuriluokka, joka luo uuden asiakastyön, konfigurointiobjektin ja mainostaa Mapper- ja Reducer-luokkia.

Kuljettajaluokka on vastuussa MapReduce-työmme asettamisesta käynnistymään Hadoop. Tässä luokassa määritämme työn nimi, syötteen/tulosteen tietotyyppi sekä kartoitus- ja vähennysluokkien nimet.

SalesCountryDriver-luokan selitys

3. Alla olevassa koodinpätkässä asetamme syöttö- ja lähtöhakemistot, joita käytetään syötetietojoukon kuluttamiseen ja tulosteen tuottamiseen, vastaavasti.

arg[0] ja arg[1] ovat komentorivin argumentit, jotka välitetään MapReducen käytännönläheisessä komennossa, eli

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

SalesCountryDriver-luokan selitys

4. Käynnistä työmme

Koodin alapuolella alkaa MapReduce-työn suorittaminen-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}