Skip to content

Commit 14532dc

Browse files
committed
Merge -r 705419:705420 from trunk to branch-0.17 to fix HADOOP-3127.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@705426 13f79535-47bb-0310-9956-ffa450edef68
1 parent effd81e commit 14532dc

File tree

4 files changed

+62
-30
lines changed

4 files changed

+62
-30
lines changed

src/contrib/hod/CHANGES.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
HOD Change Log
22

33

4+
Release 0.17.3 - Unreleased
5+
6+
BUG FIXES
7+
8+
HADOOP-3217. Decrease the rate at which the hod queries the resource
9+
manager for job status. (Hemanth Yamijala via acmurthy)
10+
411
Release 0.17.0 - 2008-05-18
512

613
INCOMPATIBLE CHANGES

src/contrib/hod/bin/hod

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,20 @@ defList = { 'hod' : (
152152
True, None, False, True, 'l'),
153153

154154
('script-wait-time', 'pos_int', 'Specifies the time to wait before running the script. Used with the hod.script option.',
155-
True, 10, False, True, 'W')),
155+
True, 10, False, True, 'W'),
156+
157+
('job-status-query-interval', 'pos_int', 'Specifies the time between checking for job status',
158+
False, 30, False, True),
159+
160+
('job-command-failure-interval', 'pos_int', 'Specifies the time between checking for failed job status or submission commands',
161+
False, 10, False, True),
162+
163+
('job-status-query-failure-retries', 'pos_int', 'Specifies the number of times job status failure queries are retried',
164+
False, 3, False, True),
165+
166+
('job-submission-failure-retries', 'pos_int', 'Specifies the number of times job submission failure queries are retried',
167+
False, 3, False, True)),
168+
156169

157170
'resource_manager' : (
158171
('id', 'string', 'Batch scheduler ID: torque|condor.',

src/contrib/hod/hodlib/Hod/hadoop.py

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -182,22 +182,26 @@ def __get_service_data(self):
182182
return serviceData
183183

184184
def __check_job_status(self):
185-
initWaitCount = 20
186-
count = 0
185+
failureCount = 0
187186
status = False
188187
state = 'Q'
189-
while state == 'Q':
188+
while (state=='Q') or (state==False):
190189
if hodInterrupt.isSet():
191190
raise HodInterruptException()
192191

193192
state = self.__nodePool.getJobState()
194-
if (state==False) or (state!='Q'):
193+
self.__log.debug('job state %s' % state)
194+
if state == False:
195+
failureCount += 1
196+
if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
197+
self.__log.debug('Number of retries reached max limit while querying job status')
198+
break
199+
time.sleep(self.__cfg['hod']['job-command-failure-interval'])
200+
elif state!='Q':
195201
break
196-
count = count + 1
197-
if count < initWaitCount:
198-
time.sleep(0.5)
199202
else:
200-
time.sleep(10)
203+
self.__log.debug('querying for job status after job-status-query-interval')
204+
time.sleep(self.__cfg['hod']['job-status-query-interval'])
201205

202206
if state and state != 'C':
203207
status = True
@@ -237,7 +241,7 @@ def __get_ringmaster_client(self):
237241
time.sleep(1)
238242
count = count + 1
239243
# check to see if the job exited by any chance in that time:
240-
if (count % 10 == 0):
244+
if (count % self.__cfg['hod']['job-status-query-interval'] == 0):
241245
if not self.__check_job_status():
242246
break
243247

@@ -256,9 +260,9 @@ def __init_hadoop_service(self, serviceName, xmlrpcClient):
256260
serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
257261
if serviceAddress:
258262
if serviceAddress == 'not found':
259-
time.sleep(.5)
263+
time.sleep(1)
260264
# check to see if the job exited by any chance in that time:
261-
if (i % 10 == 0):
265+
if ((i+1) % self.__cfg['hod']['job-status-query-interval'] == 0):
262266
if not self.__check_job_status():
263267
break
264268
else:
@@ -420,6 +424,7 @@ def delete_job(self, jobId):
420424

421425
def allocate(self, clusterDir, min, max=None):
422426
status = 0
427+
failureCount = 0
423428
self.__svcrgyClient = self.__get_svcrgy_client()
424429

425430
self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
@@ -432,7 +437,25 @@ def allocate(self, clusterDir, min, max=None):
432437
walltime = None
433438
if self.__cfg['hod'].has_key('walltime'):
434439
walltime = self.__cfg['hod']['walltime']
440+
435441
self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
442+
# if the job submission returned an error other than no resources
443+
# retry a couple of times
444+
while (self.jobId is False) and (exitCode != 188):
445+
if hodInterrupt.isSet():
446+
raise HodInterruptException()
447+
448+
failureCount += 1
449+
if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
450+
self.__log.debug("failed submitting job more than the retries. exiting")
451+
break
452+
else:
453+
# wait a bit before retrying
454+
time.sleep(self.__cfg['hod']['job-command-failure-interval'])
455+
if hodInterrupt.isSet():
456+
raise HodInterruptException()
457+
self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
458+
436459
if self.jobId:
437460
try:
438461
jobStatus = self.__check_job_status()
@@ -558,12 +581,12 @@ def allocate(self, clusterDir, min, max=None):
558581
if exitCode == 188:
559582
self.__log.critical("Request execeeded maximum resource allocation.")
560583
else:
561-
self.__log.critical("Insufficient resources available.")
584+
self.__log.critical("Job submission failed with exit code %s" % exitCode)
562585
status = 4
563-
else:
586+
else:
564587
self.__log.critical("Scheduler failure, allocation failed.\n\n")
565588
status = 4
566-
589+
567590
return status
568591

569592
def __isRingMasterAlive(self, rmAddr):

src/contrib/hod/hodlib/NodePools/torque.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -265,23 +265,12 @@ def getServiceId(self):
265265
return id
266266

267267
def getJobState(self):
268-
#torque error code when credentials fail, a temporary condition sometimes.
269-
credFailureErrorCode = 171
270-
credFailureRetries = 10
271-
i = 0
272268
jobState = False
273269

274-
while i < credFailureRetries:
275-
qstatInfo, exitCode = self.__torque.qstat(self.getServiceId())
276-
if exitCode == 0:
277-
jobState = qstatInfo['job_state']
278-
break
279-
else:
280-
if exitCode == credFailureErrorCode:
281-
time.sleep(1)
282-
i = i+1
283-
else:
284-
break
270+
qstatInfo, exitCode = self.__torque.qstat(self.getServiceId())
271+
if exitCode == 0:
272+
jobState = qstatInfo['job_state']
273+
285274
return jobState
286275

287276
def deleteJob(self, jobId):

0 commit comments

Comments
 (0)