Hadoop MapReduce 连接和计数器示例
Mapreduce 中的 Join 是什么?
MapReduce 连接 操作用于合并两个大型数据集。但是,此过程涉及编写大量代码来执行实际的连接操作。连接两个数据集首先要比较每个数据集的大小。如果一个数据集与另一个数据集相比较小,则较小的数据集将分发到集群中的每个数据节点。
一旦 MapReduce 中的连接被分发,Mapper 或 Reducer 就会使用较小的数据集从大数据集中查找匹配的记录,然后组合这些记录以形成输出记录。
连接类型
根据实际执行连接的位置,Hadoop 中的连接可分为:
1. Map 端连接 – 当连接由映射器执行时,它被称为映射端连接。在这种类型中,连接是在映射函数实际使用数据之前执行的。每个映射的输入必须采用分区的形式并按排序顺序排列。此外,分区数量必须相等,并且必须按连接键排序。
2. Reduce 侧连接 – 当连接由 Reducer 执行时,它被称为 Reduce 侧连接。这种连接不需要具有结构化形式(或分区)的数据集。
在这里,map 端处理会发出两个表的连接键和相应的元组。此处理的效果是,所有具有相同连接键的元组都会落入同一个 Reducer,然后该 Reducer 会连接具有相同连接键的记录。
下图描述了 Hadoop 中连接的总体过程流程。
如何连接两个数据集:MapReduce 示例
两个不同的文件中有两组数据(如下所示)。关键的 Dept_ID 在两个文件中是相同的。目标是使用 MapReduce Join 来合并这些文件
输入: 输入数据集是一个txt文件, 部门名称.txt 和 部门实力.txt
确保已安装 Hadoop。在开始 MapReduce Join 示例实际流程之前,将用户更改为“hduser”(Hadoop 配置时使用的 id,您可以切换到 Hadoop 配置期间使用的用户 id)。
su - hduser_
步骤1) 将 zip 文件复制到你选择的位置
步骤2) 解压 Zip 文件
sudo tar -xvf MapReduceJoin.tar.gz
步骤3) 进入目录 MapReduceJoin/
cd MapReduceJoin/
步骤4) 启动 Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
步骤5) DeptStrength.txt 和 DeptName.txt 是此 MapReduce Join 示例程序使用的输入文件。
需要使用以下命令将这些文件复制到 HDFS -
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal DeptStrength.txt DeptName.txt /
步骤6) 使用以下命令运行程序 -
$HADOOP_HOME/bin/hadoop jar MapReduceJoin.jar MapReduceJoin/JoinDriver/DeptStrength.txt /DeptName.txt /output_mapreducejoin
步骤7) 执行后,输出文件(名为“part-00000”)将存储在 HDFS 上的目录 /output_mapreducejoin 中
可以使用命令行界面查看结果
$HADOOP_HOME/bin/hdfs dfs -cat /output_mapreducejoin/part-00000
也可以通过 Web 界面查看结果:
现在选择 “浏览文件系统” 并导航至 /output_mapreducejoin
可选 部分-r-00000
结果显示
注意: 请注意,下次运行此程序之前,您需要删除输出目录 /output_mapreducejoin
$HADOOP_HOME/bin/hdfs dfs -rm -r /output_mapreducejoin
另一种方法是对输出目录使用不同的名称。
MapReduce 中的 Counter 是什么?
A MapReduce 中的计数器 是一种用于收集和测量有关 MapReduce 作业和事件的统计信息的机制。计数器跟踪 MapReduce 中的各种作业统计信息,如发生的操作数和操作进度。计数器用于 MapReduce 中的问题诊断。
Hadoop 计数器类似于将日志消息放入 map 或 Reduce 代码中。此信息可能有助于诊断 MapReduce 作业处理中的问题。
通常,Hadoop 中的这些计数器是在程序(map 或 Reduce)中定义的,当发生特定事件或条件(特定于该计数器)时,这些计数器会在执行过程中递增。Hadoop 计数器的一个非常好的应用是跟踪输入数据集中的有效和无效记录。
MapReduce 计数器的类型
基本上有2种类型 映射简化 计数器
- Hadoop 内置计数器:每个作业都有一些内置的 Hadoop 计数器。以下是内置计数器组:
- MapReduce 任务计数器 – 在执行期间收集任务特定信息(例如输入记录的数量)。
- 文件系统计数器 – 收集任务读取或写入的字节数等信息
- 文件输入格式计数器 – 收集通过 FileInputFormat 读取的字节数的信息
- 文件输出格式计数器 – 收集通过 FileOutputFormat 写入的字节数的信息
- 工作计数器 – 这些计数器由 JobTracker 使用。它们收集的统计数据包括例如为某项作业启动的任务数。
- 用户定义计数器
除了内置计数器外,用户还可以使用以下类似功能定义自己的计数器: 编程语言。 例如,在 Java “enum”用于定义用户定义的计数器。
计数器示例
带有计数器的示例 MapClass,用于计算缺失值和无效值的数量。本教程中使用的输入数据文件我们的输入数据集是一个 CSV 文件, 2009 年 XNUMX 月销售额.csv
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { static enum SalesCounters { MISSING, INVALID }; public void map ( LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { //Input string is split using ',' and stored in 'fields' array String fields[] = value.toString().split(",", -20); //Value at 4th index is country. It is stored in 'country' variable String country = fields[4]; //Value at 8th index is sales data. It is stored in 'sales' variable String sales = fields[8]; if (country.length() == 0) { reporter.incrCounter(SalesCounters.MISSING, 1); } else if (sales.startsWith("\"")) { reporter.incrCounter(SalesCounters.INVALID, 1); } else { output.collect(new Text(country), new Text(sales + ",1")); } } }
上面的代码片段展示了 Hadoop Map Reduce 中计数器的一个示例实现。
在这里, 销售柜台 是使用定义的计数器 ‘枚举’. 它用于计数 失踪 和 无效的 输入記錄。
在代码片段中,如果 '国家' 字段长度为零,则其值缺失,因此相应的计数器 销售柜台。缺失 已递增。
接下来,如果 '销售量' 字段以 “ 那么该记录就被视为无效。这通过增加计数器来表示 销售计数器.INVALID。