-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-51272][CORE] Aborting instead of continuing partially completed indeterminate result stage at ResubmitFailedStages #50630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @mridulm |
The "java.lang.OutOfMemoryError: Java heap space" in the pyspark-pandas-connect-part2 is unrelated. |
After the test was restarted the error is resolved. |
Only unsuccessful (and so uncommitted) tasks are candidates for (re)execution (and so commit) - not completed tasks.
As discussed here, this is a bug in jdbc implementation - the txn commit should be done in a task commit, not as part of
The fix for this is to handle something similar to this. I have sketched a rough impl here for reference (it is just illustrative ! and to convey what I was talking about).
Option 1 is much more aggressive with cleanup, but might spuriously kills jobs a lot more than required. (I have adapted the tests you included in this PR for both - and they both pass) |
But that's also bad for an indeterminate stage as the data is inconsistent. I mean the committed partitions are coming from a previous old computation and not from the latest one but the resubmitted ones are coming from the new one. To illustrate it:
So if we write the |
If parent map stage was indeterminate - existing spark code would have already aborted the stage - if there was a fetch failure for that parent stage. As you have pointed out in the test in this PR, there is a gap in the existing impl - which is that when there is a shuffle loss due to executor/host failure (and not detected through a fetch failure) - the check for determinism was not being performed before recomputing the lost data; and so if shuffle files are lost for an indeterminate stage, but never resulted in a Fetch failure (in the test - But that does not require failing the result stage - even if it is indeterminate - if no indeterminate parent has lost any shuffle outputs.
This will not happen - please see above. "some but not all tasks was successful and a resubmit happened" -> if it results in reexecution of the parent (indeterminate) stage through a fetch failure, job will be aborted. Please do let me know if I am missing some nuance. (Edited to hopefully improve clarity !) |
@mridulm regarding option 2 why a return is enough here (and not an meanwhile when there is an exception at task creation an Why we need to check whether all jobs should be aborted and not only just one, here: |
It should result in same behavior (all jobs, this stage was part of, have been aborted in that scenario - and we have not added the stage to the runningStages yes).
Stage can be part of multiple concurrent jobs, and not all of them might be getting aborted: some of them might not have started a result stage, and so recoverable. |
@mridulm I am addig your option2 solution but I would like to do some changes based on it |
d8e7efe
to
9301803
Compare
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Please do ! It was just a rough sketch :-) I will circle back to this PR later this week, thanks for updating it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for thinking through this @attilapiros !
Just one comment, rest looks fine to me otherwise.
But then, I am too close to this :-)
Would love to have additional eyes on this.
+CC @Ngone51
…ns < mapStage.numTasks'
Definitely you should be credited here. Is there a way to list you as co-author? |
I meant in turns of objectivity ! |
@mridulm yes you are right still I would like to give you the credit too if there is a way |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just minor suggestions in attilapiros#9, but looks good otherwise.
cc @cloud-fan do you have any objections? If not we would like to merge this early next week. |
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
thanks, merging to master/4.0! |
…d indeterminate result stage at ResubmitFailedStages ### What changes were proposed in this pull request? This PR aborts the indeterminate partially completed result stage instead of resubmitting it. ### Why are the changes needed? A result stage compared to shuffle map stage has more output and more intermediate state: - It can use a `FileOutputCommitter` where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content). - In case of JDBC write it can already inserted all rows of a partitions into the target schema. Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data. As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage. The existing code before this PR already tried to address a similar situation at the handling of `FetchFailed` when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182 But this is not enough as a `FetchFailed` from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest. Moreover the `ResubmitFailedStages` can be in race with a successful `CompletionEvent`. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests are created to illustrate the situation above. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50630 from attilapiros/SPARK-51272_attila_3. Lead-authored-by: attilapiros <[email protected]> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Co-authored-by: Peter Toth <[email protected]> Co-authored-by: Attila Zsolt Piros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7604f67) Signed-off-by: Wenchen Fan <[email protected]>
@attilapiros can you open a 3.5 backport PR? thanks! |
…d indeterminate result stage at ResubmitFailedStages This PR aborts the indeterminate partially completed result stage instead of resubmitting it. A result stage compared to shuffle map stage has more output and more intermediate state: - It can use a `FileOutputCommitter` where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content). - In case of JDBC write it can already inserted all rows of a partitions into the target schema. Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data. As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage. The existing code before this PR already tried to address a similar situation at the handling of `FetchFailed` when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182 But this is not enough as a `FetchFailed` from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest. Moreover the `ResubmitFailedStages` can be in race with a successful `CompletionEvent`. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage. No. New unit tests are created to illustrate the situation above. No. Closes apache#50630 from attilapiros/SPARK-51272_attila_3. Lead-authored-by: attilapiros <[email protected]> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Co-authored-by: Peter Toth <[email protected]> Co-authored-by: Attila Zsolt Piros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7604f67)
…d indeterminate result stage at ResubmitFailedStages ### What changes were proposed in this pull request? This PR aborts the indeterminate partially completed result stage instead of resubmitting it. ### Why are the changes needed? A result stage compared to shuffle map stage has more output and more intermediate state: - It can use a `FileOutputCommitter` where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content). - In case of JDBC write it can already inserted all rows of a partitions into the target schema. Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data. As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage. The existing code before this PR already tried to address a similar situation at the handling of `FetchFailed` when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182 But this is not enough as a `FetchFailed` from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest. Moreover the `ResubmitFailedStages` can be in race with a successful `CompletionEvent`. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests are created to illustrate the situation above. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50630 from attilapiros/SPARK-51272_attila_3. Lead-authored-by: attilapiros <[email protected]> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Co-authored-by: Peter Toth <[email protected]> Co-authored-by: Attila Zsolt Piros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR aborts the indeterminate partially completed result stage instead of resubmitting it.
Why are the changes needed?
A result stage compared to shuffle map stage has more output and more intermediate state:
FileOutputCommitter
where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content).Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data.
As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage.
The existing code before this PR already tried to address a similar situation at the handling of
FetchFailed
when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182But this is not enough as a
FetchFailed
from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.Moreover the
ResubmitFailedStages
can be in race with a successfulCompletionEvent
. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage.Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit tests are created to illustrate the situation above.
Was this patch authored or co-authored using generative AI tooling?
No.