hadoop 3. MapReduce编程架构

27 Jan 2021

MapReduce思想

MapReduce的思想核心是分而治之,充分利用了并行处理的优势。 即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创

MapReduce任务过程是分为两个处理阶段:

Hadoop序列化及数据类型

序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结构。

为了得到更加紧凑的序列化数据,Hadoop 传输的数据都需要实现Writable接口,和java自带的Serializable相比Hadoop的序列化数据更小,可以有效的减小网络传输的数据量

MR编程快速入门

WordCount代码实现

1. 新建Maven工程

  1. 导入hadoop依赖
<dependencies>  
    <dependency>  
        <groupId>org.apache.logging.log4j</groupId>  
        <artifactId>log4j-core</artifactId>  
        <version>2.8.2</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-common</artifactId>  
        <version>2.9.2</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-client</artifactId>  
        <version>2.9.2</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-hdfs</artifactId>  
        <version>2.9.2</version>  
    </dependency>  
</dependencies>  
  
<!--maven打包插件 -->  
<build>  
    <plugins>  
        <plugin>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <version>2.3.2</version>  
            <configuration>  
                <source>1.8</source>  
                <target>1.8</target>  
            </configuration>  
        </plugin>  
        <plugin>  
            <artifactId>maven-assembly-plugin</artifactId>  
            <configuration>  
                <descriptorRefs>  
                    <descriptorRef>jar-with-dependencies</descriptorRef>  
                </descriptorRefs>  
            </configuration>  
            <executions>  
                <execution>  
                    <id>make-assembly</id>  
                    <phase>package</phase>  
                    <goals>  
                        <goal>single</goal>  
                    </goals>  
                </execution>  
            </executions>  
        </plugin>  
    </plugins>  
</build>  
  1. 添加log4j.properties
    log4j.rootLogger=INFO, stdout  
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
    log4j.appender.logfile=org.apache.log4j.FileAppender  
    log4j.appender.logfile.File=target/spring.log  
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  
    

2. Mapper阶段

import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  
  
import java.io.IOException;  
import java.util.StringTokenizer;  
  
public class WordMapper extends Mapper<Object, Text, Text, IntWritable> {  
  
//    1. map()方法中把传入的数据转为String类型  
//    2. 根据空格切分出单词  
//    3. 输出<单词,1>  
  
    private Text k = new Text();  
    private IntWritable one = new IntWritable(1);  
  
    @Override  
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {  
  
        //    1. map()方法中把传入的数据转为String类型  
        //    2. 根据空格切分出单词  
        StringTokenizer stringTokenizer = new StringTokenizer(value.toString());  
  
        while (stringTokenizer.hasMoreTokens()) {  
  
            k.set(stringTokenizer.nextToken());  
  
            //    3. 输出<单词,1>  
            context.write(k, one);  
        }  
  
    }  
}  
  

3. Reducer阶段

import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
  
import java.io.IOException;  
import java.util.Iterator;  
  
// Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>  
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
  
    private IntWritable count = new IntWritable();  
  
    @Override  
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
  
//        1. 汇总各个key(单词)的个数,遍历value数据进行累加  
        int sum = 0;  
        for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {  
            sum += it.next().get();  
        }  
//        2. 输出key的总数  
        count.set(sum);  
        context.write(key, count);  
    }  
}  
  

4. Driver驱动类

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  
import java.io.IOException;  
  
/**  
 * Driver阶段  
 * 创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路 径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运 行。  
 */  
public class WordCountDriver {  
  
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
//        1. 获取配置文件对象,获取job对象实例  
        Configuration configuration = new Configuration();  
  
        Job job = Job.getInstance(configuration, "wordCount");  
//        2. 指定程序jar的本地路径  
        job.setJarByClass(WordCountDriver.class);  
//        3. 指定Mapper/Reducer类  
        job.setMapperClass(WordMapper.class);  
        job.setReducerClass(WordCountReducer.class);  
//        job.setCombinerClass(WordCountReducer.class);  
  
//        4. 指定Mapper输出的kv数据类型  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(IntWritable.class);  
//        5. 指定最终输出的kv数据类型  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
  
//        6. 指定job处理的原始数据路径  
        FileInputFormat.setInputPaths(job, new Path(args[0]));  
//        7. 指定job输出结果路径  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  
//        8. 提交作业  
        boolean completion = job.waitForCompletion(true);  
  
        System.exit(completion ? 0 : 1);  
    }  
}  
  

5. idea端运行

6. Yarn集群模式

  1. 使用maven打包, 选择没有依赖的jar包,上传到Hadoop集群

  2. 在hadoop上面调用指令

# hadoop jar  wordcount包.jar driver的全名  
hadoop的输入文件夹  hadoop的输出文件夹   
hadoop jar  wc.jar com.lagou.wordcount.WordcountDriver  
/user/lagou/input /user/lagou/output  

自定义bean实现writable

基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对象就需要实现Writable序列化接口。

示例代码

import org.apache.hadoop.io.Writable;  
  
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
  
/**  
 *  如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为因 为MapReduce框中的Shuffle过程要求对key必须能排序!!  
 */  
public class SpeakBean implements Writable {  
    private Long self_duration;  
    private Long third_party_duration;  
    private Long sum_duration;  
  
    // 按顺序写入  
    @Override  
    public void write(DataOutput dataOutput) throws IOException {  
        dataOutput.writeLong(self_duration);  
        dataOutput.writeLong(third_party_duration);  
        dataOutput.writeLong(sum_duration);  
    }  
  
    // 反序列化的字段顺序和序列化字段的顺序必须完全一致  
    // 按顺序读取  
    @Override  
    public void readFields(DataInput dataInput) throws IOException {  
        this.self_duration = dataInput.readLong();  
        this.third_party_duration = dataInput.readLong();  
        this.sum_duration = dataInput.readLong();  
    }  
  
    // 反序列化时,需要反射调用空参构造函数,所以必须有空参构造  
    public SpeakBean() {  
    }  
  
    // 方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符  
    @Override  
    public String toString() {  
        return  self_duration +  
                "\t" + third_party_duration +  
                "\t" + sum_duration;  
    }  
  
    public SpeakBean(Long self_duration, Long third_party_duration) {  
        this.self_duration = self_duration;  
        this.third_party_duration = third_party_duration;  
        this.sum_duration = self_duration + third_party_duration;  
    }  
  
    public Long getSelf_duration() {  
        return self_duration;  
    }  
  
    public void setSelf_duration(Long self_duration) {  
        this.self_duration = self_duration;  
    }  
  
    public Long getThird_party_duration() {  
        return third_party_duration;  
    }  
  
    public void setThird_party_duration(Long third_party_duration) {  
        this.third_party_duration = third_party_duration;  
    }  
  
    public Long getSum_duration() {  
        return sum_duration;  
    }  
  
    public void setSum_duration(Long sum_duration) {  
        this.sum_duration = sum_duration;  
    }  
      
}  

MapReduce原理分析

MapTask

ReduceTask

Shuffle机制

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)

MapReduce Join实战

MapReduce读取和输出数据

shuffle阶段数据的压缩机制

数据压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输!!
我们可以使用bin/hadoop checknative 来查看我们编译之后的hadoop支持的各种压缩,如果出现 openssl为false,那么就在线安装一下依赖包!!

安装openssl

yum install -y openssl-devel