Hadoop 和 Mapreduce 示例:创建第一个程序 Java

在本教程中,您将学习如何使用 Hadoop 和 MapReduce 示例。使用的输入数据是 2009 年 XNUMX 月销售额.csv。它包含销售相关信息,如产品名称、价格、付款方式、城市、客户国家等。目标是 找出每个国家销售的产品数量。

第一个 Hadoop MapReduce 程序

现在在这个 MapReduce 教程,我们将创建我们的第一个 Java MapReduce 程序:

第一个 Hadoop MapReduce 程序

2009年XNUMX月销售数据

确保已安装 Hadoop。在开始实际操作之前,请将用户更改为“hduser”(Hadoop 配置时使用的 ID,您可以切换到 Hadoop 编程配置期间使用的用户 ID)。

su - hduser_

第一个 Hadoop MapReduce 程序

步骤1)

创建一个名为的新目录 MapReduce教程 如下面的 MapReduce 示例所示

sudo mkdir MapReduceTutorial

第一个 Hadoop MapReduce 程序

授予权限

sudo chmod -R 777 MapReduceTutorial

第一个 Hadoop MapReduce 程序

销售映射器.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

销售国家/地区Reducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

销售国家驱动程序.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在此处下载文件

第一个 Hadoop MapReduce 程序

检查所有这些文件的文件权限

第一个 Hadoop MapReduce 程序

如果缺少“读取”权限,则授予相同的权限-

第一个 Hadoop MapReduce 程序

步骤2)

导出类路径,如下面的 Hadoop 示例所示

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

第一个 Hadoop MapReduce 程序

步骤3)

汇编 Java 文件(这些文件存在于目录中 最终版-MapReduce 实践)。其类文件将放在包目录中

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

第一个 Hadoop MapReduce 程序

您可以放心地忽略此警告。

此次编译将在当前目录中创建一个目录,该目录以 java 源文件中指定的包名命名(即 销售国家 在我们的例子中),并将所有已编译的类文件放入其中。

第一个 Hadoop MapReduce 程序

步骤4)

创建一个新文件 清单.txt

sudo gedit Manifest.txt

添加以下几行,

Main-Class: SalesCountry.SalesCountryDriver

第一个 Hadoop MapReduce 程序

销售国家.销售国家驱动程序 是主类的名称。请注意,您必须在此行末尾按回车键。

步骤5)

创建 Jar 文件

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

第一个 Hadoop MapReduce 程序

检查 jar 文件是否已创建

第一个 Hadoop MapReduce 程序

步骤6)

启动 Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

步骤7)

复制文件 2009 年 XNUMX 月销售额.csv〜/ inputMapReduce

现在使用下面的命令复制 〜/ inputMapReduce 到 HDFS。

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

第一个 Hadoop MapReduce 程序

我们可以安全地忽略这个警告。

验证文件是否确实被复制。

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

第一个 Hadoop MapReduce 程序

步骤8)

运行 MapReduce 作业

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

第一个 Hadoop MapReduce 程序

这将在上创建一个名为 mapreduce_output_sales 的输出目录 高密度文件系统。此目录的内容将是一个包含每个国家/地区产品销售情况的文件。

步骤9)

通过命令界面可以看到结果如下:

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

第一个 Hadoop MapReduce 程序

也可以通过 Web 界面查看结果:

在网络浏览器中打开 r。

第一个 Hadoop MapReduce 程序

现在选择 “浏览文件系统” 并导航到 /mapreduce_output_sales

第一个 Hadoop MapReduce 程序

可选 部分-r-00000

第一个 Hadoop MapReduce 程序

SalesMapper 类的解释

在本节中,我们将了解 销售地图 类。

1. 我们首先为我们的类指定一个包的名称。 销售国家 是我们的包的名称。请注意编译的输出, SalesMapper.类 将进入以此包名称命名的目录: 销售国家.

接下来,我们导入库包。

下面的快照显示了 销售地图 班级-

SalesMapper 类说明

示例代码说明:

1.SalesMapper 类定义-

