Παραδείγματα Hadoop & Mapreduce: Δημιουργία πρώτου προγράμματος στο Java

Σε αυτό το σεμινάριο, θα μάθετε να χρησιμοποιείτε το Hadoop με Παραδείγματα MapReduce. Τα δεδομένα εισόδου που χρησιμοποιούνται είναι SalesJan2009.csv. Περιέχει πληροφορίες σχετικές με τις πωλήσεις, όπως όνομα προϊόντος, τιμή, τρόπο πληρωμής, πόλη, χώρα πελάτη κ.λπ. Ο στόχος είναι να Μάθετε τον αριθμό των προϊόντων που πωλήθηκαν σε κάθε χώρα.

Πρώτο πρόγραμμα Hadoop MapReduce

Τώρα σε αυτό Οδηγός MapReduce, θα δημιουργήσουμε το πρώτο μας Java Πρόγραμμα MapReduce:

Πρώτο πρόγραμμα Hadoop MapReduce

Στοιχεία SalesJan2009

Βεβαιωθείτε ότι έχετε εγκαταστήσει το Hadoop. Πριν ξεκινήσετε με την πραγματική διαδικασία, αλλάξτε χρήστη σε 'hduser' (το αναγνωριστικό χρησιμοποιείται κατά τη διαμόρφωση του Hadoop, μπορείτε να μεταβείτε στο userid που χρησιμοποιείται κατά τη διαμόρφωση προγραμματισμού Hadoop ).

su - hduser_

Πρώτο πρόγραμμα Hadoop MapReduce

Βήμα 1)

Δημιουργήστε έναν νέο κατάλογο με όνομα MapReduceTutorial όπως φαίνεται στο παρακάτω παράδειγμα MapReduce

sudo mkdir MapReduceTutorial

Πρώτο πρόγραμμα Hadoop MapReduce

Δώστε δικαιώματα

sudo chmod -R 777 MapReduceTutorial

Πρώτο πρόγραμμα Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Κατεβάστε τα Αρχεία εδώ

Πρώτο πρόγραμμα Hadoop MapReduce

Ελέγξτε τα δικαιώματα αρχείων όλων αυτών των αρχείων

Πρώτο πρόγραμμα Hadoop MapReduce

και αν λείπουν τα δικαιώματα «ανάγνωσης», χορηγήστε τα ίδια-

Πρώτο πρόγραμμα Hadoop MapReduce

Βήμα 2)

Εξαγωγή διαδρομής κλάσης όπως φαίνεται στο παρακάτω παράδειγμα Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Πρώτο πρόγραμμα Hadoop MapReduce

Βήμα 3)

Μεταγλωττίστε Java αρχεία (αυτά τα αρχεία υπάρχουν στον κατάλογο Final-MapReduceHandsOn). Τα αρχεία κλάσης του θα τεθούν στον κατάλογο του πακέτου

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Πρώτο πρόγραμμα Hadoop MapReduce

Αυτή η προειδοποίηση μπορεί να αγνοηθεί με ασφάλεια.

Αυτή η μεταγλώττιση θα δημιουργήσει έναν κατάλογο σε έναν τρέχοντα κατάλογο με το όνομα του πακέτου που καθορίζεται στο αρχείο προέλευσης java (π.χ. Χώρα πωλήσεων στην περίπτωσή μας) και βάλτε όλα τα μεταγλωττισμένα αρχεία κλάσης σε αυτό.

Πρώτο πρόγραμμα Hadoop MapReduce

Βήμα 4)

Δημιουργήστε ένα νέο αρχείο Manifest.txt

sudo gedit Manifest.txt

προσθέστε τις ακόλουθες γραμμές σε αυτό,

Main-Class: SalesCountry.SalesCountryDriver

Πρώτο πρόγραμμα Hadoop MapReduce

SalesCountry.SalesCountryDriver είναι το όνομα της κύριας τάξης. Σημειώστε ότι πρέπει να πατήσετε το πλήκτρο enter στο τέλος αυτής της γραμμής.

Βήμα 5)

Δημιουργήστε ένα αρχείο Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Πρώτο πρόγραμμα Hadoop MapReduce

Ελέγξτε ότι το αρχείο jar έχει δημιουργηθεί

Πρώτο πρόγραμμα Hadoop MapReduce

Βήμα 6)

Ξεκινήστε το Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Βήμα 7)

Αντιγράψτε το Αρχείο SalesJan2009.csv σε ~/inputMapReduce

Τώρα χρησιμοποιήστε την παρακάτω εντολή για αντιγραφή ~/inputMapReduce στο HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Πρώτο πρόγραμμα Hadoop MapReduce

Μπορούμε με ασφάλεια να αγνοήσουμε αυτήν την προειδοποίηση.

Επαληθεύστε εάν ένα αρχείο έχει αντιγραφεί πραγματικά ή όχι.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Πρώτο πρόγραμμα Hadoop MapReduce

Βήμα 8)

Εκτελέστε την εργασία MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Πρώτο πρόγραμμα Hadoop MapReduce

Αυτό θα δημιουργήσει έναν κατάλογο εξόδου με το όνομα mapreduce_output_sales on ΚΑΕ. Τα περιεχόμενα αυτού του καταλόγου θα είναι ένα αρχείο που περιέχει πωλήσεις προϊόντων ανά χώρα.

Βήμα 9)

Το αποτέλεσμα μπορεί να φανεί μέσω της διεπαφής εντολών ως:

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Πρώτο πρόγραμμα Hadoop MapReduce

Τα αποτελέσματα μπορούν επίσης να προβληθούν μέσω μιας διεπαφής ιστού ως-

Ανοίξτε το r σε ένα πρόγραμμα περιήγησης ιστού.

Πρώτο πρόγραμμα Hadoop MapReduce

Τώρα επιλέξτε 'Περιήγηση στο σύστημα αρχείων' και περιηγηθείτε στο /mapreduce_output_sales

Πρώτο πρόγραμμα Hadoop MapReduce

Ανοικτό part-r-00000

Πρώτο πρόγραμμα Hadoop MapReduce

Επεξήγηση της κλάσης SalesMapper

Σε αυτή την ενότητα, θα κατανοήσουμε την εφαρμογή του SalesMapper τάξη.

1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Χώρα πωλήσεων είναι ένα όνομα του πακέτου μας. Σημειώστε ότι το αποτέλεσμα της μεταγλώττισης, SalesMapper.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: Χώρα πωλήσεων.

Μετά από αυτό, εισάγουμε πακέτα βιβλιοθήκης.

Το παρακάτω στιγμιότυπο δείχνει μια υλοποίηση του SalesMapper τάξη-

Επεξήγηση κλάσης SalesMapper

Δείγμα επεξήγησης κώδικα:

1. Ορισμός κλάσης SalesMapper-

