MapReduce 常用算法


MapReduce 常用算法有排序、去重、过滤、TopN、单表关联和多表关联。下面介绍一下这些算法的简单实现示例。

1.排序

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
    /**
     * hadoop默认排序:  
     * 如果k2、v2类型是Text-文本,结果是按照字典顺序 
     * 如果k2、v2类型是LongWritable-数字,结果是按照数字大小顺序
     */
public class SortTest { 
    /** 
     * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 
     */
    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 
        /** 
         * 重写map方法 
         */
        public void map(LongWritable k1, Text v1, Context context) 
          throws IOException, InterruptedException { 
            //这里v1转为k2-数字类型,舍弃k1。null为v2             
            context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get()); 
            //因为v1可能重复,这时,k2也是可能有重复的         
        } 
    } 

    /** 
     * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 
     */
    public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 
        /** 
         * 重写reduce方法 
          * 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]  
         */
        protected void reduce(
          LongWritable k2, Iterable<NullWritable> v2, 
          Reducer<LongWritable, Context context
        ) throws IOException, InterruptedException { 
            //k2=>k3, v2[...]舍弃。null => v3             
            context.write(k2, NullWritable.get()); 
                        //此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3          
            } 
    }

    public static void main(String[] args) throws Exception { 
        // 声明配置信息         
        Configuration conf = new Configuration(); 
        conf.set("fs.default.name", "hdfs://localhost:9000"); 
        // 创建作业        
        Job job = new Job(conf, "SortTest"); 
        job.setJarByClass(SortTest.class); 
        // 设置mr         
        job.setMapperClass(MyMapper.class); 
        job.setReducerClass(MyReducer.class); 
        // 设置输出类型,和Context上下文对象write的参数类型一致         
        job.setOutputKeyClass(LongWritable.class); 
        job.setOutputValueClass(NullWritable.class); 
        // 设置输入输出路径         
        FileInputFormat.setInputPaths(job, new Path("/import/")); 
        FileOutputFormat.setOutputPath(job, new Path("/out")); 
        // 执行         
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
}

2.去重

  • 需求

查取手机号有哪些。这里的思路和排序算法的思路是一致的,仅仅多了分割出手机号这一步骤。

  • 数据

创建数据文件,手动输入一些测试内容,每个字段用制表符隔开。数据样例如下:

日期 电话 地址 方式 数据量

2016-02-23 20:17:35 13222229999 北京 wap 123456

2016-02-23 20:17:35 13222229999 北京 wap 123456

2016-02-23 20:17:35 13388887777 上海 wap 222222

2016-02-23 20:17:35 13388887777 上海 wap 222222

/** 
     * 映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { 
        /** 
         * 重写map方法 
         */
        protected void map(LongWritable k1, Text v1, Context context) 
                  throws IOException ,InterruptedException { 
          //按照制表符进行分割             
          String[] tels = v1.toString().split("\t"); 
          //k1 => k2-第2列手机号,null => v2             
          context.write(new Text(tels[1]), NullWritable.get()); 
        } 
    } 

    /************************************************************ 
     *  在map后,reduce前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]  
     ***********************************************************/

    /** 
     * 拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 
     */
    public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 
        /** 
         * 重写reduce方法 
         */
        protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) 
          throws IOException ,InterruptedException { 
            //此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3,达到去重到效果             
          context.write(k2, NullWritable.get()); 
        } 
    }

// 设置输出类型,和Context上下文对象write的参数类型一致 

job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NullWritable.class);

3.过滤

  • 需求:

查询在北京地区发生的上网记录。思路同上,当写出 k2 、 v2 时加一个判断即可。

/** 
 * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 
 */
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { 
  /** 
   * 重写map方法 
   */
  protected void map(LongWritable k1, Text v1, Context context) 
    throws IOException ,InterruptedException { 
    //按照制表符进行分割             
    final String[] adds = v1.toString().split("\t"); 
    //地址在第3列             //k1 => k2-地址,null => v2             
    if(adds[2].equals("北京")){ 
      context.write(new Text(v1.toString()), NullWritable.get()); 
    } 
  } 
} 

/** 
 * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 
 */
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 
  /** 
   * 重写reduce方法 
   */
  protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) 
    throws IOException ,InterruptedException { 
    context.write(k2, NullWritable.get()); 
  }
}

// 设置输出类型,和Context上下文对象write的参数类型一致 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NullWritable.class);

4.TopN

  • 需求:

找到流量最大值的前5个值

//map     
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 

    //首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808         
        long temp = Long.MIN_VALUE; 

    //找出最大值         
        protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { 
    //按照制表符进行分割             
        final String[] flows = v1.toString().split("\t"); 
    //将文本转数值             
        final long val = Long.parseLong(flows[4]); 
    //如果v1比临时变量大,则保存v1的值             
        if(temp<val){ 
                temp = val; 
            } 
    } 

        /** 此方法在全部的map任务结束后执行一次。这时仅输出临时变量到最大值 */
        protected void cleanup(Context context) throws IOException ,InterruptedException { 
            context.write(new LongWritable(temp), NullWritable.get()); 
            System.out.println("文件读取完毕"); 
        }
}

//reduce     
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 
    //临时变量         
        Long temp = Long.MIN_VALUE; 

    //因为一个文件得到一个最大值,再次将这些值比对,得到最大的         
        protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) 
      throws IOException ,InterruptedException { 
       long long1 = Long.parseLong(k2.toString()); 
       //如果k2比临时变量大,则保存k2的值             
             if(temp<long1){ 
                temp = long1; 
            } 
        } 

        /** 此方法在全部的reduce任务结束后执行一次。这时仅输出临时变量到最大值 */
        protected void cleanup(Context context) throws IOException, InterruptedException { 
            context.write(new LongWritable(temp), NullWritable.get()); 
        } 

}

// 设置输出类型 
job.setOutputKeyClass(LongWritable.class); 
job.setOutputValueClass(NullWritable.class);

5.单表关联

  • 需求:

将同一个文本中有关联的数据拿出来

  • 提供的数据

    张三 张二

    张二 张一

    李三 李二

    李二 李一

  • 希望得到的数据

    张三 张一

    李三 李一

//map     
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { 
        //拆分原始数据         
    protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { 
        //按制表符拆分记录             
        String[] splits = v1.toString().split("\t"); 
        //一条k2v2记录:把孙辈作为k2;祖辈加下划线区分,作为v2             
        context.write(new Text(splits[0]), new Text("_"+splits[1])); 
        //一条k2v2记录:把祖辈作为k2;孙辈作为v2。就是把原两个单词调换位置保存             
        context.write(new Text(splits[1]), new Text(splits[0])); 
    }
}

//reduce     
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
    // 拆分k2v2[...]数据
    protected void reduce(Text k2, Iterable<Text> v2, Context context)
            throws IOException, InterruptedException {
        String grandchild = ""; // 孙辈
        String grandfather = ""; // 祖辈

        // 从迭代中遍历v2[...]
        for (Text man : v2) {
            String p = man.toString();
            // 如果单词是以下划线开始的
            if (p.startsWith("_")) {
                // 从索引1开始截取字符串,保存到祖辈变量
                grandfather = p.substring(1);
            }
            // 如果单词没有下划线起始
            else {
                // 直接赋值给孙辈变量
                grandchild = p;
            }
        }

        // 在得到有效数据的情况下
        if (grandchild != "" && grandfather != "") {
            // 写出得到的结果。
            context.write(new Text(grandchild), new Text(grandfather));
        }
    }
}

// 设置输出类型 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class);

6.多表关联

  • 需求:

将同一个文本中有关联的数据拿出来

  • 提供的数据

    • 数据 1

      小明 1

      小红 2

    • 数据 2

      1 山东

      2 河北

  • 希望得到的数据

    小明 山东

    小红 河北

//map     
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { 
  //拆分原始数据         
    protected void map(LongWritable k1, Text v1, Context context) 
            throws IOException ,InterruptedException { 
    //拆分记录             
        String[] splited = v1.toString().split("\t"); 
    //如果第一列是数字(使用正则判断),就是地址表            
        if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){ 
                String addreId = splited[0]; 
                String address = splited[1]; 
                //k2,v2-加两条下划线作为前缀标识为地址                 
                context.write(new Text(addreId), new Text("__"+address)); 
            } 
        else{ //否则就是人员表  
                String personId = splited[1]; 
                String persName = splited[0]; 
                //k2,v2-加两条横线作为前缀标识为人员                
                context.write(new Text(personId), new Text("--"+persName)); 
            } 
   } 
} 

//reduce     
public static class MyReducer extends Reducer<Text, Text, Text, Text> { 
        //拆分k2v2[...]数据         
    protected void reduce(Text k2, Iterable<Text> v2, Context context) 
    throws IOException ,InterruptedException { 
            String address = "";    //地址             
            String person = "";     //人员            
            //迭代的是address或者person             
            for (Text text : v2) { 
                String tmp = text.toString(); 

                if(tmp.startsWith("__")){ 
                    //如果是__开头的是address                     
                    address = tmp.substring(2); //从索引2开始截取字符串                 } 
                if(tmp.startsWith("--")){ 
                    //如果是--开头的是person                     
                    person = tmp.substring(2); 
                } 
            } 
                context.write(new Text(person), new Text(address)); 
        } 
}

job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class);

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
使用 Java 调用 Sqoop 示例 使用 Java 调用 Sqoop 示例
1. 介绍Sqoop主要用于在HADOOP(Hive、hdfs、hbase)与传统的数据库(mysql、postgresql、db2、oracle…)间进行数据的传递。 但为了更方便的控制数据的导入,且sqoop是apache下的JAVA开
2016-02-25
下一篇 
使用Sqoop 对HDFS与Mysql数据导入导出示例 使用Sqoop 对HDFS与Mysql数据导入导出示例
Sqoop是一款数据迁移工具,将关系型数据库数据库与hdfs里面的数据相互迁移的工具。在Hadoop之上,将自己的语法转换成MapReduce来并行读取数据库里面的数据,将分析之后的数据保存在hdfs。 1.mysql –> hdfs
2016-02-23
  目录