Hadoop 和 Mapreduce 示例:创建第一个程序 Java
在本教程中,您将学习如何使用 Hadoop 和 MapReduce 示例。使用的输入数据是 2009 年 XNUMX 月销售额.csv。它包含销售相关信息,如产品名称、价格、付款方式、城市、客户国家等。目标是 找出每个国家销售的产品数量。
第一个 Hadoop MapReduce 程序
现在在这个 MapReduce 教程,我们将创建我们的第一个 Java MapReduce 程序:
确保已安装 Hadoop。在开始实际操作之前,请将用户更改为“hduser”(Hadoop 配置时使用的 ID,您可以切换到 Hadoop 编程配置期间使用的用户 ID)。
su - hduser_
步骤1)
创建一个名为的新目录 MapReduce教程 如下面的 MapReduce 示例所示
sudo mkdir MapReduceTutorial
授予权限
sudo chmod -R 777 MapReduceTutorial
销售映射器.java
package SalesCountry;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {
String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
output.collect(new Text(SingleCountryData[7]), one);
}
}
销售国家/地区Reducer.java
package SalesCountry;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Text key = t_key;
int frequencyForCountry = 0;
while (values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();
}
output.collect(key, new IntWritable(frequencyForCountry));
}
}
销售国家驱动程序.java
package SalesCountry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class SalesCountryDriver {
public static void main(String[] args) {
JobClient my_client = new JobClient();
// Create a configuration object for the job
JobConf job_conf = new JobConf(SalesCountryDriver.class);
// Set a name of the Job
job_conf.setJobName("SalePerCountry");
// Specify data type of output key and value
job_conf.setOutputKeyClass(Text.class);
job_conf.setOutputValueClass(IntWritable.class);
// Specify names of Mapper and Reducer Class
job_conf.setMapperClass(SalesCountry.SalesMapper.class);
job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);
// Specify formats of the data type of Input and output
job_conf.setInputFormat(TextInputFormat.class);
job_conf.setOutputFormat(TextOutputFormat.class);
// Set input and output directories using command line arguments,
//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.
FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));
my_client.setConf(job_conf);
try {
// Run the job
JobClient.runJob(job_conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
检查所有这些文件的文件权限
如果缺少“读取”权限,则授予相同的权限-
步骤2)
导出类路径,如下面的 Hadoop 示例所示
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
步骤3)
汇编 Java 文件(这些文件存在于目录中 最终版-MapReduce 实践)。其类文件将放在包目录中
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
您可以放心地忽略此警告。
此次编译将在当前目录中创建一个目录,该目录以 java 源文件中指定的包名命名(即 销售国家 在我们的例子中),并将所有已编译的类文件放入其中。
步骤4)
创建一个新文件 清单.txt
sudo gedit Manifest.txt
添加以下几行,
Main-Class: SalesCountry.SalesCountryDriver
销售国家.销售国家驱动程序 是主类的名称。请注意,您必须在此行末尾按回车键。
步骤5)
创建 Jar 文件
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
检查 jar 文件是否已创建
步骤6)
启动 Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
步骤7)
复制文件 2009 年 XNUMX 月销售额.csv 成 〜/ inputMapReduce
现在使用下面的命令复制 〜/ inputMapReduce 到 HDFS。
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
我们可以安全地忽略这个警告。
验证文件是否确实被复制。
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
步骤8)
运行 MapReduce 作业
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
这将在上创建一个名为 mapreduce_output_sales 的输出目录 高密度文件系统。此目录的内容将是一个包含每个国家/地区产品销售情况的文件。
步骤9)
通过命令界面可以看到结果如下:
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
也可以通过 Web 界面查看结果:
在网络浏览器中打开 r。
现在选择 “浏览文件系统” 并导航到 /mapreduce_output_sales
可选 部分-r-00000
SalesMapper 类的解释
在本节中,我们将了解 销售地图 类。
1. 我们首先为我们的类指定一个包的名称。 销售国家 是我们的包的名称。请注意编译的输出, SalesMapper.类 将进入以此包名称命名的目录: 销售国家.
接下来,我们导入库包。
下面的快照显示了 销售地图 班级-
示例代码说明:
1.SalesMapper 类定义-
公共类 SalesMapper 扩展了 MapReduceBase 实现 Mapper {
每个映射器类都必须从 MapReduce基础 类,并且它必须实现 映射器 界面。
2. 定义“map”函数-
public void map(LongWritable key,
Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException
Mapper 类的主要部分是 '地图()' 接受四个参数的方法。
每次调用 '地图()' 方法,一个 核心价值 对 ('钥匙' 与 '价值' 在此代码中)被传递。
'地图()' 方法首先拆分作为参数接收的输入文本。它使用标记器将这些行拆分为单词。
String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
在这里, '' 用作分隔符。
此后,使用数组第 7 个索引处的记录形成一对 '单一国家数据' 和一个值 “1”.
输出.收集(新文本(SingleCountryData[7]),一);
我们选择第 7 个索引处的记录,因为我们需要 国家 数据,位于数组的第 7 个索引处 '单一国家数据'.
请注意,我们的输入数据采用以下格式(其中 国家 在7点th 索引,以 0 为起始索引)-
交易日期、产品、价格、付款类型、名称、城市、州、国家,帐户创建时间,上次登录,纬度,经度
mapper 的输出又是 核心价值 对,使用输出 '收集()' 的方法 ‘输出收集器’.
SalesCountryReducer 类说明
在本节中,我们将了解 销售国家/地区减少器 类。
1. 我们首先为我们的类指定一个包的名称。 销售国家 是我们的包名。请注意编译的输出, SalesCountryReducer.类 将进入以此包名称命名的目录: 销售国家.
接下来,我们导入库包。
下面的快照显示了 销售国家/地区减少器 班级-
代码说明:
1. SalesCountryReducer 类定义-
公共类 SalesCountryReducer 扩展了 MapReduceBase 实现 Reducer {
这里,前两种数据类型, '文本' 与 ‘可写入’ 是输入到 Reducer 的键值数据类型。
映射器的输出形式为, 。mapper 的输出将成为 Reducer 的输入。因此,为了与其数据类型保持一致, 文本 与 可写入 在这里用作数据类型。
最后两种数据类型“Text”和“IntWritable”是reducer以键值对的形式生成的输出的数据类型。
每个 Reducer 类都必须从 MapReduce基础 类,并且它必须实现 减速器 界面。
2. 定义“reduce”函数
public void reduce( Text t_key,
Iterator<IntWritable> values,
OutputCollector<Text,IntWritable> output,
Reporter reporter) throws IOException {
输入 降低() 方法是一个带有多个值列表的键。
例如,在我们的例子中,它将是-
, , , , , 。
这给 Reducer 的格式为
因此,要接受这种形式的参数,首先要使用两种数据类型,即, 文本 与 迭代器. 文本 是键的数据类型,并且 迭代器 是该键的值列表的数据类型。
下一个参数是类型 输出收集器 收集 Reducer 阶段的输出。
降低() 方法首先复制键值,并将频率计数初始化为 0。
文本键 = t_key;
int 国家频率 = 0;
然后,使用'尽管' 循环,我们遍历与键关联的值列表,并通过对所有值求和来计算最终频率。
while (values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();
}
现在,我们将结果以以下形式推送到输出收集器 键 并获得 频率计数.
下面的代码可以实现这个功能-
output.collect(key, new IntWritable(frequencyForCountry));
SalesCountryDriver 类说明
在本节中,我们将了解 销售国家司机 程
1. 我们首先为我们的类指定一个包的名称。 销售国家 是我们的包名。请注意编译的输出, SalesCountryDriver.类 将进入以此包名称命名的目录: 销售国家.
这是指定包名称的一行,后跟用于导入库包的代码。
2. 定义一个驱动程序类,它将创建一个新的客户端作业、配置对象并公布 Mapper 和 Reducer 类。
驱动程序类负责设置我们的 MapReduce 作业在 Hadoop的. 在本课程中,我们指定 作业名称、输入/输出的数据类型以及 mapper 和 Reducer 类的名称.
3. 在下面的代码片段中,我们设置了输入和输出目录,分别用于使用输入数据集和产生输出。
参数[0] 与 参数[1] 是 MapReduce 实践中给出的命令传递的命令行参数,即
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. 触发我们的工作
下面的代码开始执行 MapReduce 作业-
try {
// Run the job
JobClient.runJob(job_conf);
} catch (Exception e) {
e.printStackTrace();
}





















