Skip to content

Commit fc19716

Browse files
author
Devaraj Das
committed
HADOOP-4664. Introduces multiple job initialization threads, where the number of threads are configurable via mapred.jobinit.threads. Contributed by (Matei Zaharia and Jothi Padmanabhan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@752932 13f79535-47bb-0310-9956-ffa450edef68
1 parent 7fedd33 commit fc19716

File tree

7 files changed

+265
-26
lines changed

7 files changed

+265
-26
lines changed

CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,10 @@ Release 0.20.0 - Unreleased
626626
HADOOP-5248. A testcase that checks for the existence of job directory
627627
after the job completes. Fails if it exists. (ddas)
628628

629+
HADOOP-4664. Introduces multiple job initialization threads, where the
630+
number of threads are configurable via mapred.jobinit.threads.
631+
(Matei Zaharia and Jothi Padmanabhan via ddas)
632+
629633
OPTIMIZATIONS
630634

631635
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,14 @@ public FairScheduler() {
105105
protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
106106
this.clock = clock;
107107
this.runBackgroundUpdates = runBackgroundUpdates;
108-
this.eagerInitListener = new EagerTaskInitializationListener();
109108
this.jobListener = new JobListener();
110109
}
111110

112111
@Override
113112
public void start() {
114113
try {
115114
Configuration conf = getConf();
115+
this.eagerInitListener = new EagerTaskInitializationListener(conf);
116116
eagerInitListener.start();
117117
taskTrackerManager.addJobInProgressListener(eagerInitListener);
118118
taskTrackerManager.addJobInProgressListener(jobListener);

src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222
import java.util.Map;
23-
import java.util.TreeMap;
23+
import java.util.concurrent.ConcurrentHashMap;
2424

2525
/**
2626
* A cached implementation of DNSToSwitchMapping that takes an
@@ -30,7 +30,7 @@
3030
*
3131
*/
3232
public class CachedDNSToSwitchMapping implements DNSToSwitchMapping {
33-
private Map<String, String> cache = new TreeMap<String, String>();
33+
private Map<String, String> cache = new ConcurrentHashMap<String, String>();
3434
protected DNSToSwitchMapping rawMapping;
3535

3636
public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {

src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import java.util.Collections;
2323
import java.util.Comparator;
2424
import java.util.List;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
2527

2628
import org.apache.commons.logging.Log;
2729
import org.apache.commons.logging.LogFactory;
30+
import org.apache.hadoop.conf.Configuration;
2831
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
2932
import org.apache.hadoop.util.StringUtils;
3033

@@ -34,53 +37,88 @@
3437
*/
3538
class EagerTaskInitializationListener extends JobInProgressListener {
3639

40+
private static final int DEFAULT_NUM_THREADS = 4;
3741
private static final Log LOG = LogFactory.getLog(
3842
EagerTaskInitializationListener.class.getName());
3943

4044
/////////////////////////////////////////////////////////////////
4145
// Used to init new jobs that have just been created
4246
/////////////////////////////////////////////////////////////////
43-
class JobInitThread implements Runnable {
47+
class JobInitManager implements Runnable {
48+
4449
public void run() {
45-
JobInProgress job;
50+
JobInProgress job = null;
4651
while (true) {
47-
job = null;
4852
try {
4953
synchronized (jobInitQueue) {
50-
while (jobInitQueue.isEmpty()) {
54+
while (jobInitQueue.isEmpty() && !exitFlag) {
5155
jobInitQueue.wait();
5256
}
53-
job = jobInitQueue.remove(0);
57+
if (exitFlag) {
58+
break;
59+
}
5460
}
55-
job.initTasks();
61+
job = jobInitQueue.remove(0);
62+
threadPool.execute(new InitJob(job));
5663
} catch (InterruptedException t) {
64+
LOG.info("JobInitManagerThread interrupted.");
5765
break;
58-
} catch (Throwable t) {
59-
LOG.error("Job initialization failed:\n" +
60-
StringUtils.stringifyException(t));
61-
if (job != null) {
62-
job.fail();
63-
}
66+
}
67+
}
68+
LOG.info("Shutting down thread pool");
69+
threadPool.shutdownNow();
70+
}
71+
}
72+
73+
static class InitJob implements Runnable {
74+
75+
private JobInProgress job;
76+
77+
public InitJob(JobInProgress job) {
78+
this.job = job;
79+
}
80+
81+
public void run() {
82+
try {
83+
LOG.info("Initializing " + job.getJobID());
84+
job.initTasks();
85+
} catch (Throwable t) {
86+
LOG.error("Job initialization failed:\n" +
87+
StringUtils.stringifyException(t));
88+
if (job != null) {
89+
job.fail();
6490
}
6591
}
6692
}
6793
}
6894

69-
private JobInitThread initJobs = new JobInitThread();
70-
private Thread initJobsThread;
95+
private JobInitManager jobInitManager = new JobInitManager();
96+
private Thread jobInitManagerThread;
7197
private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
98+
private ExecutorService threadPool;
99+
private int numThreads;
100+
private boolean exitFlag = false;
101+
102+
public EagerTaskInitializationListener(Configuration conf) {
103+
numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
104+
threadPool = Executors.newFixedThreadPool(numThreads);
105+
}
72106

73107
public void start() throws IOException {
74-
this.initJobsThread = new Thread(initJobs, "initJobs");
75-
this.initJobsThread.start();
108+
this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
109+
jobInitManagerThread.setDaemon(true);
110+
this.jobInitManagerThread.start();
76111
}
77112

78113
public void terminate() throws IOException {
79-
if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
80-
LOG.info("Stopping initer");
81-
this.initJobsThread.interrupt();
114+
if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
115+
LOG.info("Stopping Job Init Manager thread");
116+
synchronized (jobInitQueue) {
117+
exitFlag = true;
118+
jobInitQueue.notify();
119+
}
82120
try {
83-
this.initJobsThread.join();
121+
jobInitManagerThread.join();
84122
} catch (InterruptedException ex) {
85123
ex.printStackTrace();
86124
}

src/mapred/org/apache/hadoop/mapred/JobHistory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.HashMap;
3333
import java.util.Map;
3434
import java.util.TreeMap;
35+
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.regex.Matcher;
3637
import java.util.regex.Pattern;
3738

@@ -92,7 +93,7 @@ public class JobHistory {
9293
private static String JOBTRACKER_UNIQUE_STRING = null;
9394
private static String LOG_DIR = null;
9495
private static Map<String, ArrayList<PrintWriter>> openJobs =
95-
new HashMap<String, ArrayList<PrintWriter>>();
96+
new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
9697
private static boolean disableHistory = false;
9798
private static final String SECONDARY_FILE_SUFFIX = ".recover";
9899
private static long jobHistoryBlockSize = 0;

src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ class JobQueueTaskScheduler extends TaskScheduler {
4141

4242
public JobQueueTaskScheduler() {
4343
this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
44-
this.eagerTaskInitializationListener =
45-
new EagerTaskInitializationListener();
4644
}
4745

4846
@Override
@@ -74,6 +72,8 @@ public synchronized void setConf(Configuration conf) {
7472
super.setConf(conf);
7573
padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",
7674
0.01f);
75+
this.eagerTaskInitializationListener =
76+
new EagerTaskInitializationListener(conf);
7777
}
7878

7979
@Override
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.mapred;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
import junit.framework.TestCase;
28+
29+
import org.apache.hadoop.io.IntWritable;
30+
31+
public class TestParallelInitialization extends TestCase {
32+
33+
private static int jobCounter;
34+
private static final int NUM_JOBS = 3;
35+
IntWritable numJobsCompleted = new IntWritable();
36+
37+
static void resetCounters() {
38+
jobCounter = 0;
39+
}
40+
41+
class FakeJobInProgress extends JobInProgress {
42+
43+
public FakeJobInProgress(JobConf jobConf,
44+
FakeTaskTrackerManager taskTrackerManager) throws IOException {
45+
super(new JobID("test", ++jobCounter), jobConf);
46+
this.startTime = System.currentTimeMillis();
47+
this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
48+
this.status.setJobPriority(JobPriority.NORMAL);
49+
this.status.setStartTime(startTime);
50+
}
51+
52+
@Override
53+
public synchronized void initTasks() throws IOException {
54+
try {
55+
int jobNumber = this.getJobID().getId();
56+
synchronized (numJobsCompleted) {
57+
while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
58+
numJobsCompleted.wait();
59+
}
60+
numJobsCompleted.set(numJobsCompleted.get() + 1);
61+
numJobsCompleted.notifyAll();
62+
LOG.info("JobNumber " + jobNumber + " succeeded");
63+
}
64+
} catch (InterruptedException ie) {};
65+
this.status.setRunState(JobStatus.SUCCEEDED);
66+
}
67+
68+
@Override
69+
synchronized void fail() {
70+
this.status.setRunState(JobStatus.FAILED);
71+
}
72+
}
73+
74+
static class FakeTaskTrackerManager implements TaskTrackerManager {
75+
76+
int maps = 0;
77+
int reduces = 0;
78+
int maxMapTasksPerTracker = 2;
79+
int maxReduceTasksPerTracker = 2;
80+
List<JobInProgressListener> listeners =
81+
new ArrayList<JobInProgressListener>();
82+
QueueManager queueManager;
83+
84+
private Map<String, TaskTrackerStatus> trackers =
85+
new HashMap<String, TaskTrackerStatus>();
86+
87+
public FakeTaskTrackerManager() {
88+
JobConf conf = new JobConf();
89+
queueManager = new QueueManager(conf);
90+
trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
91+
new ArrayList<TaskStatus>(), 0,
92+
maxMapTasksPerTracker, maxReduceTasksPerTracker));
93+
}
94+
95+
public ClusterStatus getClusterStatus() {
96+
int numTrackers = trackers.size();
97+
return new ClusterStatus(numTrackers, 0,
98+
JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
99+
maps, reduces,
100+
numTrackers * maxMapTasksPerTracker,
101+
numTrackers * maxReduceTasksPerTracker,
102+
JobTracker.State.RUNNING);
103+
}
104+
105+
public int getNumberOfUniqueHosts() {
106+
return 0;
107+
}
108+
109+
public Collection<TaskTrackerStatus> taskTrackers() {
110+
return trackers.values();
111+
}
112+
113+
public void addJobInProgressListener(JobInProgressListener listener) {
114+
listeners.add(listener);
115+
}
116+
117+
public void removeJobInProgressListener(JobInProgressListener listener) {
118+
listeners.remove(listener);
119+
}
120+
121+
122+
public QueueManager getQueueManager() {
123+
return queueManager;
124+
}
125+
126+
public int getNextHeartbeatInterval() {
127+
return MRConstants.HEARTBEAT_INTERVAL_MIN;
128+
}
129+
130+
public void killJob(JobID jobid) {
131+
return;
132+
}
133+
134+
public JobInProgress getJob(JobID jobid) {
135+
return null;
136+
}
137+
138+
// Test methods
139+
140+
public void submitJob(JobInProgress job) throws IOException {
141+
for (JobInProgressListener listener : listeners) {
142+
listener.jobAdded(job);
143+
}
144+
}
145+
}
146+
147+
protected JobConf jobConf;
148+
protected TaskScheduler scheduler;
149+
private FakeTaskTrackerManager taskTrackerManager;
150+
151+
@Override
152+
protected void setUp() throws Exception {
153+
resetCounters();
154+
jobConf = new JobConf();
155+
taskTrackerManager = new FakeTaskTrackerManager();
156+
scheduler = createTaskScheduler();
157+
scheduler.setConf(jobConf);
158+
scheduler.setTaskTrackerManager(taskTrackerManager);
159+
scheduler.start();
160+
}
161+
162+
@Override
163+
protected void tearDown() throws Exception {
164+
if (scheduler != null) {
165+
scheduler.terminate();
166+
}
167+
}
168+
169+
protected TaskScheduler createTaskScheduler() {
170+
return new JobQueueTaskScheduler();
171+
}
172+
173+
public void testParallelInitJobs() throws IOException {
174+
FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
175+
176+
// Submit NUM_JOBS jobs in order. The init code will ensure
177+
// that the jobs get inited in descending order of Job ids
178+
// i.e. highest job id first and the smallest last.
179+
// If we were not doing parallel init, the first submitted job
180+
// will be inited first and that will hang
181+
182+
for (int i = 0; i < NUM_JOBS; i++) {
183+
jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
184+
jobs[i].getStatus().setRunState(JobStatus.PREP);
185+
taskTrackerManager.submitJob(jobs[i]);
186+
}
187+
188+
try {
189+
Thread.sleep(1000);
190+
} catch (InterruptedException ie) {}
191+
192+
for (int i = 0; i < NUM_JOBS; i++) {
193+
assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
194+
}
195+
}
196+
}

0 commit comments

Comments
 (0)