Skip to content

[SPARK-52582][SQL] Improve the memory usage of XML parser #51287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

xiaonanyang-db
Copy link
Contributor

@xiaonanyang-db xiaonanyang-db commented Jun 26, 2025

What changes were proposed in this pull request?

Today, the XML parser is not memory efficient. It loads each XML record into memory first before parsing, which causes an OOM if the input XML record is large. This PR improves the parser to parse an XML record token by token to avoid copying the entire XML record into memory ahead of time.

The optimization also introduces deterministic behavior in handling malformed XML files. Currently, the XML parser doesn't scavenge all valid records deterministically. After the improvement, the parser will fail at the first corrupt record but return all the valid ones before it.

Why are the changes needed?

Solve the OOM issue in XML ingestion.

Does this PR introduce any user-facing change?

No. The new behavior is disabled by default for now.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@xiaonanyang-db xiaonanyang-db marked this pull request as ready for review June 26, 2025 01:46
@github-actions github-actions bot added the SQL label Jun 26, 2025
@HyukjinKwon HyukjinKwon changed the title [SPARK-52582] Improve the memory usage of XML parser [SPARK-52582][SQL] Improve the memory usage of XML parser Jun 26, 2025
Utils.tryWithResource(
CodecStreams.createInputStreamWithCloseResource(conf, file.toPath)
) { is =>
UTF8String.fromBytes(ByteStreams.toByteArray(is))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may hit java byte array limit that this PR is trying to address. Limit it to 1GB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it will only be called for corrupted files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't throw an exception when ignoreCorruptFiles is set.

Comment on lines 51 to 58
// Malformed recording handling is slightly different in optimized XML parser
"DSL test for parsing a malformed XML file",
"DSL test for permissive mode for corrupt records",
"DSL test with malformed attributes",
"DSL test for dropping malformed rows",
"DSL: handle malformed record in singleVariantColumn mode",
// No valid row will be found in `unclosed_tag.xml` by the OptimizedXMLTokenizer
"test FAILFAST with unclosed tag",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets update some of these tests to have some valid rowTags in the beginning and a corrupt one towards the end.

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this specific test case, the record will fail at the tokenizing stage and will not enter the parsing stage; thus, it will not fail in the FAILFAST mode as originally expected.

@LuciferYang
Copy link
Contributor

The optimization is governed by an SQL conf and disabled by default because it comes with two consequences:

  1. XSD validation is not supported in the optimized parser. The current XSD validation works only on a full XML record string, which violates the optimization here.
  2. Behavior change of corrupted record handling. Currently, good records in an XML file can be parsed correctly even if there is a corrupted XML record in between, but they won't be parsed anymore in the optimized parser. For example, given the following XML file:
<ROWS>
  <ROW><b>1</b></ROW>
  <ROW><b></ROW>
  <ROW><b>2</b></ROW>
</ROWS>

In the current parser, both the 1st and 3rd records will be parsed correctly, and the second one will be moved to the corrupted data column. In the new parser, only the 1st record will be parsed and the whole document will be moved to the corrupted data column as the second record.

From the description in the pr, enabling the optimization will disrupt some existing behaviors. Have there been similar cases in the past? cc @cloud-fan

@xiaonanyang-db
Copy link
Contributor Author

The optimization is governed by an SQL conf and disabled by default because it comes with two consequences:

  1. XSD validation is not supported in the optimized parser. The current XSD validation works only on a full XML record string, which violates the optimization here.
  2. Behavior change of corrupted record handling. Currently, good records in an XML file can be parsed correctly even if there is a corrupted XML record in between, but they won't be parsed anymore in the optimized parser. For example, given the following XML file:
<ROWS>
  <ROW><b>1</b></ROW>
  <ROW><b></ROW>
  <ROW><b>2</b></ROW>
</ROWS>

In the current parser, both the 1st and 3rd records will be parsed correctly, and the second one will be moved to the corrupted data column. In the new parser, only the 1st record will be parsed and the whole document will be moved to the corrupted data column as the second record.

From the description in the pr, enabling the optimization will disrupt some existing behaviors. Have there been similar cases in the past? cc @cloud-fan

Updated the PR to eliminate the behavior gaps between the optimized version and the existing version.

@@ -292,7 +283,9 @@ class XmlSuite
)
}

test("test FAILFAST with unclosed tag") {
// The record in the test XML file doesn't have a valid start row tag, thus no record is tokenized
// and parsed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the behavior before?

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 8, 2025

No. The new behavior is disabled by default for now.

@xiaonanyang-db can we explain the behavior difference in details if the conf is enabled?

convertStream(xmlTokenizer) { tokens =>
safeParser.parse(tokens)
convertStream(xmlTokenizer) { parser =>
safeParser.parse(parser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this parser close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be closed in two cases:

  1. In XMLTokenizer, when there are no more records
  2. doParseColumn, when we run into a corrupted record

result
}
} catch {
case e: SparkUpgradeException => throw e
case e@(_: RuntimeException | _: XMLStreamException | _: MalformedInputException
| _: SAXException) =>
// Skip rest of the content in the parser and put the whole XML file in the
// BadRecordException.
parser.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will the non-exception path close the parser?

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In XMLTokenizer.next(), when there are no more records

@@ -35,29 +35,39 @@ object StaxXmlParserUtils {
factory
}

val filter = new EventFilter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this filter be reused across multiple readers? If not, define a function that return the filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be reused as it's stateless

Comment on lines 67 to 70
case _: RuntimeException | _: IOException if options.ignoreCorruptFiles =>
logWarning("Skipping the rest of the content in the corrupted file", e)
case _: XMLStreamException =>
logWarning("Skipping the rest of the content in the corrupted file", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is XMLStreamException a separate case? Shouldn't there be a call to close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, those two cases can be combined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we shouldn't need separate closes in each case, the parser will be closed in the finally block

Comment on lines -2976 to +2965
val schema = buildSchema(field("a1", LongType), field("a2", LongType))
val schema = buildSchema(
field("_corrupted_record", LongType), field("a1", LongType), field("a2", LongType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was _corrupted_record field required when writing data.

Comment on lines -395 to -398
assert(
singleVariantColumn.isDefined || schemaDDL.isDefined,
"Either singleVariantColumn or schema must be defined to ingest XML files as variants via DSL"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this assert no longer valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we support the user-specified schema of the single variant column and the corrupted record column now in singleVariantColumn mode

@xiaonanyang-db xiaonanyang-db requested a review from sandip-db July 8, 2025 21:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants