1. MapReduce概述
MapReduce是一种分布式计算模型,由google提出,主要用于搜索领域,解决海量数据的计算问题。
mapr由两个阶段组成:MAP和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。
2. MapReduce的原理
要实现MapReduce模型 需要重写map与reduce方法。
map的输入 k1 V1 ,map的输出也是reduce的输入 K2V2 ,reduce的输出 K3V3
2.1map任务处理
读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数
写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
对输出的key、value进行分区
对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中//。
(可选)分组后的数据进行归约。
2.2 reduce任务处理
对于多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点
对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
把reduce的输出保存到文件中。
3. MapReduce 代码示例
分析具体的业务逻辑
确定输入输出的数据的样式
自定义一个类,这个类要继承mapper类 重写map()方法在map方法里实现具体逻辑,然后将新的key,value输出
自己定义一个类,这个类要继承reduce类,重写reduce()方法,在reduce方法里实现自己的逻辑
将自定义的mapper与reducer通过job对象组装起来
// map
package mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author hnbian
* k1 字符偏移量
* v1 一行字符
* k2 字符 串
* v2 出现次数
* LongWritable hadoop对Long实现的序列化方法
* Text hadoop对String实现的序列化方法
*/
public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
//重写父类的map方法
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//接收数据 v1
String line = value.toString();
//切分数据
String words[] = line.split(" ");
//循环words
for(String w:words){
//循环出现一次,记一个一,输出
context.write(new Text(w), new LongWritable(1));
}
}
}
// reduce
package mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author admin
* k2 类型text
* v2 long 实参1
* k3
* v3
*/
public class WcReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* k2 text类型
* v2s 迭代器 存放字符出现的次数
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context)
throws IOException, InterruptedException {
//接收数据
Text k3 = k2;
//定义一个计数器
long conter = 0;
//循环迭代v2s
for(LongWritable i:v2s){
conter +=i.get();
}
//截下来输出
context.write(k3,new LongWritable(conter));
}
}
// 组装任务
package mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//将自己定义的map与reduce组装起来
//将MapReduce作业抽象成job对象,以后提交job对象就可以
Job job = Job.getInstance(new Configuration());
//注意:main 方法所在的类
job.setJarByClass(WordCount.class);
//通过job对象将map与reduce组装起来
//设置mapper
job.setMapperClass(WcMapper.class);
//设置mapper输出的key类型
job.setMapOutputKeyClass(Text.class);
//设置mapper输出的value类型
job.setMapOutputValueClass(LongWritable.class);
//设置mapper的输入数据 hdfs路径 设置输入的路径 可以输入多个
FileInputFormat.setInputPaths(job, new Path("/words.txt"));
//设置reduce
job.setReducerClass(WcReducer.class);
//设置reduce输出路径 map 或者reduce的输出都可以指定
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置输出的路径
FileOutputFormat.setOutputPath(job, new Path("/wcOunt"));
//提交作业 true打印作业详情 false 不打印作业详情
job.waitForCompletion(true);
//如果安装了eclipse 的hadoop插件是可以提交任务的 但是如果没有安装插件 可以打成jar包 执行
}
}
打好jar文件之后
在Client运行hadoop -jar 实际上是运行MapReduce的main方法,要构建一个job, 然后job将jar提交到hadoop集群。
- job里面还持有一个jobClient,任务先给jobClient,jobClient持有rm的代理对象与rm进行通信,rm先给jobclient一个jobid 又给jobclient一个存放jar的位置。
- jobclient将返jar要存放的位置作为前缀,jobid作为后缀拼接起来作为路径,将jar文件存到hdfs中。(通过FileSystem写十份)
将作业提交给rm,提交作业的描述信息(作业id,jar位置等等)
rm接收到消息之后,将信息初始化,然后将作业存放到调度器里面。
决定启动多少mapper 与多少reducer(由数据量决定)
nm通过心跳机制向rm领取任务
nm领取到任务之后到hdfs中下载jar文件
下载完jar之后,nm启动另外的java进程,在进程中之后map或reduce
4. MapReduce中的角色
- jobClient:提交作业
- RM(ResourceManager):初始化作业、分配作业,nm与其进行通信,协调监控整个作业
- NM(NodeManager):定期与rm通信,执行map和reduce任务
- Hdfs:保存作业的数据、配置、jar包、结果。
5. 序列化概念
- 序列化(Serialization)是指把结构化对象转换为字节流。
- 反序列化(Deserizlization)是序列化的逆过程。即把字节流转回结构化对象
- Java序列化(java.io.Serializable)
5.1 hadoop序列化的特点
序列化格式特点
- 紧凑:高效使用存储空间
- 快速:读写数据的额外开销小
- 可扩展:可透明地读取老格式的数据
- 互操作:支持多语言的交互
hadoop的序列化格式 Writable
5.2 hadoop序列化的作用
- 序列化在分布式环境中的两大作用:进程间通信,永久储存。
- hadoop节点之间通信
5.3 常用的Writable实现类
Text一般认为它等价于Java.lang.String的Writable。针对UTF-8序列。
例:Text t = new Text(“text”);
IntWritable one = new intWritable(1);
5.4 Writable接口
Writable接口,是根据DataInput 和 DataOutput 实现的简单,有效的序列化对象。
MR的任意key和value必须实现Writable接口
MR的任意key必须实现WritableComparable接口
java基本类型 | Writable | 序列化大小(字节) |
---|---|---|
boolean | BooleanWritable | 1 |
byte | ByteWritable | 1 |
int | IntWritable | 4 |
vintWritable | 1~5 | |
float | FloatWritable | 4 |
long | LongWritable | 9 |
vlongWritable | 1~9 | |
double | DoubleWritable | 8 |
5.5 Writable
- write是把每个对象序列化到输出流
- readFields是把输入流字节反序列化
5.6 InputSplit
- 在执行MapReduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录
- FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分
- 如果一个文件的大小比block小,将不会划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
- 当Hadoop处理很多小文件(文件大小小于HDFS block大小)的时候,由于FileInputFormat 不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率低下。
- 例如:一个G的文件,将会划分成8个128MB的split,并分配成8个map任务处理,而10000个100kb的文件会被10000个map任务处理。
5.7 hadoop自定义对象序列化
编写mapreduce 统计手机在一定时间内的上网流量
- 数据格式
序号 | 字段 | 字段类型 | 描述信息 |
---|---|---|---|
0 | reportTime | long | 记录报告时间戳 |
1 | msisdn | String | 手机号码 |
2 | apmac | String | Ap mac 手机地址 |
3 | acmac | String | Ac mac 运营商ip |
4 | host | String | 访问的网址 |
5 | siteType | String | 网站种类 |
6 | upPackNum | long | 上行数据包,单位:个 |
7 | downPackNum | long | 下行数据包,单位:个 |
8 | upPayLoad | long | 上行总流量。要注意单位的转换,单位,byte |
9 | downPayLoad | long | 下行总流量。要注意单位的转换,单位,byte |
10 | httpStatus | String | HTTP Response的状态 |
- 配置依赖
<dependency>
<groupId>org.apathe.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apathe.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.2.0</version>
</dependency>
package com.dc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DataCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//编写入口程序
//将MapReduce作业抽象成job对象,以后提交job对象就可以
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
//当满足k2 v2 与k3 v3的类型一一对应的时候可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
//这里为了节省代码我门写成内部类
//首先 定义类 继承mapper 确定泛型
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
//重写map方法
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//接收数据
String line = value.toString();
//切分数据
String files[] = line.split("/t");
String telNo = files[1];
Long up = Long.parseLong(files[8]);
Long down = Long.parseLong(files[9]);
//每次执行map都要new新的对象 影响效率
DataBean db = new DataBean(telNo, up, down);
context.write(new Text(telNo), db);
//数据清洗
}
}
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text key, Iterable<DataBean> v2s,Context context)
throws IOException, InterruptedException {
//上行流量总和
long up_sum = 0;
//下行流量总和
long down_sum = 0;
for(DataBean bean:v2s){
//计算上行流量和
up_sum += bean.getUpPayLoad();
//计算下行流量和
up_sum += bean.getDownPayLoad();
}
//创建对象
DataBean db = new DataBean("", up_sum, down_sum);
//输出对象
context.write(key, db);
}
}
}
package com.dc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataBean implements Writable {
//手机号
private String telNo;
//上行流量
private long upPayLoad;
//下行流量
private long downPayLoad;
//总流量
private long totalPayLoad;
//有参构造方法
public DataBean(String telNo, long upPayLoad, long downPayLoad) {
super();
this.telNo = telNo;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad+downPayLoad;
}
//无参构造方法
public DataBean() {
}
//序列化
public void write(DataOutput out) throws IOException {
// 将成员变量序列化
out.writeUTF(telNo);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
//反序列化
public void readFields(DataInput in) throws IOException {
// 将成员变量反序列化
this.telNo = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
@Override
public String toString() {
return this.upPayLoad+"/"+this.downPayLoad+"/"+this.totalPayLoad;
};
public String getTelNo() {
return telNo;
}
public void setTelNo(String telNo) {
this.telNo = telNo;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
6. Partitioner编程
- Partitioner是Partitioner的基类,如果需要指定Partitioner也需要继承该类。
- HashPartitioner是MapReduce的默认Partitioner,计算方法是
Which reduce =(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到房前的目的reduce。
- 例子以jar形式运行(代码将手机号码按照指定运营商写到不同的reduce)其中databean同上
当设置的reducer数量 小于指定的reducer数量的时候会报错
当设置的reducer数量 大雨指定的reducer数量的时候不报错,但是后面几个结果为空
Map task 的并发数量是由切片的数量决定的,有多少个切片,就启动多少个map task,切片是一个逻辑概念,指的是文件中的偏移量范围,切片的具体大小应该根据所处理的文件大小调整的。
package com.dc;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DataCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//编写入口程序
//将MapReduce作业抽象成job对象,以后提交job对象就可以
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
//当满足k2 v2 与k3 v3的类型一一对应的时候可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setPartitionerClass(ProviderPartitioner.class);
//设置reducer的数量参数
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
//这里为了节省代码我门写成内部类
//首先 定义类 继承mapper 确定泛型
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
//重写map方法
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//接收数据
String line = value.toString();
//切分数据
String files[] = line.split("/t");
String telNo = files[1];
Long up = Long.parseLong(files[8]);
Long down = Long.parseLong(files[9]);
//每次执行map都要new新的对象 影响效率
DataBean db = new DataBean(telNo, up, down);
context.write(new Text(telNo), db);
//数据清洗
}
}
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text key, Iterable<DataBean> v2s,Context context)
throws IOException, InterruptedException {
//上行流量总和
long up_sum = 0;
//下行流量总和
long down_sum = 0;
for(DataBean bean:v2s){
//计算上行流量和
up_sum += bean.getUpPayLoad();
//计算下行流量和
up_sum += bean.getDownPayLoad();
}
//创建对象
DataBean db = new DataBean("", up_sum, down_sum);
//输出对象
context.write(key, db);
}
}
//创建partitioner 类
public static class ProviderPartitioner extends Partitioner<Text, DataBean>{
private static Map<String,Integer> providerMap = new HashMap<String,Integer>();
static{
providerMap.put("135", 1);
providerMap.put("138", 1);
providerMap.put("150", 2);
providerMap.put("159", 2);
providerMap.put("182", 3);
providerMap.put("183", 3);
}
//重写方法
//return int为分区号
@Override
public int getPartition(Text key, DataBean value, int arg2) {
//获得key
String account = key.toString();
//截取前三位
String sub_acc = account.substring(0, 3);
Integer code = providerMap.get(sub_acc);
if(code==null){
code = 0;
}
return code;
}
}
}
- 代码示例
对商家收入支出排序 按照收入高的排在前面 收入相同 按支出少的在前
将要排序的对象作为k2即可排序
如果是long 按照自然数
如果是text按照字典顺序
如果是bean按照自定义顺序
// 首先 先计算总的收入与支出
package com.data;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SumStep {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//将刚刚写的MapReduce 组装起来
Configuration con = new Configuration();
Job job = Job.getInstance(con);
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
//设置map读取数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交作业
job.waitForCompletion(true);
}
//重写mapper
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
private Text k = new Text();
private InfoBean v = new InfoBean();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String files[] = line.split("\t");
String account = files[0];
double in = Double.parseDouble(files[1]);
double out = Double.parseDouble(files[2]);
//set key
k.set(account);
//set value
v.set(account, in, out);
context.write(k, v);
}
}
//
public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
private InfoBean v = new InfoBean();
@Override
protected void reduce(Text key, Iterable<InfoBean> values,Context context)
throws IOException, InterruptedException {
double in_sum = 0;
double out_sum = 0;
for(InfoBean bean:values){
in_sum +=bean.getIncome();
out_sum +=bean.getExpenses();
}
v.set("", in_sum, out_sum);
context.write(key, v);
}
}
}
// 按照定制规则排序
package com.data;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortStep {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//将刚刚写的MapReduce 组装起来
Configuration con = new Configuration();
Job job = Job.getInstance(con);
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);
//设置map读取数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交作业
job.waitForCompletion(true);
}
public static class SortMapper extends Mapper<LongWritable, Text,InfoBean, NullWritable>{
private InfoBean k = new InfoBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String files[] = line.split("\t");
String account = files[0];
double in = Double.parseDouble(files[1]);
double out = Double.parseDouble(files[2]);
k.set(account, in, out);
context.write(k, NullWritable.get());
}
}
//
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k = new Text();
@Override
protected void reduce(InfoBean bean, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
String accten = bean.getAccount();
k.set(accten);
context.write(k, bean);
}
}
}
// Bean 对象
package com.data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class InfoBean implements WritableComparable<InfoBean> {
private String account;
//收入
private double income;
//支出
private double expenses;
//结余
private double surplus;
public void set(String account,double income,double expenses){
this.account = account;
this.income = income;
this.expenses = expenses;
this.surplus = income - expenses;
}
//反序列化
public void readFields(DataInput in) throws IOException {
this.account = in.readUTF();
//会根据长度从流里面读取
this.income = in.readDouble();
this.expenses = in.readDouble();
this.surplus = in.readDouble();
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeUTF(account);
out.writeDouble(income);
out.writeDouble(expenses);
out.writeDouble(surplus);
}
//制订排序规则
public int compareTo(InfoBean ib) {
//收入相等的情况下
if(this.income == ib.getIncome()){
//支出少的在前面
return this.expenses > ib.getExpenses() ? 1 : -1;
}else{//收入不相等的情况下
//收入收入高的在前面
return this.income>ib.getIncome() ? -1 : 1;
}
}
@Override
public String toString() {
return this.income+"\t"+this.expenses+"\t"+this.surplus;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public double getIncome() {
return income;
}
public void setIncome(double income) {
this.income = income;
}
public double getExpenses() {
return expenses;
}
public void setExpenses(double expenses) {
this.expenses = expenses;
}
public double getSurplus() {
return surplus;
}
public void setSurplus(double surplus) {
this.surplus = surplus;
}
}
7. Combiners编程
- 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,减少输出到reduce的数据量
- combiners最基本是实现本地key的递归,combiner具有类似本地的reduce功能。
- 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下,使用Combiner,先完成的map会在本地聚合,提升速度。
- 注意:Combiner的输出是Reduce的输入,结果Combiner是可插拔的,添加Combiner绝不能该不变最终的计算结果,所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景,比如累加、最大值等。
- map的输出是Combiner的输入,Combiner的输出是Reduce的输入
package mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//将自己定义的map与reduce组装起来
//将MapReduce作业抽象成job对象,以后提交job对象就可以
Job job = Job.getInstance(new Configuration());
//注意:main 方法所在的类
job.setJarByClass(WordCount.class);
//通过job对象将map与reduce组装起来
//设置mapper
job.setMapperClass(WcMapper.class);
//设置mapper输出的key类型
job.setMapOutputKeyClass(Text.class);
//设置mapper输出的value类型
job.setMapOutputValueClass(LongWritable.class);
//设置mapper的输入数据 hdfs路径 设置输入的路径 可以输入多个
FileInputFormat.setInputPaths(job, new Path("/words.txt"));
//设置reduce
job.setReducerClass(WcReducer.class);
//设置reduce输出路径 map 或者reduce的输出都可以指定
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置输出的路径
FileOutputFormat.setOutputPath(job, new Path("/wcOunt"));
//添加一个Combiner 这里我们就将reducer 作为Combiner
job.setCombinerClass(WcReducer.class);
//提交作业 true打印作业详情 false 不打印作业详情
job.waitForCompletion(true);
//如果安装了eclipse 的hadoop插件是可以提交任务的 但是如果没有安装插件 可以打成jar包 执行
}
/**
*
* @author admin
* k2 类型text
* v2 long 实参1
* k3
* v3
*/
public static class WcReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* k2 text类型
* v2s 迭代器 存放字符出现的次数
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context)
throws IOException, InterruptedException {
//接收数据
Text k3 = k2;
//定义一个计数器
long conter = 0;
//循环迭代v2s
for(LongWritable i:v2s){
conter +=i.get();
}
//截下来输出
context.write(k3,new LongWritable(conter));
}
}
/**
*
* @author hnbian
* k1 字符偏移量
* v1 一行字符
* k2 字符 串
* v2 出现次数
* LongWritable hadoop对Long实现的序列化方法
* Text hadoop对String实现的序列化方法
*/
public static class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
//重写父类的map方法
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//接收数据 v1
String line = value.toString();
//切分数据
String words[] = line.split(" ");
//循环words
for(String w:words){
//循环出现一次,记一个一,输出
context.write(new Text(w), new LongWritable(1));
}
}
}
}
8. Shuffle
一个输入切片对应一个mapper,
一个mapper读取文件的一部分,每一个mapper都有一个环形缓冲区(环形缓冲区在内存中)
环形缓冲区用来存储map的输出,如果内存中的数据满了(默认100M),
当map向内存中写的数据达到一定阀值(80%),后台进程便将数据溢写到磁盘,
(首先对数据进行分区)Partitioner 分区(默认分区hashPartitioner)
- 对每个分区里面的数据进行排序(按照分区号)
- 在每一个分区里面的数据再进行排序(按照k2的规则)
mapper向环形缓冲区速度快,内存向磁盘写数据慢,当内存写满之后Mapper堵塞,直到内存中的数据写入到小文件全部写完,内存写到下一个小文件。
会产生很多分区且排序的小文件
小文件还会合并成大文件
相同分区号的文件进行合并(0号分区的数据与0号分区的数据进行合并。。。)合并之后再进行一次排序,得到分区切排序的大文件
mapper的任务完成了
接下来到reducer
reducer获取mapper的数据
reducer启动之初就对其进行编号,在获取数据的时候按照(0号reducer取0号分区里面的数据。。。)来进行
8.1 Shuffer 代码示例(倒排索引)
倒排索引过程
Map阶段
<0,”hello tom”>
….
context.write(“hello->a.txt”,1);
context.write(“hello->a.txt”,1);
context.write(“hello->a.txt”,1);
context.write(“hello->a.txt”,1);
context.write(“hello->a.txt”,1);
context.write(“hello->b.txt”,1);
context.write(“hello->b.txt”,1);
context.write(“hello->b.txt”,1);
-——————————————————-
combiner阶段
<”hello->a.txt”,1>
<”hello->a.txt”,1>
<”hello->a.txt”,1>
<”hello->a.txt”,1>
<”hello->a.txt”,1>
<”hello->b.txt”,1>
<”hello->b.txt”,1>
<”hello->b.txt”,1>
context.write(“hello”,”a.txt->5”);
context.write(“hello”,”b.txt->3”);
-——————————————————-
Reducer阶段
<”hello”,{“a.txt->5”,”b.txt->3”}>
context.write(“hello”,”a.txt->5 b.txt->3”);
-——————————————————
hello “a.txt->5 b.txt->3”
tom “a.txt->2 b.txt->1”
kitty “a.txt->1”
…….
package com.ii;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InverseIndex {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//将刚刚写的MapReduce 组装起来
Configuration con = new Configuration();
Job job = Job.getInstance(con);
job.setJarByClass(InverseIndex.class);
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置map读取数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置Combiner class
job.setCombinerClass(IndexCombinner.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交作业
job.waitForCompletion(true);
}
//定义mappeer
public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
//覆盖map方法
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String words[] = line.split(" ");
//定义切片 用于获得文件名
FileSplit is = (FileSplit) context.getInputSplit();
String path = is.getPath().toString();//获得路径
for(String w:words){
k.set(w+"->"+path);
v.set("1");;
context.write(k, v);
}
}
}
public static class IndexCombinner extends Reducer<Text, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
String wordAndPath [] = key.toString().split("->");
String word = wordAndPath[0];
String path = wordAndPath[1];
int counter = 0;
for(Text t:values){
counter += Integer.parseInt(t.toString());
}
k.set(word);
v.set(path+"->"+counter);
context.write(k,v);
}
}
//定义reducer
public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
String result = "";
for(Text t:values){
result +=t.toString()+"\t";
}
v.set(result);
context.write(key, v);
}
}
}
切片对应mapper的个数
决定切片大小的要因 一般一个切片对应一个块 一个块对应一个mapper