Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: influxdata/influxdb-client-python
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: influxdata/influxdb-client-python
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: iox-sql
Choose a head ref
  • 6 commits
  • 8 files changed
  • 2 contributors

Commits on Jan 13, 2023

  1. feat: adds the ability to query with SQL

    This creates a new SQL Query API that allows for the use of SQL queries
    along with InfluxDB that has SQL enabled. Without further testing and
    usage it should be expected that this could break easily.
    
    The current model will take the user's URL so they do not need to
    provide two different URLs and create two different clients and will
    parse out the hostname. This is for use with the gRPC client.
    
    There are no tests as this largely wraps the FlightSQLClient library.
    powersj committed Jan 13, 2023
    Copy the full SHA
    52bbaf8 View commit details
  2. Major refactor post-review

    * Rename to "sql_client" remove API from class/examples
    * Constructor now sets up gRPC client for re-use
    * Constructor now takes a bucket, one client per bucket
    * Add close function
    * Add schemas fucntion
    * Add tables function
    powersj committed Jan 13, 2023
    Copy the full SHA
    600c3ed View commit details
  3. fix pydoc linter

    powersj committed Jan 13, 2023
    Copy the full SHA
    3db88ee View commit details
  4. fix twine/rst linter

    powersj committed Jan 13, 2023
    Copy the full SHA
    99d55f2 View commit details

Commits on Jan 17, 2023

  1. Copy the full SHA
    0f69180 View commit details

Commits on Mar 16, 2023

  1. Copy the full SHA
    3915a47 View commit details
Showing with 231 additions and 7 deletions.
  1. +34 −2 README.rst
  2. +5 −4 examples/README.md
  3. +33 −0 examples/sql_client.py
  4. +1 −0 influxdb_client/__init__.py
  5. +6 −0 influxdb_client/client/_base.py
  6. +9 −0 influxdb_client/client/influxdb_client.py
  7. +131 −0 influxdb_client/client/sql_client.py
  8. +12 −1 setup.py
36 changes: 34 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -84,6 +84,7 @@ InfluxDB 2.0 client features
- `Nanosecond precision`_
- `Delete data`_
- `Handling Errors`_
- `SQL client support`_
- `Logging`_

