-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52582][SQL] Improve the memory usage of XML parser #51287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
Outdated
Show resolved
Hide resolved
Utils.tryWithResource( | ||
CodecStreams.createInputStreamWithCloseResource(conf, file.toPath) | ||
) { is => | ||
UTF8String.fromBytes(ByteStreams.toByteArray(is)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may hit java byte array limit that this PR is trying to address. Limit it to 1GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but it will only be called for corrupted files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't throw an exception when ignoreCorruptFiles
is set.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParserUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala
Outdated
Show resolved
Hide resolved
.../src/test/scala/org/apache/spark/sql/execution/datasources/xml/OptimizedXMLParserSuite.scala
Outdated
Show resolved
Hide resolved
// Malformed recording handling is slightly different in optimized XML parser | ||
"DSL test for parsing a malformed XML file", | ||
"DSL test for permissive mode for corrupt records", | ||
"DSL test with malformed attributes", | ||
"DSL test for dropping malformed rows", | ||
"DSL: handle malformed record in singleVariantColumn mode", | ||
// No valid row will be found in `unclosed_tag.xml` by the OptimizedXMLTokenizer | ||
"test FAILFAST with unclosed tag", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets update some of these tests to have some valid rowTags in the beginning and a corrupt one towards the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this specific test case, the record will fail at the tokenizing stage and will not enter the parsing stage; thus, it will not fail in the FAILFAST mode as originally expected.
.../src/test/scala/org/apache/spark/sql/execution/datasources/xml/OptimizedXMLParserSuite.scala
Outdated
Show resolved
Hide resolved
006a14f
to
74ecf46
Compare
From the description in the pr, enabling the optimization will disrupt some existing behaviors. Have there been similar cases in the past? cc @cloud-fan |
82d7af0
to
5c7164e
Compare
Updated the PR to eliminate the behavior gaps between the optimized version and the existing version. |
@@ -292,7 +283,9 @@ class XmlSuite | |||
) | |||
} | |||
|
|||
test("test FAILFAST with unclosed tag") { | |||
// The record in the test XML file doesn't have a valid start row tag, thus no record is tokenized | |||
// and parsed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was the behavior before?
@xiaonanyang-db can we explain the behavior difference in details if the conf is enabled? |
convertStream(xmlTokenizer) { tokens => | ||
safeParser.parse(tokens) | ||
convertStream(xmlTokenizer) { parser => | ||
safeParser.parse(parser) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will this parser
close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be closed in two cases:
- In
XMLTokenizer
, when there are no more records doParseColumn
, when we run into a corrupted record
result | ||
} | ||
} catch { | ||
case e: SparkUpgradeException => throw e | ||
case e@(_: RuntimeException | _: XMLStreamException | _: MalformedInputException | ||
| _: SAXException) => | ||
// Skip rest of the content in the parser and put the whole XML file in the | ||
// BadRecordException. | ||
parser.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will the non-exception path close the parser?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In XMLTokenizer.next()
, when there are no more records
@@ -35,29 +35,39 @@ object StaxXmlParserUtils { | |||
factory | |||
} | |||
|
|||
val filter = new EventFilter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this filter
be reused across multiple readers? If not, define a function that return the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be reused as it's stateless
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParserUtils.scala
Outdated
Show resolved
Hide resolved
case _: RuntimeException | _: IOException if options.ignoreCorruptFiles => | ||
logWarning("Skipping the rest of the content in the corrupted file", e) | ||
case _: XMLStreamException => | ||
logWarning("Skipping the rest of the content in the corrupted file", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is XMLStreamException
a separate case
? Shouldn't there be a call to close
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, those two cases can be combined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we shouldn't need separate closes in each case, the parser will be closed in the finally
block
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
Outdated
Show resolved
Hide resolved
val schema = buildSchema(field("a1", LongType), field("a2", LongType)) | ||
val schema = buildSchema( | ||
field("_corrupted_record", LongType), field("a1", LongType), field("a2", LongType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was _corrupted_record
field required when writing data.
assert( | ||
singleVariantColumn.isDefined || schemaDDL.isDefined, | ||
"Either singleVariantColumn or schema must be defined to ingest XML files as variants via DSL" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this assert no longer valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we support the user-specified schema of the single variant column and the corrupted record column now in singleVariantColumn mode
What changes were proposed in this pull request?
Today, the XML parser is not memory efficient. It loads each XML record into memory first before parsing, which causes an OOM if the input XML record is large. This PR improves the parser to parse an XML record token by token to avoid copying the entire XML record into memory ahead of time.
The optimization also introduces deterministic behavior in handling malformed XML files. Currently, the XML parser doesn't scavenge all valid records deterministically. After the improvement, the parser will fail at the first corrupt record but return all the valid ones before it.
Why are the changes needed?
Solve the OOM issue in XML ingestion.
Does this PR introduce any user-facing change?
No. The new behavior is disabled by default for now.
How was this patch tested?
New UTs.
Was this patch authored or co-authored using generative AI tooling?
No.