Skip to content

[SPARK-51272][CORE] Aborting instead of continuing partially completed indeterminate result stage at ResubmitFailedStages #50630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from

Conversation

attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Apr 17, 2025

What changes were proposed in this pull request?

This PR aborts the indeterminate partially completed result stage instead of resubmitting it.

Why are the changes needed?

A result stage compared to shuffle map stage has more output and more intermediate state:

  • It can use a FileOutputCommitter where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content).
  • In case of JDBC write it can already inserted all rows of a partitions into the target schema.

Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data.

As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage.

The existing code before this PR already tried to address a similar situation at the handling of FetchFailed when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182

But this is not enough as a FetchFailed from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

Moreover the ResubmitFailedStages can be in race with a successful CompletionEvent. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit tests are created to illustrate the situation above.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Apr 17, 2025
@attilapiros
Copy link
Contributor Author

cc @mridulm

@attilapiros attilapiros requested a review from Ngone51 April 18, 2025 00:59
@attilapiros
Copy link
Contributor Author

The "java.lang.OutOfMemoryError: Java heap space" in the pyspark-pandas-connect-part2 is unrelated.

@attilapiros
Copy link
Contributor Author

After the test was restarted the error is resolved.

@mridulm
Copy link
Contributor

mridulm commented Apr 18, 2025

It can use a FileOutputCommitter where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content)

Only unsuccessful (and so uncommitted) tasks are candidates for (re)execution (and so commit) - not completed tasks.
So if a partition has completed task commit, it wont be reexecuted - spark ensures this w.r.t use of FileOutputCommitter

In case of JDBC write it can already inserted all rows of a partitions into the target schema.

As discussed here, this is a bug in jdbc implementation - the txn commit should be done in a task commit, not as part of foreachPartition(savePartition).
It is not expected to work correctly in all scenarios, and in the case observed, it did end up failing.

But this is not enough as a FetchFailed from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

The fix for this is to handle something similar to this.

I have sketched a rough impl here for reference (it is just illustrative ! and to convey what I was talking about).
Assuming I am not missing anything, it should help you with fixing this issue.

  • Option 1 handles indeterminate impact when processing shuffle data loss.
  • Option 2 does this when computing an indeterminate stage.

Option 1 is much more aggressive with cleanup, but might spuriously kills jobs a lot more than required.
If option 2 is correct (and we would need to rigorously analyze it for correctness and completeness), I would prefer that - as it is much more conservative with abort'ing stages/failing jobs.

(I have adapted the tests you included in this PR for both - and they both pass)

@attilapiros
Copy link
Contributor Author

@mridulm

Only unsuccessful (and so uncommitted) tasks are candidates for (re)execution (and so commit) - not completed tasks.
So if a partition has completed task commit, it wont be reexecuted - spark ensures this w.r.t use of FileOutputCommitter

But that's also bad for an indeterminate stage as the data is inconsistent. I mean the committed partitions are coming from a previous old computation and not from the latest one but the resubmitted ones are coming from the new one.

To illustrate it:

scala> import org.apache.spark.sql.functions.udf
scala> val myudf = udf(() => { val rnd = new java.util.Random(); rnd.nextInt(10)}).asNondeterministic()
scala> spark.udf.register("myudf", myudf)
scala> val df = sql("SELECT rand, count(rand) as cnt from (SELECT myudf() as rand from explode(sequence(1, 1000))) GROUP BY rand")
scala> df.show
+----+---+
|rand|cnt|
+----+---+
|   1|122|
|   6|110|
|   3|111|
|   5| 85|
|   9| 99|
|   4| 94|
|   8| 93|
|   7| 88|
|   2| 98|
|   0|100|
+----+---+
scala> df.selectExpr("sum(cnt)").show
+--------+
|sum(cnt)|
+--------+
|    1000|
+--------+

So if we write the df to a table and some but not all tasks was successful and a resubmit happened we might have inconsistent result where sum(cnt) won't be 1000 when we load back the data as the resubmit might run on the shuffle map stage which regenerated the random values but with a different distribution of the value from 0 to 10. The complete shuffle map stage re-executed but the result stage not.

