Skip to content
This repository was archived by the owner on Jun 10, 2025. It is now read-only.

Methods to check API status #134

Merged
merged 2 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions overpass/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import csv
import geojson
import logging
import re
from datetime import datetime
from shapely.geometry import Polygon, Point
from io import StringIO
Expand Down Expand Up @@ -138,6 +139,70 @@ def get(self, query, responseformat="geojson", verbosity="body", build=True, dat
# construct geojson
return self._as_geojson(response["elements"])

@staticmethod
def _api_status() -> dict:
"""
:returns: dict describing the client's status with the API
"""
endpoint = "https://overpass-api.de/api/status"

r = requests.get(endpoint)
lines = tuple(r.text.splitlines())

available_re = re.compile(r'\d(?= slots? available)')
available_slots = int(
available_re.search(lines[3]).group()
if available_re.search(lines[3])
else 0
)

waiting_re = re.compile(r'(?<=Slot available after: )[\d\-TZ:]{20}')
waiting_slots = tuple(
datetime.strptime(
waiting_re.search(line).group(), "%Y-%m-%dT%H:%M:%S%z"
)
for line in lines if waiting_re.search(line)
)

current_idx = next(
i for i, word in enumerate(lines)
if word.startswith('Currently running queries')
)
running_slots = tuple(tuple(line.split()) for line in lines[current_idx + 1:])
running_slots_datetimes = tuple(
datetime.strptime(
slot[3], "%Y-%m-%dT%H:%M:%S%z"
)
for slot in running_slots
)

return {
"available_slots": available_slots,
"waiting_slots": waiting_slots,
"running_slots": running_slots_datetimes,
}

@property
def slots_available(self) -> int:
"""
:returns: count of open slots the client has on the server
"""
return self._api_status()["available_slots"]

@property
def slots_waiting(self) -> tuple:
"""
:returns: tuple of datetimes representing waiting slots and when they will be available
"""
return self._api_status()["waiting_slots"]

@property
def slots_running(self) -> tuple:
"""
:returns: tuple of datetimes representing running slots and when they will be freed
"""
return self._api_status()["running_slots"]

def search(self, feature_type, regex=False):
"""Search for something."""
raise NotImplementedError()
Expand Down
27 changes: 27 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,30 @@ def _get_from_overpass(self, query):
osm_geo = api.get("rel(6518385);out body geom;way(10322303);out body geom;node(4927326183);", verbosity='body geom')
ref_geo = geojson.load(open(os.path.join(os.path.dirname(__file__), "example.json"), "r"))
assert osm_geo==ref_geo


def test_slots_available():
api = overpass.API(debug=True)

map_query = overpass.MapQuery(37.86517, -122.31851, 37.86687, -122.31635)
api.get(map_query)

assert api.slots_available <= 2 and api.slots_available >= 0


def test_slots_running():
api = overpass.API(debug=True)

map_query = overpass.MapQuery(37.86517, -122.31851, 37.86687, -122.31635)
api.get(map_query)

assert isinstance(api.slots_running, tuple)


def test_slots_waiting():
api = overpass.API(debug=True)

map_query = overpass.MapQuery(37.86517, -122.31851, 37.86687, -122.31635)
api.get(map_query)

assert isinstance(api.slots_waiting, tuple)