1818package org .apache .hadoop .mapred ;
1919
2020import java .io .BufferedReader ;
21- import java .io .DataOutputStream ;
2221import java .io .IOException ;
2322import java .io .InputStream ;
2423import java .io .InputStreamReader ;
2524import java .io .OutputStream ;
2625import java .io .OutputStreamWriter ;
2726import java .io .Writer ;
27+ import java .util .Iterator ;
28+ import java .util .StringTokenizer ;
2829
29- import org .apache .hadoop .fs .FSDataOutputStream ;
30- import org .apache .hadoop .fs .FileSystem ;
3130import org .apache .hadoop .fs .FileUtil ;
3231import org .apache .hadoop .fs .Path ;
3332import org .apache .hadoop .io .LongWritable ;
3433import org .apache .hadoop .io .Text ;
3534import org .apache .hadoop .io .serializer .JavaSerializationComparator ;
36- import org .apache .hadoop .mapred .lib .IdentityReducer ;
37- import org .apache .hadoop .util .Progressable ;
3835
3936public class TestJavaSerialization extends ClusterMapReduceTestCase {
4037
41- static class TypeConverterMapper extends MapReduceBase implements
42- Mapper <LongWritable , Text , Long , String > {
38+ static class WordCountMapper extends MapReduceBase implements
39+ Mapper <LongWritable , Text , String , Long > {
4340
4441 public void map (LongWritable key , Text value ,
45- OutputCollector <Long , String > output , Reporter reporter )
42+ OutputCollector <String , Long > output , Reporter reporter )
4643 throws IOException {
47- output .collect (key .get (), value .toString ());
44+ StringTokenizer st = new StringTokenizer (value .toString ());
45+ while (st .hasMoreTokens ()) {
46+ output .collect (st .nextToken (), 1L );
47+ }
4848 }
4949
5050 }
5151
52- static class StringOutputFormat <K , V > extends FileOutputFormat <K , V > {
52+ static class SumReducer <K > extends MapReduceBase implements
53+ Reducer <K , Long , K , Long > {
5354
54- static class LineRecordWriter <K , V > implements RecordWriter <K , V > {
55-
56- private DataOutputStream out ;
57-
58- public LineRecordWriter (DataOutputStream out ) {
59- this .out = out ;
60- }
61-
62- public void close (Reporter reporter ) throws IOException {
63- out .close ();
64- }
55+ public void reduce (K key , Iterator <Long > values ,
56+ OutputCollector <K , Long > output , Reporter reporter )
57+ throws IOException {
6558
66- public void write (K key , V value ) throws IOException {
67- print (key );
68- print ("\t " );
69- print (value );
70- print ("\n " );
71- }
72-
73- private void print (Object o ) throws IOException {
74- out .write (o .toString ().getBytes ("UTF-8" ));
59+ long sum = 0 ;
60+ while (values .hasNext ()) {
61+ sum += values .next ();
7562 }
76-
77- }
78-
79- @ Override
80- public RecordWriter <K , V > getRecordWriter (FileSystem ignored , JobConf job ,
81- String name , Progressable progress ) throws IOException {
82-
83- Path dir = getWorkOutputPath (job );
84- FileSystem fs = dir .getFileSystem (job );
85- FSDataOutputStream fileOut = fs .create (new Path (dir , name ), progress );
86- return new LineRecordWriter <K , V >(fileOut );
63+ output .collect (key , sum );
8764 }
8865
8966 }
9067
9168 public void testMapReduceJob () throws Exception {
92- OutputStream os = getFileSystem ().create (new Path (getInputDir (), "text.txt" ));
69+ OutputStream os = getFileSystem ().create (new Path (getInputDir (),
70+ "text.txt" ));
9371 Writer wr = new OutputStreamWriter (os );
94- wr .write ("hello1\n " );
95- wr .write ("hello2\n " );
96- wr .write ("hello3\n " );
97- wr .write ("hello4\n " );
72+ wr .write ("b a\n " );
9873 wr .close ();
9974
10075 JobConf conf = createJobConf ();
@@ -106,16 +81,12 @@ public void testMapReduceJob() throws Exception {
10681
10782 conf .setInputFormat (TextInputFormat .class );
10883
109- conf .setMapOutputKeyClass (Long .class );
110- conf .setMapOutputValueClass (String .class );
111-
112- conf .setOutputFormat (StringOutputFormat .class );
113- conf .setOutputKeyClass (Long .class );
114- conf .setOutputValueClass (String .class );
84+ conf .setOutputKeyClass (String .class );
85+ conf .setOutputValueClass (Long .class );
11586 conf .setOutputKeyComparatorClass (JavaSerializationComparator .class );
11687
117- conf .setMapperClass (TypeConverterMapper .class );
118- conf .setReducerClass (IdentityReducer .class );
88+ conf .setMapperClass (WordCountMapper .class );
89+ conf .setReducerClass (SumReducer .class );
11990
12091 FileInputFormat .setInputPaths (conf , getInputDir ());
12192
@@ -126,19 +97,13 @@ public void testMapReduceJob() throws Exception {
12697 Path [] outputFiles = FileUtil .stat2Paths (
12798 getFileSystem ().listStatus (getOutputDir (),
12899 new OutputLogFilter ()));
129- if (outputFiles .length > 0 ) {
130- InputStream is = getFileSystem ().open (outputFiles [0 ]);
131- BufferedReader reader = new BufferedReader (new InputStreamReader (is ));
132- String line = reader .readLine ();
133- int counter = 0 ;
134- while (line != null ) {
135- counter ++;
136- assertTrue (line .contains ("hello" ));
137- line = reader .readLine ();
138- }
139- reader .close ();
140- assertEquals (4 , counter );
141- }
100+ assertEquals (1 , outputFiles .length );
101+ InputStream is = getFileSystem ().open (outputFiles [0 ]);
102+ BufferedReader reader = new BufferedReader (new InputStreamReader (is ));
103+ assertEquals ("a\t 1" , reader .readLine ());
104+ assertEquals ("b\t 1" , reader .readLine ());
105+ assertNull (reader .readLine ());
106+ reader .close ();
142107 }
143108
144109}
0 commit comments