Skip to content

Commit b8ceb3e

Browse files
author
Dhruba Borthakur
committed
HADOOP-6433. Introduce asychronous deletion of files via a pool of
threads. This can be used to delete files in the Distributed Cache. (Zheng Shao via dhruba) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@890502 13f79535-47bb-0310-9956-ffa450edef68
1 parent 0023e06 commit b8ceb3e

File tree

3 files changed

+243
-0
lines changed

3 files changed

+243
-0
lines changed

CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ Trunk (unreleased changes)
2222
HADOOP-6323. Add comparators to the serialization API.
2323
(Aaron Kimball via cutting)
2424

25+
HADOOP-6433. Introduce asychronous deletion of files via a pool of
26+
threads. This can be used to delete files in the Distributed
27+
Cache. (Zheng Shao via dhruba)
28+
2529
IMPROVEMENTS
2630

2731
HADOOP-6283. Improve the exception messages thrown by
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.util;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.ThreadFactory;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
29+
import org.apache.commons.logging.Log;
30+
import org.apache.commons.logging.LogFactory;
31+
32+
/*
33+
* This class is a container of multiple thread pools, each for a volume,
34+
* so that we can schedule async disk operations easily.
35+
*
36+
* Examples of async disk operations are deletion of files.
37+
* We can move the files to a "TO_BE_DELETED" folder before asychronously
38+
* deleting it, to make sure the caller can run it faster.
39+
*/
40+
public class AsyncDiskService {
41+
42+
public static final Log LOG = LogFactory.getLog(AsyncDiskService.class);
43+
44+
// ThreadPool core pool size
45+
private static final int CORE_THREADS_PER_VOLUME = 1;
46+
// ThreadPool maximum pool size
47+
private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
48+
// ThreadPool keep-alive time for threads over core pool size
49+
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
50+
51+
private final ThreadGroup threadGroup = new ThreadGroup("async disk service");
52+
53+
private ThreadFactory threadFactory;
54+
55+
private HashMap<String, ThreadPoolExecutor> executors
56+
= new HashMap<String, ThreadPoolExecutor>();
57+
58+
/**
59+
* Create a AsyncDiskServices with a set of volumes (specified by their
60+
* root directories).
61+
*
62+
* The AsyncDiskServices uses one ThreadPool per volume to do the async
63+
* disk operations.
64+
*
65+
* @param volumes The roots of the file system volumes.
66+
*/
67+
public AsyncDiskService(String[] volumes) throws IOException {
68+
69+
threadFactory = new ThreadFactory() {
70+
public Thread newThread(Runnable r) {
71+
return new Thread(threadGroup, r);
72+
}
73+
};
74+
75+
// Create one ThreadPool per volume
76+
for (int v = 0 ; v < volumes.length; v++) {
77+
ThreadPoolExecutor executor = new ThreadPoolExecutor(
78+
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
79+
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
80+
new LinkedBlockingQueue<Runnable>(), threadFactory);
81+
82+
// This can reduce the number of running threads
83+
executor.allowCoreThreadTimeOut(true);
84+
executors.put(volumes[v], executor);
85+
}
86+
87+
}
88+
89+
/**
90+
* Execute the task sometime in the future, using ThreadPools.
91+
*/
92+
public synchronized void execute(String root, Runnable task) {
93+
ThreadPoolExecutor executor = executors.get(root);
94+
if (executor == null) {
95+
throw new RuntimeException("Cannot find root " + root
96+
+ " for execution of task " + task);
97+
} else {
98+
executor.execute(task);
99+
}
100+
}
101+
102+
/**
103+
* Gracefully start the shut down of all ThreadPools.
104+
*/
105+
public synchronized void shutdown() {
106+
107+
LOG.info("Shutting down all AsyncDiskService threads...");
108+
109+
for (Map.Entry<String, ThreadPoolExecutor> e
110+
: executors.entrySet()) {
111+
e.getValue().shutdown();
112+
}
113+
}
114+
115+
/**
116+
* Wait for the termination of the thread pools.
117+
*
118+
* @param milliseconds The number of milliseconds to wait
119+
* @return true if all thread pools are terminated without time limit
120+
* @throws InterruptedException
121+
*/
122+
public synchronized boolean awaitTermination(long milliseconds)
123+
throws InterruptedException {
124+
125+
long end = System.currentTimeMillis() + milliseconds;
126+
for (Map.Entry<String, ThreadPoolExecutor> e:
127+
executors.entrySet()) {
128+
ThreadPoolExecutor executor = e.getValue();
129+
if (!executor.awaitTermination(
130+
Math.max(end - System.currentTimeMillis(), 0),
131+
TimeUnit.MILLISECONDS)) {
132+
LOG.warn("AsyncDiskService awaitTermination timeout.");
133+
return false;
134+
}
135+
}
136+
LOG.info("All AsyncDiskService threads are terminated.");
137+
return true;
138+
}
139+
140+
/**
141+
* Shut down all ThreadPools immediately.
142+
*/
143+
public synchronized List<Runnable> shutdownNow() {
144+
145+
LOG.info("Shutting down all AsyncDiskService threads immediately...");
146+
147+
List<Runnable> list = new ArrayList<Runnable>();
148+
for (Map.Entry<String, ThreadPoolExecutor> e
149+
: executors.entrySet()) {
150+
list.addAll(e.getValue().shutdownNow());
151+
}
152+
return list;
153+
}
154+
155+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.util;
19+
20+
import junit.framework.TestCase;
21+
22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
24+
import org.apache.hadoop.util.AsyncDiskService;
25+
import org.junit.Test;
26+
27+
/**
28+
* A test for AsyncDiskService.
29+
*/
30+
public class TestAsyncDiskService extends TestCase {
31+
32+
public static final Log LOG = LogFactory.getLog(TestAsyncDiskService.class);
33+
34+
// Access by multiple threads from the ThreadPools in AsyncDiskService.
35+
volatile int count;
36+
37+
/** An example task for incrementing a counter.
38+
*/
39+
class ExampleTask implements Runnable {
40+
41+
ExampleTask() {
42+
}
43+
44+
@Override
45+
public void run() {
46+
synchronized (TestAsyncDiskService.this) {
47+
count ++;
48+
}
49+
}
50+
};
51+
52+
53+
/**
54+
* This test creates some ExampleTasks and runs them.
55+
*/
56+
@Test
57+
public void testAsyncDiskService() throws Throwable {
58+
59+
String[] vols = new String[]{"/0", "/1"};
60+
AsyncDiskService service = new AsyncDiskService(vols);
61+
62+
int total = 100;
63+
64+
for (int i = 0; i < total; i++) {
65+
service.execute(vols[i%2], new ExampleTask());
66+
}
67+
68+
Exception e = null;
69+
try {
70+
service.execute("no_such_volume", new ExampleTask());
71+
} catch (RuntimeException ex) {
72+
e = ex;
73+
}
74+
assertNotNull("Executing a task on a non-existing volume should throw an "
75+
+ "Exception.", e);
76+
77+
service.shutdown();
78+
if (!service.awaitTermination(5000)) {
79+
fail("AsyncDiskService didn't shutdown in 5 seconds.");
80+
}
81+
82+
assertEquals(total, count);
83+
}
84+
}

0 commit comments

Comments
 (0)