ตัวอย่าง Hadoop & Mapreduce: สร้างโปรแกรมแรกใน Java

ในบทช่วยสอนนี้ คุณจะได้เรียนรู้การใช้ Hadoop กับตัวอย่าง MapReduce ข้อมูลอินพุตที่ใช้คือ ยอดขายมกราคม 2009.csv- ประกอบด้วยข้อมูลที่เกี่ยวข้องกับการขาย เช่น ชื่อผลิตภัณฑ์ ราคา วิธีการชำระเงิน เมือง ประเทศของลูกค้า ฯลฯ เป้าหมายคือ ค้นหาจำนวนผลิตภัณฑ์ที่ขายในแต่ละประเทศ

โปรแกรม Hadoop MapReduce ตัวแรก

ตอนนี้อยู่ในนี้ บทช่วยสอน MapReduceเราจะสร้างของเราก่อน Java โปรแกรม MapReduce:

โปรแกรม Hadoop MapReduce ตัวแรก

ข้อมูลการขายมกราคม 2009

ตรวจสอบให้แน่ใจว่าคุณได้ติดตั้ง Hadoop แล้ว ก่อนที่คุณจะเริ่มกระบวนการจริง ให้เปลี่ยนผู้ใช้เป็น 'hduser' (รหัสที่ใช้ขณะกำหนดค่า Hadoop คุณสามารถสลับเป็นรหัสผู้ใช้ที่ใช้ในระหว่างการกำหนดค่าการเขียนโปรแกรม Hadoop ของคุณ)

su - hduser_

โปรแกรม Hadoop MapReduce ตัวแรก

ขั้นตอน 1)

สร้างไดเรกทอรีใหม่ด้วยชื่อ แผนที่ReduceTutorial ดังแสดงในตัวอย่าง MapReduce ด้านล่าง

sudo mkdir MapReduceTutorial

โปรแกรม Hadoop MapReduce ตัวแรก

ให้สิทธิ์

sudo chmod -R 777 MapReduceTutorial

โปรแกรม Hadoop MapReduce ตัวแรก

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

ฝ่ายขาย CountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ดาวน์โหลดไฟล์ที่นี่

โปรแกรม Hadoop MapReduce ตัวแรก

ตรวจสอบการอนุญาตไฟล์ของไฟล์เหล่านี้ทั้งหมด

โปรแกรม Hadoop MapReduce ตัวแรก

และหากไม่มีการอนุญาต 'อ่าน' ให้อนุญาตแบบเดียวกัน

โปรแกรม Hadoop MapReduce ตัวแรก

ขั้นตอน 2)

ส่งออก classpath ดังแสดงในตัวอย่าง Hadoop ด้านล่าง

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

โปรแกรม Hadoop MapReduce ตัวแรก

ขั้นตอน 3)

คอมไพล์ Java ไฟล์ (ไฟล์เหล่านี้มีอยู่ในไดเรกทอรี Final-MapReduceHandsOn- ไฟล์คลาสของมันจะถูกใส่ไว้ในไดเร็กทอรีแพ็คเกจ

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

โปรแกรม Hadoop MapReduce ตัวแรก

คำเตือนนี้สามารถละเว้นได้อย่างปลอดภัย

การคอมไพล์นี้จะสร้างไดเร็กทอรีในไดเร็กทอรีปัจจุบันที่มีชื่อแพ็คเกจที่ระบุในไฟล์ต้นฉบับ Java (เช่น ขายประเทศ ในกรณีของเรา) และใส่ไฟล์คลาสที่คอมไพล์แล้วทั้งหมดลงไป

โปรแกรม Hadoop MapReduce ตัวแรก

ขั้นตอน 4)

สร้างไฟล์ใหม่ รายการ.txt

sudo gedit Manifest.txt

เพิ่มบรรทัดต่อไปนี้ลงไป

Main-Class: SalesCountry.SalesCountryDriver

โปรแกรม Hadoop MapReduce ตัวแรก

SalesCountry.SalesCountryDriver เป็นชื่อของคลาสหลัก โปรดทราบว่าคุณต้องกดปุ่ม Enter ที่ท้ายบรรทัดนี้

ขั้นตอน 5)

สร้างไฟล์ Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

โปรแกรม Hadoop MapReduce ตัวแรก

ตรวจสอบว่าไฟล์ jar ถูกสร้างขึ้น

โปรแกรม Hadoop MapReduce ตัวแรก

ขั้นตอน 6)

เริ่ม Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

ขั้นตอน 7)

คัดลอกไฟล์ ยอดขายมกราคม 2009.csv เข้าไป ~/inputMapReduce

ตอนนี้ใช้คำสั่งด้านล่างเพื่อคัดลอก ~/inputMapReduce ถึง HDFS

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

โปรแกรม Hadoop MapReduce ตัวแรก

เราสามารถเพิกเฉยต่อคำเตือนนี้ได้อย่างปลอดภัย

ตรวจสอบว่าไฟล์ถูกคัดลอกจริงหรือไม่

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

โปรแกรม Hadoop MapReduce ตัวแรก

ขั้นตอน 8)

เรียกใช้งาน MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

โปรแกรม Hadoop MapReduce ตัวแรก

สิ่งนี้จะสร้างไดเร็กทอรีเอาต์พุตชื่อ mapreduce_output_sales on เอชดีเอฟเอส- เนื้อหาของไดเร็กทอรีนี้จะเป็นไฟล์ที่มียอดขายผลิตภัณฑ์ตามประเทศ

ขั้นตอน 9)

ผลลัพธ์สามารถเห็นได้ผ่านอินเทอร์เฟซคำสั่งเป็น

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

โปรแกรม Hadoop MapReduce ตัวแรก

สามารถดูผลลัพธ์ได้ผ่านทางเว็บอินเตอร์เฟสเช่นกัน

เปิด r ในเว็บเบราว์เซอร์

โปรแกรม Hadoop MapReduce ตัวแรก

ตอนนี้เลือก 'เรียกดูระบบไฟล์' และไปที่ /mapreduce_output_sales

โปรแกรม Hadoop MapReduce ตัวแรก

จุดเปิด ส่วน-r-00000

โปรแกรม Hadoop MapReduce ตัวแรก

คำอธิบายของคลาส SalesMapper

ในส่วนนี้เราจะมาทำความเข้าใจถึงการดำเนินการของ SalesMapper ชั้นเรียน

1. เราเริ่มต้นด้วยการระบุชื่อแพ็คเกจสำหรับคลาสของเรา ขายประเทศ เป็นชื่อแพ็คเกจของเรา โปรดทราบว่าผลลัพธ์ของการรวบรวม SalesMapper.คลาส จะเข้าไปในไดเร็กทอรีที่ตั้งชื่อตามชื่อแพ็กเกจนี้: ขายประเทศ.

ตามด้วยสิ่งนี้ เรานำเข้าแพ็คเกจไลบรารี

สแน็ปช็อตด้านล่างแสดงการใช้งานของ SalesMapper ชั้น -

คำอธิบายคลาส SalesMapper

คำอธิบายโค้ดตัวอย่าง:

1. คำจำกัดความของคลาส SalesMapper-

SalesMapper คลาสสาธารณะขยาย MapReduceBase ใช้งาน Mapper -

ทุกคลาส mapper จะต้องขยายจาก แผนที่ReduceBase และจะต้องนำไปปฏิบัติ ผู้ทำแผนที่ อินเตอร์เฟซ.

2. การกำหนดฟังก์ชัน 'แผนที่'-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

ส่วนหลักของคลาส Mapper คือ 'แผนที่()' วิธีการที่ยอมรับสี่ข้อโต้แย้ง

ทุกครั้งที่โทรมา 'แผนที่()' วิธีการก คีย์ - ค่า คู่ ('สำคัญ' และ 'ค่า' ในรหัสนี้) ผ่านแล้ว

'แผนที่()' วิธีการเริ่มต้นด้วยการแยกข้อความอินพุตที่ได้รับเป็นอาร์กิวเมนต์ ใช้ tokenizer เพื่อแยกบรรทัดเหล่านี้เป็นคำ

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

ที่นี่ '' ถูกใช้เป็นตัวคั่น

หลังจากนี้ คู่จะถูกสร้างขึ้นโดยใช้บันทึกที่ดัชนีอาร์เรย์ที่ 7 'ข้อมูลประเทศเดียว' และค่า '1'.

output.collect (ข้อความใหม่ (SingleCountryData [7]) หนึ่ง);

เรากำลังเลือกบันทึกที่ดัชนีที่ 7 เพราะเราต้องการ ประเทศ data และอยู่ที่ดัชนีที่ 7 ในอาร์เรย์ 'ข้อมูลประเทศเดียว'.

โปรดทราบว่าข้อมูลที่ป้อนของเราอยู่ในรูปแบบด้านล่าง (โดยที่ ประเทศ อยู่ที่ 7th ดัชนี โดยมี 0 เป็นดัชนีเริ่มต้น)-

Transaction_date,สินค้า,ราคา,Payment_Type,ชื่อ,เมือง,รัฐ,ประเทศ,สร้างบัญชี,Last_Login,ละติจูด,ลองจิจูด

ผลลัพธ์ของ mapper นั้นเป็น a อีกครั้ง คีย์ - ค่า คู่ที่ส่งออกโดยใช้ 'เก็บรวบรวม()' วิธีการของ 'นักสะสมเอาท์พุท'.

คำอธิบายของคลาส SalesCountryReducer

ในส่วนนี้เราจะมาทำความเข้าใจถึงการดำเนินการของ การขายประเทศลด ชั้นเรียน

1. เราเริ่มต้นด้วยการระบุชื่อแพ็คเกจสำหรับคลาสของเรา ขายประเทศ เป็นชื่อของแพ็คเกจออก โปรดทราบว่าผลลัพธ์ของการรวบรวม SalesCountryReducer.คลาส จะเข้าไปในไดเร็กทอรีที่ตั้งชื่อตามชื่อแพ็กเกจนี้: ขายประเทศ.

ตามด้วยสิ่งนี้ เรานำเข้าแพ็คเกจไลบรารี

สแน็ปช็อตด้านล่างแสดงการใช้งานของ การขายประเทศลด ชั้น -

คำอธิบายคลาส SalesCountryReducer

คำอธิบายรหัส:

1. คำจำกัดความของคลาส SalesCountryReducer-

SalesCountryReducer ระดับสาธารณะขยาย MapReduceBase ดำเนินการลด -

ที่นี่ สองประเภทข้อมูลแรก 'ข้อความ' และ 'เขียนไม่ได้' เป็นชนิดข้อมูลของคีย์-ค่าอินพุตไปยังตัวลด

ผลลัพธ์ของ mapper จะอยู่ในรูปแบบของ - - เอาต์พุตของ mapper นี้จะกลายเป็นอินพุตไปยังตัวลด เพื่อให้สอดคล้องกับประเภทข้อมูล ข้อความ และ ไม่สามารถเขียนได้ ถูกใช้เป็นประเภทข้อมูลที่นี่

สองประเภทข้อมูลสุดท้าย 'ข้อความ' และ 'IntWritable' เป็นประเภทข้อมูลของเอาต์พุตที่สร้างโดยตัวลดในรูปแบบของคู่คีย์-ค่า

ทุกคลาสตัวลดจะต้องขยายจาก แผนที่ReduceBase และจะต้องนำไปปฏิบัติ ลด อินเตอร์เฟซ.