公共类 SalesMapper 扩展了 MapReduceBase 实现 Mapper {

每个映射器类都必须从 MapReduce基础 类,并且它必须实现 映射器 界面。

2. 定义“map”函数-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

Mapper 类的主要部分是 '地图()' 接受四个参数的方法。

每次调用 '地图()' 方法,一个 核心价值 对 ('钥匙''价值' 在此代码中)被传递。

'地图()' 方法首先拆分作为参数接收的输入文本。它使用标记器将这些行拆分为单词。

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

在这里, '' 用作分隔符。

此后,使用数组第 7 个索引处的记录形成一对 '单一国家数据' 和一个值 “1”.

输出.收集(新文本(SingleCountryData[7]),一);

我们选择第 7 个索引处的记录,因为我们需要 国家 数据,位于数组的第 7 个索引处 '单一国家数据'.

请注意,我们的输入数据采用以下格式(其中 国家 在7点th 索引,以 0 为起始索引)-

交易日期、产品、价格、付款类型、名称、城市、州、国家,帐户创建时间,上次登录,纬度,经度

mapper 的输出又是 核心价值 对,使用输出 '收集()' 的方法 ‘输出收集器’.

SalesCountryReducer 类说明

在本节中,我们将了解 销售国家/地区减少器 类。

1. 我们首先为我们的类指定一个包的名称。 销售国家 是我们的包名。请注意编译的输出, SalesCountryReducer.类 将进入以此包名称命名的目录: 销售国家.

接下来,我们导入库包。

下面的快照显示了 销售国家/地区减少器 班级-

SalesCountryReducer 类说明

代码说明:

1. SalesCountryReducer 类定义-

公共类 SalesCountryReducer 扩展了 MapReduceBase 实现 Reducer {

这里,前两种数据类型, '文本'‘可写入’ 是输入到 Reducer 的键值数据类型。

映射器的输出形式为, 。mapper 的输出将成为 Reducer 的输入。因此,为了与其数据类型保持一致, 文本可写入 在这里用作数据类型。

最后两种数据类型“Text”和“IntWritable”是reducer以键值对的形式生成的输出的数据类型。

每个 Reducer 类都必须从 MapReduce基础 类,并且它必须实现 减速器 界面。

2. 定义“reduce”函数

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

输入 降低() 方法是一个带有多个值列表的键。

例如,在我们的例子中,它将是-

, , , , , 。

这给 Reducer 的格式为

因此,要接受这种形式的参数,首先要使用两种数据类型,即, 文本迭代器. 文本 是键的数据类型,并且 迭代器 是该键的值列表的数据类型。

下一个参数是类型 输出收集器 收集 Reducer 阶段的输出。

降低() 方法首先复制键值,并将频率计数初始化为 0。

文本键 = t_key;
int 国家频率 = 0;

然后,使用'尽管' 循环,我们遍历与键关联的值列表,并通过对所有值求和来计算最终频率。

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

现在,我们将结果以以下形式推送到输出收集器 并获得 频率计数.

下面的代码可以实现这个功能-

output.collect(key, new IntWritable(frequencyForCountry));

SalesCountryDriver 类说明

在本节中,我们将了解 销售国家司机

1. 我们首先为我们的类指定一个包的名称。 销售国家 是我们的包名。请注意编译的输出, SalesCountryDriver.类 将进入以此包名称命名的目录: 销售国家.

这是指定包名称的一行,后跟用于导入库包的代码。

SalesCountryDriver 类说明

2. 定义一个驱动程序类,它将创建一个新的客户端作业、配置对象并公布 Mapper 和 Reducer 类。

驱动程序类负责设置我们的 MapReduce 作业在 Hadoop的. 在本课程中,我们指定 作业名称、输入/输出的数据类型以及 mapper 和 Reducer 类的名称.

SalesCountryDriver 类说明

3. 在下面的代码片段中,我们设置了输入和输出目录,分别用于使用输入数据集和产生输出。

参数[0]参数[1] 是 MapReduce 实践中给出的命令传递的命令行参数,即

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

SalesCountryDriver 类说明

4. 触发我们的工作

下面的代码开始执行 MapReduce 作业-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}

总结一下这篇文章: