Skip to content

Commit 04125eb

Browse files
ueshinHyukjinKwon
authored andcommitted
[SPARK-41971][CONNECT][PYTHON][FOLLOWUP] Fix to_pandas to support the older Spark
### What changes were proposed in this pull request? This is a follow-up of apache#40988. Fix `to_pandas` to support the older Spark. For the server: ```py % ./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0 ``` with the client with the change here: ```py >>> spark.sql("values (1, struct('x' as x)) as t(a, b)").toPandas() a b 0 1 {'x': 'x'} ``` ### Why are the changes needed? The config `spark.sql.execution.pandas.structHandlingMode` introduced in apache#40988 does not exist in the older Spark, `<3.5` ```py >>> spark.sql("values (1, struct('x' as x)) as t(a, b)").toPandas() Traceback (most recent call last): ... pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.util.NoSuchElementException) spark.sql.execution.pandas.structHandlingMode ``` ### Does this PR introduce _any_ user-facing change? The newer Spark Connect client will work with `Spark<3.5`. ### How was this patch tested? Manually. Closes apache#41390 from ueshin/issues/SPARK-41971/config_with_default. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 11390c5 commit 04125eb

File tree

1 file changed

+17
-3
lines changed
  • python/pyspark/sql/connect/client

1 file changed

+17
-3
lines changed

python/pyspark/sql/connect/client/core.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -726,11 +726,14 @@ def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame":
726726

727727
if len(pdf.columns) > 0:
728728
timezone: Optional[str] = None
729+
if any(_has_type(f.dataType, TimestampType) for f in schema.fields):
730+
(timezone,) = self.get_configs("spark.sql.session.timeZone")
731+
729732
struct_in_pandas: Optional[str] = None
730733
error_on_duplicated_field_names: bool = False
731-
if any(_has_type(f.dataType, (StructType, TimestampType)) for f in schema.fields):
732-
timezone, struct_in_pandas = self.get_configs(
733-
"spark.sql.session.timeZone", "spark.sql.execution.pandas.structHandlingMode"
734+
if any(_has_type(f.dataType, StructType) for f in schema.fields):
735+
(struct_in_pandas,) = self.get_config_with_defaults(
736+
("spark.sql.execution.pandas.structHandlingMode", "legacy"),
734737
)
735738

736739
if struct_in_pandas == "legacy":
@@ -1108,6 +1111,17 @@ def get_configs(self, *keys: str) -> Tuple[Optional[str], ...]:
11081111
configs = dict(self.config(op).pairs)
11091112
return tuple(configs.get(key) for key in keys)
11101113

1114+
def get_config_with_defaults(
1115+
self, *pairs: Tuple[str, Optional[str]]
1116+
) -> Tuple[Optional[str], ...]:
1117+
op = pb2.ConfigRequest.Operation(
1118+
get_with_default=pb2.ConfigRequest.GetWithDefault(
1119+
pairs=[pb2.KeyValue(key=key, value=default) for key, default in pairs]
1120+
)
1121+
)
1122+
configs = dict(self.config(op).pairs)
1123+
return tuple(configs.get(key) for key, _ in pairs)
1124+
11111125
def config(self, operation: pb2.ConfigRequest.Operation) -> ConfigResult:
11121126
"""
11131127
Call the config RPC of Spark Connect.

0 commit comments

Comments
 (0)