2. การกำหนดฟังก์ชัน 'ลด'-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

ข้อมูลเข้าไปยัง ลด() method เป็นคีย์ที่มีรายการค่าหลายค่า

ตัวอย่างเช่น ในกรณีของเรา มันจะเป็น-

- - - - - -

สิ่งนี้มอบให้กับตัวลดเป็น

ดังนั้น เพื่อยอมรับข้อโต้แย้งในรูปแบบนี้ จึงใช้ประเภทข้อมูลสองชนิดแรก ดังนี้ ข้อความ และ ตัววนซ้ำ. ข้อความ เป็นประเภทข้อมูลของคีย์และ ตัววนซ้ำ เป็นชนิดข้อมูลสำหรับรายการค่าสำหรับคีย์นั้น

อาร์กิวเมนต์ถัดไปเป็นประเภท OutputCollector ซึ่งรวบรวมเอาต์พุตของเฟสรีดิวเซอร์

ลด() วิธีการเริ่มต้นด้วยการคัดลอกค่าคีย์และเริ่มต้นการนับความถี่เป็น 0

ปุ่มข้อความ = t_key;
ความถี่ intForCountry = 0;

จากนั้นใช้ 'ในขณะที่' วนซ้ำ เราจะวนซ้ำรายการค่าที่เกี่ยวข้องกับคีย์และคำนวณความถี่สุดท้ายโดยการรวมค่าทั้งหมด

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

ตอนนี้เราส่งผลลัพธ์ไปยังตัวรวบรวมเอาต์พุตในรูปแบบของ สำคัญ และได้รับ นับความถี่.

รหัสด้านล่างทำสิ่งนี้ -

output.collect(key, new IntWritable(frequencyForCountry));

คำอธิบายของคลาส SalesCountryDriver

ในส่วนนี้เราจะมาทำความเข้าใจถึงการดำเนินการของ พนักงานขายประจำประเทศ ชั้น

1. เราเริ่มต้นด้วยการระบุชื่อแพ็คเกจสำหรับคลาสของเรา ขายประเทศ เป็นชื่อของแพ็คเกจออก โปรดทราบว่าผลลัพธ์ของการรวบรวม SalesCountryDriver.คลาส จะเข้าสู่ไดเร็กทอรีที่ตั้งชื่อตามชื่อแพ็คเกจนี้: ขายประเทศ.

นี่คือบรรทัดที่ระบุชื่อแพ็คเกจตามด้วยรหัสเพื่อนำเข้าแพ็คเกจไลบรารี

คำอธิบายคลาส SalesCountryDriver

2. กำหนดคลาสไดรเวอร์ซึ่งจะสร้างงานไคลเอนต์ใหม่ ออบเจ็กต์การกำหนดค่า และโฆษณาคลาส Mapper และ ลด

คลาสไดรเวอร์มีหน้าที่ตั้งค่างาน MapReduce ของเราให้ทำงาน Hadoop- ในคลาสนี้เราระบุ ชื่องาน ประเภทข้อมูลของอินพุต/เอาท์พุต และชื่อของคลาสผู้ทำแผนที่และคลาสตัวลด.

คำอธิบายคลาส SalesCountryDriver

3. ในโค้ดด้านล่าง เราได้ตั้งค่าไดเร็กทอรีอินพุตและเอาต์พุตซึ่งใช้เพื่อใช้ชุดข้อมูลอินพุตและสร้างเอาต์พุตตามลำดับ

หาเรื่อง[0] และ หาเรื่อง[1] เป็นอาร์กิวเมนต์บรรทัดคำสั่งที่ส่งผ่านด้วยคำสั่งที่ให้ไว้ใน MapReduce เชิงปฏิบัติ เช่น

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

คำอธิบายคลาส SalesCountryDriver

4. กระตุ้นงานของเรา

โค้ดด้านล่างเริ่มการทำงานของงาน MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}