Παραδείγματα Hadoop & Mapreduce: Δημιουργία πρώτου προγράμματος στο Java
Σε αυτό το σεμινάριο, θα μάθετε να χρησιμοποιείτε το Hadoop με Παραδείγματα MapReduce. Τα δεδομένα εισόδου που χρησιμοποιούνται είναι SalesJan2009.csv. Περιέχει πληροφορίες σχετικές με τις πωλήσεις, όπως όνομα προϊόντος, τιμή, τρόπο πληρωμής, πόλη, χώρα πελάτη κ.λπ. Ο στόχος είναι να Μάθετε τον αριθμό των προϊόντων που πωλήθηκαν σε κάθε χώρα.
Πρώτο πρόγραμμα Hadoop MapReduce
Τώρα σε αυτό Οδηγός MapReduce, θα δημιουργήσουμε το πρώτο μας Java Πρόγραμμα MapReduce:
Βεβαιωθείτε ότι έχετε εγκαταστήσει το Hadoop. Πριν ξεκινήσετε με την πραγματική διαδικασία, αλλάξτε χρήστη σε 'hduser' (το αναγνωριστικό χρησιμοποιείται κατά τη διαμόρφωση του Hadoop, μπορείτε να μεταβείτε στο userid που χρησιμοποιείται κατά τη διαμόρφωση προγραμματισμού Hadoop ).
su - hduser_
Βήμα 1)
Δημιουργήστε έναν νέο κατάλογο με όνομα MapReduceTutorial όπως φαίνεται στο παρακάτω παράδειγμα MapReduce
sudo mkdir MapReduceTutorial
Δώστε δικαιώματα
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException { String valueString = value.toString(); String[] SingleCountryData = valueString.split(","); output.collect(new Text(SingleCountryData[7]), one); } }
SalesCountryReducer.java
package SalesCountry; import java.io.IOException; import java.util.*; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { Text key = t_key; int frequencyForCountry = 0; while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); } output.collect(key, new IntWritable(frequencyForCountry)); } }
SalesCountryDriver.java
package SalesCountry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; public class SalesCountryDriver { public static void main(String[] args) { JobClient my_client = new JobClient(); // Create a configuration object for the job JobConf job_conf = new JobConf(SalesCountryDriver.class); // Set a name of the Job job_conf.setJobName("SalePerCountry"); // Specify data type of output key and value job_conf.setOutputKeyClass(Text.class); job_conf.setOutputValueClass(IntWritable.class); // Specify names of Mapper and Reducer Class job_conf.setMapperClass(SalesCountry.SalesMapper.class); job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class); // Specify formats of the data type of Input and output job_conf.setInputFormat(TextInputFormat.class); job_conf.setOutputFormat(TextOutputFormat.class); // Set input and output directories using command line arguments, //arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file. FileInputFormat.setInputPaths(job_conf, new Path(args[0])); FileOutputFormat.setOutputPath(job_conf, new Path(args[1])); my_client.setConf(job_conf); try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); } } }
Ελέγξτε τα δικαιώματα αρχείων όλων αυτών των αρχείων
και αν λείπουν τα δικαιώματα «ανάγνωσης», χορηγήστε τα ίδια-
Βήμα 2)
Εξαγωγή διαδρομής κλάσης όπως φαίνεται στο παρακάτω παράδειγμα Hadoop
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Βήμα 3)
Μεταγλωττίστε Java αρχεία (αυτά τα αρχεία υπάρχουν στον κατάλογο Final-MapReduceHandsOn). Τα αρχεία κλάσης του θα τεθούν στον κατάλογο του πακέτου
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Αυτή η προειδοποίηση μπορεί να αγνοηθεί με ασφάλεια.
Αυτή η μεταγλώττιση θα δημιουργήσει έναν κατάλογο σε έναν τρέχοντα κατάλογο με το όνομα του πακέτου που καθορίζεται στο αρχείο προέλευσης java (π.χ. Χώρα πωλήσεων στην περίπτωσή μας) και βάλτε όλα τα μεταγλωττισμένα αρχεία κλάσης σε αυτό.
Βήμα 4)
Δημιουργήστε ένα νέο αρχείο Manifest.txt
sudo gedit Manifest.txt
προσθέστε τις ακόλουθες γραμμές σε αυτό,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver είναι το όνομα της κύριας τάξης. Σημειώστε ότι πρέπει να πατήσετε το πλήκτρο enter στο τέλος αυτής της γραμμής.
Βήμα 5)
Δημιουργήστε ένα αρχείο Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Ελέγξτε ότι το αρχείο jar έχει δημιουργηθεί
Βήμα 6)
Ξεκινήστε το Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Βήμα 7)
Αντιγράψτε το Αρχείο SalesJan2009.csv σε ~/inputMapReduce
Τώρα χρησιμοποιήστε την παρακάτω εντολή για αντιγραφή ~/inputMapReduce στο HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Μπορούμε με ασφάλεια να αγνοήσουμε αυτήν την προειδοποίηση.
Επαληθεύστε εάν ένα αρχείο έχει αντιγραφεί πραγματικά ή όχι.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Βήμα 8)
Εκτελέστε την εργασία MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Αυτό θα δημιουργήσει έναν κατάλογο εξόδου με το όνομα mapreduce_output_sales on ΚΑΕ. Τα περιεχόμενα αυτού του καταλόγου θα είναι ένα αρχείο που περιέχει πωλήσεις προϊόντων ανά χώρα.
Βήμα 9)
Το αποτέλεσμα μπορεί να φανεί μέσω της διεπαφής εντολών ως:
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Τα αποτελέσματα μπορούν επίσης να προβληθούν μέσω μιας διεπαφής ιστού ως-
Ανοίξτε το r σε ένα πρόγραμμα περιήγησης ιστού.
Τώρα επιλέξτε 'Περιήγηση στο σύστημα αρχείων' και περιηγηθείτε στο /mapreduce_output_sales
Ανοικτό part-r-00000
Επεξήγηση της κλάσης SalesMapper
Σε αυτή την ενότητα, θα κατανοήσουμε την εφαρμογή του SalesMapper τάξη.
1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Χώρα πωλήσεων είναι ένα όνομα του πακέτου μας. Σημειώστε ότι το αποτέλεσμα της μεταγλώττισης, SalesMapper.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: Χώρα πωλήσεων.
Μετά από αυτό, εισάγουμε πακέτα βιβλιοθήκης.
Το παρακάτω στιγμιότυπο δείχνει μια υλοποίηση του SalesMapper τάξη-
Δείγμα επεξήγησης κώδικα:
1. Ορισμός κλάσης SalesMapper-
δημόσια κλάση SalesMapper επεκτείνει το MapReduceBase υλοποιεί το Mapper {
Κάθε κλάση χαρτογράφησης πρέπει να επεκταθεί από MapReduceBase τάξη και πρέπει να υλοποιήσει Χαρτογράφος Διεπαφή.
2. Καθορισμός συνάρτησης «χάρτης»-
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
Το κύριο μέρος της κλάσης Mapper είναι α 'χάρτης()' μέθοδο που δέχεται τέσσερα ορίσματα.
Σε κάθε κλήση προς 'χάρτης()' μέθοδος, α κλειδί-τιμή ζευγάρι ('κλειδί' και 'αξία' σε αυτόν τον κωδικό) περνάει.
'χάρτης()' Η μέθοδος ξεκινά με διαίρεση του κειμένου εισόδου που λαμβάνεται ως όρισμα. Χρησιμοποιεί το tokenizer για να χωρίσει αυτές τις γραμμές σε λέξεις.
String valueString = value.toString(); String[] SingleCountryData = valueString.split(",");
Εδώ, ',' χρησιμοποιείται ως οριοθέτης.
Μετά από αυτό, σχηματίζεται ένα ζεύγος χρησιμοποιώντας μια εγγραφή στον 7ο δείκτη του πίνακα 'SingleCountryData' και μια αξία «1».
output.collect(new Text(SingleCountryData[7]), one);
Επιλέγουμε ρεκόρ στον 7ο δείκτη γιατί χρειαζόμαστε Χώρα δεδομένα και βρίσκεται στο 7ο ευρετήριο στη σειρά 'SingleCountryData'.
Λάβετε υπόψη ότι τα δεδομένα εισόδου μας είναι στην παρακάτω μορφή (όπου Χώρα είναι στο 7th δείκτης, με 0 ως αρχικό δείκτη)-
Ημερομηνία_συναλλαγής,Προϊόν,Τιμή,Τύπος_Πληρωμής,Όνομα,Πόλη,Πολιτεία,Χώρα,Account_Created,Last_Login,Latitude,Longitude
Μια έξοδος του χαρτογράφου είναι και πάλι α κλειδί-τιμή ζεύγος το οποίο εξάγεται χρησιμοποιώντας 'συλλέγω()' μέθοδος για 'OutputCollector'.
Επεξήγηση της κατηγορίας SalesCountryReducer
Σε αυτή την ενότητα, θα κατανοήσουμε την εφαρμογή του SalesCountryReducer τάξη.
1. Ξεκινάμε καθορίζοντας ένα όνομα του πακέτου για την τάξη μας. Χώρα πωλήσεων είναι ένα όνομα του πακέτου εκτός. Σημειώστε ότι το αποτέλεσμα της μεταγλώττισης, SalesCountryReducer.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: Χώρα πωλήσεων.
Μετά από αυτό, εισάγουμε πακέτα βιβλιοθήκης.
Το παρακάτω στιγμιότυπο δείχνει μια υλοποίηση του SalesCountryReducer τάξη-
Επεξήγηση κώδικα:
1. Ορισμός κλάσης SalesCountryReducer-
δημόσια κλάση SalesCountryReducer επεκτείνει το MapReduceBase υλοποιεί Reducer {
Εδώ, οι δύο πρώτοι τύποι δεδομένων, 'Κείμενο' και 'IntWritable' είναι τύπος δεδομένων κλειδιού-τιμής εισόδου στον μειωτήρα.
Η έξοδος του χαρτογράφου έχει τη μορφή , . Αυτή η έξοδος του χαρτογράφου γίνεται είσοδος στον μειωτήρα. Έτσι, για να ευθυγραμμιστεί με τον τύπο δεδομένων του, Κείμενο και IntWritable χρησιμοποιούνται ως τύπος δεδομένων εδώ.
Οι δύο τελευταίοι τύποι δεδομένων, «Κείμενο» και «IntWritable» είναι τύποι δεδομένων εξόδου που παράγονται από μειωτήρα με τη μορφή ζεύγους κλειδιού-τιμής.
Κάθε κατηγορία μειωτή πρέπει να επεκταθεί από MapReduceBase τάξη και πρέπει να υλοποιήσει Reducer Διεπαφή.
2. Ορισμός της συνάρτησης 'μείωσης'-
public void reduce( Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Μια εισαγωγή στο περιορίζω() μέθοδος είναι ένα κλειδί με μια λίστα πολλαπλών τιμών.
Για παράδειγμα, στην περίπτωσή μας, θα είναι-
, , , , , .
Αυτό δίνεται στον μειωτήρα ως
Έτσι, για να δεχθούμε ορίσματα αυτής της φόρμας, χρησιμοποιούνται πρώτα δύο τύποι δεδομένων, δηλαδή, Κείμενο και Iterator. Κείμενο είναι ένας τύπος κλειδιού δεδομένων και Iterator είναι ένας τύπος δεδομένων για λίστα τιμών για αυτό το κλειδί.
Το επόμενο όρισμα είναι τύπου OutputCollector που συλλέγει την έξοδο της φάσης μειωτήρα.
περιορίζω() Η μέθοδος ξεκινά αντιγράφοντας την τιμή του κλειδιού και αρχικοποιώντας τον αριθμό συχνοτήτων στο 0.
Κλειδί κειμένου = t_key;
int συχνότηταForCountry = 0;
Στη συνέχεια, χρησιμοποιώντας το 'ενώ' βρόχο, επαναλαμβάνουμε τη λίστα των τιμών που σχετίζονται με το κλειδί και υπολογίζουμε την τελική συχνότητα αθροίζοντας όλες τις τιμές.
while (values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); }
Τώρα, σπρώχνουμε το αποτέλεσμα στον συλλέκτη εξόδου με τη μορφή κλειδί και αποκτήθηκε μέτρηση συχνότητας.
Ο παρακάτω κώδικας κάνει αυτό-
output.collect(key, new IntWritable(frequencyForCountry));
Επεξήγηση της κατηγορίας SalesCountryDriver
Σε αυτή την ενότητα, θα κατανοήσουμε την εφαρμογή του SalesCountryDriver τάξη
1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Χώρα πωλήσεων είναι ένα όνομα του πακέτου εκτός. Σημειώστε ότι το αποτέλεσμα της μεταγλώττισης, SalesCountryDriver.class θα μεταβεί στον κατάλογο με το όνομα αυτού του πακέτου: Χώρα πωλήσεων.
Εδώ είναι μια γραμμή που καθορίζει το όνομα του πακέτου ακολουθούμενο από κώδικα για την εισαγωγή πακέτων βιβλιοθήκης.
2. Ορίστε μια κλάση προγραμμάτων οδήγησης που θα δημιουργήσει μια νέα εργασία πελάτη, ένα αντικείμενο διαμόρφωσης και θα διαφημίσει τις κατηγορίες Mapper και Reducer.
Η κατηγορία προγραμμάτων οδήγησης είναι υπεύθυνη για τη ρύθμιση της εργασίας MapReduce για εκτέλεση Hadoop. Σε αυτή την κλάση, καθορίζουμε όνομα εργασίας, τύπος δεδομένων εισόδου/εξόδου και ονόματα κατηγοριών αντιστοίχισης και μειωτήρα.
3. Στο παρακάτω απόσπασμα κώδικα, ορίζουμε καταλόγους εισόδου και εξόδου που χρησιμοποιούνται για την κατανάλωση δεδομένων εισόδου και την παραγωγή εξόδου, αντίστοιχα.
arg[0] και arg[1] είναι τα ορίσματα της γραμμής εντολών που μεταβιβάζονται με μια εντολή που δίνεται στο MapReduce hands-on, π.χ.
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. Ενεργοποιήστε τη δουλειά μας
Κάτω από τον κώδικα έναρξη εκτέλεσης της εργασίας MapReduce-
try { // Run the job JobClient.runJob(job_conf); } catch (Exception e) { e.printStackTrace(); }