利用MapReduce进行全排序

    本文就要是介绍三种使用MapReduce方法进行全排序,并比较三种方法的优缺点。

生成测试数据

1
2
3
4
#!/bin/sh
for i in {1…100000};do
echo $RANDOM
done;

    $RANDOMshell内置的变量,生成五位内的随机数。将上述代码保存到iterator.sh文件,并执行下面两个命令,生成两个数据文件,文件内每一行数据便是一个随机数。

1
2
$ sh iteblog.sh > data1
$ sh iteblog.sh > data2

方法一:使用一个Reduce进行全排序

    MapReduce默认就是保证同一个分区内的key有序,但不能保证全局有序,故将所有数据都交给一个Reduce处理,结果就是全部有序。
存在问题:数据量过大时,就会出现OOM问题,也没有体现MapReduce“分而治之”的思想。

代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class TatalSort1 implements Tool {
private Configuration conf = new Configuration();
public static void main(String[] args) {
try {
ToolRunner.run(new Configuration(), new TatalSort1(), args);
} catch (Exception e) {
throw new RuntimeException();
}
}
@Override
public int run(String[] args) throws Exception {
// 配置
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "TotalSort");
// 设置输入输出路径
Path inpath = new Path("input");
FileInputFormat.addInputPath(job, inpath);
Path outpath = new Path("output");
if (outpath.getFileSystem(conf).exists(outpath)) {
outpath.getFileSystem(conf).delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
// 设置 mapper端的输入格式,默认是TextInputFormat
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 设置mapper端
job.setMapperClass(TotalSort1Mapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reduce端
job.setReducerClass(TotalSort1Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : -1;
}
@Override
public void setConf(Configuration conf) {
// 设置跨平台提交
conf.set("mapreduce.app-submission.cross-platform", "true");
// 切换分布式到本地单进程模拟运行的配置,这种方式不是分布式运行,故不用打jar包
conf.set("mapreduce.framework.name", "local");
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
private static class TotalSort1Mapper extends Mapper<Text, Text, IntWritable, IntWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
IntWritable val = new IntWritable(Integer.parseInt(key.toString()));
context.write(val, val);
}
}
private static class TotalSort1Reduce extends Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable val : values){
context.write(key, NullWritable.get());
}
}
}
}

方法二:自定义分区函数实现全排序

    Partitioner的作用的对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,Partitioner直接影响Reduce阶段的负载均衡。设置多个分区,key0 ~ 10000的数据发送到Reduce 0处理;在10001~20000的数据发送到Reduce 1处理,剩下的发送到Reduce 2进行处理。每一个Reduce处理的结果有序,从而使整个序列有序。
    与方法一相比,提高了资源的利用率,但当,key在某一个区间内的数据远远大于其他区间的数据时,就会发生数据倾斜的问题。代码如下:

代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class TotalSort2 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 配置
Configuration conf = new Configuration();
// 设置跨平台提交
conf.set("mapreduce.app-submission.cross-platform", "true");
// 切换分布式到本地单进程模拟运行的配置,这种方式不是分布式运行,故不用打jar包
conf.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(conf, "TotalSort");
// 设置输入输出路径
Path input = new Path("input");
FileInputFormat.addInputPath(job, input);
Path output = new Path("output");
if (output.getFileSystem(conf).exists(output)) {
output.getFileSystem(conf).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
// 设置输入路径
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 设置 mapper 端
job.setMapperClass(TotalSort2Mapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reduce端
job.setNumReduceTasks(3);
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(TotalSort2Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);

System.out.println(job.waitForCompletion(true) ? 0 : -1);

}

private static class MyPartitioner extends Partitioner<IntWritable, IntWritable> {

@Override
public int getPartition(IntWritable key, IntWritable val, int numPartitions) {
//int tmp = Integer.parseInt(key.toString());
int tmp = key.get();
if (tmp >= 0 && tmp <= 10000) {
return 0;
} else if (tmp > 10000 && tmp <= 20000) {
return 1;
} else {
return 2;
}
}
}

private static class TotalSort2Mapper extends Mapper<Text, Text, IntWritable, IntWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
IntWritable val = new IntWritable(Integer.parseInt(key.toString()));
context.write(val, val);
}
}

private static class TotalSort2Reduce extends Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
context.write(value, NullWritable.get());
}
}
}
}

方法三:使用TotalOrderPartioner进行全排序

    Hadoop默认分区实现类为HashPartitionerTotalOrderPartioner也是Hadoop的内置分区实现类,主要用于解决全排序问题。TotalOrderPartitioner能够按照key大小将数据分成若干个区间(分片),并保证后一个区间的所有数据均大于前一个区间的所有数据。分区的分界点由程序决定(比较均匀),方法二的分界点是有人指定的。