@mridulm
Copy link
Contributor

mridulm commented Apr 18, 2025

But that's also bad for an indeterminate stage as the data is inconsistent. I mean the committed partitions are coming from a previous old computation and not from the latest one but the resubmitted ones are coming from the new one.

If parent map stage was indeterminate - existing spark code would have already aborted the stage - if there was a fetch failure for that parent stage.
If parent is determinate - why does it matter which attempt the data came from ? The input to a partition for result stage will always be same (except for ordering perhaps).

As you have pointed out in the test in this PR, there is a gap in the existing impl - which is that when there is a shuffle loss due to executor/host failure (and not detected through a fetch failure) - the check for determinism was not being performed before recomputing the lost data; and so if shuffle files are lost for an indeterminate stage, but never resulted in a Fetch failure (in the test - submitMissingParents for result stage would have recomputed the indeterminate stage after it is recomputed the determinate parent) the check to abort its child stages was not being done.
This is indeed a bug, which needs to be addressed - and I have proposed two options for it. Would be great if you could take a look !

But that does not require failing the result stage - even if it is indeterminate - if no indeterminate parent has lost any shuffle outputs.

So if we write the df to a table and some but not all tasks was successful and a resubmit happened we might have inconsistent result where sum(cnt) won't be 1000 when we load back the data as the resubmit might run on the shuffle map stage which regenerated the random values but with a different distribution of the value from 0 to 10. The complete shuffle map stage re-executed but the result stage not.

This will not happen - please see above.

"some but not all tasks was successful and a resubmit happened" -> if it results in reexecution of the parent (indeterminate) stage through a fetch failure, job will be aborted.
If it does not result in re-execution of parent stage - the computation is deterministic - it is essentially: "WITH foo AS (SELECT key, count(key) as cnt FROM constant_table GROUP BY rand) SELECT SUM(cnt) FROM foo" - which will always give same result.

Please do let me know if I am missing some nuance.
If there is a test case to illustrate, that would be great !

(Edited to hopefully improve clarity !)

@attilapiros
Copy link
Contributor Author

@mridulm
Copy link
Contributor

mridulm commented Apr 24, 2025

@mridulm regarding option 2 why a return is enough here (and not an abortStage):

It should result in same behavior (all jobs, this stage was part of, have been aborted in that scenario - and we have not added the stage to the runningStages yes).
Having said that, it is indeed better to explictly abort it - even if it does not do much right now : so that we are more robust to code evolution in future.

Why we need to check whether all jobs should be aborted and not only just one, here:

Stage can be part of multiple concurrent jobs, and not all of them might be getting aborted: some of them might not have started a result stage, and so recoverable.
Only if all of them have been aborted, we can abort the stage.

@attilapiros attilapiros marked this pull request as draft April 30, 2025 20:35
@attilapiros
Copy link
Contributor Author

@mridulm I am addig your option2 solution but I would like to do some changes based on it

@attilapiros attilapiros force-pushed the SPARK-51272_attila_3 branch from d8e7efe to 9301803 Compare April 30, 2025 20:37
@mridulm
Copy link
Contributor

mridulm commented May 1, 2025

I am addig your option2 solution but I would like to do some changes based on it

Please do ! It was just a rough sketch :-)

I will circle back to this PR later this week, thanks for updating it.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for thinking through this @attilapiros !

Just one comment, rest looks fine to me otherwise.
But then, I am too close to this :-)

Would love to have additional eyes on this.
+CC @Ngone51

@attilapiros attilapiros marked this pull request as ready for review May 3, 2025 08:12
@attilapiros
Copy link
Contributor Author

@mridulm

But then, I am too close to this :-)

Definitely you should be credited here. Is there a way to list you as co-author?

@mridulm
Copy link
Contributor

mridulm commented May 3, 2025

Definitely you should be credited here. Is there a way to list you as co-author?

I meant in turns of objectivity !
If there is a blind spot, both of us might not be seeing it

@attilapiros
Copy link
Contributor Author

@mridulm yes you are right still I would like to give you the credit too if there is a way

Copy link
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just minor suggestions in attilapiros#9, but looks good otherwise.

@attilapiros
Copy link
Contributor Author

attilapiros commented May 8, 2025

cc @cloud-fan do you have any objections? If not we would like to merge this early next week.

@attilapiros attilapiros changed the title [SPARK-51272][CORE] Aborting instead of re-submitting of partially completed indeterminate result stage [SPARK-51272][CORE] Aborting instead of continuing partially completed indeterminate result stage at ResubmitFailedStages May 9, 2025
@attilapiros attilapiros requested a review from cloud-fan May 16, 2025 21:47
@cloud-fan
Copy link
Contributor

cloud-fan commented May 19, 2025

thanks, merging to master/4.0!

@cloud-fan cloud-fan closed this in 7604f67 May 19, 2025
cloud-fan pushed a commit that referenced this pull request May 19, 2025
…d indeterminate result stage at ResubmitFailedStages

### What changes were proposed in this pull request?

This PR aborts the indeterminate partially completed result stage instead of resubmitting it.

### Why are the changes needed?

A result stage compared to shuffle map stage has more output and more intermediate state:
- It can use a `FileOutputCommitter` where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content).
- In case of JDBC write it can already inserted all rows of a partitions into the target schema.

Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data.

As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage.

The existing code before this PR already tried to address a similar situation at the handling of `FetchFailed` when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182

But this is not enough as a `FetchFailed` from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

Moreover the `ResubmitFailedStages` can be in race with a successful `CompletionEvent`. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests are created to illustrate the situation above.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50630 from attilapiros/SPARK-51272_attila_3.

Lead-authored-by: attilapiros <[email protected]>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Co-authored-by: Peter Toth <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7604f67)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

@attilapiros can you open a 3.5 backport PR? thanks!

attilapiros added a commit to attilapiros/spark that referenced this pull request May 20, 2025
…d indeterminate result stage at ResubmitFailedStages

This PR aborts the indeterminate partially completed result stage instead of resubmitting it.

A result stage compared to shuffle map stage has more output and more intermediate state:
- It can use a `FileOutputCommitter` where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content).
- In case of JDBC write it can already inserted all rows of a partitions into the target schema.

Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data.

As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage.

The existing code before this PR already tried to address a similar situation at the handling of `FetchFailed` when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182

But this is not enough as a `FetchFailed` from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

Moreover the `ResubmitFailedStages` can be in race with a successful `CompletionEvent`. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage.

No.

New unit tests are created to illustrate the situation above.

No.

Closes apache#50630 from attilapiros/SPARK-51272_attila_3.

Lead-authored-by: attilapiros <[email protected]>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Co-authored-by: Peter Toth <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7604f67)
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
…d indeterminate result stage at ResubmitFailedStages

### What changes were proposed in this pull request?

This PR aborts the indeterminate partially completed result stage instead of resubmitting it.

### Why are the changes needed?

A result stage compared to shuffle map stage has more output and more intermediate state:
- It can use a `FileOutputCommitter` where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content).
- In case of JDBC write it can already inserted all rows of a partitions into the target schema.

Ignoring the resubmit when a recalculation is needed would cause data corruption as the partial result is based on the previous indeterminate computation but continuing means finishing the stage with the new recomputed data.

As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can do when a recalculation is needed is aborting the stage.

The existing code before this PR already tried to address a similar situation at the handling of `FetchFailed` when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182

But this is not enough as a `FetchFailed` from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

Moreover the `ResubmitFailedStages` can be in race with a successful `CompletionEvent`. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests are created to illustrate the situation above.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50630 from attilapiros/SPARK-51272_attila_3.

Lead-authored-by: attilapiros <[email protected]>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Co-authored-by: Peter Toth <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants