Skip to content

Commit a96e415

Browse files
tejasapatilrxin
authored andcommitted
[SPARK-14400][SQL] ScriptTransformation does not fail the job for bad user command
## What changes were proposed in this pull request? - Refer to the Jira for the problem: jira : https://issues.apache.org/jira/browse/SPARK-14400 - The fix is to check if the process has exited with a non-zero exit code in `hasNext()`. I have moved this and checking of writer thread exception to a separate method. ## How was this patch tested? - Ran a job which had incorrect transform script command and saw that the job fails - Existing unit tests for `ScriptTransformationSuite`. Added a new unit test Author: Tejas Patil <[email protected]> Closes apache#12194 from tejasapatil/script_transform.
1 parent b376a4e commit a96e415

File tree

3 files changed

+81
-34
lines changed

3 files changed

+81
-34
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,12 @@ object SparkPlanTest {
231231
}
232232
}
233233

234-
private def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
234+
/**
235+
* Runs the plan
236+
* @param outputPlan SparkPlan to be executed
237+
* @param spark SqlContext used for execution of the plan
238+
*/
239+
def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
235240
val execution = new QueryExecution(spark.sparkSession, null) {
236241
override lazy val sparkPlan: SparkPlan = outputPlan transform {
237242
case plan: SparkPlan =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe
3232
import org.apache.hadoop.hive.serde2.objectinspector._
3333
import org.apache.hadoop.io.Writable
3434

35-
import org.apache.spark.TaskContext
35+
import org.apache.spark.{SparkException, TaskContext}
3636
import org.apache.spark.internal.Logging
3737
import org.apache.spark.rdd.RDD
3838
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -127,45 +127,71 @@ case class ScriptTransformation(
127127
}
128128
val mutableRow = new SpecificMutableRow(output.map(_.dataType))
129129

130+
private def checkFailureAndPropagate(cause: Throwable = null): Unit = {
131+
if (writerThread.exception.isDefined) {
132+
throw writerThread.exception.get
133+
}
134+
135+
// Checks if the proc is still alive (incase the command ran was bad)
136+
// The ideal way to do this is to use Java 8's Process#isAlive()
137+
// but it cannot be used because Spark still supports Java 7.
138+
// Following is a workaround used to check if a process is alive in Java 7
139+
// TODO: Once builds are switched to Java 8, this can be changed
140+
try {
141+
val exitCode = proc.exitValue()
142+
if (exitCode != 0) {
143+
logError(stderrBuffer.toString) // log the stderr circular buffer
144+
throw new SparkException(s"Subprocess exited with status $exitCode. " +
145+
s"Error: ${stderrBuffer.toString}", cause)
146+
}
147+
} catch {
148+
case _: IllegalThreadStateException =>
149+
// This means that the process is still alive. Move ahead
150+
}
151+
}
152+
130153
override def hasNext: Boolean = {
131-
if (outputSerde == null) {
132-
if (curLine == null) {
133-
curLine = reader.readLine()
154+
try {
155+
if (outputSerde == null) {
134156
if (curLine == null) {
135-
if (writerThread.exception.isDefined) {
136-
throw writerThread.exception.get
157+
curLine = reader.readLine()
158+
if (curLine == null) {
159+
checkFailureAndPropagate()
160+
return false
137161
}
138-
false
139-
} else {
140-
true
141162
}
142-
} else {
143-
true
144-
}
145-
} else if (scriptOutputWritable == null) {
146-
scriptOutputWritable = reusedWritableObject
163+
} else if (scriptOutputWritable == null) {
164+
scriptOutputWritable = reusedWritableObject
147165

148-
if (scriptOutputReader != null) {
149-
if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
150-
writerThread.exception.foreach(throw _)
151-
false
166+
if (scriptOutputReader != null) {
167+
if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
168+
checkFailureAndPropagate()
169+
return false
170+
}
152171
} else {
153-
true
154-
}
155-
} else {
156-
try {
157-
scriptOutputWritable.readFields(scriptOutputStream)
158-
true
159-
} catch {
160-
case _: EOFException =>
161-
if (writerThread.exception.isDefined) {
162-
throw writerThread.exception.get
163-
}
164-
false
172+
try {
173+
scriptOutputWritable.readFields(scriptOutputStream)
174+
} catch {
175+
case _: EOFException =>
176+
// This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted.
177+
// Ideally the proc should *not* be alive at this point but
178+
// there can be a lag between EOF being written out and the process
179+
// being terminated. So explicitly waiting for the process to be done.
180+
proc.waitFor()
181+
checkFailureAndPropagate()
182+
return false
183+
}
165184
}
166185
}
167-
} else {
186+
168187
true
188+
} catch {
189+
case NonFatal(e) =>
190+
// If this exception is due to abrupt / unclean termination of `proc`,
191+
// then detect it and propagate a better exception message for end users
192+
checkFailureAndPropagate(e)
193+
194+
throw e
169195
}
170196
}
171197

@@ -284,7 +310,6 @@ private class ScriptTransformationWriterThread(
284310
}
285311
}
286312
}
287-
outputStream.close()
288313
threwException = false
289314
} catch {
290315
case NonFatal(e) =>
@@ -295,6 +320,7 @@ private class ScriptTransformationWriterThread(
295320
throw e
296321
} finally {
297322
try {
323+
outputStream.close()
298324
if (proc.waitFor() != 0) {
299325
logError(stderrBuffer.toString) // log the stderr circular buffer
300326
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
2020
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
2121
import org.scalatest.exceptions.TestFailedException
2222

23-
import org.apache.spark.TaskContext
23+
import org.apache.spark.{SparkException, TaskContext}
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -109,6 +109,22 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
109109
}
110110
assert(e.getMessage().contains("intentional exception"))
111111
}
112+
113+
test("SPARK-14400 script transformation should fail for bad script command") {
114+
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
115+
116+
val e = intercept[SparkException] {
117+
val plan =
118+
new ScriptTransformation(
119+
input = Seq(rowsDf.col("a").expr),
120+
script = "some_non_existent_command",
121+
output = Seq(AttributeReference("a", StringType)()),
122+
child = rowsDf.queryExecution.sparkPlan,
123+
ioschema = serdeIOSchema)
124+
SparkPlanTest.executePlan(plan, hiveContext)
125+
}
126+
assert(e.getMessage.contains("Subprocess exited with status"))
127+
}
112128
}
113129

114130
private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {

0 commit comments

Comments
 (0)