أمثلة على Hadoop وMapreduce: إنشاء البرنامج الأول في Java

في هذا البرنامج التعليمي، سوف تتعلم كيفية استخدام Hadoop مع أمثلة MapReduce. بيانات الإدخال المستخدمة هي SalesJan2009.csv. أنه يحتوي على معلومات متعلقة بالمبيعات مثل اسم المنتج والسعر وطريقة الدفع والمدينة وبلد العميل وما إلى ذلك تعرف على عدد المنتجات المباعة في كل دولة.

أول برنامج Hadoop MapReduce

الآن في هذا البرنامج التعليمي MapReduce، سوف نقوم بإنشاء أول لدينا Java برنامج MapReduce:

أول برنامج Hadoop MapReduce

بيانات المبيعاتJan2009

تأكد من تثبيت Hadoop. قبل البدء بالعملية الفعلية، قم بتغيير المستخدم إلى "hduser" (المعرف المستخدم أثناء تكوين Hadoop، يمكنك التبديل إلى معرف المستخدم المستخدم أثناء تكوين برمجة Hadoop).

su - hduser_

أول برنامج Hadoop MapReduce

الخطوة 1)

قم بإنشاء دليل جديد بالاسم MapReduceTutorial كما هو موضح في مثال MapReduce أدناه

sudo mkdir MapReduceTutorial

أول برنامج Hadoop MapReduce

امنح الأذونات

sudo chmod -R 777 MapReduceTutorial

أول برنامج Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

تحميل الملفات هنا

أول برنامج Hadoop MapReduce

تحقق من أذونات الملف لجميع هذه الملفات

أول برنامج Hadoop MapReduce

وإذا كانت أذونات "القراءة" مفقودة، فامنحها نفس-

أول برنامج Hadoop MapReduce

الخطوة 2)

تصدير classpath كما هو موضح في مثال Hadoop أدناه

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

أول برنامج Hadoop MapReduce

الخطوة 3)

جمع Java الملفات (هذه الملفات موجودة في الدليل Final-MapReduceHandsOn). سيتم وضع ملفات فئتها في دليل الحزمة

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

أول برنامج Hadoop MapReduce

يمكن تجاهل هذا التحذير بأمان.

سيقوم هذا التجميع بإنشاء دليل في الدليل الحالي المسمى باسم الحزمة المحدد في ملف جافا المصدر (أي بلد المبيعات في حالتنا) ووضع جميع ملفات الفصل المترجمة فيه.

أول برنامج Hadoop MapReduce

الخطوة 4)

قم بإنشاء ملف جديد البيان.txt

sudo gedit Manifest.txt

أضف إليه الأسطر التالية،

Main-Class: SalesCountry.SalesCountryDriver

أول برنامج Hadoop MapReduce

SalesCountry.SalesCountryDriver هو اسم الطبقة الرئيسية. يرجى ملاحظة أنه يجب عليك الضغط على مفتاح الإدخال في نهاية هذا السطر.

الخطوة 5)

قم بإنشاء ملف Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

أول برنامج Hadoop MapReduce

تأكد من إنشاء ملف الجرة

أول برنامج Hadoop MapReduce

الخطوة 6)

ابدأ هادوب

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

الخطوة 7)

انسخ الملف SalesJan2009.csv إلى ~/inputMapReduce

الآن استخدم الأمر أدناه للنسخ ~/inputMapReduce إلى HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

أول برنامج Hadoop MapReduce

يمكننا تجاهل هذا التحذير بأمان.

التحقق مما إذا كان الملف قد تم نسخه بالفعل أم لا.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

أول برنامج Hadoop MapReduce

الخطوة 8)

قم بتشغيل مهمة MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

أول برنامج Hadoop MapReduce

سيؤدي هذا إلى إنشاء دليل إخراج باسم Mapreduce_output_sales on HDFS. ستكون محتويات هذا الدليل عبارة عن ملف يحتوي على مبيعات المنتجات لكل بلد.

الخطوة 9)

يمكن رؤية النتيجة من خلال واجهة الأوامر كما يلي:

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

أول برنامج Hadoop MapReduce

يمكن أيضًا رؤية النتائج عبر واجهة الويب كما يلي:

افتح r في متصفح الويب.

أول برنامج Hadoop MapReduce

الآن حدد "تصفح نظام الملفات" وانتقل إلى /mapreduce_output_sales

أول برنامج Hadoop MapReduce

ساعات العمل جزء-ص-00000

أول برنامج Hadoop MapReduce

شرح فئة SalesMapper

في هذا القسم سوف نفهم التنفيذ SalesMapper فئة.

1. نبدأ بتحديد اسم الحزمة لفصلنا. بلد المبيعات هو اسم الحزمة لدينا. يرجى ملاحظة أن إخراج التجميع، SalesMapper.class سينتقل إلى دليل يسمى باسم الحزمة هذا: بلد المبيعات.

وبعد ذلك، نقوم باستيراد حزم المكتبة.

تظهر اللقطة أدناه تنفيذًا لـ SalesMapper صف دراسي-

شرح فئة SalesMapper

شرح نموذج التعليمات البرمجية:

1. تعريف فئة SalesMapper-

