Hadoop MapReduce 连接和计数器示例

Mapreduce 中的 Join 是什么?

MapReduce 连接 操作用于合并两个大型数据集。但是,此过程涉及编写大量代码来执行实际的连接操作。连接两个数据集首先要比较每个数据集的大小。如果一个数据集与另一个数据集相比较小,则较小的数据集将分发到集群中的每个数据节点。

一旦 MapReduce 中的连接被分发,Mapper 或 Reducer 就会使用较小的数据集从大数据集中查找匹配的记录,然后组合这些记录以形成输出记录。

连接类型

根据实际执行连接的位置,Hadoop 中的连接可分为:

1. Map 端连接 – 当连接由映射器执行时,它被称为映射端连接。在这种类型中,连接是在映射函数实际使用数据之前执行的。每个映射的输入必须采用分区的形式并按排序顺序排列。此外,分区数量必须相等,并且必须按连接键排序。

2. Reduce 侧连接 – 当连接由 Reducer 执行时,它被称为 Reduce 侧连接。这种连接不需要具有结构化形式(或分区)的数据集。

在这里,map 端处理会发出两个表的连接键和相应的元组。此处理的效果是,所有具有相同连接键的元组都会落入同一个 Reducer,然后该 Reducer 会连接具有相同连接键的记录。

下图描述了 Hadoop 中连接的总体过程流程。

Hadoop MapReduce 中的连接类型
Hadoop MapReduce 中的连接类型

如何连接两个数据集:MapReduce 示例

两个不同的文件中有两组数据(如下所示)。关键的 Dept_ID 在两个文件中是相同的。目标是使用 MapReduce Join 来合并这些文件

MapReduce 示例
文件1
MapReduce 示例
文件2

输入: 输入数据集是一个txt文件, 部门名称.txt 和 部门实力.txt

从这里下载输入文件

确保已安装 Hadoop。在开始 MapReduce Join 示例实际流程之前,将用户更改为“hduser”(Hadoop 配置时使用的 id,您可以切换到 Hadoop 配置期间使用的用户 id)。

su - hduser_

MapReduce 示例

步骤1) 将 zip 文件复制到你选择的位置

MapReduce 示例

步骤2) 解压 Zip 文件

sudo tar -xvf MapReduceJoin.tar.gz

MapReduce 示例

步骤3) 进入目录 MapReduceJoin/

cd MapReduceJoin/

MapReduce 示例

步骤4) 启动 Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

MapReduce 示例

步骤5) DeptStrength.txt 和 DeptName.txt 是此 MapReduce Join 示例程序使用的输入文件。

需要使用以下命令将这些文件复制到 HDFS -

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal DeptStrength.txt DeptName.txt /

MapReduce 示例

步骤6) 使用以下命令运行程序 -

$HADOOP_HOME/bin/hadoop jar MapReduceJoin.jar MapReduceJoin/JoinDriver/DeptStrength.txt /DeptName.txt /output_mapreducejoin

MapReduce 示例

MapReduce 示例

步骤7) 执行后,输出文件(名为“part-00000”)将存储在 HDFS 上的目录 /output_mapreducejoin 中

可以使用命令行界面查看结果

$HADOOP_HOME/bin/hdfs dfs -cat /output_mapreducejoin/part-00000

MapReduce 示例

也可以通过 Web 界面查看结果:

MapReduce 示例

现在选择 “浏览文件系统” 并导航至 /output_mapreducejoin

MapReduce 示例

可选 部分-r-00000

MapReduce 示例

结果显示

MapReduce 示例

注意: 请注意,下次运行此程序之前,您需要删除输出目录 /output_mapreducejoin

$HADOOP_HOME/bin/hdfs dfs -rm -r /output_mapreducejoin

另一种方法是对输出目录使用不同的名称。

MapReduce 中的 Counter 是什么?

A MapReduce 中的计数器 是一种用于收集和测量有关 MapReduce 作业和事件的统计信息的机制。计数器跟踪 MapReduce 中的各种作业统计信息,如发生的操作数和操作进度。计数器用于 MapReduce 中的问题诊断。

Hadoop 计数器类似于将日志消息放入 map 或 Reduce 代码中。此信息可能有助于诊断 MapReduce 作业处理中的问题。

通常,Hadoop 中的这些计数器是在程序(map 或 Reduce)中定义的,当发生特定事件或条件(特定于该计数器)时,这些计数器会在执行过程中递增。Hadoop 计数器的一个非常好的应用是跟踪输入数据集中的有效和无效记录。

MapReduce 计数器的类型

基本上有2种类型 映射简化 计数器

  1. Hadoop 内置计数器:每个作业都有一些内置的 Hadoop 计数器。以下是内置计数器组:
    • MapReduce 任务计数器 – 在执行期间收集任务特定信息(例如输入记录的数量)。
    • 文件系统计数器 – 收集任务读取或写入的字节数等信息
    • 文件输入格式计数器 – 收集通过 FileInputFormat 读取的字节数的信息
    • 文件输出格式计数器 – 收集通过 FileOutputFormat 写入的字节数的信息
    • 工作计数器 – 这些计数器由 JobTracker 使用。它们收集的统计数据包括例如为某项作业启动的任务数。
  2. 用户定义计数器

除了内置计数器外,用户还可以使用以下类似功能定义自己的计数器: 编程语言。 例如,在 Java “enum”用于定义用户定义的计数器。

计数器示例

带有计数器的示例 MapClass,用于计算缺失值和无效值的数量。本教程中使用的输入数据文件我们的输入数据集是一个 CSV 文件, 2009 年 XNUMX 月销售额.csv

public static class MapClass
            extends MapReduceBase
            implements Mapper<LongWritable, Text, Text, Text>
{
    static enum SalesCounters { MISSING, INVALID };
    public void map ( LongWritable key, Text value,
                 OutputCollector<Text, Text> output,
                 Reporter reporter) throws IOException
    {
        
        //Input string is split using ',' and stored in 'fields' array
        String fields[] = value.toString().split(",", -20);
        //Value at 4th index is country. It is stored in 'country' variable
        String country = fields[4];
        
        //Value at 8th index is sales data. It is stored in 'sales' variable
        String sales = fields[8];
      
        if (country.length() == 0) {
            reporter.incrCounter(SalesCounters.MISSING, 1);
        } else if (sales.startsWith("\"")) {
            reporter.incrCounter(SalesCounters.INVALID, 1);
        } else {
            output.collect(new Text(country), new Text(sales + ",1"));
        }
    }
}

上面的代码片段展示了 Hadoop Map Reduce 中计数器的一个示例实现。

在这里, 销售柜台 是使用定义的计数器 ‘枚举’. 它用于计数 失踪无效的 输入記錄。

在代码片段中,如果 '国家' 字段长度为零,则其值缺失,因此相应的计数器 销售柜台。缺失 已递增。

接下来,如果 '销售量' 字段以 那么该记录就被视为无效。这通过增加计数器来表示 销售计数器.INVALID。