全排序的过程:
    1. 数据采样。在客户端对待排序的数据进行采样,并对抽样的数据进行排序,产生分割点。Hadoop内置几种采样方法:IntercalSamplessplit里按照一定间隔取样,通常适用于有序数据,效率较随机数据采样器要好一些 ;RandomSample按一定的频率对所有数据做随机采样,效率很低 ;SplitSimplerssplit里选取前n个记录,效率最高。举例说明:
    采样数据为: b, abc, abd, bcd, abcd, efg, hii, afd, rrr, mnk
    经排序后获得: abc, abcd, abd, afd, b, bcd, efg, hii, mnk, rrr
    若有3个Reduce Task,则采样数据的三等分点 afd, hii 将按照这两个字符串做分割点。
    2. Map端对输入的每条数据计算其处在哪两个分割点之间,并对两个分割点之间的数据进行局部排序,将排序后的数据发送到对应ID的Reduce
PS:如果key(分割点) 的类型是 BinaryComparableBytesWritableText ),并且 mapreduce.totalorderpartitioner.naturalorder 属性的指是 true ,则会构建trie 树,时间复杂度为O(h)h为树的深度;其他情况会构建一个 BinarySearchNode,用二分查找,时间复杂度为O(log t)tReduce Task的个数。

在计算机科学中,trie,又称前缀树或字典树,是一种有序树,用于保存关联数组,其中的键通常是字符串。与二叉查找树不同,键不是直接保存在节点中,而是由节点在树中的位置决定。一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串,而根节点对应空字符串。一般情况下,不是所有的节点都有对应的值,只有叶子节点和部分内部节点所对应的键才有相关的值。
trie树
假设输入数据汇总的两个字符串“abg”和“mnz”,则字符串“abg”对应partition1;“mnz”对应partition 3。

    3.Reduce将获得数据直接输出。

代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class TotalSort3 implements Tool {
private Configuration conf = new Configuration();

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new TotalSort3(), args);
}

@Override
public int run(String[] args) throws Exception {
// 配置
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "TotalSort");
// 设置输入输出路径
Path input = new Path("input");
FileInputFormat.addInputPath(job, input);
Path output = new Path("output");
if (output.getFileSystem(conf).exists(output)) {
output.getFileSystem(conf).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
// 设置输入格式类
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 设置 mapper 端
job.setMapperClass(TotalSort3Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置自定义比较器
job.setSortComparatorClass(KeyComparator.class);
job.setNumReduceTasks(3);
Path partitions = new Path("partitions/_partitions");
// 设置采样器
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitions);
// 三个参数:第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数
InputSampler.Sampler<Text,Text> sampler = new InputSampler.RandomSampler<>(0.1, 1000, 10);
InputSampler.writePartitionFile(job, sampler);
job.setPartitionerClass(TotalOrderPartitioner.class);
// 设置reduce端
job.setReducerClass(TotalSort3Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);

return job.waitForCompletion(true) ? 0 : -1;
}

@Override
public void setConf(Configuration conf) {
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.framework.name", "local");
// "mapreduce.totalorderpartitioner.naturalorder" 属性为 true,则会构建trie 树,便于后面的查找;其他情况会构建一个 BinarySearchNode,用二分查找
conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
this.conf = conf;
}

@Override
public Configuration getConf() {
return this.conf;
}
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(Text.class, true);
}

@Override
public int compare(WritableComparable w1, WritableComparable w2) {
int v1 = Integer.parseInt(w1.toString());
int v2 = Integer.parseInt(w2.toString());

return v1 - v2;
}
}
private static class TotalSort3Mapper extends Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
IntWritable val = new IntWritable(Integer.parseInt(key.toString()));
context.write(key, val);
}
}
private static class TotalSort3Reduce extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
context.write(value, NullWritable.get());
}
}
}
}

总结

  1. Map端输出的key若为Text类型,Map shuffle中的排序便是按照字典序排序;若为IntWritable/LongWritable类型,则按照大小序排序。
  2. KeyValueTextInputFormat将一行数据作为key,是Text类型;默认的TextInputFormat是将每行数据的偏移量作为key,是LongWritable类型。
  3. Hadoop在调用mapreduce类时是通过反射来调用的,如果内部类不是静态的,便没有进行类加载,Hadoop便获取不到内部类的实例,出现如下异常:

    1
    java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: TotalSort2$TotalSort2Mapper.<init>()
  4. 由于InputSampler.writePartitionFile()方法实现的原因,必须要求 map 输入和输出Key的类型一致,否则会出现如下的异常:

    1
    2
    3
    4
    5
    6
    Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable
    at org.apache.hadoop.io.SequenceFile$RecordCompressWriter.append(SequenceFile.java:1448)
    at org.apache.hadoop.mapreduce.lib.partition.InputSampler.writePartitionFile(InputSampler.java:340)
    at TotalSort3.run(TotalSort3.java:57)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at TotalSort3.main(TotalSort3.java:28)