Skip to content

Commit 7731f2e

Browse files
Add binding result streaming to client (terminusdb#430)
* Add binding result streaming to client * Remove spaces around assignment * Satisfy the linting gods * Fix streaming field * Rename? Or copy... * Put back original test
1 parent f738c46 commit 7731f2e

File tree

4 files changed

+59
-17
lines changed

4 files changed

+59
-17
lines changed

terminusdb_client/client/Client.py

+38-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,36 @@
3131
# summary Python module for accessing the Terminus DB API
3232

3333

34+
class WoqlResult:
35+
"""Iterator for streaming WOQL results."""
36+
def __init__(self, lines):
37+
preface = json.loads(next(lines))
38+
if not ('@type' in preface and preface['@type'] == 'PrefaceRecord'):
39+
raise DatabaseError(response=preface)
40+
self.preface = preface
41+
self.postscript = {}
42+
self.lines = lines
43+
44+
def _check_error(self, document):
45+
if ('@type' in document):
46+
if document['@type'] == 'Binding':
47+
return document
48+
if document['@type'] == 'PostscriptRecord':
49+
self.postscript = document
50+
raise StopIteration()
51+
52+
raise DatabaseError(response=document)
53+
54+
def variable_names(self):
55+
return self.preface['names']
56+
57+
def __iter__(self):
58+
return self
59+
60+
def __next__(self):
61+
return self._check_error(json.loads(next(self.lines)))
62+
63+
3464
class JWTAuth(requests.auth.AuthBase):
3565
"""Class for JWT Authentication in requests"""
3666

@@ -1500,8 +1530,9 @@ def query(
15001530
commit_msg: Optional[str] = None,
15011531
get_data_version: bool = False,
15021532
last_data_version: Optional[str] = None,
1533+
streaming: bool = False,
15031534
# file_dict: Optional[dict] = None,
1504-
) -> Union[dict, str]:
1535+
) -> Union[dict, str, WoqlResult]:
15051536
"""Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents
15061537
15071538
Parameters
@@ -1537,6 +1568,7 @@ def query(
15371568
else:
15381569
request_woql_query = woql_query
15391570
query_obj["query"] = request_woql_query
1571+
query_obj["streaming"] = streaming
15401572

15411573
headers = self._default_headers.copy()
15421574
if last_data_version is not None:
@@ -1547,7 +1579,12 @@ def query(
15471579
headers=headers,
15481580
json=query_obj,
15491581
auth=self._auth(),
1582+
stream=streaming
15501583
)
1584+
1585+
if streaming:
1586+
return WoqlResult(lines=_finish_response(result, streaming=True))
1587+
15511588
if get_data_version:
15521589
result, version = _finish_response(result, get_data_version)
15531590
result = json.loads(result)

terminusdb_client/tests/test_Client.py

+1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ def test_query(mocked_requests, mocked_requests2, mocked_requests3):
234234
"query": WoqlStar,
235235
},
236236
headers={"user-agent": f"terminusdb-client-python/{__version__}"},
237+
stream=False
237238
)
238239

239240

terminusdb_client/woql_utils.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def _args_as_payload(args: dict) -> dict:
2424
return {k: v for k, v in args.items() if v}
2525

2626

27-
def _finish_response(request_response, get_version=False):
27+
def _finish_response(request_response, get_version=False, streaming=False):
2828
"""Get the response text
2929
3030
Parameters
@@ -43,11 +43,14 @@ def _finish_response(request_response, get_version=False):
4343
4444
"""
4545
if request_response.status_code == 200:
46-
if get_version:
46+
if get_version and not streaming:
4747
return request_response.text, request_response.headers.get(
4848
"Terminusdb-Data-Version"
4949
)
50-
return request_response.text # if not a json it raises an error
50+
if streaming:
51+
return request_response.iter_lines()
52+
else:
53+
return request_response.text # if not a json it raises an error
5154
elif request_response.status_code > 399 and request_response.status_code < 599:
5255
raise DatabaseError(request_response)
5356

terminusdb_client/woqlquery/woql_query.py

+14-13
Original file line numberDiff line numberDiff line change
@@ -367,21 +367,22 @@ def _clean_subject(self, obj):
367367
return self._expand_node_variable(obj)
368368
raise ValueError("Subject must be a URI string")
369369

370-
def _clean_predicate(self, predicate):
370+
def _clean_predicate(self, obj):
371371
"""Transforms whatever is passed in as the predicate (id or variable) into the appropriate json-ld form"""
372372
pred = False
373-
if isinstance(predicate, dict):
374-
return predicate
375-
if not isinstance(predicate, str):
376-
raise ValueError("Predicate must be a URI string")
377-
return str(predicate)
378-
if ":" in predicate:
379-
pred = predicate
380-
elif self._vocab and (predicate in self._vocab):
381-
pred = self._vocab[predicate]
382-
else:
383-
pred = predicate
384-
return self._expand_node_variable(pred)
373+
if type(obj) is dict:
374+
return obj
375+
elif type(obj) is str:
376+
if ":" in obj:
377+
pred = obj
378+
elif self._vocab and (obj in self._vocab):
379+
pred = self._vocab[obj]
380+
else:
381+
pred = obj
382+
return self._expand_node_variable(pred)
383+
elif isinstance(obj, Var):
384+
return self._expand_node_variable(obj)
385+
raise ValueError("Predicate must be a URI string")
385386

386387
def _clean_path_predicate(self, predicate=None):
387388
pred = False

0 commit comments

Comments
 (0)