Skip to content

fix: read_gbq supports extreme DATETIME values such as 0001-01-01 00:00:00 #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
933d470
fix: read out-of-bounds DATETIME values such as `0001-01-01 00:00:00`
tswast Dec 6, 2021
9a9d3fd
feat: accepts a table ID, which downloads the table without a query
tswast Dec 6, 2021
2a76982
revert tests for read_gbq fix which isn't yet resolved
tswast Dec 6, 2021
4695c5f
Revert "revert tests for read_gbq fix which isn't yet resolved"
tswast Dec 6, 2021
6adf233
add todo for next steps
tswast Dec 6, 2021
73a791a
Merge remote-tracking branch 'upstream/main' into issue266-read_gbq-n…
tswast Dec 9, 2021
9b1eb0d
add unit test for table ID read_gbq
tswast Dec 9, 2021
ec9ddaf
add helper for is_query
tswast Dec 9, 2021
9cc7c74
implement read_gbq with table id
tswast Dec 10, 2021
dd51ad8
fix remaining tests, don't localalize out-of-bounds timestamp columns
tswast Dec 10, 2021
e1ad679
Update pandas_gbq/gbq.py
tswast Dec 10, 2021
d29bc2a
fix 3.7 unit tests
tswast Dec 10, 2021
cb8f24f
correct coverage
tswast Dec 10, 2021
56b73b2
skip coverage for optional test skip
tswast Dec 10, 2021
8a61e97
fix docs build
tswast Dec 10, 2021
3f7900b
improve test coverage for error case
tswast Dec 10, 2021
ae3e044
Merge branch 'issue266-read_gbq-no-query' into issue365-extreme-datet…
tswast Dec 10, 2021
3c53f1f
as of google-cloud-bigquery 1.11.0, get_table before list_rows is unn…
tswast Dec 13, 2021
c98982d
Merge branch 'issue266-read_gbq-no-query' into issue365-extreme-datet…
tswast Dec 13, 2021
f0acde6
refactor tests
tswast Dec 13, 2021
362a26d
add more scalars
tswast Dec 14, 2021
752d67c
add more types
tswast Dec 14, 2021
5b46127
add failing time test
tswast Dec 15, 2021
254f6a0
add test for bignumeric
tswast Dec 15, 2021
c0780b6
add test for null values
tswast Dec 15, 2021
9aaedc6
add epoch timestamps to tests
tswast Dec 15, 2021
b03443b
add post-download dtype conversions
tswast Dec 16, 2021
cd6ae70
Merge remote-tracking branch 'upstream/main' into issue365-extreme-da…
tswast Dec 29, 2021
11126a6
add failing test for desired fix
tswast Dec 29, 2021
14e6070
fix the issue with extreme datetimes
tswast Dec 29, 2021
8f92d9b
fix constraints
tswast Dec 29, 2021
9985d15
fix tests for empty dataframe
tswast Dec 30, 2021
6fb73a2
fix tests for older google-cloud-bigquery
tswast Dec 30, 2021
8cc4524
ignore index on empty dataframe
tswast Dec 30, 2021
a0d6cad
add db-dtypes to runtime import checks
tswast Dec 30, 2021
dfa6942
Merge remote-tracking branch 'upstream/main' into issue365-extreme-da…
tswast Jan 4, 2022
82c5362
document dependencies
tswast Jan 4, 2022
de4a06e
remove TODO, since done
tswast Jan 4, 2022
9fc8c08
remove unnecessary special case for empty dataframe
tswast Jan 4, 2022
c5c0e85
remove redundant 'deprecated' from comment
tswast Jan 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def cover(session):
test runs (not system test runs), and then erases coverage data.
"""
session.install("coverage", "pytest-cov")
session.run("coverage", "report", "--show-missing", "--fail-under=88")
session.run("coverage", "report", "--show-missing", "--fail-under=89")

session.run("coverage", "erase")

Expand Down
138 changes: 86 additions & 52 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# license that can be found in the LICENSE file.

import logging
import re
import time
import warnings
from datetime import datetime
Expand Down Expand Up @@ -64,6 +65,10 @@ def _test_google_api_imports():
raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex


def _is_query(query_or_table: str) -> bool:
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None


class DatasetCreationError(ValueError):
"""
Raised when the create dataset method fails
Expand Down Expand Up @@ -374,6 +379,28 @@ def process_http_error(ex):

raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
self, table_id, max_results=None, progress_bar_type=None, dtypes=None
):
self._start_timer()

try:
# Get the table schema, so that we can list rows.
table_ref = bigquery.TableReference.from_string(
table_id, default_project=self.project_id
)
destination = self.client.get_table(table_ref)
rows_iter = self.client.list_rows(destination, max_results=max_results)
except self.http_error as ex:
self.process_http_error(ex)

return self._download_results(
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
Expand All @@ -390,15 +417,6 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
if config is not None:
job_config.update(config)

if "query" in config and "query" in config["query"]:
if query is not None:
raise ValueError(
"Query statement can't be specified "
"inside config while it is specified "
"as parameter"
)
query = config["query"].pop("query")

self._start_timer()

try:
Expand Down Expand Up @@ -464,15 +482,25 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
)

dtypes = kwargs.get("dtypes")

# Ensure destination is populated.
try:
query_reply.result()
except self.http_error as ex:
self.process_http_error(ex)

# Get the table schema, so that we can list rows.
destination = self.client.get_table(query_reply.destination)
rows_iter = self.client.list_rows(destination, max_results=max_results)
return self._download_results(
query_reply,
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
)

def _download_results(
self, query_job, max_results=None, progress_bar_type=None, user_dtypes=None,
self, rows_iter, max_results=None, progress_bar_type=None, user_dtypes=None,
):
# No results are desired, so don't bother downloading anything.
if max_results == 0:
Expand Down Expand Up @@ -504,11 +532,6 @@ def _download_results(
to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client

try:
query_job.result()
# Get the table schema, so that we can list rows.
destination = self.client.get_table(query_job.destination)
rows_iter = self.client.list_rows(destination, max_results=max_results)

schema_fields = [field.to_api_repr() for field in rows_iter.schema]
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
conversion_dtypes.update(user_dtypes)
Expand Down Expand Up @@ -644,7 +667,7 @@ def _cast_empty_df_dtypes(schema_fields, df):


def read_gbq(
query,
query_or_table,
project_id=None,
index_col=None,
col_order=None,
Expand All @@ -668,17 +691,18 @@ def read_gbq(

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__.
<https://googleapis.dev/python/bigquery/latest/index.html>`__.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.

Parameters
----------
query : str
SQL-Like Query to return data values.
query_or_table : str
SQL query to return data values. If the string is a table ID, fetch the
rows directly from the table without running a query.
project_id : str, optional
Google BigQuery Account project ID. Optional when available from
Google Cloud Platform project ID. Optional when available from
the environment.
index_col : str, optional
Name of result column to use for index in results DataFrame.
Expand All @@ -688,14 +712,14 @@ def read_gbq(
reauth : boolean, default False
Force Google BigQuery to re-authenticate the user. This is useful
if multiple accounts are used.
auth_local_webserver : boolean, default False
Use the `local webserver flow`_ instead of the `console flow`_
when getting user credentials.

.. _local webserver flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server
.. _console flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
auth_local_webserver : bool, default False
Use the `local webserver flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_
instead of the `console flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_
when getting user credentials. Your code must run on the same machine
as your web browser and your web browser can access your application
via ``localhost:808X``.

.. versionadded:: 0.2.0
dialect : str, default 'standard'
Expand Down Expand Up @@ -745,13 +769,6 @@ def read_gbq(
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
permission on the project you are billing queries to.

**Note:** Due to a `known issue in the ``google-cloud-bigquery``
package
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
(fixed in version 1.11.0), you must write your query results to a
destination table. To do this with ``read_gbq``, supply a
``configuration`` dictionary.

This feature requires the ``google-cloud-bigquery-storage`` and
``pyarrow`` packages.

Expand Down Expand Up @@ -823,6 +840,15 @@ def read_gbq(
if dialect not in ("legacy", "standard"):
raise ValueError("'{0}' is not valid for dialect".format(dialect))

if configuration and "query" in configuration and "query" in configuration["query"]:
if query_or_table is not None:
raise ValueError(
"Query statement can't be specified "
"inside config while it is specified "
"as parameter"
)
query_or_table = configuration["query"].pop("query")

connector = GbqConnector(
project_id,
reauth=reauth,
Expand All @@ -834,13 +860,21 @@ def read_gbq(
use_bqstorage_api=use_bqstorage_api,
)

final_df = connector.run_query(
query,
configuration=configuration,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)
if _is_query(query_or_table):
final_df = connector.run_query(
query_or_table,
configuration=configuration,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)
else:
final_df = connector.download_table(
query_or_table,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)

# Reindex the DataFrame on the provided column
if index_col is not None:
Expand Down Expand Up @@ -889,7 +923,7 @@ def to_gbq(

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__.
<https://googleapis.dev/python/bigquery/latest/index.html>`__.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.
Expand All @@ -902,7 +936,7 @@ def to_gbq(
Name of table to be written, in the form ``dataset.tablename`` or
``project.dataset.tablename``.
project_id : str, optional
Google BigQuery Account project ID. Optional when available from
Google Cloud Platform project ID. Optional when available from
the environment.
chunksize : int, optional
Number of rows to be inserted in each chunk from the dataframe.
Expand All @@ -920,13 +954,13 @@ def to_gbq(
``'append'``
If table exists, insert data. Create if does not exist.
auth_local_webserver : bool, default False
Use the `local webserver flow`_ instead of the `console flow`_
when getting user credentials.

.. _local webserver flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server
.. _console flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
Use the `local webserver flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_
instead of the `console flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_
when getting user credentials. Your code must run on the same machine
as your web browser and your web browser can access your application
via ``localhost:808X``.

.. versionadded:: 0.2.0
table_schema : list of dicts, optional
Expand Down
4 changes: 0 additions & 4 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ def cast_dataframe_for_parquet(
# Use extension dtype first so that it uses the correct equality operator.
and db_dtypes.DateDtype() != dataframe[column_name].dtype
):
# Construct converted column manually, because I can't use
# .astype() with DateDtype. With .astype(), I get the error:
#
# TypeError: Cannot interpret '<db_dtypes.DateDtype ...>' as a data type
cast_column = dataframe[column_name].astype(
dtype=db_dtypes.DateDtype(),
# Return the original column if there was an error converting
Expand Down
8 changes: 7 additions & 1 deletion pandas_gbq/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
Private module.
"""

import pandas.api.types


def localize_df(df, schema_fields):
"""Localize any TIMESTAMP columns to tz-aware type.
Expand Down Expand Up @@ -38,7 +40,11 @@ def localize_df(df, schema_fields):
if "mode" in field and field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
if (
field["type"].upper() == "TIMESTAMP"
and pandas.api.types.is_datetime64_ns_dtype(df.dtypes[column])
and df[column].dt.tz is None
):
df[column] = df[column].dt.tz_localize("UTC")

return df
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"google-auth-oauthlib",
# 2.4.* has a bug where waiting for the query can hang indefinitely.
# https://github.com/pydata/pandas-gbq/issues/343
"google-cloud-bigquery >=1.26.1,<3.0.0dev,!=2.4.*",
"google-cloud-bigquery >=1.26.1,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=1.1.0,<3.0.0dev",
]
extras = {
Expand Down
19 changes: 19 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# license that can be found in the LICENSE file.

import os
import functools
import pathlib

from google.cloud import bigquery
Expand Down Expand Up @@ -56,6 +57,24 @@ def project(project_id):
return project_id


@pytest.fixture
def to_gbq(credentials, project_id):
import pandas_gbq

return functools.partial(
pandas_gbq.to_gbq, project_id=project_id, credentials=credentials
)


@pytest.fixture
def read_gbq(credentials, project_id):
import pandas_gbq

return functools.partial(
pandas_gbq.read_gbq, project_id=project_id, credentials=credentials
)


@pytest.fixture()
def random_dataset_id(bigquery_client: bigquery.Client, project_id: str):
dataset_id = prefixer.create_prefix()
Expand Down
Loading