MapReduce思想
MapReduce的思想核心是分而治之,充分利用了并行处理的优势。 即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创
MapReduce任务过程是分为两个处理阶段:
- Map阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。 Map阶段的这些任务可以并行计算,彼此间没有依赖关系。
- Reduce阶段:Reduce阶段的主要作用是“合”,即对map阶段的结果进行全局汇总。
Hadoop序列化及数据类型
序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结构。
为了得到更加紧凑的序列化数据,Hadoop 传输的数据都需要实现Writable接口,和java自带的Serializable相比Hadoop的序列化数据更小,可以有效的减小网络传输的数据量
MR编程快速入门
WordCount代码实现
1. 新建Maven工程
- 导入hadoop依赖
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
<!--maven打包插件 -->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 添加log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
2. Mapper阶段
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordMapper extends Mapper<Object, Text, Text, IntWritable> {
// 1. map()方法中把传入的数据转为String类型
// 2. 根据空格切分出单词
// 3. 输出<单词,1>
private Text k = new Text();
private IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 1. map()方法中把传入的数据转为String类型
// 2. 根据空格切分出单词
StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
while (stringTokenizer.hasMoreTokens()) {
k.set(stringTokenizer.nextToken());
// 3. 输出<单词,1>
context.write(k, one);
}
}
}
3. Reducer阶段
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
// Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable count = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1. 汇总各个key(单词)的个数,遍历value数据进行累加
int sum = 0;
for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {
sum += it.next().get();
}
// 2. 输出key的总数
count.set(sum);
context.write(key, count);
}
}
4. Driver驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Driver阶段
* 创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路 径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运 行。
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 获取配置文件对象,获取job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordCount");
// 2. 指定程序jar的本地路径
job.setJarByClass(WordCountDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordCountReducer.class);
// job.setCombinerClass(WordCountReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6. 指定job处理的原始数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 提交作业
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
5. idea端运行
6. Yarn集群模式
-
使用maven打包, 选择没有依赖的jar包,上传到Hadoop集群
-
在hadoop上面调用指令
# hadoop jar wordcount包.jar driver的全名
hadoop的输入文件夹 hadoop的输出文件夹
hadoop jar wc.jar com.lagou.wordcount.WordcountDriver
/user/lagou/input /user/lagou/output
自定义bean实现writable
基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对象就需要实现Writable序列化接口。
示例代码
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为因 为MapReduce框中的Shuffle过程要求对key必须能排序!!
*/
public class SpeakBean implements Writable {
private Long self_duration;
private Long third_party_duration;
private Long sum_duration;
// 按顺序写入
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(self_duration);
dataOutput.writeLong(third_party_duration);
dataOutput.writeLong(sum_duration);
}
// 反序列化的字段顺序和序列化字段的顺序必须完全一致
// 按顺序读取
@Override
public void readFields(DataInput dataInput) throws IOException {
this.self_duration = dataInput.readLong();
this.third_party_duration = dataInput.readLong();
this.sum_duration = dataInput.readLong();
}
// 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public SpeakBean() {
}
// 方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符
@Override
public String toString() {
return self_duration +
"\t" + third_party_duration +
"\t" + sum_duration;
}
public SpeakBean(Long self_duration, Long third_party_duration) {
this.self_duration = self_duration;
this.third_party_duration = third_party_duration;
this.sum_duration = self_duration + third_party_duration;
}
public Long getSelf_duration() {
return self_duration;
}
public void setSelf_duration(Long self_duration) {
this.self_duration = self_duration;
}
public Long getThird_party_duration() {
return third_party_duration;
}
public void setThird_party_duration(Long third_party_duration) {
this.third_party_duration = third_party_duration;
}
public Long getSum_duration() {
return sum_duration;
}
public void setSum_duration(Long sum_duration) {
this.sum_duration = sum_duration;
}
}
MapReduce原理分析
MapTask
- MapTask运行机制
-
首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关 系默认是一对一。
-
将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n 作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。
-
读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。 RecordReader读取一行这里调用一次。
-
map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先 对其进行分区处理,默认使用HashPartitioner。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对 输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的 取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到 job上。
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结 果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之 前,key与value值都会被序列化成字节数组。
- 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信 息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概 念。
- 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以 需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘 写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写 map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例 spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
- 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!
- 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的 value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整 个模型中会多次使用。
- 那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner 绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重, 如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
Combiner的父类是Reducer,所以Combiner可以共用Reducer类
- 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果 map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当 整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入 磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
- MapTask的配置 https://hadoop.apache.org/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
-
-
MapTask的并行度
MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。- 数据块:Block是HDFS物理上把数据分成一块一块。
-
切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
-
问题:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
答案不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split. MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分片,一个是128M,一个是1M;
对于1M的切片的Maptask来说,太浪费资源。影响并行度的因素:
- 默认分块的数量就是maptask的数量
- 如果文件大小<= blockSize*SPLIT_SLOP,会被放在同一个split中
- 有很多小文件的话默认每个文件会被放在一个maptask中,很浪费资源,所以可以使用CombineTextInputFormat合并处理同类小文件,以减少maptask的数量
-
ReduceTask
-
ReduceTask 工作机制 Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个 eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线 程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据 进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行 finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
详细步骤:
* Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求 maptask获取属于自己的文件。
* Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数 值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge 有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的 数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过 程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种 merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge 方式生成最终的文件。
* 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
* 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个 或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。 -
ReduceTask并行度 ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定
不同,ReduceTask数量的决定是可以直接手动设置:// 默认值是1,手动设置为4 job.setNumReduceTasks(4);
注意事项
- ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
- ReduceTask数量不设置默认就是一个,输出文件数量为1个;
- 如果数据分布不均匀,可能在Reduce阶段产生倾斜,数据倾斜就是某个reducetask处理的数据远远大于其他的task
Shuffle机制
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)
-
自定义分区
在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认 是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce 当中去,在同一个reduce当中统计相同类型的数据,如何才能保证相同key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。结 合以上原理分析我们知道MR程序shuffle机制默认就是这种规则!如何制定自己需要的分区规则?
具体步骤:
1. 自定义类继承Partitioner,重写 getPartition()方法
2. 在Driver驱动中,指定使用自定义Partitioner
3. 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量。注意事项:
1. 自定义分区器时最好保证分区数量与reduceTask数量保持一致;
2. 如果分区数量不止1个,但是reduceTask数量1个,此时只会输出一个文件。
3. 如果reduceTask数量大于分区数量,会输出多个空文件
4. 如果reduceTask数量小于分区数量,有可能会报错。 - Combiner
- Combiner是MR程序中Mapper和Reducer之外的一种组件
- Combiner组件的父类就是Reducer
- Combiner和reducer的区别在于运行的位置
- Combiner是在每一个maptask所在的节点运行;
- Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
- Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。如果reducer的业务是计算平均值就不适合用combiner,计算累加就适合
- 自定义Combiner实现步骤
- 自定义一个Combiner继承Reducer,重写Reduce方法
- 在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)
其实大多数情况可以直接调用reducer
-
MapReduce中的排序 MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序 中的数据均会被排序,而不管逻辑.上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是 快速排序。
- MapTask
- 它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,
- 溢写完毕后,它会对磁盘上所有文件进行归并排序。
-
ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排序。
-
全排序 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
- 实现方式
- 让需要排序的
bean
实现WritableComparable
- 在
compareTo
方法中指定要用来排序的键值和逻辑 - 在
Mapper
中设置自定义的bean为key,value设置为NullWritable - 在
Reducer
中遍历所有values取出对应的key(遍历是为了防止key相同时没有被取出)
@Override protected void reduce(SpeakSortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { for (NullWritable b: values) { // 遍历value的时候key也会随之遍历 context.write(key, b); } }
- 设置
Job.setNumReduceTasks(1)
, 保证输出在同一个文件;
- 让需要排序的
- 实现方式
-
分区排序 MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。默认的分区规则,区内有序
-
辅助排序辅助排序: (GroupingComparator分组) 在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为 一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻 辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。
-
案例实现
需要求出每一个订单中成交金额最大的一笔交易。- 实现思路
- Mapper
- 读取一行文本数据,切分出每个字段;
- 订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id 相等再按照金额降序排;
- map()方法输出kv;key–>bean对象,value–>NullWritable.get();
- Shuffle
- 指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)
- 指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;
- Reduce
- 每个reduce()方法写出一组key的第一个
-
- 自定义实现WritableComparable的bean
public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private Double price; @Override public int compareTo(OrderBean o) { //比较订单id的排序顺序 int res = this.orderId.compareTo(o.orderId); //如果订单id相同,则比较金额,金额大的排在前面 res = res == 0 ? -this.price.compareTo(o.price) : res; return res; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(orderId); dataOutput.writeDouble(price); } @Override public void readFields(DataInput dataInput) throws IOException { this.orderId = dataInput.readUTF(); this.price = dataInput.readDouble(); } public OrderBean() { } getter, setter, toString… }
-
- 自定义分区器Partitioner
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class GroupPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) { //自定义分区,将相同订单id的数据发送到同一个reduce里面去 return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
-
- 自定义GroupingComparator 保证id相同的订单进入一个分组中,进入分组的数据已经是按照金额降序排序。reduce()方法取出 第一个即是金额最高的交易
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator extends WritableComparator { public GroupComparator() { // 需要创建要比较对象的实例, 不然会空指针异常 super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof OrderBean) { // 只要ID相同就会合并到同一组 return ((OrderBean)a).getOrderId().compareTo(((OrderBean)b).getOrderId()); } return super.compare(a, b); } }
-
- Mapper
public class GroupMapper extends Mapper<Object, Text, OrderBean, NullWritable> { private OrderBean bean = new OrderBean(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); bean.setOrderId(fields[0]); bean.setPrice(Double.valueOf(fields[2])); context.write(bean, NullWritable.get()); } }
-
- Reducer
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { // key是被聚合到同一组的kv中的第一个, // 通过遍历values可以取到对应实际的key @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
-
6.Driver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class GroupDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(GroupDriver.class); job.setMapperClass(GroupMapper.class); job.setReducerClass(GroupReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 指定分区器 job.setPartitionerClass(GroupPartitioner.class); // 指定组合并规则 job.setGroupingComparatorClass(GroupComparator.class); // 指定reduceTask的数量 job.setNumReduceTasks(2); FileInputFormat.setInputPaths(job, new Path("/Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/groupingComparator.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/group/out")); boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : 1); } }
- Mapper
- 实现思路
-
-
二次排序 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
-
- MapTask
MapReduce Join实战
-
需求
输出的时候要把两张表通过PositionId合并到一起 -
reduce端join 缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载 则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
-
- Bean的定义 把两张表的数据都定义到同一个bean,外加一个flag用来区分不同的表
public class DeliverBean implements Writable { private String userId; private String positionId; private String date; private String positionName; private String flag; … }
-
- Mapper阶段 Key在PositionId,让有相同的id的数据都聚到同一组
public class ReduceJoinMapper extends Mapper<Object, Text, Text, DeliverBean> { private String name = ""; private DeliverBean deliverBean = new DeliverBean(); private Text theKey = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 获取文件名,后面map的时候做分支处理 InputSplit inputSplit = context.getInputSplit(); name = ((FileSplit)inputSplit).getPath().getName(); } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); if (name.startsWith("deliver")) { deliverBean.setUserId(fields[0]); deliverBean.setPositionId(fields[1]); deliverBean.setDate(fields[2]); deliverBean.setPositionName(""); deliverBean.setFlag("deliver"); } else { deliverBean.setUserId(""); deliverBean.setPositionId(fields[0]); deliverBean.setDate(""); deliverBean.setPositionName(fields[1]); deliverBean.setFlag("position"); } theKey.set(deliverBean.getPositionId()); context.write(theKey, deliverBean); } }
-
- Reducer阶段 根据flag对组内数据再次分组,然后遍历deliver的数据,setname,拼接成完整的bean
public class ReducerJoinReducer extends Reducer<Text, DeliverBean, DeliverBean, NullWritable> { @Override protected void reduce(Text key, Iterable<DeliverBean> values, Context context) throws IOException, InterruptedException { ArrayList<DeliverBean> deliverBeans = new ArrayList<>(); DeliverBean positionBean = new DeliverBean(); for (DeliverBean bean: values) { // 必须使用深拷贝的对象,不然会有问题 DeliverBean tempBean = new DeliverBean(); try { BeanUtils.copyProperties(tempBean, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } if (bean.getFlag().equals("deliver")) { deliverBeans.add(tempBean); } else { positionBean = tempBean ; } } for(DeliverBean bean: deliverBeans) { bean.setPositionName(positionBean.getPositionName()); context.write(bean, NullWritable.get()); } } }
-
- Driver
public class ReducerJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(ReducerJoinDriver.class); job.setMapperClass(ReduceJoinMapper.class); job.setReducerClass(ReducerJoinReducer.class); job.setOutputKeyClass(DeliverBean.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DeliverBean.class); FileInputFormat.setInputPaths(job, new Path("/Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/reduce_join/input")); FileOutputFormat.setOutputPath(job, new Path("/Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/reduce_join/out")); boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : 1); } }
-
-
map端join 适用于关联表中有小表的情形; 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
实现思路:
* 在Mapper的setup阶段,将文件读取到缓存集合中
* 在驱动函数中加载缓存。// 缓存普通文件到Task运行节点。 job.addCacheFile(new URI("file:///e:/cache/position.txt"));
-
- Mapper阶段
public class MapJoinMapper extends Mapper<Object, Text, Text, NullWritable> { private Map<String, String> positionMap = new HashMap<>(); private Text theKey = new Text(); /** * 读取缓存的文件 */ @Override protected void setup(Context context) throws IOException, InterruptedException { // 直接通过文件名获取文件 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("position.txt"), "UTF-8")); String line; while (!StringUtils.isEmpty(line=reader.readLine())) { String[] fields = line.split("\t"); positionMap.put(fields[0], fields[1]); } reader.close(); } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); // 拼接成完整数据 String deliver = value.toString() + "\t" + positionMap.get(fields[1]); theKey.set(deliver); context.write(theKey, NullWritable.get()); } }
-
- Driver阶段
public class MapJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MapJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 不需要reducer,map之后直接合并输出 job.setNumReduceTasks(0); // 添加缓存文件 job.addCacheFile(new URI("file:///Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/reduce_join/input/position.txt")); FileInputFormat.setInputPaths(job, new Path("/Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/reduce_join/input/deliver_info.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/april/Desktop/JAVA_BigData/Hadoop/MapReduece_Demo/reduce_join/out")); boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : 1); } }
-
-
数据倾斜解决方案 什么是数据倾斜?
* 数据倾斜无非就是大量的相同key被partition分配到一个分区里,
现象
* 绝大多数task执行得都非常快,但个别task执行的极慢。甚至失败!通用解决方案:
* 分为两次MR来执行任务
* 第一阶段:对key增加随机数
* 第二阶段:去掉key的随机数
MapReduce读取和输出数据
-
InputFormat
-
TextInputFormat 普通文本文件,MR框架默认的读取实现类型
-
KeyValueTextInputFormat 读取一行文本数据按照指定分隔符,把数据封装为kv类型
-
NLineInputFormat 读取数据按照行数进行划分分片
-
CombineTextInputFormat 合并小文件,避免启动过多MapTask任务
-
案例 MR框架默认的TextInputFormat切片机制按文件划分切片,文件无论多小,都是单独一个切片, 然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个 MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用 率不高。
CombineTextInputFormat
用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切 片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; // 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
-
CombineTextInputFormat切片原理 切片生成过程分为两部分:虚拟存储过程和切片过程
假设设置setMaxInputSplitSize值为4M
四个小文件:1.txt –>2M ;2.txt–>7M;3.txt–>0.3M;4.txt—>8.2M-
虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比 较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于 两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时 将文件均分成2个虚拟存储块(防止出现太小切片)。 比如如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的 块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文 件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
1.txt–>2M;2M<4M;一个块;
2.txt–>7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
3.txt–>0.3M;0.3<4M ,0.3M<4M ,一个块
4.txt–>8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
所有块信息:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。 -
切片过程
- 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个 切片。
- 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
- 按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件, 则虚拟存储之后形成7个文件块,大小分别为: 2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M
- 最终会形成3个切片,大小分别为: (2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M
虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
-
-
-
自定义InputFormat HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,
此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。需求:
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的 key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为value。
整体思路
- 定义一个类继承FileInputFormat
- 重写isSplitable()指定为不可切分;重写createRecordReader()方法,创建自己的
RecorderReader对象 - 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出;
- Driver指定使用的InputFormat类型
-
- CustomFileInputformat
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> { //文件不可切分 @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } //获取自定义RecordReader对象用来读取数据 @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { CustomRecorder customRecorder = new CustomRecorder(); customRecorder.initialize(split, context); return customRecorder; } }
-
- CustomRecorder
public class CustomRecorder extends RecordReader<Text, BytesWritable> { private FileSplit split; private Configuration job; private Text theKey = new Text(); private BytesWritable theValue = new BytesWritable(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit) split; job = context.getConfiguration(); } private Boolean flag = true; @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (flag) { // 一次把所有数据都读完 final Path file = this.split.getPath(); theKey.set(split.getPath().toString()); byte[] buffer = new byte[(int)split.getLength()]; // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); FSDataInputStream inputStream = fs.open(file); inputStream.readFully(0, buffer); theValue.set(buffer, 0, buffer.length); flag = false; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return theKey; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return theValue; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
-
- Mapper& Reducer
public class CustomInputMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public class CustomInputReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); } }
-
- Driver
public class CustomInputDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1. 获取配置文件对象,获取job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "customInput"); // 2. 指定程序jar的本地路径 job.setJarByClass(CustomInputDriver.class); // 3. 指定Mapper/Reducer类 job.setMapperClass(CustomInputMapper.class); // 4. 指定Mapper输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 5. 指定最终输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 指定自定义的输入类 job.setInputFormatClass(CustomInputFormat.class); // 指定文件输出格式为 SequenceFileOutputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); // 6. 指定job处理的原始数据路径 FileInputFormat.setInputPaths(job, new Path("/Users/april/Desktop/Big Data/大数据正式班第一阶段模块一/模块一/资料/data/小文件")); // 7. 指定job输出结果路径 FileOutputFormat.setOutputPath(job, new Path("/Users/april/Desktop/out")); // 8. 提交作业 boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : 1); } }
-
-
OutputFormat OutputFormat:是MapReduce输出数据的基类,所有MapReduce的数据输出都实现了OutputFormat 抽象类。
-
TextOutputFormat 默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,
因为TextOutputFormat调用toString()方 法把它们转换为字符串。 -
SequenceFileOutputFormat 将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这是一种好的输出格式, 因为它的格式紧凑,很容易被压缩。
-
自定义OutputFormat 要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类输出需求可以通过自定义 OutputFormat来实现。
需求
网络请求日志数据
http://www.baidu.com http://www.google.com http://cn.bing.com http://www.lagou.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com [http://www.sindsafa.com](http://www.sindsafa.com)
把lagou单独放到一个文件,其他放一个文件
-
- CustomOutputFormat
public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); FileSystem fileSystem = FileSystem.get(configuration); // 指定不同的输出流 String rootPath = configuration.get(FileOutputFormat.OUTDIR) + "/"; FSDataOutputStream lagouOut = fileSystem.create(new Path(rootPath + "lagou.log")); FSDataOutputStream otherOut = fileSystem.create(new Path(rootPath + "other.log")); CustomRecorderWriter recorderWriter = new CustomRecorderWriter(lagouOut, otherOut); return recorderWriter; } }
-
- CustomRecorderWriter
public class CustomRecorderWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream lagouOut; FSDataOutputStream otherOut; public CustomRecorderWriter(FSDataOutputStream lagouOut, FSDataOutputStream otherOut) { this.lagouOut = lagouOut; this.otherOut = otherOut; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // write的数据来自reducer // 根据key值,输出到不同的文件 FSDataOutputStream tempOut = key.toString().contains("lagou") ? lagouOut : otherOut; tempOut.write(key.toString().getBytes()); tempOut.write("\r\n".getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(lagouOut); IOUtils.closeStream(otherOut); } }
-
- Mapper& Reducer
public class CustomOutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } } public class CustomOutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); } }
-
4.Driver
public class CustomOutputDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1. 获取配置文件对象,获取job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "custom output"); // 2. 指定程序jar的本地路径 job.setJarByClass(CustomOutputDriver.class); // 3. 指定Mapper/Reducer类 job.setMapperClass(CustomOutputMapper.class); job.setReducerClass(CustomOutputReducer.class); // 4. 指定Mapper输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5. 指定最终输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(CustomOutputFormat.class); // 6. 指定job处理的原始数据路径 FileInputFormat.setInputPaths(job, new Path("/Users/april/Desktop/Big Data/大数据正式班第一阶段模块一/模块一/资料/data/click_log")); // 7. 指定job输出结果路径 FileOutputFormat.setOutputPath(job, new Path("/Users/april/Desktop/out")); // 8. 提交作业 boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : 1); } }
-
-
shuffle阶段数据的压缩机制
数据压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输!!
我们可以使用bin/hadoop checknative 来查看我们编译之后的hadoop支持的各种压缩,如果出现 openssl为false,那么就在线安装一下依赖包!!
安装openssl
yum install -y openssl-devel
-
压缩方式比较
-
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器
-
常见压缩方式对比分析
- 压缩位置
- Map输入端压缩
此处使用压缩文件作为Map的输入数据,无需显示指定编解码方式,Hadoop会自动检查文件扩展名,如果压缩方式能够匹配,Hadoop就会选择合适的编解码方式对文件进行压缩和解压。
- Map输出端压缩
Shuffle是Hadoop MR过程中资源消耗最多的阶段,如果有数据量过大造成网络传输速度缓慢,可以考虑使用压缩
- Reduce端输出压缩
输出的结果数据使用压缩能够减少存储的数据量,降低所需磁盘的空间,并且作为第二个MR的输 入时可以复用压缩。
- Map输入端压缩
-
压缩配置方式
-
Configuration 在驱动代码中通过Configuration直接设置使用的压缩方式,可以开启Map输出和Reduce输出压缩
// 设置map阶段压缩 Configuration configuration = new Configuration(); configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.i o.compress.SnappyCodec"); // 设置reduce阶段的压缩 configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD" ); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.ap ache.hadoop.io.compress.SnappyCodec");
-
mapred-site.xml 配置mapred-site.xml(修改后分发到集群其它节点,重启Hadoop集群),此种方式对运行在集群的 所有MR任务都会执行压缩
<property> <name>mapreduce.output.fileoutputformat.compress</name> <value>true</value> </property> <property> <name>mapreduce.output.fileoutputformat.compress.type</name> <value>RECORD</value> </property> <property> <name>mapreduce.output.fileoutputformat.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
-
设置输出端压缩 比较灵活的一种方式,只针对当前代码
// 压缩输出文件, 如果是系统的inputformat就会自动解压,最简单的方式,打开压缩即可 FileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
-