Skip to content

Commit d44a336

Browse files
JoshRosenAndrew Or
authored and
Andrew Or
committed
[SPARK-6079] Use index to speed up StatusTracker.getJobIdsForGroup()
`StatusTracker.getJobIdsForGroup()` is implemented via a linear scan over a HashMap rather than using an index, which might be an expensive operation if there are many (e.g. thousands) of retained jobs. This patch adds a new map to `JobProgressListener` in order to speed up these lookups. Author: Josh Rosen <[email protected]> Closes apache#4830 from JoshRosen/statustracker-job-group-indexing and squashes the following commits: e39c5c7 [Josh Rosen] Address review feedback 6709fb2 [Josh Rosen] Merge remote-tracking branch 'origin/master' into statustracker-job-group-indexing 2c49614 [Josh Rosen] getOrElse 97275a7 [Josh Rosen] Add jobGroup to jobId index to JobProgressListener
1 parent 4fc4d03 commit d44a336

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

core/src/main/scala/org/apache/spark/SparkStatusTracker.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
4545
*/
4646
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
4747
jobProgressListener.synchronized {
48-
val jobData = jobProgressListener.jobIdToData.valuesIterator
49-
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
48+
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
5049
}
5150
}
5251

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
4444
// These type aliases are public because they're used in the types of public fields:
4545

4646
type JobId = Int
47+
type JobGroupId = String
4748
type StageId = Int
4849
type StageAttemptId = Int
4950
type PoolName = String
@@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
5455
val completedJobs = ListBuffer[JobUIData]()
5556
val failedJobs = ListBuffer[JobUIData]()
5657
val jobIdToData = new HashMap[JobId, JobUIData]
58+
val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
5759

5860
// Stages:
5961
val pendingStages = new HashMap[StageId, StageInfo]
@@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
119121
Map(
120122
"jobIdToData" -> jobIdToData.size,
121123
"stageIdToData" -> stageIdToData.size,
122-
"stageIdToStageInfo" -> stageIdToInfo.size
124+
"stageIdToStageInfo" -> stageIdToInfo.size,
125+
"jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
126+
// Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
127+
"jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
123128
)
124129
}
125130

@@ -140,7 +145,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
140145
if (jobs.size > retainedJobs) {
141146
val toRemove = math.max(retainedJobs / 10, 1)
142147
jobs.take(toRemove).foreach { job =>
143-
jobIdToData.remove(job.jobId)
148+
// Remove the job's UI data, if it exists
149+
jobIdToData.remove(job.jobId).foreach { removedJob =>
150+
// A null jobGroupId is used for jobs that are run without a job group
151+
val jobGroupId = removedJob.jobGroup.orNull
152+
// Remove the job group -> job mapping entry, if it exists
153+
jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
154+
jobsInGroup.remove(job.jobId)
155+
// If this was the last job in this job group, remove the map entry for the job group
156+
if (jobsInGroup.isEmpty) {
157+
jobGroupToJobIds.remove(jobGroupId)
158+
}
159+
}
160+
}
144161
}
145162
jobs.trimStart(toRemove)
146163
}
@@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
158175
stageIds = jobStart.stageIds,
159176
jobGroup = jobGroup,
160177
status = JobExecutionStatus.RUNNING)
178+
// A null jobGroupId is used for jobs that are run without a job group
179+
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
161180
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
162181
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
163182
// This may be an underestimate because the job start event references all of the result

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.ui.jobs
1919

20+
import java.util.Properties
21+
2022
import org.scalatest.FunSuite
2123
import org.scalatest.Matchers
2224

@@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
4446
SparkListenerStageCompleted(stageInfo)
4547
}
4648

47-
private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
49+
private def createJobStartEvent(
50+
jobId: Int,
51+
stageIds: Seq[Int],
52+
jobGroup: Option[String] = None): SparkListenerJobStart = {
4853
val stageInfos = stageIds.map { stageId =>
4954
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
5055
}
51-
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
56+
val properties: Option[Properties] = jobGroup.map { groupId =>
57+
val props = new Properties()
58+
props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
59+
props
60+
}
61+
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull)
5262
}
5363

5464
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
@@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
110120
listener.stageIdToActiveJobIds.size should be (0)
111121
}
112122

123+
test("test clearing of jobGroupToJobIds") {
124+
val conf = new SparkConf()
125+
conf.set("spark.ui.retainedJobs", 5.toString)
126+
val listener = new JobProgressListener(conf)
127+
128+
// Run 50 jobs, each with one stage
129+
for (jobId <- 0 to 50) {
130+
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
131+
listener.onStageSubmitted(createStageStartEvent(0))
132+
listener.onStageCompleted(createStageEndEvent(0, failed = false))
133+
listener.onJobEnd(createJobEndEvent(jobId, false))
134+
}
135+
assertActiveJobsStateIsEmpty(listener)
136+
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
137+
listener.jobGroupToJobIds.size should be (5)
138+
}
139+
113140
test("test LRU eviction of jobs") {
114141
val conf = new SparkConf()
115142
conf.set("spark.ui.retainedStages", 5.toString)

0 commit comments

Comments
 (0)