Hadoop MapReduce Join & Counter cu Exemplu
Ce este Join in Mapreduce?
Mapreduce Join operația este utilizată pentru a combina două seturi mari de date. Cu toate acestea, acest proces implică scrierea multor coduri pentru a efectua operația reală de conectare. Unirea a două seturi de date începe prin a compara dimensiunea fiecărui set de date. Dacă un set de date este mai mic în comparație cu celălalt set de date, atunci setul de date mai mic este distribuit fiecărui nod de date din cluster.
Odată ce o îmbinare în MapReduce este distribuită, fie Mapper, fie Reducer utilizează setul de date mai mic pentru a efectua o căutare a înregistrărilor potrivite din setul de date mare și apoi combina acele înregistrări pentru a forma înregistrări de ieșire.
Tipuri de alăturare
În funcție de locul în care se realizează îmbinarea reală, îmbinările în Hadoop sunt clasificate în-
1. Îmbinare pe hartă – Atunci când îmbinarea este efectuată de mapator, se numește îmbinarea pe hartă. În acest tip, îmbinarea este efectuată înainte ca datele să fie efectiv consumate de funcția map. Este obligatoriu ca intrarea în fiecare hartă să fie sub forma unei partiții și să fie în ordine sortată. De asemenea, trebuie să existe un număr egal de partiții și trebuie să fie sortate după cheia de unire.
2. Îmbinare laterală redusă – Atunci când îmbinarea este realizată de reductor, se numește îmbinare de reducere. Nu este nevoie ca această unire să aibă un set de date într-o formă structurată (sau partiționat).
Aici, procesarea laterală a hărții emite cheia de unire și tuplurile corespunzătoare ale ambelor tabele. Ca efect al acestei procesări, toate tuplurile cu aceeași cheie de unire cad în același reductor care apoi unește înregistrările cu aceeași cheie de unire.
Un flux general al procesului de îmbinări în Hadoop este descris în diagrama de mai jos.
Cum să alăturați două seturi de date: Exemplu MapReduce
Există două seturi de date în două fișiere diferite (prezentate mai jos). Key Dept_ID este comun în ambele fișiere. Scopul este de a utiliza MapReduce Join pentru a combina aceste fișiere
Intrare: Setul de date de intrare este un fișier txt, DeptName.txt și DepStrength.txt
Descărcați fișierele de intrare de aici
Asigurați-vă că aveți Hadoop instalat. Înainte de a începe cu procesul real de exemplu MapReduce Join, schimbați utilizatorul în „hduser” (id-ul folosit în timpul configurației Hadoop, puteți comuta la ID-ul de utilizator utilizat în timpul configurației Hadoop).
su - hduser_
Pas 1) Copiați fișierul zip în locația dorită
Pas 2) Decomprimați fișierul Zip
sudo tar -xvf MapReduceJoin.tar.gz
Pas 3) Accesați directorul MapReduceJoin/
cd MapReduceJoin/
Pas 4) Porniți Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Pas 5) DeptStrength.txt și DeptName.txt sunt fișierele de intrare utilizate pentru acest program exemplu MapReduce Join.
Aceste fișiere trebuie copiate în HDFS folosind comanda de mai jos:
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal DeptStrength.txt DeptName.txt /
Pas 6) Rulați programul folosind comanda de mai jos -
$HADOOP_HOME/bin/hadoop jar MapReduceJoin.jar MapReduceJoin/JoinDriver/DeptStrength.txt /DeptName.txt /output_mapreducejoin
Pas 7) După execuție, fișierul de ieșire (numit „part-00000”) va fi stocat în directorul /output_mapreducejoin pe HDFS
Rezultatele pot fi văzute folosind interfața de linie de comandă
$HADOOP_HOME/bin/hdfs dfs -cat /output_mapreducejoin/part-00000
Rezultatele pot fi văzute și printr-o interfață web ca-
Acum selectați „Răsfoiește sistemul de fișiere” și navigați până la /output_mapreducejoin
Operatii Deschise piesa-r-00000
Rezultatele sunt afișate
NOTĂ: Vă rugăm să rețineți că înainte de a rula acest program pentru data viitoare, va trebui să ștergeți directorul de ieșire /output_mapreducejoin
$HADOOP_HOME/bin/hdfs dfs -rm -r /output_mapreducejoin
Alternativa este să folosiți un nume diferit pentru directorul de ieșire.
Ce este Counter în MapReduce?
A Contor în MapReduce este un mecanism folosit pentru colectarea și măsurarea informațiilor statistice despre joburile și evenimentele MapReduce. Contoarele țin evidența diferitelor statistici de lucru în MapReduce, cum ar fi numărul de operațiuni efectuate și progresul operațiunii. Contoarele sunt folosite pentru diagnosticarea problemelor în MapReduce.
Contoarele Hadoop sunt similare cu introducerea unui mesaj de jurnal în cod pentru o hartă sau reducere. Aceste informații ar putea fi utile pentru diagnosticarea unei probleme în procesarea jobului MapReduce.
De obicei, aceste contoare din Hadoop sunt definite într-un program (map sau reduce) și sunt incrementate în timpul execuției atunci când apare un anumit eveniment sau condiție (specifică contorului respectiv). O aplicație foarte bună a contoarelor Hadoop este urmărirea înregistrărilor valide și nevalide dintr-un set de date de intrare.
Tipuri de contoare MapReduce
Există practic 2 tipuri de MapReduce contoare
- Contoare Hadoop încorporate:Există câteva contoare Hadoop încorporate care există pentru fiecare job. Mai jos sunt grupuri de contoare încorporate-
- Contoare de sarcini MapReduce – Colectează informații specifice sarcinii (de exemplu, numărul de înregistrări de intrare) în timpul execuției acesteia.
- Contoare de sistem de fișiere – Colectează informații cum ar fi numărul de octeți citiți sau scrisi de o sarcină
- Contoare FileInputFormat – Colectează informații despre un număr de octeți citiți prin FileInputFormat
- Contoare FileOutputFormat – Colectează informații dintr-un număr de octeți scrise prin FileOutputFormat
- Contoare de locuri de muncă - Aceste contoare sunt folosite de JobTracker. Statisticile colectate de ei includ, de exemplu, numărul de sarcini lansate pentru un loc de muncă.
- Contoare definite de utilizator
În plus față de contoarele încorporate, un utilizator își poate defini propriile contoare folosind funcționalități similare oferite de limbaje de programare. De exemplu, în Java „enum” sunt folosite pentru a defini contoare definite de utilizator.
Exemplu de contoare
Un exemplu de MapClass cu contoare pentru a număra numărul de valori lipsă și nevalide. Fișier de date de intrare utilizat în acest tutorial Setul nostru de date de intrare este un fișier CSV, VânzăriJan2009.csv
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { static enum SalesCounters { MISSING, INVALID }; public void map ( LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { //Input string is split using ',' and stored in 'fields' array String fields[] = value.toString().split(",", -20); //Value at 4th index is country. It is stored in 'country' variable String country = fields[4]; //Value at 8th index is sales data. It is stored in 'sales' variable String sales = fields[8]; if (country.length() == 0) { reporter.incrCounter(SalesCounters.MISSING, 1); } else if (sales.startsWith("\"")) { reporter.incrCounter(SalesCounters.INVALID, 1); } else { output.collect(new Text(country), new Text(sales + ",1")); } } }
Fragmentul de cod de mai sus arată un exemplu de implementare a contoarelor în Hadoop Map Reduce.
Aici, Contoare de vânzări este un contor definit folosind 'enum'. Este folosit pentru a număra LIPSĂ si INVALID înregistrări de intrare.
În fragmentul de cod, dacă 'țară' câmpul are lungime zero, atunci valoarea lui lipsește și, prin urmare, contorul corespunzător Contoare de vânzări.LIPSĂ este incrementat.
În continuare, dacă 'vânzări' câmpul începe cu a atunci înregistrarea este considerată INVALIDĂ. Acest lucru este indicat prin creșterea contorului SalesCounters.INVALID.