يقوم SalesMapper من الفئة العامة بتوسيع MapReduceBase وتنفيذ Mapper {

يجب أن يتم تمديد كل فئة مصمم خرائط من MapReduceBase الطبقة ويجب تنفيذها مخطط واجهة.

2. تحديد وظيفة "الخريطة"-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

الجزء الرئيسي من فئة Mapper هو أ 'خريطة()' الطريقة التي تقبل أربع وسائط.

في كل مكالمة ل 'خريطة()' الطريقة، أ الرئيسية ذات القيمة زوج ('مفتاح' 'قيمة' في هذا الرمز) تم تمريره.

'خريطة()' تبدأ الطريقة بتقسيم نص الإدخال الذي يتم تلقيه كوسيطة. يستخدم الرمز المميز لتقسيم هذه السطور إلى كلمات.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

هنا، '،' يستخدم كمحدد.

بعد ذلك، يتم تشكيل زوج باستخدام سجل في الفهرس السابع للصفيف "بيانات بلد واحد" وقيمة "1".

output.collect(new Text(SingleCountryData[7]), one);

نحن نختار السجل في الفهرس السابع لأننا نحتاج الدولة data وهي موجودة في الفهرس السابع في الصفيف "بيانات بلد واحد".

يرجى ملاحظة أن بيانات الإدخال الخاصة بنا موجودة بالتنسيق أدناه (حيث الدولة الساعة 7th الفهرس، مع 0 كمؤشر بداية)-

تاريخ_المعاملة، المنتج، السعر، نوع_الدفع، الاسم، المدينة، الولاية،الدولة، تم إنشاء الحساب، آخر تسجيل دخول، خط العرض، خط الطول

إخراج مخطط الخرائط هو مرة أخرى أ الرئيسية ذات القيمة الزوج الذي يتم إخراجه باستخدام 'يجمع()' طريقة "مجمع الإخراج".

شرح فئة SalesCountryReducer

في هذا القسم سوف نفهم التنفيذ SalesCountryReducer فئة.

1. نبدأ بتحديد اسم الحزمة لفصلنا. بلد المبيعات هو اسم الحزمة خارج. يرجى ملاحظة أن إخراج التجميع، SalesCountryReducer.class سينتقل إلى دليل يسمى باسم الحزمة هذا: بلد المبيعات.

وبعد ذلك، نقوم باستيراد حزم المكتبة.

تظهر اللقطة أدناه تنفيذًا لـ SalesCountryReducer صف دراسي-

شرح فئة SalesCountryReducer

شرح الكود:

1. تعريف فئة SalesCountryReducer-

تعمل الفئة العامة SalesCountryReducer على توسيع MapReduceBase وتنفيذ المخفض {

هنا، أول نوعين من البيانات، 'نص' "غير قابل للكتابة" هي نوع البيانات من قيمة مفتاح الإدخال إلى المخفض.

إخراج مخطط الخرائط هو في شكل , . يصبح هذا الإخراج من مصمم الخرائط مدخلاً إلى المخفض. لذلك، للتوافق مع نوع البيانات الخاص به، نص غير قابل للكتابة يتم استخدامها كنوع البيانات هنا.

آخر نوعين من البيانات، "نص" و"IntWritable"، هما نوع بيانات من المخرجات التي تم إنشاؤها بواسطة المخفض في شكل زوج من القيمة الرئيسية.

يجب أن تمتد كل فئة المخفض من MapReduceBase الطبقة ويجب تنفيذها المخفض واجهة.

2. تحديد وظيفة "التقليل"-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

مدخل إلى خفض() الطريقة هي مفتاح يحتوي على قائمة من القيم المتعددة.

على سبيل المثال، في حالتنا، سيكون -

, , , , , .

ويعطى هذا للمخفض كما

لذلك، لقبول حجج من هذا النموذج، يتم استخدام أول نوعين من البيانات، وهما، نص مكرر. نص هو نوع بيانات المفتاح و مكرر هو نوع بيانات لقائمة القيم لهذا المفتاح.

الوسيطة التالية هي من النوع جامع الإخراج الذي يجمع مخرجات مرحلة المخفض.

خفض() تبدأ الطريقة بنسخ قيمة المفتاح وتهيئة عدد التكرارات إلى 0.

مفتاح النص = t_key؛
تردد intForCountry = 0;

ثم باستخدام "بينما' حلقة، نكرر قائمة القيم المرتبطة بالمفتاح ونحسب التردد النهائي من خلال جمع كل القيم.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

الآن، ندفع النتيجة إلى جامع الإخراج في شكل مفتاح وحصلت عليها عدد الترددات.

الكود أدناه يفعل هذا-

output.collect(key, new IntWritable(frequencyForCountry));

شرح فئة SalesCountryDriver

في هذا القسم سوف نفهم التنفيذ SalesCountryDriver فئة

1. نبدأ بتحديد اسم الحزمة لفصلنا. بلد المبيعات هو اسم الحزمة خارج. يرجى ملاحظة أن إخراج التجميع، SalesCountryDriver.class سينتقل إلى الدليل المسمى باسم الحزمة هذا: بلد المبيعات.

فيما يلي سطر يحدد اسم الحزمة متبوعًا برمز لاستيراد حزم المكتبة.

شرح فئة SalesCountryDriver

2. حدد فئة برنامج التشغيل التي ستقوم بإنشاء مهمة عميل جديدة وكائن التكوين والإعلان عن فئات Mapper وReducer.

فئة برنامج التشغيل مسؤولة عن إعداد وظيفة MapReduce الخاصة بنا ليتم تشغيلها Hadoop. في هذه الفئة نحدد اسم الوظيفة ونوع بيانات الإدخال/الإخراج وأسماء فئات مصمم الخرائط والمخفض.

شرح فئة SalesCountryDriver

3. في مقتطف الكود أدناه، قمنا بتعيين أدلة الإدخال والإخراج التي يتم استخدامها لاستهلاك مجموعة بيانات الإدخال وإنتاج المخرجات، على التوالي.

حج [0] حج [1] هي وسيطات سطر الأوامر التي تم تمريرها باستخدام أمر محدد في التدريب العملي على MapReduce، على سبيل المثال،

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

شرح فئة SalesCountryDriver

4. تفعيل مهمتنا

أدناه الكود يبدأ تنفيذ وظيفة MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}