Skip to content

Commit 424a86a

Browse files
committed
[SPARK-6175] Fix standalone executor log links when ephemeral ports or SPARK_PUBLIC_DNS are used
This patch fixes two issues with the executor log viewing links added in Spark 1.3. In standalone mode, the log URLs might include a port value of 0 rather than the actual bound port of the UI, which broke the ability to view logs from workers whose web UIs had been configured to bind to ephemeral ports. In addition, the URLs used workers' local hostnames instead of respecting SPARK_PUBLIC_DNS, which prevented this feature from working properly on Spark EC2 clusters because the links would point to internal DNS names instead of external ones. I included tests for both of these bugs: - We now browse to the URLs and verify that they point to the expected pages. - To test SPARK_PUBLIC_DNS, I changed the code that reads the environment variable to do so via `SparkConf.getenv`, then used a custom SparkConf subclass to mock the environment variable (this pattern is used elsewhere in Spark's tests). Author: Josh Rosen <[email protected]> Closes apache#4903 from JoshRosen/SPARK-6175 and squashes the following commits: 5577f41 [Josh Rosen] Remove println cfec135 [Josh Rosen] Use webUi.boundPort and publicAddress in log links 27918c7 [Josh Rosen] Add failing unit tests for standalone log URL viewing c250fbe [Josh Rosen] Respect SparkConf in local-cluster Workers. 422a2ef [Josh Rosen] Use conf.getenv to read SPARK_PUBLIC_DNS
1 parent 0bfacd5 commit 424a86a

File tree

8 files changed

+57
-20
lines changed

8 files changed

+57
-20
lines changed

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class LocalSparkCluster(
5959
/* Start the Workers */
6060
for (workerNum <- 1 to numWorkers) {
6161
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
62-
memoryPerWorker, masters, null, Some(workerNum))
62+
memoryPerWorker, masters, null, Some(workerNum), _conf)
6363
workerActorSystems += workerSystem
6464
}
6565

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class Master(
9696
val webUi = new MasterWebUI(this, webUiPort)
9797

9898
val masterPublicAddress = {
99-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
99+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
100100
if (envVar != null) envVar else host
101101
}
102102

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
4444
val workerId: String,
4545
val host: String,
4646
val webUiPort: Int,
47+
val publicAddress: String,
4748
val sparkHome: File,
4849
val executorDir: File,
4950
val workerUrl: String,
@@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
140141
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
141142

142143
// Add webUI log urls
143-
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144+
val baseUrl =
145+
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144146
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
145147
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
146148

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private[spark] class Worker(
121121
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
122122

123123
val publicAddress = {
124-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
124+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
125125
if (envVar != null) envVar else host
126126
}
127127
var webUi: WorkerWebUI = null
@@ -362,7 +362,8 @@ private[spark] class Worker(
362362
self,
363363
workerId,
364364
host,
365-
webUiPort,
365+
webUi.boundPort,
366+
publicAddress,
366367
sparkHome,
367368
executorDir,
368369
akkaUrl,
@@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
538539
memory: Int,
539540
masterUrls: Array[String],
540541
workDir: String,
541-
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
542+
workerNumber: Option[Int] = None,
543+
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
542544

543545
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
544-
val conf = new SparkConf
545546
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
546547
val actorName = "Worker"
547548
val securityMgr = new SecurityManager(conf)

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
4747
protected val handlers = ArrayBuffer[ServletContextHandler]()
4848
protected var serverInfo: Option[ServerInfo] = None
4949
protected val localHostName = Utils.localHostName()
50-
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
50+
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
5151
private val className = Utils.getFormattedClassName(this)
5252

5353
def getBasePath: String = basePath

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
119119

120120
def createExecutorRunner(): ExecutorRunner = {
121121
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
122-
new File("sparkHome"), new File("workDir"), "akka://worker",
122+
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
123123
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
124124
}
125125

core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,69 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URL
21+
2022
import scala.collection.mutable
23+
import scala.io.Source
2124

22-
import org.scalatest.{BeforeAndAfter, FunSuite}
25+
import org.scalatest.FunSuite
2326

2427
import org.apache.spark.scheduler.cluster.ExecutorInfo
2528
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
26-
import org.apache.spark.{SparkContext, LocalSparkContext}
29+
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
2730

28-
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
31+
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
2932

3033
/** Length of time to wait while draining listener events. */
31-
val WAIT_TIMEOUT_MILLIS = 10000
34+
private val WAIT_TIMEOUT_MILLIS = 10000
3235

33-
before {
36+
test("verify that correct log urls get propagated from workers") {
3437
sc = new SparkContext("local-cluster[2,1,512]", "test")
38+
39+
val listener = new SaveExecutorInfo
40+
sc.addSparkListener(listener)
41+
42+
// Trigger a job so that executors get added
43+
sc.parallelize(1 to 100, 4).map(_.toString).count()
44+
45+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
46+
listener.addedExecutorInfos.values.foreach { info =>
47+
assert(info.logUrlMap.nonEmpty)
48+
// Browse to each URL to check that it's valid
49+
info.logUrlMap.foreach { case (logType, logUrl) =>
50+
val html = Source.fromURL(logUrl).mkString
51+
assert(html.contains(s"$logType log page"))
52+
}
53+
}
3554
}
3655

37-
test("verify log urls get propagated from workers") {
56+
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
57+
val SPARK_PUBLIC_DNS = "public_dns"
58+
class MySparkConf extends SparkConf(false) {
59+
override def getenv(name: String) = {
60+
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
61+
else super.getenv(name)
62+
}
63+
64+
override def clone: SparkConf = {
65+
new MySparkConf().setAll(getAll)
66+
}
67+
}
68+
val conf = new MySparkConf()
69+
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
70+
3871
val listener = new SaveExecutorInfo
3972
sc.addSparkListener(listener)
4073

41-
val rdd1 = sc.parallelize(1 to 100, 4)
42-
val rdd2 = rdd1.map(_.toString)
43-
rdd2.setName("Target RDD")
44-
rdd2.count()
74+
// Trigger a job so that executors get added
75+
sc.parallelize(1 to 100, 4).map(_.toString).count()
4576

4677
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
4778
listener.addedExecutorInfos.values.foreach { info =>
4879
assert(info.logUrlMap.nonEmpty)
80+
info.logUrlMap.values.foreach { logUrl =>
81+
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
82+
}
4983
}
5084
}
5185

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
3333
val appDesc = new ApplicationDescription("app name", Some(8), 500,
3434
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3535
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
36-
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
36+
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
3737
ExecutorState.RUNNING)
3838
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
3939
assert(builder.command().last === appId)

0 commit comments

Comments
 (0)