HadoopとMapreduceの例: 最初のプログラムを作成する Java

このチュートリアルでは、MapReduce の例で Hadoop の使用方法を学習します。 使用される入力データは、 SalesJan2009.csv。 これには、製品名、価格、支払い方法、都市、顧客の国などの販売関連情報が含まれています。目標は、 各国で販売された製品の数を調べます。

最初の Hadoop MapReduce プログラム

今これで MapReduce チュートリアル、私たちは最初の Java MapReduce プログラム:

最初の Hadoop MapReduce プログラム

2009年XNUMX月の売上データ

Hadoop がインストールされていることを確認してください。 実際のプロセスを開始する前に、ユーザーを「hduser」に変更します (ID は Hadoop 構成中に使用されます。Hadoop プログラミング構成中に使用されるユーザー ID に切り替えることができます)。

su - hduser_

最初の Hadoop MapReduce プログラム

ステップ1)

という名前で新しいディレクトリを作成します MapReduceチュートリアル 以下の MapReduce の例に示されているように

sudo mkdir MapReduceTutorial

最初の Hadoop MapReduce プログラム

権限を与える

sudo chmod -R 777 MapReduceTutorial

最初の Hadoop MapReduce プログラム

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

Sales CountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

Sales CountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ここからファイルをダウンロード

最初の Hadoop MapReduce プログラム

これらすべてのファイルのファイル権限を確認してください

最初の Hadoop MapReduce プログラム

「読み取り」権限がない場合は、同じ権限を付与します。

最初の Hadoop MapReduce プログラム

ステップ2)

以下の Hadoop の例に示すようにクラスパスをエクスポートします。

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

最初の Hadoop MapReduce プログラム

ステップ3)

コンパイル Java ファイル (これらのファイルはディレクトリに存在します) Final-MapReduceHandsOn)。 そのクラスファイルはパッケージディレクトリに置かれます

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

最初の Hadoop MapReduce プログラム

この警告は無視しても問題ありません。

このコンパイルにより、Java ソース ファイルで指定されたパッケージ名と同じ名前のディレクトリが現在のディレクトリに作成されます (つまり、 販売国 私たちの場合)、コンパイルされたすべてのクラスファイルをその中に置きます。

最初の Hadoop MapReduce プログラム

ステップ4)

新しいファイルを作成する マニフェスト.txt

sudo gedit Manifest.txt

次の行を追加します。

Main-Class: SalesCountry.SalesCountryDriver

最初の Hadoop MapReduce プログラム

Sales Country.Sales CountryDriver メインクラスの名前です。 この行の最後で Enter キーを押す必要があることに注意してください。

ステップ5)

Jarファイルを作成する

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

最初の Hadoop MapReduce プログラム

jarファイルが作成されたことを確認する

最初の Hadoop MapReduce プログラム

ステップ6)

Hadoopを開始する

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

ステップ7)

ファイルをコピーする SalesJan2009.csv~/inputMapReduce

次のコマンドを使用してコピーします ~/inputMapReduce HDFSに。

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

最初の Hadoop MapReduce プログラム

この警告は無視しても問題ありません。

実際にファイルがコピーされたかどうかを確認します。

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

最初の Hadoop MapReduce プログラム

ステップ8)

MapReduce ジョブを実行する

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

最初の Hadoop MapReduce プログラム

これにより、mapreduce_output_sales という名前の出力ディレクトリが作成されます。 HDFS。 このディレクトリの内容は、国別の製品売上高を含むファイルになります。

ステップ9)

結果はコマンド インターフェイスから次のように確認できます。

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

最初の Hadoop MapReduce プログラム

結果は Web インターフェイス経由でも確認できます。

Web ブラウザで r を開きます。

最初の Hadoop MapReduce プログラム

今すぐ選択 「ファイルシステムを参照」 に移動して /mapreduce_output_sales

最初の Hadoop MapReduce プログラム

Open パート-r-00000

最初の Hadoop MapReduce プログラム

SalesMapperクラスの説明

このセクションでは、次の実装について理解します。 セールスマッパー とに提供されます。

1. まず、クラスのパッケージ名を指定します。 販売国 はパッケージの名前です。 コンパイルの出力に注意してください。 SalesMapper.class このパッケージ名で指定されたディレクトリに移動します。 販売国.

続いて、ライブラリ パッケージをインポートします。

以下のスナップショットは、 セールスマッパー クラス-

SalesMapperクラスの説明

サンプルコードの説明:

1. SalesMapper クラスの定義 -

