Skip to content

Commit 20a6013

Browse files
Marcelo VanzinAndrew Or
Marcelo Vanzin
authored and
Andrew Or
committed
[SPARK-2996] Implement userClassPathFirst for driver, yarn.
Yarn's config option `spark.yarn.user.classpath.first` does not work the same way as `spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in that it modifies the system classpath, instead of restricting the changes to the user's class loader. So this change implements the behavior of the latter for Yarn, and deprecates the more dangerous choice. To be able to achieve feature-parity, I also implemented the option for drivers (the existing option only applies to executors). So now there are two options, each controlling whether to apply userClassPathFirst to the driver or executors. The old option was deprecated, and aliased to the new one (`spark.executor.userClassPathFirst`). The existing "child-first" class loader also had to be fixed. It didn't handle resources, and it was also doing some things that ended up causing JVM errors depending on how things were being called. Author: Marcelo Vanzin <[email protected]> Closes apache#3233 from vanzin/SPARK-2996 and squashes the following commits: 9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation. fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically. a8c69f1 [Marcelo Vanzin] Review feedback. cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test. 0fe7777 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more meaninful. fe970a7 [Marcelo Vanzin] Review feedback. 25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent. fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks. 2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation. b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a10f379 [Marcelo Vanzin] Some feedback. 3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 7b57cba [Marcelo Vanzin] Remove now outdated message. 5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 d1273b2 [Marcelo Vanzin] Add test file to rat exclude. fa1aafa [Marcelo Vanzin] Remove write check on user jars. 89d8072 [Marcelo Vanzin] Cleanups. a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for standalone cluster mode. 50afa5f [Marcelo Vanzin] Fix Yarn executor command line. 7d14397 [Marcelo Vanzin] Register user jars in executor up front. 7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst. 20373f5 [Marcelo Vanzin] Fix ClientBaseSuite. 55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit. 0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option. 4a84d87 [Marcelo Vanzin] Fix the child-first class loader. d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf. 46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to "userClassPathFirst". a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit. 91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation. a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line arguments. 89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode.
1 parent 36c4e1d commit 20a6013

27 files changed

+736
-348
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.util.concurrent.ConcurrentHashMap
21+
import java.util.concurrent.atomic.AtomicBoolean
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable.LinkedHashSet
@@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6768
if (value == null) {
6869
throw new NullPointerException("null value for " + key)
6970
}
70-
settings.put(key, value)
71+
settings.put(translateConfKey(key, warn = true), value)
7172
this
7273
}
7374

@@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
139140

140141
/** Set a parameter if it isn't already configured */
141142
def setIfMissing(key: String, value: String): SparkConf = {
142-
settings.putIfAbsent(key, value)
143+
settings.putIfAbsent(translateConfKey(key, warn = true), value)
143144
this
144145
}
145146

@@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
175176

176177
/** Get a parameter as an Option */
177178
def getOption(key: String): Option[String] = {
178-
Option(settings.get(key))
179+
Option(settings.get(translateConfKey(key)))
179180
}
180181

181182
/** Get all parameters as a list of pairs */
@@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
228229
def getAppId: String = get("spark.app.id")
229230

230231
/** Does the configuration contain a given parameter? */
231-
def contains(key: String): Boolean = settings.containsKey(key)
232+
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
232233

233234
/** Copy this object */
234235
override def clone: SparkConf = {
@@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
285286
// Validate memory fractions
286287
val memoryKeys = Seq(
287288
"spark.storage.memoryFraction",
288-
"spark.shuffle.memoryFraction",
289+
"spark.shuffle.memoryFraction",
289290
"spark.shuffle.safetyFraction",
290291
"spark.storage.unrollFraction",
291292
"spark.storage.safetyFraction")
@@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
351352
def toDebugString: String = {
352353
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
353354
}
355+
354356
}
355357

