Skip to content

Surface fetch/pull/push kill_after_timeout and reset default to None #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Sep 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions git/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
finalizer: Union[None,
Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None,
decode_streams: bool = True,
timeout: float = 10.0) -> None:
kill_after_timeout: Union[None, float] = None) -> None:
"""Registers for notifications to learn that process output is ready to read, and dispatches lines to
the respective line handlers.
This function returns once the finalizer returns
Expand All @@ -94,7 +94,10 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
their contents to handlers.
Set it to False if `universal_newline == True` (then streams are in text-mode)
or if decoding must happen later (i.e. for Diffs).
:param timeout: float, timeout to pass to t.join() in case it hangs. Default = 10.0 seconds
:param kill_after_timeout:
float or None, Default = None
To specify a timeout in seconds for the git command, after which the process
should be killed.
"""
# Use 2 "pump" threads and wait for both to finish.
def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool,
Expand All @@ -108,9 +111,12 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO],
handler(line_str)
else:
handler(line)

except Exception as ex:
log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}")
raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
if "I/O operation on closed file" not in str(ex):
# Only reraise if the error was not due to the stream closing
raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
finally:
stream.close()

Expand Down Expand Up @@ -146,9 +152,24 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO],
## FIXME: Why Join?? Will block if `stdin` needs feeding...
#
for t in threads:
t.join(timeout=timeout)
t.join(timeout=kill_after_timeout)
if t.is_alive():
raise RuntimeError(f"Thread join() timed out in cmd.handle_process_output(). Timeout={timeout} seconds")
if isinstance(process, Git.AutoInterrupt):
process._terminate()
else: # Don't want to deal with the other case
raise RuntimeError("Thread join() timed out in cmd.handle_process_output()."
f" kill_after_timeout={kill_after_timeout} seconds")
if stderr_handler:
error_str: Union[str, bytes] = (
"error: process killed because it timed out."
f" kill_after_timeout={kill_after_timeout} seconds")
if not decode_streams and isinstance(p_stderr, BinaryIO):
# Assume stderr_handler needs binary input
error_str = cast(str, error_str)
error_str = error_str.encode()
# We ignore typing on the next line because mypy does not like
# the way we inferred that stderr takes str or bytes
stderr_handler(error_str) # type: ignore

if finalizer:
return finalizer(process)
Expand Down Expand Up @@ -386,13 +407,19 @@ class AutoInterrupt(object):
The wait method was overridden to perform automatic status code checking
and possibly raise."""

__slots__ = ("proc", "args")
__slots__ = ("proc", "args", "status")

# If this is non-zero it will override any status code during
# _terminate, used to prevent race conditions in testing
_status_code_if_terminate: int = 0

def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
self.proc = proc
self.args = args
self.status: Union[int, None] = None

def __del__(self) -> None:
def _terminate(self) -> None:
"""Terminate the underlying process"""
if self.proc is None:
return

Expand All @@ -404,10 +431,10 @@ def __del__(self) -> None:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()

# did the process finish already so we have a return code ?
try:
if proc.poll() is not None:
self.status = self._status_code_if_terminate or proc.poll()
return None
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
Expand All @@ -419,7 +446,9 @@ def __del__(self) -> None:
# try to kill it
try:
proc.terminate()
proc.wait() # ensure process goes away
status = proc.wait() # ensure process goes away

self.status = self._status_code_if_terminate or status
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
except AttributeError:
Expand All @@ -431,6 +460,9 @@ def __del__(self) -> None:
call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True)
# END exception handling

def __del__(self) -> None:
self._terminate()

def __getattr__(self, attr: str) -> Any:
return getattr(self.proc, attr)

Expand All @@ -444,24 +476,29 @@ def wait(self, stderr: Union[None, str, bytes] = b'') -> int:
if stderr is None:
stderr_b = b''
stderr_b = force_bytes(data=stderr, encoding='utf-8')

status: Union[int, None]
if self.proc is not None:
status = self.proc.wait()
p_stderr = self.proc.stderr
else: # Assume the underlying proc was killed earlier or never existed
status = self.status
p_stderr = None

def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
return stderr_b or b''
else:
def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
return stderr_b or b''
else:
return stderr_b or b''

if status != 0:
errstr = read_all_from_possibly_closed_stream(self.proc.stderr)
log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
raise GitCommandError(remove_password_if_present(self.args), status, errstr)
# END status handling

if status != 0:
errstr = read_all_from_possibly_closed_stream(p_stderr)
log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
raise GitCommandError(remove_password_if_present(self.args), status, errstr)
return status

# END auto interrupt
Expand Down Expand Up @@ -694,7 +731,7 @@ def execute(self,
as_process: bool = False,
output_stream: Union[None, BinaryIO] = None,
stdout_as_string: bool = True,
kill_after_timeout: Union[None, int] = None,
kill_after_timeout: Union[None, float] = None,
with_stdout: bool = True,
universal_newlines: bool = False,
shell: Union[None, bool] = None,
Expand Down Expand Up @@ -817,7 +854,7 @@ def execute(self,

if is_win:
cmd_not_found_exception = OSError
if kill_after_timeout:
if kill_after_timeout is not None:
raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.')
else:
cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
Expand Down Expand Up @@ -884,7 +921,7 @@ def _kill_process(pid: int) -> None:
return
# end

if kill_after_timeout:
if kill_after_timeout is not None:
kill_check = threading.Event()
watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,))

Expand All @@ -895,10 +932,10 @@ def _kill_process(pid: int) -> None:
newline = "\n" if universal_newlines else b"\n"
try:
if output_stream is None:
if kill_after_timeout:
if kill_after_timeout is not None:
watchdog.start()
stdout_value, stderr_value = proc.communicate()
if kill_after_timeout:
if kill_after_timeout is not None:
watchdog.cancel()
if kill_check.is_set():
stderr_value = ('Timeout: the command "%s" did not complete in %d '
Expand Down
40 changes: 31 additions & 9 deletions git/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ def update(self, **kwargs: Any) -> 'Remote':
return self

def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
progress: Union[Callable[..., Any], RemoteProgress, None]
progress: Union[Callable[..., Any], RemoteProgress, None],
kill_after_timeout: Union[None, float] = None,
) -> IterableList['FetchInfo']:

progress = to_progress_instance(progress)
Expand All @@ -724,7 +725,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
cmds = set(FetchInfo._flag_map.keys())

progress_handler = progress.new_message_handler()
handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False)
handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False,
kill_after_timeout=kill_after_timeout)

stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or ''
proc.wait(stderr=stderr_text)
Expand Down Expand Up @@ -769,7 +771,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
return output

def _get_push_info(self, proc: 'Git.AutoInterrupt',
progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList[PushInfo]:
progress: Union[Callable[..., Any], RemoteProgress, None],
kill_after_timeout: Union[None, float] = None) -> IterableList[PushInfo]:
progress = to_progress_instance(progress)

# read progress information from stderr
Expand All @@ -786,11 +789,14 @@ def stdout_handler(line: str) -> None:
# If an error happens, additional info is given which we parse below.
pass

handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False)
handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False,
kill_after_timeout=kill_after_timeout)
stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or ''
try:
proc.wait(stderr=stderr_text)
except Exception:
# This is different than fetch (which fails if there is any std_err
# even if there is an output)
if not output:
raise
elif stderr_text:
Expand All @@ -813,7 +819,9 @@ def _assert_refspec(self) -> None:

def fetch(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, None, 'UpdateProgress'] = None,
verbose: bool = True, **kwargs: Any) -> IterableList[FetchInfo]:
verbose: bool = True,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote

:param refspec:
Expand All @@ -833,6 +841,9 @@ def fetch(self, refspec: Union[str, List[str], None] = None,
for 'refspec' will make use of this facility.
:param progress: See 'push' method
:param verbose: Boolean for verbose output
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. It is set to None by default.
:param kwargs: Additional arguments to be passed to git-fetch
:return:
IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed
Expand All @@ -853,19 +864,22 @@ def fetch(self, refspec: Union[str, List[str], None] = None,

proc = self.repo.git.fetch(self, *args, as_process=True, with_stdout=False,
universal_newlines=True, v=verbose, **kwargs)
res = self._get_fetch_info_from_stderr(proc, progress)
res = self._get_fetch_info_from_stderr(proc, progress,
kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
return res

def pull(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, 'UpdateProgress', None] = None,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
by a merge of branch with your local branch.

:param refspec: see 'fetch' method
:param progress: see 'push' method
:param kill_after_timeout: see 'fetch' method
:param kwargs: Additional arguments to be passed to git-pull
:return: Please see 'fetch' method """
if refspec is None:
Expand All @@ -874,13 +888,15 @@ def pull(self, refspec: Union[str, List[str], None] = None,
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.pull(self, refspec, with_stdout=False, as_process=True,
universal_newlines=True, v=True, **kwargs)
res = self._get_fetch_info_from_stderr(proc, progress)
res = self._get_fetch_info_from_stderr(proc, progress,
kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
return res

def push(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] = None,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[PushInfo]:
"""Push changes from source branch in refspec to target branch in refspec.

Expand All @@ -897,6 +913,9 @@ def push(self, refspec: Union[str, List[str], None] = None,
overrides the ``update()`` function.

:note: No further progress information is returned after push returns.
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. It is set to None by default.
:param kwargs: Additional arguments to be passed to git-push
:return:
list(PushInfo, ...) list of PushInfo instances, each
Expand All @@ -908,8 +927,11 @@ def push(self, refspec: Union[str, List[str], None] = None,
be 0."""
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.push(self, refspec, porcelain=True, as_process=True,
universal_newlines=True, **kwargs)
return self._get_push_info(proc, progress)
universal_newlines=True,
kill_after_timeout=kill_after_timeout,
**kwargs)
return self._get_push_info(proc, progress,
kill_after_timeout=kill_after_timeout)

@ property
def config_reader(self) -> SectionConstraint[GitConfigParser]:
Expand Down
Loading