パブリック クラス SalesMapper は MapReduceBase を拡張し、Mapper を実装します。 {

すべてのマッパー クラスは以下から拡張する必要があります。 MapReduceBase クラスを実装する必要があります マッパー インタフェース。

2.「マップ」関数の定義 -

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Mapper クラスの主要部分は次のとおりです。 '地図()' XNUMX つの引数を受け取るメソッド。

電話をかけるたびに '地図()' 方法、 Key-Value ペア('鍵''価値' このコードでは) が渡されます。

'地図()' メソッドは、引数として受け取った入力テキストを分割することから始まります。 トークナイザーを使用してこれらの行を単語に分割します。

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

ここでは、 「、」 区切り文字として使用されます。

この後、配列の 7 番目のインデックスにあるレコードを使用してペアが形成されます '単一国データ' と値 '1'.

Output.collect(new Text(Single CountryData[7]), one);

必要なため、7 番目のインデックスのレコードを選択しています。 Country データであり、配列の 7 番目のインデックスに位置します。 '単一国データ'.

入力データは以下の形式であることに注意してください (ここで、 Country 7時ですth インデックス、開始インデックスは 0)-

取引日、製品、価格、支払いタイプ、名前、都市、州、Country,Account_Created,Last_Login,緯度,経度

マッパーの出力もまた次のようになります。 Key-Value を使用して出力されるペア '集める()' 方法 '出力コレクター'.

Sales CountryReducerクラスの説明

このセクションでは、次の実装について理解します。 Sales CountryReducer とに提供されます。

1. まず、クラスのパッケージの名前を指定します。 販売国 アウトパッケージの名前です。 コンパイルの出力に注意してください。 Sales CountryReducer.class このパッケージ名で指定されたディレクトリに移動します。 販売国.

続いて、ライブラリ パッケージをインポートします。

以下のスナップショットは、 Sales CountryReducer クラス-

Sales CountryReducer クラスの説明

コードの説明:

1. Sales CountryReducer クラスの定義 -

public class Sales CountryReducer extends MapReduceBase 実装 Reducer {

ここで、最初の XNUMX つのデータ型は、 '文章'「書き込み可能」 Reducer への入力 Key-Value のデータ型です。

マッパーの出力は次の形式になります。 、 。 マッパーのこの出力はリデューサーへの入力になります。 したがって、そのデータ型に合わせるために、 テキストIntWritable ここではデータ型として使用されます。

最後の XNUMX つのデータ型「Text」と「IntWritable」は、reducer によってキーと値のペアの形式で生成される出力のデータ型です。

すべてのリデューサー クラスは以下から拡張する必要があります。 MapReduceBase クラスを実装する必要があります レデューサー インタフェース。

2. 「reduce」関数の定義 -

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

への入力 reduce() メソッドは、複数の値のリストを持つキーです。

たとえば、私たちの場合は次のようになります-

、 、 、 、 、 。

これはレデューサに次のように与えられます

したがって、この形式の引数を受け入れるには、まず2つのデータ型が使用されます。 テキストイテレータ. テキスト はキーのデータ型であり、 イテレータ は、そのキーの値のリストのデータ型です。

次の引数の型は 出力コレクター これは減速機フェーズの出力を収集します。

reduce() このメソッドは、キー値をコピーし、頻度カウントを 0 に初期化することから始まります。

テキストキー = t_key;
int 頻度の国 = 0;

次に、「」を使用して、その間' ループでは、キーに関連付けられた値のリストを繰り返し処理し、すべての値を合計することで最終的な頻度を計算します。

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

ここで、結果を次の形式で出力コレクターにプッシュします。 キー そして得た 周波数カウント.

以下のコードはこれを行います-

output.collect(key, new IntWritable(frequencyForCountry));

Sales CountryDriverクラスの説明

このセクションでは、次の実装について理解します。 販売国ドライバー class

1. まず、クラスのパッケージ名を指定します。 販売国 アウトパッケージの名前です。 コンパイルの出力に注意してください。 Sales CountryDriver.class このパッケージ名で指定されたディレクトリに移動します。 販売国.

ここでは、パッケージ名を指定する行と、その後にライブラリ パッケージをインポートするコードが続きます。

Sales CountryDriver クラスの説明

2. 新しいクライアント ジョブ、構成オブジェクトを作成し、Mapper クラスと Reducer クラスをアドバタイズするドライバー クラスを定義します。

ドライバー クラスは、MapReduce ジョブを実行するように設定する責任があります。 Hadoopの。 このクラスでは、次のように指定します。 ジョブ名、入出力のデータ型、マッパークラスとリデューサークラスの名前.

Sales CountryDriver クラスの説明

3. 以下のコード スニペットでは、入力データセットの使用と出力の生成にそれぞれ使用される入力ディレクトリと出力ディレクトリを設定します。

arg [0]arg [1] MapReduce のハンズオンで指定されたコマンドで渡されるコマンドライン引数です。

$HADOOP_HOME/bin/hadoop jar ProductSalePer Country.jar /inputMapReduce /mapreduce_output_sales

Sales CountryDriver クラスの説明

4. ジョブをトリガーする

以下のコードは、MapReduce ジョブの実行を開始します。

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}