Skip to content

Commit ddb0f77

Browse files
authored
fix: exception propagation for asynchronous QueryApi (influxdata#512)
1 parent 42432f9 commit ddb0f77

File tree

3 files changed

+31
-17
lines changed

3 files changed

+31
-17
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
### Features
44
1. [#510](https://github.com/influxdata/influxdb-client-python/pull/510): Allow to use client's optional configs for initialization from file or environment properties
55

6+
### Bug Fixes
7+
1. [#512](https://github.com/influxdata/influxdb-client-python/pull/512): Exception propagation for asynchronous `QueryApi` [async/await]
8+
69
## 1.33.0 [2022-09-29]
710

811
### Features

influxdb_client/client/query_api_async.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from influxdb_client.client._base import _BaseQueryApi
99
from influxdb_client.client.flux_table import FluxRecord, TableList
1010
from influxdb_client.client.query_api import QueryOptions
11-
from influxdb_client.rest import _UTF_8_encoding
11+
from influxdb_client.rest import _UTF_8_encoding, ApiException
12+
from .._async.rest import RESTResponseAsync
1213

1314

1415
class QueryApiAsync(_BaseQueryApi):
@@ -98,10 +99,7 @@ async def query(self, query: str, org=None, params: dict = None) -> TableList:
9899
""" # noqa: E501
99100
org = self._org_param(org)
100101

101-
response = await self._query_api.post_query_async(org=org,
102-
query=self._create_query(query, self.default_dialect, params),
103-
async_req=False, _preload_content=False,
104-
_return_http_data_only=True)
102+
response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params))
105103

106104
return await self._to_tables_async(response, query_options=self._get_query_options())
107105

@@ -118,10 +116,7 @@ async def query_stream(self, query: str, org=None, params: dict = None) -> Async
118116
""" # noqa: E501
119117
org = self._org_param(org)
120118

121-
response = await self._query_api.post_query_async(org=org,
122-
query=self._create_query(query, self.default_dialect, params),
123-
async_req=False, _preload_content=False,
124-
_return_http_data_only=True)
119+
response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params))
125120

126121
return await self._to_flux_record_stream_async(response, query_options=self._get_query_options())
127122

@@ -193,11 +188,8 @@ async def query_data_frame_stream(self, query: str, org=None, data_frame_index:
193188
""" # noqa: E501
194189
org = self._org_param(org)
195190

196-
response = await self._query_api.post_query_async(org=org,
197-
query=self._create_query(query, self.default_dialect, params,
198-
dataframe_query=True),
199-
async_req=False, _preload_content=False,
200-
_return_http_data_only=True)
191+
response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params,
192+
dataframe_query=True))
201193

202194
return await self._to_data_frame_stream_async(data_frame_index=data_frame_index, response=response,
203195
query_options=self._get_query_options())
@@ -215,8 +207,18 @@ async def query_raw(self, query: str, org=None, dialect=_BaseQueryApi.default_di
215207
:return: :class:`~str`
216208
"""
217209
org = self._org_param(org)
218-
result = await self._query_api.post_query_async(org=org, query=self._create_query(query, dialect, params),
219-
async_req=False, _preload_content=False,
220-
_return_http_data_only=True)
210+
result = await self._post_query(org=org, query=self._create_query(query, dialect, params))
221211
raw_bytes = await result.read()
222212
return raw_bytes.decode(_UTF_8_encoding)
213+
214+
async def _post_query(self, org, query):
215+
response = await self._query_api.post_query_async(org=org,
216+
query=query,
217+
async_req=False,
218+
_preload_content=False,
219+
_return_http_data_only=True)
220+
if not 200 <= response.status <= 299:
221+
data = await response.read()
222+
raise ApiException(http_resp=RESTResponseAsync(response, data))
223+
224+
return response

tests/test_InfluxDBClientAsync.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,15 @@ async def test_parse_csv_with_new_lines_in_column(self, mocked):
374374

375375
self.assertEqual(4, len(records))
376376

377+
@async_test
378+
async def test_query_exception_propagation(self):
379+
await self.client.close()
380+
self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org")
381+
382+
with pytest.raises(InfluxDBError) as e:
383+
await self.client.query_api().query("buckets()", "my-org")
384+
self.assertEqual("unauthorized access", e.value.message)
385+
377386
async def _prepare_data(self, measurement: str):
378387
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3)
379388
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3)

0 commit comments

Comments
 (0)