MapReduce 常用算法有排序、去重、过滤、TopN、单表关联和多表关联。下面介绍一下这些算法的简单实现示例。
1.排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortTest {
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get()); } }
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
protected void reduce( LongWritable k2, Iterable<NullWritable> v2, Reducer<LongWritable, Context context ) throws IOException, InterruptedException { context.write(k2, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); Job job = new Job(conf, "SortTest"); job.setJarByClass(SortTest.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/import/")); FileOutputFormat.setOutputPath(job, new Path("/out")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
2.去重
查取手机号有哪些。这里的思路和排序算法的思路是一致的,仅仅多了分割出手机号这一步骤。
创建数据文件,手动输入一些测试内容,每个字段用制表符隔开。数据样例如下:
日期 电话 地址 方式 数据量
2016-02-23 20:17:35 13222229999 北京 wap 123456
2016-02-23 20:17:35 13222229999 北京 wap 123456
2016-02-23 20:17:35 13388887777 上海 wap 222222
2016-02-23 20:17:35 13388887777 上海 wap 222222
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { String[] tels = v1.toString().split("\t"); context.write(new Text(tels[1]), NullWritable.get()); } }
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { context.write(k2, NullWritable.get()); } }
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
|
3.过滤
查询在北京地区发生的上网记录。思路同上,当写出 k2 、 v2 时加一个判断即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { final String[] adds = v1.toString().split("\t"); if(adds[2].equals("北京")){ context.write(new Text(v1.toString()), NullWritable.get()); } } }
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { context.write(k2, NullWritable.get()); } }
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
|
4.TopN
找到流量最大值的前5个值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { long temp = Long.MIN_VALUE; protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { final String[] flows = v1.toString().split("\t"); final long val = Long.parseLong(flows[4]); if(temp<val){ temp = val; } } protected void cleanup(Context context) throws IOException ,InterruptedException { context.write(new LongWritable(temp), NullWritable.get()); System.out.println("文件读取完毕"); } }
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { Long temp = Long.MIN_VALUE; protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { long long1 = Long.parseLong(k2.toString()); if(temp<long1){ temp = long1; } } protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new LongWritable(temp), NullWritable.get()); } }
job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class);
|
5.单表关联
将同一个文本中有关联的数据拿出来
提供的数据
张三 张二
张二 张一
李三 李二
李二 李一
希望得到的数据
张三 张一
李三 李一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { String[] splits = v1.toString().split("\t"); context.write(new Text(splits[0]), new Text("_"+splits[1])); context.write(new Text(splits[1]), new Text(splits[0])); } }
public static class MyReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException { String grandchild = ""; String grandfather = ""; for (Text man : v2) { String p = man.toString(); if (p.startsWith("_")) { grandfather = p.substring(1); } else { grandchild = p; } } if (grandchild != "" && grandfather != "") { context.write(new Text(grandchild), new Text(grandfather)); } } }
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
|
6.多表关联
将同一个文本中有关联的数据拿出来
提供的数据
数据 1
小明 1
小红 2
数据 2
1 山东
2 河北
希望得到的数据
小明 山东
小红 河北
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { String[] splited = v1.toString().split("\t"); if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){ String addreId = splited[0]; String address = splited[1]; context.write(new Text(addreId), new Text("__"+address)); } else{ String personId = splited[1]; String persName = splited[0]; context.write(new Text(personId), new Text("--"+persName)); } } }
public static class MyReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException { String address = ""; String person = ""; for (Text text : v2) { String tmp = text.toString(); if(tmp.startsWith("__")){ address = tmp.substring(2); if(tmp.startsWith("--")){ person = tmp.substring(2); } } context.write(new Text(person), new Text(address)); } }
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
|