MapReduce 常用算法


MapReduce 常用算法有排序、去重、过滤、TopN、单表关联和多表关联。下面介绍一下这些算法的简单实现示例。

1.排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* hadoop默认排序:
* 如果k2、v2类型是Text-文本,结果是按照字典顺序
* 如果k2、v2类型是LongWritable-数字,结果是按照数字大小顺序
*/
public class SortTest {
/**
* 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
/**
* 重写map方法
*/
public void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//这里v1转为k2-数字类型,舍弃k1。null为v2
context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
//因为v1可能重复,这时,k2也是可能有重复的
}
}

/**
* 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
/**
* 重写reduce方法
* 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]
*/
protected void reduce(
LongWritable k2, Iterable<NullWritable> v2,
Reducer<LongWritable, Context context
) throws IOException, InterruptedException {
//k2=>k3, v2[...]舍弃。null => v3
context.write(k2, NullWritable.get());
//此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3
}
}

public static void main(String[] args) throws Exception {
// 声明配置信息
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
// 创建作业
Job job = new Job(conf, "SortTest");
job.setJarByClass(SortTest.class);
// 设置mr
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置输出类型,和Context上下文对象write的参数类型一致
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("/import/"));
FileOutputFormat.setOutputPath(job, new Path("/out"));
// 执行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2.去重

  • 需求

查取手机号有哪些。这里的思路和排序算法的思路是一致的,仅仅多了分割出手机号这一步骤。

  • 数据

创建数据文件,手动输入一些测试内容,每个字段用制表符隔开。数据样例如下:

日期 电话 地址 方式 数据量

2016-02-23 20:17:35 13222229999 北京 wap 123456

2016-02-23 20:17:35 13222229999 北京 wap 123456

2016-02-23 20:17:35 13388887777 上海 wap 222222

2016-02-23 20:17:35 13388887777 上海 wap 222222

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/** 
* 映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* 重写map方法
*/
protected void map(LongWritable k1, Text v1, Context context)
throws IOException ,InterruptedException {
//按照制表符进行分割
String[] tels = v1.toString().split("\t");
//k1 => k2-第2列手机号,null => v2
context.write(new Text(tels[1]), NullWritable.get());
}
}

/************************************************************
* 在map后,reduce前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]
***********************************************************/

/**
* 拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
/**
* 重写reduce方法
*/
protected void reduce(Text k2, Iterable<NullWritable> v2, Context context)
throws IOException ,InterruptedException {
//此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3,达到去重到效果
context.write(k2, NullWritable.get());
}
}

// 设置输出类型,和Context上下文对象write的参数类型一致

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

3.过滤

  • 需求:

查询在北京地区发生的上网记录。思路同上,当写出 k2 、 v2 时加一个判断即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/** 
* 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* 重写map方法
*/
protected void map(LongWritable k1, Text v1, Context context)
throws IOException ,InterruptedException {
//按照制表符进行分割
final String[] adds = v1.toString().split("\t");
//地址在第3列 //k1 => k2-地址,null => v2
if(adds[2].equals("北京")){
context.write(new Text(v1.toString()), NullWritable.get());
}
}
}

/**
* 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
/**
* 重写reduce方法
*/
protected void reduce(Text k2, Iterable<NullWritable> v2, Context context)
throws IOException ,InterruptedException {
context.write(k2, NullWritable.get());
}
}

// 设置输出类型,和Context上下文对象write的参数类型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

4.TopN

  • 需求:

找到流量最大值的前5个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//map     
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {

//首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808
long temp = Long.MIN_VALUE;

//找出最大值
protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
//按照制表符进行分割
final String[] flows = v1.toString().split("\t");
//将文本转数值
final long val = Long.parseLong(flows[4]);
//如果v1比临时变量大,则保存v1的值
if(temp<val){
temp = val;
}
}

/** 此方法在全部的map任务结束后执行一次。这时仅输出临时变量到最大值 */
protected void cleanup(Context context) throws IOException ,InterruptedException {
context.write(new LongWritable(temp), NullWritable.get());
System.out.println("文件读取完毕");
}
}

//reduce
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
//临时变量
Long temp = Long.MIN_VALUE;

//因为一个文件得到一个最大值,再次将这些值比对,得到最大的
protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context)
throws IOException ,InterruptedException {
long long1 = Long.parseLong(k2.toString());
//如果k2比临时变量大,则保存k2的值
if(temp<long1){
temp = long1;
}
}

/** 此方法在全部的reduce任务结束后执行一次。这时仅输出临时变量到最大值 */
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(temp), NullWritable.get());
}

}

// 设置输出类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);

5.单表关联

  • 需求:

将同一个文本中有关联的数据拿出来

  • 提供的数据

    张三 张二

    张二 张一

    李三 李二

    李二 李一

  • 希望得到的数据

    张三 张一

    李三 李一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//map     
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
//拆分原始数据
protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
//按制表符拆分记录
String[] splits = v1.toString().split("\t");
//一条k2v2记录:把孙辈作为k2;祖辈加下划线区分,作为v2
context.write(new Text(splits[0]), new Text("_"+splits[1]));
//一条k2v2记录:把祖辈作为k2;孙辈作为v2。就是把原两个单词调换位置保存
context.write(new Text(splits[1]), new Text(splits[0]));
}
}

//reduce
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
// 拆分k2v2[...]数据
protected void reduce(Text k2, Iterable<Text> v2, Context context)
throws IOException, InterruptedException {
String grandchild = ""; // 孙辈
String grandfather = ""; // 祖辈

// 从迭代中遍历v2[...]
for (Text man : v2) {
String p = man.toString();
// 如果单词是以下划线开始的
if (p.startsWith("_")) {
// 从索引1开始截取字符串,保存到祖辈变量
grandfather = p.substring(1);
}
// 如果单词没有下划线起始
else {
// 直接赋值给孙辈变量
grandchild = p;
}
}

// 在得到有效数据的情况下
if (grandchild != "" && grandfather != "") {
// 写出得到的结果。
context.write(new Text(grandchild), new Text(grandfather));
}
}
}

// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

6.多表关联

  • 需求:

将同一个文本中有关联的数据拿出来

  • 提供的数据

    • 数据 1

      小明 1

      小红 2

    • 数据 2

      1 山东

      2 河北

  • 希望得到的数据

    小明 山东

    小红 河北

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//map     
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
//拆分原始数据
protected void map(LongWritable k1, Text v1, Context context)
throws IOException ,InterruptedException {
//拆分记录
String[] splited = v1.toString().split("\t");
//如果第一列是数字(使用正则判断),就是地址表
if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){
String addreId = splited[0];
String address = splited[1];
//k2,v2-加两条下划线作为前缀标识为地址
context.write(new Text(addreId), new Text("__"+address));
}
else{ //否则就是人员表
String personId = splited[1];
String persName = splited[0];
//k2,v2-加两条横线作为前缀标识为人员
context.write(new Text(personId), new Text("--"+persName));
}
}
}

//reduce
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
//拆分k2v2[...]数据
protected void reduce(Text k2, Iterable<Text> v2, Context context)
throws IOException ,InterruptedException {
String address = ""; //地址
String person = ""; //人员
//迭代的是address或者person
for (Text text : v2) {
String tmp = text.toString();

if(tmp.startsWith("__")){
//如果是__开头的是address
address = tmp.substring(2); //从索引2开始截取字符串 }
if(tmp.startsWith("--")){
//如果是--开头的是person
person = tmp.substring(2);
}
}
context.write(new Text(person), new Text(address));
}
}

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录