Skip to content

feat: support read_gbq wildcard table path #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: support read_gbq wildcard table path
  • Loading branch information
GarrettWu committed Feb 9, 2024
commit 6377c51d4106642b6bb631a320853f96c5b07cec
58 changes: 38 additions & 20 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Iterable,
List,
Literal,
Mapping,
MutableSequence,
Optional,
Sequence,
Expand Down Expand Up @@ -115,6 +116,11 @@ def _is_query(query_or_table: str) -> bool:
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None


def _is_table_with_wildcard_suffix(query_or_table: str) -> bool:
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
return not _is_query(query_or_table) and query_or_table.endswith("*")


class Session(
third_party_pandas_gbq.GBQIOMixin,
third_party_pandas_parquet.ParquetIOMixin,
Expand Down Expand Up @@ -248,7 +254,9 @@ def read_gbq(
elif col_order:
columns = col_order

query_or_table = self._filters_to_query(query_or_table, columns, filters)
filters = list(filters)
if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table):
query_or_table = self._to_query(query_or_table, columns, filters)

if _is_query(query_or_table):
return self._read_gbq_query(
Expand All @@ -272,13 +280,18 @@ def read_gbq(
use_cache=use_cache,
)

def _filters_to_query(self, query_or_table, columns, filters):
"""Convert filters to query"""
if len(filters) == 0:
return query_or_table

def _to_query(
self,
query_or_table: str,
columns: Iterable[str],
filters: third_party_pandas_gbq.FiltersType,
) -> str:
"""Compile query_or_table with conditions(filters, wildcards) to query."""
filters = list(filters)
sub_query = (
f"({query_or_table})" if _is_query(query_or_table) else query_or_table
f"({query_or_table})"
if _is_query(query_or_table)
else f"`{query_or_table}`"
)

select_clause = "SELECT " + (
Expand All @@ -287,7 +300,7 @@ def _filters_to_query(self, query_or_table, columns, filters):

where_clause = ""
if filters:
valid_operators = {
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
"in": "IN",
"not in": "NOT IN",
"==": "=",
Expand All @@ -298,19 +311,10 @@ def _filters_to_query(self, query_or_table, columns, filters):
"!=": "!=",
}

if (
isinstance(filters, Iterable)
and isinstance(filters[0], Tuple)
and (len(filters[0]) == 0 or not isinstance(filters[0][0], Tuple))
):
filters = [filters]

or_expressions = []
for group in filters:
if not isinstance(group, Iterable):
raise ValueError(
f"Filter group should be a iterable, {group} is not valid."
)
group = [group]

and_expressions = []
for filter_item in group:
Expand All @@ -329,9 +333,9 @@ def _filters_to_query(self, query_or_table, columns, filters):
if operator not in valid_operators:
raise ValueError(f"Operator {operator} is not valid.")

operator = valid_operators[operator]
operator_str = valid_operators[operator]

if operator in ["IN", "NOT IN"]:
if operator_str in ["IN", "NOT IN"]:
value_list = ", ".join([repr(v) for v in value])
expression = f"`{column}` {operator} ({value_list})"
else:
Expand Down Expand Up @@ -521,6 +525,7 @@ def read_gbq_table(
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: third_party_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
) -> dataframe.DataFrame:
Expand All @@ -546,6 +551,19 @@ def read_gbq_table(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or _is_table_with_wildcard_suffix(query):
query = self._to_query(query, columns, filters)

return self._read_gbq_query(
query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
)

return self._read_gbq_table(
query=query,
index_col=index_col,
Expand Down
26 changes: 26 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,32 @@ def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id):
assert df3 is not None


def test_read_gbq_wildcard(session: bigframes.Session):
df = session.read_gbq("bigquery-public-data.noaa_gsod.gsod193*")
assert df.shape == (348485, 32)


def test_read_gbq_wildcard_with_filter(session: bigframes.Session):
df = session.read_gbq(
"bigquery-public-data.noaa_gsod.gsod19*",
filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")],
)
assert df.shape == (348485, 32)


def test_read_gbq_table_wildcard(session: bigframes.Session):
df = session.read_gbq_table("bigquery-public-data.noaa_gsod.gsod193*")
assert df.shape == (348485, 32)


def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session):
df = session.read_gbq_table(
"bigquery-public-data.noaa_gsod.gsod19*",
filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")],
)
assert df.shape == (348485, 32)


def test_read_gbq_model(session, penguins_linear_model_name):
model = session.read_gbq_model(penguins_linear_model_name)
assert isinstance(model, bigframes.ml.linear_model.LinearRegression)
Expand Down
27 changes: 26 additions & 1 deletion tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,30 @@ def test_session_init_fails_with_no_project():
)
def test_read_gbq_with_filters(query_or_table, columns, filters, expected_output):
session = resources.create_bigquery_session()
query = session._filters_to_query(query_or_table, columns, filters)
query = session._to_query(query_or_table, columns, filters)
assert query == expected_output


@pytest.mark.parametrize(
("query_or_table", "columns", "filters", "expected_output"),
[
pytest.param(
"test_table*",
[],
[],
"SELECT * FROM test_table* AS sub",
id="wildcard_table_input",
),
pytest.param(
"test_table*",
[],
[("_TABLE_SUFFIX", ">", "2022-10-20")],
"SELECT * FROM test_table* AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'",
id="wildcard_table_input_with_filter",
),
],
)
def test_read_gbq_wildcard(query_or_table, columns, filters, expected_output):
session = resources.create_bigquery_session()
query = session._to_query(query_or_table, columns, filters)
assert query == expected_output
11 changes: 10 additions & 1 deletion third_party/bigframes_vendored/pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from bigframes import constants

FilterType = Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any]
FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"]
FilterType = Tuple[str, FilterOps, Any]
FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]]


Expand Down Expand Up @@ -52,6 +53,9 @@ def read_gbq(

>>> df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")

Read table path with wildcard suffix and filters:
>>> df = bpd.read_gbq_table("bigquery-public-data.noaa_gsod.gsod19*", filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")])

Preserve ordering in a query input.

>>> df = bpd.read_gbq('''
Expand Down Expand Up @@ -96,6 +100,8 @@ def read_gbq(
A SQL string to be executed or a BigQuery table to be read. The
table must be specified in the format of
`project.dataset.tablename` or `dataset.tablename`.
Can also take wildcard table name, such as `project.dataset.table_prefix*`.
In tha case, will read all the matched table as one DataFrame.
index_col (Iterable[str] or str):
Name of result column(s) to use for index in results DataFrame.
columns (Iterable[str]):
Expand All @@ -112,6 +118,9 @@ def read_gbq(
through an OR operation. A single Iterable of tuples can also
be used, meaning that no OR operation between set of filters
is to be conducted.
If using wildcard table suffix in query_or_table, can specify
'_table_suffix' pseudo column to filter the tables to be read
into the DataFrame.
use_cache (bool, default True):
Whether to cache the query inputs. Default to True.
col_order (Iterable[str]):
Expand Down