Featured image of post MapReduce 实验

MapReduce 实验

hadoop mapreduce 词频统计

MapReduce 实验

1 简介

1.1 实验环境

本实验主要使用 Ubuntu 20.04 64 位作为系统环境,采用 3 台 4 核 8GB 云服务器作为 Haddop 集群部署机器,使用的软件如下:

名称版本
Hadoop3.2.3
IDEA2022.2.3

1.2 集群规划

主机名IPDataNodeNameNodeJournalNodeZKFC
node1192.168.0.76
node2192.168.0.213
node3192.168.0.2

2 在 IDEA 中创建项目

  • 打开 IDEA 界面,点击File->New Project,选择Maven Archetype,创建一个名为MapReduce的 Maven 项目:

  • 编写pom.xml 文件,内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.cuterwrite</groupId>
        <artifactId>MapReduce</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>11</maven.compiler.source>
            <maven.compiler.target>11</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.3</version>
          </dependency>
        </dependencies>
    </project>
    

3 编写 MapReduce 应用程序

  • 分别编写IntSumReducer.javaTokenizerMapper.javaWordCount.java 文件:

    package com.cuterwrite;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public IntSumReducer() {
    
        }
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            IntWritable val;
            for (Iterator<IntWritable> iterator = values.iterator(); iterator.hasNext(); sum += val.get()) {
                val = (IntWritable)iterator.next();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }
    
    package com.cuterwrite;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public TokenizerMapper() {
    
        }
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while (tokenizer.hasMoreTokens()) {
                this.word.set(tokenizer.nextToken());
                context.write(this.word, one);
            }
        }
    }
    
    package com.cuterwrite;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
        public WordCount() {}
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ha-cluster");
            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
    
            String[] filePath = new String[] {
                    "hdfs://ha-cluster/user/root/input/news1.txt",
                    "hdfs://ha-cluster/user/root/input/news2.txt",
                    "hdfs://ha-cluster/user/root/input/news3.txt"
            };
    
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < filePath.length ; i++) {
                FileInputFormat.addInputPath(job, new Path(filePath[i]));
            }
            String outputPath = "hdfs://ha-cluster/user/root/output/mapreduce";
            FileOutputFormat.setOutputPath(job, new Path(outputPath));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

4 实验结果

  • 将应用程序编译打包成 jar 包:

    mvn clean install
    
  • 上传 jar 包至 HDFS 中的jars 目录下:

    hdfs dfs -put MapReduce-1.0-SNAPSHOT.jar jars
    
  • 创建 input、output 目录,上传数据文件至 HDFS

    hdfs dfs -mkdir -p input
    hdfs dfs -mkdir -p output
    hdfs dfs -put news1.txt news2.txt news3.txt input
    
  • 运行 jar 包:

    hadoop jar MapReduce-1.0-SNAPSHOT.jar com.cuterwrite.WordCount
    
  • 查看执行结果:

    hdfs dfs -cat output/mapreduce/*
    
Licensed under CC BY-NC-SA 4.0
本博客已稳定运行
发表了70篇文章 · 总计295.51k字
Welcome to cuterwrite's blog!
使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.25.0 分支版本修改