δημόσια κλάση SalesMapper επεκτείνει το MapReduceBase υλοποιεί το Mapper {

Κάθε κλάση χαρτογράφησης πρέπει να επεκταθεί από MapReduceBase τάξη και πρέπει να υλοποιήσει Χαρτογράφος Διεπαφή.

2. Καθορισμός συνάρτησης «χάρτης»-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Το κύριο μέρος της κλάσης Mapper είναι α 'χάρτης()' μέθοδο που δέχεται τέσσερα ορίσματα.

Σε κάθε κλήση προς 'χάρτης()' μέθοδος, α κλειδί-τιμή ζευγάρι ('κλειδί' και 'αξία' σε αυτόν τον κωδικό) περνάει.

'χάρτης()' Η μέθοδος ξεκινά με διαίρεση του κειμένου εισόδου που λαμβάνεται ως όρισμα. Χρησιμοποιεί το tokenizer για να χωρίσει αυτές τις γραμμές σε λέξεις.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Εδώ, ',' χρησιμοποιείται ως οριοθέτης.

Μετά από αυτό, σχηματίζεται ένα ζεύγος χρησιμοποιώντας μια εγγραφή στον 7ο δείκτη του πίνακα 'SingleCountryData' και μια αξία «1».

output.collect(new Text(SingleCountryData[7]), one);

Επιλέγουμε ρεκόρ στον 7ο δείκτη γιατί χρειαζόμαστε Χώρα δεδομένα και βρίσκεται στο 7ο ευρετήριο στη σειρά 'SingleCountryData'.

Λάβετε υπόψη ότι τα δεδομένα εισόδου μας είναι στην παρακάτω μορφή (όπου Χώρα είναι στο 7th δείκτης, με 0 ως αρχικό δείκτη)-

Ημερομηνία_συναλλαγής,Προϊόν,Τιμή,Τύπος_Πληρωμής,Όνομα,Πόλη,Πολιτεία,Χώρα,Account_Created,Last_Login,Latitude,Longitude

Μια έξοδος του χαρτογράφου είναι και πάλι α κλειδί-τιμή ζεύγος το οποίο εξάγεται χρησιμοποιώντας 'συλλέγω()' μέθοδος για 'OutputCollector'.

Επεξήγηση της κατηγορίας SalesCountryReducer

Σε αυτή την ενότητα, θα κατανοήσουμε την εφαρμογή του SalesCountryReducer τάξη.

1. Ξεκινάμε καθορίζοντας ένα όνομα του πακέτου για την τάξη μας. Χώρα πωλήσεων είναι ένα όνομα του πακέτου εκτός. Σημειώστε ότι το αποτέλεσμα της μεταγλώττισης, SalesCountryReducer.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: Χώρα πωλήσεων.

Μετά από αυτό, εισάγουμε πακέτα βιβλιοθήκης.

Το παρακάτω στιγμιότυπο δείχνει μια υλοποίηση του SalesCountryReducer τάξη-

Επεξήγηση κατηγορίας SalesCountryReducer

Επεξήγηση κώδικα:

1. Ορισμός κλάσης SalesCountryReducer-

δημόσια κλάση SalesCountryReducer επεκτείνει το MapReduceBase υλοποιεί Reducer {

Εδώ, οι δύο πρώτοι τύποι δεδομένων, 'Κείμενο' και 'IntWritable' είναι τύπος δεδομένων κλειδιού-τιμής εισόδου στον μειωτήρα.

Η έξοδος του χαρτογράφου έχει τη μορφή , . Αυτή η έξοδος του χαρτογράφου γίνεται είσοδος στον μειωτήρα. Έτσι, για να ευθυγραμμιστεί με τον τύπο δεδομένων του, Κείμενο και IntWritable χρησιμοποιούνται ως τύπος δεδομένων εδώ.

Οι δύο τελευταίοι τύποι δεδομένων, «Κείμενο» και «IntWritable» είναι τύποι δεδομένων εξόδου που παράγονται από μειωτήρα με τη μορφή ζεύγους κλειδιού-τιμής.

Κάθε κατηγορία μειωτή πρέπει να επεκταθεί από MapReduceBase τάξη και πρέπει να υλοποιήσει Reducer Διεπαφή.

2. Ορισμός της συνάρτησης 'μείωσης'-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Μια εισαγωγή στο περιορίζω() μέθοδος είναι ένα κλειδί με μια λίστα πολλαπλών τιμών.

Για παράδειγμα, στην περίπτωσή μας, θα είναι-

, , , , , .

Αυτό δίνεται στον μειωτήρα ως

Έτσι, για να δεχθούμε ορίσματα αυτής της φόρμας, χρησιμοποιούνται πρώτα δύο τύποι δεδομένων, δηλαδή, Κείμενο και Iterator. Κείμενο είναι ένας τύπος κλειδιού δεδομένων και Iterator είναι ένας τύπος δεδομένων για λίστα τιμών για αυτό το κλειδί.

Το επόμενο όρισμα είναι τύπου OutputCollector που συλλέγει την έξοδο της φάσης μειωτήρα.

περιορίζω() Η μέθοδος ξεκινά αντιγράφοντας την τιμή του κλειδιού και αρχικοποιώντας τον αριθμό συχνοτήτων στο 0.

Κλειδί κειμένου = t_key;
int συχνότηταForCountry = 0;

Στη συνέχεια, χρησιμοποιώντας το 'ενώ' βρόχο, επαναλαμβάνουμε τη λίστα των τιμών που σχετίζονται με το κλειδί και υπολογίζουμε την τελική συχνότητα αθροίζοντας όλες τις τιμές.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Τώρα, σπρώχνουμε το αποτέλεσμα στον συλλέκτη εξόδου με τη μορφή κλειδί και αποκτήθηκε μέτρηση συχνότητας.

Ο παρακάτω κώδικας κάνει αυτό-

output.collect(key, new IntWritable(frequencyForCountry));

Επεξήγηση της κατηγορίας SalesCountryDriver

Σε αυτή την ενότητα, θα κατανοήσουμε την εφαρμογή του SalesCountryDriver τάξη

1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Χώρα πωλήσεων είναι ένα όνομα του πακέτου εκτός. Σημειώστε ότι το αποτέλεσμα της μεταγλώττισης, SalesCountryDriver.class θα μεταβεί στον κατάλογο με το όνομα αυτού του πακέτου: Χώρα πωλήσεων.

Εδώ είναι μια γραμμή που καθορίζει το όνομα του πακέτου ακολουθούμενο από κώδικα για την εισαγωγή πακέτων βιβλιοθήκης.

Επεξήγηση κατηγορίας SalesCountryDriver

2. Ορίστε μια κλάση προγραμμάτων οδήγησης που θα δημιουργήσει μια νέα εργασία πελάτη, ένα αντικείμενο διαμόρφωσης και θα διαφημίσει τις κατηγορίες Mapper και Reducer.

Η κατηγορία προγραμμάτων οδήγησης είναι υπεύθυνη για τη ρύθμιση της εργασίας MapReduce για εκτέλεση Hadoop. Σε αυτή την κλάση, καθορίζουμε όνομα εργασίας, τύπος δεδομένων εισόδου/εξόδου και ονόματα κατηγοριών αντιστοίχισης και μειωτήρα.

Επεξήγηση κατηγορίας SalesCountryDriver

3. Στο παρακάτω απόσπασμα κώδικα, ορίζουμε καταλόγους εισόδου και εξόδου που χρησιμοποιούνται για την κατανάλωση δεδομένων εισόδου και την παραγωγή εξόδου, αντίστοιχα.

arg[0] και arg[1] είναι τα ορίσματα της γραμμής εντολών που μεταβιβάζονται με μια εντολή που δίνεται στο MapReduce hands-on, π.χ.

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Επεξήγηση κατηγορίας SalesCountryDriver

4. Ενεργοποιήστε τη δουλειά μας

Κάτω από τον κώδικα έναρξη εκτέλεσης της εργασίας MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}