Skip to content

Make httpclient thread safe #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Make httpclient thread safe
  • Loading branch information
Alex Toker committed May 19, 2025
commit 895d26c0dd23e392ff0229791c09692141659ee8
194 changes: 140 additions & 54 deletions v3io/dataplane/transport/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import contextlib
import http.client
import queue
import socket
import ssl
import threading
import time

import v3io.dataplane.request
import v3io.dataplane.response
Expand All @@ -31,7 +34,9 @@ class Transport(abstract.Transport):
def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None):
super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity)

self._free_connections = queue.Queue()
self._free_connections = queue.Queue(self.max_connections)
self._lock = threading.RLock() # Reentrant lock for thread safety
self._closed = False

# based on scheme, create a host and context for _create_connection
self._host, self._ssl_context = self._parse_endpoint(self._endpoint)
Expand All @@ -52,6 +57,10 @@ def get_connection_timeout(cls):
global _connection_timeout_seconds
return _connection_timeout_seconds

@classmethod
def get_connection_acquire_timeout(cls):
return cls.get_connection_timeout() * 10

@classmethod
def set_connection_timeout(cls, timeout):
global _connection_timeout_seconds
Expand All @@ -67,39 +76,85 @@ def get_request_max_retries(cls):
global _request_max_retries
return _request_max_retries

def _put_connection(self, connection):
with self._lock:
if self._closed:
with contextlib.suppress(Exception):
connection.close()
return
try:
self._free_connections.put(connection, block=False)
except Exception as conn_error:
self._logger.error(f"Failed to return connection to the pool: {conn_error}")
with contextlib.suppress(Exception):
connection.close()
raise conn_error

def _get_connection(self):
start_time = time.time()
while True:
# Check if we've exceeded the total timeout
if time.time() - start_time > Transport.get_connection_acquire_timeout():
raise TimeoutError(
f"Could not get a connection within {Transport.get_connection_acquire_timeout()} seconds"
)
# First, check state under lock and decide what to do
with self._lock:
if self._closed:
raise RuntimeError("Cannot send request on a closed client")

# Try non-blocking get first
if not self._free_connections.empty():
with contextlib.suppress(queue.Empty):
return self._free_connections.get_nowait()

# Wait outside the lock
try:
connection = self._free_connections.get(block=True, timeout=0.01)
except queue.Empty:
continue # Go back to the start of the loop
except Exception as e:
raise RuntimeError(f"Cannot get connection , {e}") from e

# We got a connection, verify client is still open
if self._closed:
with contextlib.suppress(Exception):
connection.close()
raise RuntimeError("Cannot send request on a closed client")
return connection

def close(self):
# Ignore redundant calls to close
if not self._free_connections:
return

connections = []
while not self._free_connections.empty():
conn = self._free_connections.get()
connections.append(conn)
# In case anyone tries to reuse this object, we want them to get an error and not hang
self._free_connections = None
with self._lock:
if self._closed:
return
# Mark as closed before draining the queue to prevent race conditions
self._closed = True
connections = []
# Move free connections to local variable to release the lock as soon as possible
with contextlib.suppress(queue.Empty):
while not self._free_connections.empty():
conn = self._free_connections.get_nowait()
connections.append(conn)

self._logger.debug(f"Closing all {len(connections)} v3io transport connections")
for conn in connections:
conn.close()
try:
conn.close()
except Exception as e:
self._logger.debug(f"Error closing connection: {e}")

def requires_access_key(self):
return True

def send_request(self, request):
if not self._free_connections:
raise RuntimeError("Cannot send request on a closed client")

# TODO: consider getting param of whether we should block or
# not (wait for connection to be free or raise exception)
connection = self._free_connections.get(block=True, timeout=None)

connection = self._get_connection()
try:
return self._send_request_on_connection(request, connection)
except BaseException as e:
request.transport.connection_used.close()
connection = self._create_connection(self._host, self._ssl_context)
self._free_connections.put(connection, block=True)
raise e
except BaseException:
new_connection = self._create_connection(self._host, self._ssl_context)
self._put_connection(new_connection)
with contextlib.suppress(Exception):
connection.close()

def wait_response(self, request, raise_for_status=None, num_retries=1):
connection = request.transport.connection_used
Expand All @@ -118,24 +173,25 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
response_body = response.read()

status_code, headers = self._get_status_and_headers(response)
self.log(
"Rx",
connection=connection,
status_code=status_code,
body=response_body,
)
self._put_connection(connection)
try:
v3io_response = v3io.dataplane.response.Response(
request.output, status_code, headers, response_body
)
v3io_response.raise_for_status(request.raise_for_status or raise_for_status)
return v3io_response
except v3io.dataplane.response.HttpResponseError as response_error:
self._logger.warn_with(f"Response error: {response_error}")
raise response_error

self.log("Rx", connection=connection, status_code=status_code, body=response_body)

response = v3io.dataplane.response.Response(request.output, status_code, headers, response_body)

self._free_connections.put(connection, block=True)

response.raise_for_status(request.raise_for_status or raise_for_status)

return response

except v3io.dataplane.response.HttpResponseError as response_error:
self._logger.warn_with(f"Response error: {response_error}")
raise response_error
except BaseException as e:
connection.close()
connection = self._create_connection(self._host, self._ssl_context)

if num_retries == 0:
self._logger.error_with(
"Error occurred while waiting for response and ran out of retries",
Expand All @@ -145,11 +201,11 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
status_code=status_code,
headers=headers,
)
self._free_connections.put(connection, block=True)
self._put_connection(connection)
raise e

self._logger.debug_with(
"Error occurred while waiting for response retrying",
"Error occurred while waiting for response - retrying",
retries_left=num_retries,
e=type(e),
e_msg=e,
Expand All @@ -159,25 +215,46 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
is_retry = True

def _send_request_on_connection(self, request, connection):
request.transport.connection_used = connection
"""Sends a request on the specified connection.

This method attempts to send the given request over the provided connection.
It handles potential connection errors, retries if necessary, and manages
the request body's position for seekable streams. Note!! If the send operation fails,
the connection is closed within this function.

Args:
request (Request): The request object to send.
connection (http.client.HTTPConnection): The connection to use for sending.

Returns:
Request: The original request object.
"""

path = request.encode_path()

self.log(
"Tx", connection=connection, method=request.method, path=path, headers=request.headers, body=request.body
"Tx",
connection=connection,
method=request.method,
path=path,
headers=request.headers,
body=request.body,
)

starting_offset = 0
is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell")
if is_body_seekable:
starting_offset = request.body.tell()

starting_offset = request.body.tell() if is_body_seekable else 0
retries_left = Transport.get_request_max_retries()

while True:
try:
request.transport.connection_used = connection
connection.request(request.method, path, request.body, request.headers)
break
return request
except self._send_request_exceptions as e:
# Close failed connection
with contextlib.suppress(Exception):
connection.close()

self._logger.debug_with(
f"Disconnected while attempting to send request – "
f"{retries_left} out of {Transport.get_request_max_retries()} retries left.",
Expand All @@ -186,27 +263,36 @@ def _send_request_on_connection(self, request, connection):
)
if retries_left == 0:
raise

retries_left -= 1
connection.close()

connection = self._create_connection(self._host, self._ssl_context)

if is_body_seekable:
# If the first connection fails, the pointer of the body might move at the size
# of the first connection blocksize.
# We need to reset the position of the pointer in order to send the whole file.
request.body.seek(starting_offset)
connection = self._create_connection(self._host, self._ssl_context)
request.transport.connection_used = connection
with contextlib.suppress(Exception):
request.body.seek(starting_offset)

except BaseException as e:
self._logger.error_with(
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
"Unhandled exception while sending request",
e=type(e),
e_msg=e,
connection=connection,
)
# Close failed connection
with contextlib.suppress(Exception):
connection.close()
raise e

return request

def _create_connections(self, num_connections, host, ssl_context):
for _ in range(num_connections):
connection = self._create_connection(host, ssl_context)
self._free_connections.put(connection, block=True)
self._put_connection(connection)

def _create_connection(self, host, ssl_context):
if ssl_context is None:
Expand Down