Installation
@@ -113,15 +114,23 @@ The python package is hosted on `PyPI <https://pypi.org/project/influxdb-client/

.. code-block:: sh
pip install 'influxdb-client[ciso]'
pip install 'influxdb-client'
Then import the package:

.. code-block:: python
import influxdb_client
If your application uses async/await in Python you can install with the ``async`` extra::
There are additional package extras that will pull in additional dependencies:

* `async`: async/await support
* `ciso`: faster date and time parsing
* `extra`: Pandas and NumPy support
* `sql`: SQL client support

For example if your application uses async/await in Python you can install the
``async`` extra::

$ pip install influxdb-client[async]

@@ -1579,6 +1588,29 @@ Client automatically follows HTTP redirects. The default redirect policy is to f

.. marker-asyncio-end
SQL Client Support
^^^^^^^^^^^^^^^^^^
.. marker-sql-support-start
The ability to query InfluxDB with SQL was introduced with the IOX backend.
To make use of the SQL support users can create a SQL Client with this library:

.. code-block:: python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
sql_client = client.sql_client("my-bucket")
reader = sql_client.query("select * from cpu limit 10")
print(reader.read_all())
.. warning::

The ``SQLClient`` only works with InfluxDB that has SQL support enabled.
This does not apply to all InfluxDB versions.

.. marker-sql-support-end
Logging
^^^^^^^
.. marker-logging-start
9 changes: 5 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -10,9 +10,9 @@
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
- [logging_handler.py](logging_handler.py) - How to set up a python native logging handler that writes to InfluxDB
- [import_parquet.py](import_parquet.py) - How to import [Apache Parquet](https://parquet.apache.org/) data files,
the example requires:
- manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
- [import_parquet.py](import_parquet.py) - How to import [Apache Parquet](https://parquet.apache.org/) data files,
the example requires:
- manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
- install Apache Arrow `pip install pyarrow` dependency
- [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count.

@@ -35,6 +35,7 @@
- [influx_cloud.py](influx_cloud.py) - How to connect to InfluxDB 2 Cloud
- [invokable_scripts.py](invokable_scripts.py) - How to use Invokable scripts Cloud API to create custom endpoints that query data
- [bucket_schemas.py](bucket_schemas.py) - How to manage explicit bucket schemas to enforce column names, tags, fields, and data types for your data
- [query_sql.py](query_sql.py) - How to query buckets with SQL

## Others
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
@@ -46,4 +47,4 @@
- [asynchronous_management.py](asynchronous_management.py) - How to use asynchronous Management API
- [asynchronous_batching.py](asynchronous_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches
- [asynchronous_retry.py](asynchronous_retry.py) - How to use [aiohttp-retry](https://github.com/inyutin/aiohttp_retry) to configure retries

33 changes: 33 additions & 0 deletions examples/sql_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
"""Demonstrate how to use the SQL client with InfluxDB."""
from influxdb_client import InfluxDBClient


with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
# Each connection is specific to a bucket
sql_client = client.sql_client("my-bucket")

# To help understand the shape and makeup of a table users can use these
# two helper functions.
tables = sql_client.tables()
print(tables)
schemas = sql_client.schemas()
print(schemas)

# The returned result is a stream of data. For large result-sets users can
# iterate through those one-by-one to avoid using large chunks of memory.
with sql_client.query("select * from cpu") as reader:
for batch in reader:
print(batch)

# For smaller results you might want to read the results at once. You
# can do so by using the `read_all()` method.
with sql_client.query("select * from cpu limit 10") as result:
data = result.read_all()
print(data)

# To get you data into a Pandas DataFrame use the following helper function
df = data.to_pandas()

# Close the connection to this bucket.
sql_client.close()
1 change: 1 addition & 0 deletions influxdb_client/__init__.py
Original file line number Diff line number Diff line change
@@ -384,6 +384,7 @@
from influxdb_client.client.labels_api import LabelsApi
from influxdb_client.client.organizations_api import OrganizationsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.sql_client import SQLClient
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions
6 changes: 6 additions & 0 deletions influxdb_client/client/_base.py
Original file line number Diff line number Diff line change
@@ -502,6 +502,12 @@ def _prepare_predicate_request(self, start, stop, predicate):
return predicate_request


# noinspection PyMethodMayBeStatic
class _BaseSQLClient(object):
def __init__(self, influxdb_client):
self._influxdb_client = influxdb_client


class _Configuration(Configuration):
def __init__(self):
Configuration.__init__(self)
9 changes: 9 additions & 0 deletions influxdb_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
from influxdb_client.client.labels_api import LabelsApi
from influxdb_client.client.organizations_api import OrganizationsApi
from influxdb_client.client.query_api import QueryApi, QueryOptions
from influxdb_client.client.sql_client import SQLClient
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings
@@ -301,6 +302,14 @@ def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
"""
return QueryApi(self, query_options)

def sql_client(self, bucket: str) -> SQLClient:
"""
Create an SQL client instance.
:return: SQL client instance
"""
return SQLClient(self, bucket)

def invokable_scripts_api(self) -> InvokableScriptsApi:
"""
Create an InvokableScripts API instance.
131 changes: 131 additions & 0 deletions influxdb_client/client/sql_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""InfluxDB SQL Client."""
from urllib.parse import urlparse

from influxdb_client.client._base import _BaseSQLClient


class SQLClient(_BaseSQLClient):
"""
Implementation for gRPC+TLS client for SQL.
This class provides basic operations for interacting with InfluxDB via SQL.
"""

def __init__(self, influxdb_client, bucket, **kwargs):
"""
Initialize SQL client.
Unlike the previous APIs, this client is is produced for a specific
bucket to query against. Queries to different buckets require different
clients.
To complete SQL requests, a different client is used. The rest of this
client library utilizes REST requests against the published API.
However, for SQL support connections are handled over gRPC+TLS. As such
this client takes the host and client and creates a new client
connection for SQL operations.
:param influxdb_client: influxdb client
"""
super().__init__(influxdb_client=influxdb_client)

from flightsql import FlightSQLClient

namespace = f'{influxdb_client.org}_{bucket}'
url = urlparse(self._influxdb_client.url)
port = url.port if url.port else 443
self._client = FlightSQLClient(
host=url.hostname,
port=port,
metadata={
"bucket-name": bucket, # for cloud
"iox-namespace-name": namespace, # for local instance
},
**kwargs
)

def __enter__(self):
"""
Enter the runtime context related to this object.
It will bind this method’s return value to the target(s)
specified in the `as` clause of the statement.
return: self instance
"""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the runtime context related to this object and close the SQLClient."""
self.close()

def close(self):
"""Close the client connection."""
self._client.close()

def query(self, query: str):
"""
Execute synchronous SQL query and return result as an Arrow reader.
:param str, query: the SQL query to execute
:return: PyArrow RecordbatchReader
.. code-block:: python
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
# Each connection is specific to a bucket
sql_client = client.sql_client("my-bucket")
# The returned result is a stream of data. For large result-sets users can
# iterate through those one-by-one to avoid using large chunks of memory.
with sql_client.query("select * from cpu") as result:
for r in result:
print(r)
# For smaller results you might want to read the results at once. You
# can do so by using the `read_all()` method.
with sql_client.query("select * from cpu limit 10") as result:
data = result.read_all()
print(data)
# To get you data into a Pandas DataFrame use the following helper function
df = data.to_pandas()
""" # noqa: E501
return self._get_ticket_info(self._client.execute(query))

def schemas(self):
"""
Return the schema of the specified bucket.
:return: PyArrow Table
.. code-block:: python
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
sql_client = client.sql_client("my-bucket")
print(sql_client.schemas())
""" # noqa: E501
return self._get_ticket_info(self._client.get_db_schemas()).read_all()

def tables(self):
"""
Return tables available from the specified bucket.
:return: PyArrow Table
.. code-block:: python
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
sql_client = client.sql_client("my-bucket")
print(sql_client.tables())
""" # noqa: E501
return self._get_ticket_info(self._client.get_table_types()).read_all()

def _get_ticket_info(self, flightInfo):
"""Collect results from FlightInfo."""
if len(flightInfo.endpoints) == 0:
raise ValueError("no endpoints received")
return self._client.do_get(flightInfo.endpoints[0].ticket).to_reader()
13 changes: 12 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -43,6 +43,11 @@
'aiocsv>=1.2.2'
]

sql_requires = [
'flightsql-dbapi@git+https://github.com/influxdata/flightsql-dbapi.git@fbc9fc1618528cd442a7e22ea11663856b0ecd5d',
'pandas>=0.25.3',
]

with open('README.rst', 'r') as f:
# Remove `class` text role as it's not allowed on PyPI
lines = []
@@ -66,7 +71,13 @@
keywords=["InfluxDB", "InfluxDB Python Client"],
tests_require=test_requires,
install_requires=requires,
extras_require={'extra': extra_requires, 'ciso': ciso_requires, 'async': async_requires, 'test': test_requires},
extras_require={
'async': async_requires,
'ciso': ciso_requires,
'extra': extra_requires,
'sql': sql_requires,
'test': test_requires,
},
long_description_content_type="text/x-rst",
packages=find_packages(exclude=('tests*',)),
test_suite='tests',