Skip to content

Commit 4173e15

Browse files
committed
MAPREDUCE-4574. Fix TotalOrderParitioner to work with non-WritableComparable key types. Contributed by Harsh J. (harsh)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1395936 13f79535-47bb-0310-9956-ffa450edef68
1 parent 472bda2 commit 4173e15

File tree

4 files changed

+81
-11
lines changed

4 files changed

+81
-11
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ Trunk (Unreleased)
138138
MAPREDUCE-4695. Fix LocalRunner on trunk after MAPREDUCE-3223 broke it
139139
(harsh)
140140

141+
MAPREDUCE-4574. Fix TotalOrderParitioner to work with
142+
non-WritableComparable key types. (harsh)
143+
141144
Release 2.0.3-alpha - Unreleased
142145

143146
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*/
3232
@InterfaceAudience.Public
3333
@InterfaceStability.Stable
34-
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
34+
public class TotalOrderPartitioner<K ,V>
3535
extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
3636
implements Partitioner<K,V> {
3737

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
@InterfaceAudience.Public
4949
@InterfaceStability.Stable
50-
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
50+
public class TotalOrderPartitioner<K,V>
5151
extends Partitioner<K,V> implements Configurable {
5252

5353
private Node partitions;
@@ -298,12 +298,13 @@ public int findPartition(BinaryComparable key) {
298298
@SuppressWarnings("unchecked") // map output key class
299299
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
300300
Configuration conf) throws IOException {
301-
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
301+
SequenceFile.Reader reader = new SequenceFile.Reader(
302+
conf,
303+
SequenceFile.Reader.file(p));
302304
ArrayList<K> parts = new ArrayList<K>();
303305
K key = ReflectionUtils.newInstance(keyClass, conf);
304-
NullWritable value = NullWritable.get();
305306
try {
306-
while (reader.next(key, value)) {
307+
while ((key = (K) reader.next(key)) != null) {
307308
parts.add(key);
308309
key = ReflectionUtils.newInstance(keyClass, conf);
309310
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,25 @@
2121
import java.io.IOException;
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.Comparator;
2425

2526
import junit.framework.TestCase;
2627

2728
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.CommonConfigurationKeys;
2830
import org.apache.hadoop.fs.FileSystem;
2931
import org.apache.hadoop.fs.Path;
3032
import org.apache.hadoop.io.NullWritable;
3133
import org.apache.hadoop.io.RawComparator;
3234
import org.apache.hadoop.io.SequenceFile;
3335
import org.apache.hadoop.io.Text;
34-
import org.apache.hadoop.io.WritableComparable;
3536
import org.apache.hadoop.io.WritableComparator;
3637
import org.apache.hadoop.io.WritableUtils;
38+
import org.apache.hadoop.io.SequenceFile.CompressionType;
39+
import org.apache.hadoop.io.serializer.JavaSerialization;
40+
import org.apache.hadoop.io.serializer.JavaSerializationComparator;
41+
import org.apache.hadoop.io.serializer.Serialization;
42+
import org.apache.hadoop.io.serializer.WritableSerialization;
3743
import org.apache.hadoop.mapreduce.MRJobConfig;
3844

3945
public class TestTotalOrderPartitioner extends TestCase {
@@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner extends TestCase {
5157
new Text("yak"), // 9
5258
};
5359

60+
private static final String[] splitJavaStrings = new String[] {
61+
// -inf // 0
62+
new String("aabbb"), // 1
63+
new String("babbb"), // 2
64+
new String("daddd"), // 3
65+
new String("dddee"), // 4
66+
new String("ddhee"), // 5
67+
new String("dingo"), // 6
68+
new String("hijjj"), // 7
69+
new String("n"), // 8
70+
new String("yak"), // 9
71+
};
72+
5473
static class Check<T> {
5574
T data;
5675
int part;
@@ -76,19 +95,41 @@ static class Check<T> {
7695
testStrings.add(new Check<Text>(new Text("hi"), 6));
7796
};
7897

79-
private static <T extends WritableComparable<?>> Path writePartitionFile(
98+
private static final ArrayList<Check<String>> testJavaStrings =
99+
new ArrayList<Check<String>>();
100+
static {
101+
testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
102+
testJavaStrings.add(new Check<String>(new String("aaabb"), 0));
103+
testJavaStrings.add(new Check<String>(new String("aabbb"), 1));
104+
testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
105+
testJavaStrings.add(new Check<String>(new String("babbb"), 2));
106+
testJavaStrings.add(new Check<String>(new String("baabb"), 1));
107+
testJavaStrings.add(new Check<String>(new String("yai"), 8));
108+
testJavaStrings.add(new Check<String>(new String("yak"), 9));
109+
testJavaStrings.add(new Check<String>(new String("z"), 9));
110+
testJavaStrings.add(new Check<String>(new String("ddngo"), 5));
111+
testJavaStrings.add(new Check<String>(new String("hi"), 6));
112+
};
113+
114+
115+
private static <T> Path writePartitionFile(
80116
String testname, Configuration conf, T[] splits) throws IOException {
81117
final FileSystem fs = FileSystem.getLocal(conf);
82118
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
83-
).makeQualified(fs);
119+
).makeQualified(
120+
fs.getUri(),
121+
fs.getWorkingDirectory());
84122
Path p = new Path(testdir, testname + "/_partition.lst");
85123
TotalOrderPartitioner.setPartitionFile(conf, p);
86124
conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
87125
SequenceFile.Writer w = null;
88126
try {
89-
w = SequenceFile.createWriter(fs, conf, p,
90-
splits[0].getClass(), NullWritable.class,
91-
SequenceFile.CompressionType.NONE);
127+
w = SequenceFile.createWriter(
128+
conf,
129+
SequenceFile.Writer.file(p),
130+
SequenceFile.Writer.keyClass(splits[0].getClass()),
131+
SequenceFile.Writer.valueClass(NullWritable.class),
132+
SequenceFile.Writer.compression(CompressionType.NONE));
92133
for (int i = 0; i < splits.length; ++i) {
93134
w.append(splits[i], NullWritable.get());
94135
}
@@ -99,6 +140,31 @@ private static <T extends WritableComparable<?>> Path writePartitionFile(
99140
return p;
100141
}
101142

143+
public void testTotalOrderWithCustomSerialization() throws Exception {
144+
TotalOrderPartitioner<String, NullWritable> partitioner =
145+
new TotalOrderPartitioner<String, NullWritable>();
146+
Configuration conf = new Configuration();
147+
conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
148+
JavaSerialization.class.getName(),
149+
WritableSerialization.class.getName());
150+
conf.setClass(MRJobConfig.KEY_COMPARATOR,
151+
JavaSerializationComparator.class,
152+
Comparator.class);
153+
Path p = TestTotalOrderPartitioner.<String>writePartitionFile(
154+
"totalordercustomserialization", conf, splitJavaStrings);
155+
conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class);
156+
try {
157+
partitioner.setConf(conf);
158+
NullWritable nw = NullWritable.get();
159+
for (Check<String> chk : testJavaStrings) {
160+
assertEquals(chk.data.toString(), chk.part,
161+
partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1));
162+
}
163+
} finally {
164+
p.getFileSystem(conf).delete(p, true);
165+
}
166+
}
167+
102168
public void testTotalOrderMemCmp() throws Exception {
103169
TotalOrderPartitioner<Text,NullWritable> partitioner =
104170
new TotalOrderPartitioner<Text,NullWritable>();

0 commit comments

Comments
 (0)