356-
private[spark] object SparkConf {
358+
private[spark] object SparkConf extends Logging {
359+
360+
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
361+
val configs = Seq(
362+
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
363+
"1.3"),
364+
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
365+
"Use spark.{driver,executor}.userClassPathFirst instead."))
366+
configs.map { x => (x.oldName, x) }.toMap
367+
}
368+
357369
/**
358370
* Return whether the given config is an akka config (e.g. akka.actor.provider).
359371
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
@@ -380,4 +392,63 @@ private[spark] object SparkConf {
380392
def isSparkPortConf(name: String): Boolean = {
381393
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
382394
}
395+
396+
/**
397+
* Translate the configuration key if it is deprecated and has a replacement, otherwise just
398+
* returns the provided key.
399+
*
400+
* @param userKey Configuration key from the user / caller.
401+
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
402+
* only once for each key.
403+
*/
404+
def translateConfKey(userKey: String, warn: Boolean = false): String = {
405+
deprecatedConfigs.get(userKey)
406+
.map { deprecatedKey =>
407+
if (warn) {
408+
deprecatedKey.warn()
409+
}
410+
deprecatedKey.newName.getOrElse(userKey)
411+
}.getOrElse(userKey)
412+
}
413+
414+
/**
415+
* Holds information about keys that have been deprecated or renamed.
416+
*
417+
* @param oldName Old configuration key.
418+
* @param newName New configuration key, or `null` if key has no replacement, in which case the
419+
* deprecated key will be used (but the warning message will still be printed).
420+
* @param version Version of Spark where key was deprecated.
421+
* @param deprecationMessage Message to include in the deprecation warning; mandatory when
422+
* `newName` is not provided.
423+
*/
424+
private case class DeprecatedConfig(
425+
oldName: String,
426+
_newName: String,
427+
version: String,
428+
deprecationMessage: String = null) {
429+
430+
private val warned = new AtomicBoolean(false)
431+
val newName = Option(_newName)
432+
433+
if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
434+
throw new IllegalArgumentException("Need new config name or deprecation message.")
435+
}
436+
437+
def warn(): Unit = {
438+
if (warned.compareAndSet(false, true)) {
439+
if (newName != null) {
440+
val message = Option(deprecationMessage).getOrElse(
441+
s"Please use the alternative '$newName' instead.")
442+
logWarning(
443+
s"The configuration option '$oldName' has been replaced as of Spark $version and " +
444+
s"may be removed in the future. $message")
445+
} else {
446+
logWarning(
447+
s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
448+
s"may be removed in the future. $deprecationMessage")
449+
}
450+
}
451+
}
452+
453+
}
383454
}

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.spark
1919

20-
import java.io.{File, FileInputStream, FileOutputStream}
20+
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
2121
import java.net.{URI, URL}
2222
import java.util.jar.{JarEntry, JarOutputStream}
2323

2424
import scala.collection.JavaConversions._
2525

26+
import com.google.common.base.Charsets.UTF_8
2627
import com.google.common.io.{ByteStreams, Files}
2728
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
2829

@@ -59,6 +60,22 @@ private[spark] object TestUtils {
5960
createJar(files1 ++ files2, jarFile)
6061
}
6162

63+
/**
64+
* Create a jar file containing multiple files. The `files` map contains a mapping of
65+
* file names in the jar file to their contents.
66+
*/
67+
def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
68+
val tempDir = Option(dir).getOrElse(Utils.createTempDir())
69+
val jarFile = File.createTempFile("testJar", ".jar", tempDir)
70+
val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
71+
files.foreach { case (k, v) =>
72+
val entry = new JarEntry(k)
73+
jarStream.putNextEntry(entry)
74+
ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
75+
}
76+
jarStream.close()
77+
jarFile.toURI.toURL
78+
}
6279

6380
/**
6481
* Create a jar file that contains this set of files. All files will be located at the root

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
6868
.map(Utils.splitCommandString).getOrElse(Seq.empty)
6969
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
7070
val javaOpts = sparkJavaOpts ++ extraJavaOpts
71-
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
72-
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
71+
val command = new Command(mainClass,
72+
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
73+
sys.env, classPathEntries, libraryPathEntries, javaOpts)
7374

7475
val driverDescription = new DriverDescription(
7576
driverArgs.jarUrl,

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
3737

3838
import org.apache.spark.deploy.rest._
3939
import org.apache.spark.executor._
40-
import org.apache.spark.util.Utils
40+
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4141

4242
/**
4343
* Whether to submit, kill, or request the status of an application.
@@ -467,11 +467,11 @@ object SparkSubmit {
467467
}
468468

469469
val loader =
470-
if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
471-
new ChildExecutorURLClassLoader(new Array[URL](0),
470+
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
471+
new ChildFirstURLClassLoader(new Array[URL](0),
472472
Thread.currentThread.getContextClassLoader)
473473
} else {
474-
new ExecutorURLClassLoader(new Array[URL](0),
474+
new MutableURLClassLoader(new Array[URL](0),
475475
Thread.currentThread.getContextClassLoader)
476476
}
477477
Thread.currentThread.setContextClassLoader(loader)

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
196196
<td sorttable_customkey={driver.desc.mem.toString}>
197197
{Utils.megabytesToString(driver.desc.mem.toLong)}
198198
</td>
199-
<td>{driver.desc.command.arguments(1)}</td>
199+
<td>{driver.desc.command.arguments(2)}</td>
200200
</tr>
201201
}
202202
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ private class SubmitRequestServlet(
392392
val javaOpts = sparkJavaOpts ++ extraJavaOpts
393393
val command = new Command(
394394
"org.apache.spark.deploy.worker.DriverWrapper",
395-
Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
395+
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
396396
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
397397
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
398398
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,15 @@ private[spark] class DriverRunner(
7474
val driverDir = createWorkingDirectory()
7575
val localJarFilename = downloadUserJar(driverDir)
7676

77-
// Make sure user application jar is on the classpath
77+
def substituteVariables(argument: String): String = argument match {
78+
case "{{WORKER_URL}}" => workerUrl
79+
case "{{USER_JAR}}" => localJarFilename
80+
case other => other
81+
}
82+
7883
// TODO: If we add ability to submit multiple jars they should also be added here
7984
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
80-
sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
85+
sparkHome.getAbsolutePath, substituteVariables)
8186
launchDriver(builder, driverDir, driverDesc.supervise)
8287
}
8388
catch {
@@ -111,12 +116,6 @@ private[spark] class DriverRunner(
111116
}
112117
}
113118

114-
/** Replace variables in a command argument passed to us */
115-
private def substituteVariables(argument: String): String = argument match {
116-
case "{{WORKER_URL}}" => workerUrl
117-
case other => other
118-
}
119-
120119
/**
121120
* Creates the working directory for this driver.
122121
* Will throw an exception if there are errors preparing the directory.

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,44 @@
1717

1818
package org.apache.spark.deploy.worker
1919

20+
import java.io.File
21+
2022
import akka.actor._
2123

2224
import org.apache.spark.{SecurityManager, SparkConf}
23-
import org.apache.spark.util.{AkkaUtils, Utils}
25+
import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
2426

2527
/**
2628
* Utility object for launching driver programs such that they share fate with the Worker process.
2729
*/
2830
object DriverWrapper {
2931
def main(args: Array[String]) {
3032
args.toList match {
31-
case workerUrl :: mainClass :: extraArgs =>
33+
case workerUrl :: userJar :: mainClass :: extraArgs =>
3234
val conf = new SparkConf()
3335
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
3436
Utils.localHostName(), 0, conf, new SecurityManager(conf))
3537
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
3638

39+
val currentLoader = Thread.currentThread.getContextClassLoader
40+
val userJarUrl = new File(userJar).toURI().toURL()
41+
val loader =
42+
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
43+
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
44+
} else {
45+
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
46+
}
47+
Thread.currentThread.setContextClassLoader(loader)
48+
3749
// Delegate to supplied main class
38-
val clazz = Class.forName(args(1))
50+
val clazz = Class.forName(mainClass, true, loader)
3951
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
4052
mainMethod.invoke(null, extraArgs.toArray[String])
4153

4254
actorSystem.shutdown()
4355

4456
case _ =>
45-
System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
57+
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
4658
System.exit(-1)
4759
}
4860
}

0 commit comments

Comments
 (0)