Skip to content

Commit 462910b

Browse files
committed
MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on shuffle completion. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1523660 13f79535-47bb-0310-9956-ffa450edef68
1 parent ef6c21d commit 462910b

File tree

3 files changed

+12
-1
lines changed

3 files changed

+12
-1
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,9 @@ Release 2.1.1-beta - UNRELEASED
267267
MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS
268268
(Nemon Lou via devaraj)
269269

270+
MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on
271+
shuffle completion. (jlowe via acmurthy)
272+
270273
Release 2.1.0-beta - 2013-08-22
271274

272275
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,11 @@ public RawKeyValueIterator close() throws Throwable {
355355

356356
List<InMemoryMapOutput<K, V>> memory =
357357
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
358+
inMemoryMergedMapOutputs.clear();
358359
memory.addAll(inMemoryMapOutputs);
360+
inMemoryMapOutputs.clear();
359361
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
362+
onDiskMapOutputs.clear();
360363
return finalMerge(jobConf, rfs, memory, disk);
361364
}
362365

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void cleanup() throws IOException {
8282
}
8383

8484
@Test
85-
public void testInMemoryMerger() throws IOException {
85+
public void testInMemoryMerger() throws Throwable {
8686
JobID jobId = new JobID("a", 0);
8787
TaskAttemptID reduceId = new TaskAttemptID(
8888
new TaskID(jobId, TaskType.REDUCE, 0), 0);
@@ -132,6 +132,11 @@ public void testInMemoryMerger() throws IOException {
132132
readOnDiskMapOutput(conf, fs, outPath, keys, values);
133133
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
134134
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
135+
136+
mergeManager.close();
137+
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
138+
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
139+
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
135140
}
136141

137142
private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)

0 commit comments

Comments
 (0)