# MapReduce 实验
# 1 简介
# 1.1 实验环境
本实验主要使用 Ubuntu 20.04 64 位作为系统环境,采用 3 台 4 核 8GB 云服务器作为 Haddop 集群部署机器,使用的软件如下:
名称 | 版本 |
---|---|
Hadoop | 3.2.3 |
IDEA | 2022.2.3 |
# 1.2 集群规划
主机名 | IP | DataNode | NameNode | JournalNode | ZKFC |
---|---|---|---|---|---|
node1 | 192.168.0.76 | 是 | 是 | 是 | 是 |
node2 | 192.168.0.213 | 是 | 是 | 是 | 是 |
node3 | 192.168.0.2 | 是 | 否 | 是 | 否 |
# 2 在 IDEA 中创建项目
打开 IDEA 界面,点击
File
->New Project
,选择Maven Archetype
,创建一个名为MapReduce的 Maven 项目:编写
pom.xml
文件,内容如下:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cuterwrite</groupId> <artifactId>MapReduce</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.3</version> </dependency> </dependencies> </project>
# 3 编写 MapReduce 应用程序
分别编写
IntSumReducer.java
、TokenizerMapper.java
、WordCount.java
文件:package com.cuterwrite; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public IntSumReducer() { } public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; IntWritable val; for (Iterator<IntWritable> iterator = values.iterator(); iterator.hasNext(); sum += val.get()) { val = (IntWritable)iterator.next(); } this.result.set(sum); context.write(key, this.result); } }
package com.cuterwrite; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public TokenizerMapper() { } public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { this.word.set(tokenizer.nextToken()); context.write(this.word, one); } } }
package com.cuterwrite; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public WordCount() {} public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ha-cluster"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); String[] filePath = new String[] { "hdfs://ha-cluster/user/root/input/news1.txt", "hdfs://ha-cluster/user/root/input/news2.txt", "hdfs://ha-cluster/user/root/input/news3.txt" }; Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < filePath.length ; i++) { FileInputFormat.addInputPath(job, new Path(filePath[i])); } String outputPath = "hdfs://ha-cluster/user/root/output/mapreduce"; FileOutputFormat.setOutputPath(job, new Path(outputPath)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
# 4 实验结果
将应用程序编译打包成 jar 包:
mvn clean install
上传 jar 包至 HDFS 中的
jars
目录下:hdfs dfs -put MapReduce-1.0-SNAPSHOT.jar jars
创建 input、output 目录,上传数据文件至 HDFS
hdfs dfs -mkdir -p input hdfs dfs -mkdir -p output hdfs dfs -put news1.txt news2.txt news3.txt input
运行 jar 包:
hadoop jar MapReduce-1.0-SNAPSHOT.jar com.cuterwrite.WordCount
查看执行结果:
hdfs dfs -cat output/mapreduce/*