diff --git a/git/cmd.py b/git/cmd.py index 642ef9ed6..8fb10742f 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -79,7 +79,7 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, finalizer: Union[None, Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None, decode_streams: bool = True, - timeout: float = 10.0) -> None: + kill_after_timeout: Union[None, float] = None) -> None: """Registers for notifications to learn that process output is ready to read, and dispatches lines to the respective line handlers. This function returns once the finalizer returns @@ -94,7 +94,10 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, their contents to handlers. Set it to False if `universal_newline == True` (then streams are in text-mode) or if decoding must happen later (i.e. for Diffs). - :param timeout: float, timeout to pass to t.join() in case it hangs. Default = 10.0 seconds + :param kill_after_timeout: + float or None, Default = None + To specify a timeout in seconds for the git command, after which the process + should be killed. """ # Use 2 "pump" threads and wait for both to finish. def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool, @@ -108,9 +111,12 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], handler(line_str) else: handler(line) + except Exception as ex: log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") - raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex + if "I/O operation on closed file" not in str(ex): + # Only reraise if the error was not due to the stream closing + raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex finally: stream.close() @@ -146,9 +152,24 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], ## FIXME: Why Join?? Will block if `stdin` needs feeding... # for t in threads: - t.join(timeout=timeout) + t.join(timeout=kill_after_timeout) if t.is_alive(): - raise RuntimeError(f"Thread join() timed out in cmd.handle_process_output(). Timeout={timeout} seconds") + if isinstance(process, Git.AutoInterrupt): + process._terminate() + else: # Don't want to deal with the other case + raise RuntimeError("Thread join() timed out in cmd.handle_process_output()." + f" kill_after_timeout={kill_after_timeout} seconds") + if stderr_handler: + error_str: Union[str, bytes] = ( + "error: process killed because it timed out." + f" kill_after_timeout={kill_after_timeout} seconds") + if not decode_streams and isinstance(p_stderr, BinaryIO): + # Assume stderr_handler needs binary input + error_str = cast(str, error_str) + error_str = error_str.encode() + # We ignore typing on the next line because mypy does not like + # the way we inferred that stderr takes str or bytes + stderr_handler(error_str) # type: ignore if finalizer: return finalizer(process) @@ -386,13 +407,19 @@ class AutoInterrupt(object): The wait method was overridden to perform automatic status code checking and possibly raise.""" - __slots__ = ("proc", "args") + __slots__ = ("proc", "args", "status") + + # If this is non-zero it will override any status code during + # _terminate, used to prevent race conditions in testing + _status_code_if_terminate: int = 0 def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: self.proc = proc self.args = args + self.status: Union[int, None] = None - def __del__(self) -> None: + def _terminate(self) -> None: + """Terminate the underlying process""" if self.proc is None: return @@ -404,10 +431,10 @@ def __del__(self) -> None: proc.stdout.close() if proc.stderr: proc.stderr.close() - # did the process finish already so we have a return code ? try: if proc.poll() is not None: + self.status = self._status_code_if_terminate or proc.poll() return None except OSError as ex: log.info("Ignored error after process had died: %r", ex) @@ -419,7 +446,9 @@ def __del__(self) -> None: # try to kill it try: proc.terminate() - proc.wait() # ensure process goes away + status = proc.wait() # ensure process goes away + + self.status = self._status_code_if_terminate or status except OSError as ex: log.info("Ignored error after process had died: %r", ex) except AttributeError: @@ -431,6 +460,9 @@ def __del__(self) -> None: call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True) # END exception handling + def __del__(self) -> None: + self._terminate() + def __getattr__(self, attr: str) -> Any: return getattr(self.proc, attr) @@ -444,24 +476,29 @@ def wait(self, stderr: Union[None, str, bytes] = b'') -> int: if stderr is None: stderr_b = b'' stderr_b = force_bytes(data=stderr, encoding='utf-8') - + status: Union[int, None] if self.proc is not None: status = self.proc.wait() + p_stderr = self.proc.stderr + else: # Assume the underlying proc was killed earlier or never existed + status = self.status + p_stderr = None - def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: - if stream: - try: - return stderr_b + force_bytes(stream.read()) - except ValueError: - return stderr_b or b'' - else: + def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: + if stream: + try: + return stderr_b + force_bytes(stream.read()) + except ValueError: return stderr_b or b'' + else: + return stderr_b or b'' - if status != 0: - errstr = read_all_from_possibly_closed_stream(self.proc.stderr) - log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) - raise GitCommandError(remove_password_if_present(self.args), status, errstr) # END status handling + + if status != 0: + errstr = read_all_from_possibly_closed_stream(p_stderr) + log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) + raise GitCommandError(remove_password_if_present(self.args), status, errstr) return status # END auto interrupt @@ -694,7 +731,7 @@ def execute(self, as_process: bool = False, output_stream: Union[None, BinaryIO] = None, stdout_as_string: bool = True, - kill_after_timeout: Union[None, int] = None, + kill_after_timeout: Union[None, float] = None, with_stdout: bool = True, universal_newlines: bool = False, shell: Union[None, bool] = None, @@ -817,7 +854,7 @@ def execute(self, if is_win: cmd_not_found_exception = OSError - if kill_after_timeout: + if kill_after_timeout is not None: raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.') else: cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable @@ -884,7 +921,7 @@ def _kill_process(pid: int) -> None: return # end - if kill_after_timeout: + if kill_after_timeout is not None: kill_check = threading.Event() watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,)) @@ -895,10 +932,10 @@ def _kill_process(pid: int) -> None: newline = "\n" if universal_newlines else b"\n" try: if output_stream is None: - if kill_after_timeout: + if kill_after_timeout is not None: watchdog.start() stdout_value, stderr_value = proc.communicate() - if kill_after_timeout: + if kill_after_timeout is not None: watchdog.cancel() if kill_check.is_set(): stderr_value = ('Timeout: the command "%s" did not complete in %d ' diff --git a/git/remote.py b/git/remote.py index 55772f4a0..9917c4310 100644 --- a/git/remote.py +++ b/git/remote.py @@ -707,7 +707,8 @@ def update(self, **kwargs: Any) -> 'Remote': return self def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt', - progress: Union[Callable[..., Any], RemoteProgress, None] + progress: Union[Callable[..., Any], RemoteProgress, None], + kill_after_timeout: Union[None, float] = None, ) -> IterableList['FetchInfo']: progress = to_progress_instance(progress) @@ -724,7 +725,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt', cmds = set(FetchInfo._flag_map.keys()) progress_handler = progress.new_message_handler() - handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False) + handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False, + kill_after_timeout=kill_after_timeout) stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or '' proc.wait(stderr=stderr_text) @@ -769,7 +771,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt', return output def _get_push_info(self, proc: 'Git.AutoInterrupt', - progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList[PushInfo]: + progress: Union[Callable[..., Any], RemoteProgress, None], + kill_after_timeout: Union[None, float] = None) -> IterableList[PushInfo]: progress = to_progress_instance(progress) # read progress information from stderr @@ -786,11 +789,14 @@ def stdout_handler(line: str) -> None: # If an error happens, additional info is given which we parse below. pass - handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False) + handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False, + kill_after_timeout=kill_after_timeout) stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or '' try: proc.wait(stderr=stderr_text) except Exception: + # This is different than fetch (which fails if there is any std_err + # even if there is an output) if not output: raise elif stderr_text: @@ -813,7 +819,9 @@ def _assert_refspec(self) -> None: def fetch(self, refspec: Union[str, List[str], None] = None, progress: Union[RemoteProgress, None, 'UpdateProgress'] = None, - verbose: bool = True, **kwargs: Any) -> IterableList[FetchInfo]: + verbose: bool = True, + kill_after_timeout: Union[None, float] = None, + **kwargs: Any) -> IterableList[FetchInfo]: """Fetch the latest changes for this remote :param refspec: @@ -833,6 +841,9 @@ def fetch(self, refspec: Union[str, List[str], None] = None, for 'refspec' will make use of this facility. :param progress: See 'push' method :param verbose: Boolean for verbose output + :param kill_after_timeout: + To specify a timeout in seconds for the git command, after which the process + should be killed. It is set to None by default. :param kwargs: Additional arguments to be passed to git-fetch :return: IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed @@ -853,19 +864,22 @@ def fetch(self, refspec: Union[str, List[str], None] = None, proc = self.repo.git.fetch(self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs) - res = self._get_fetch_info_from_stderr(proc, progress) + res = self._get_fetch_info_from_stderr(proc, progress, + kill_after_timeout=kill_after_timeout) if hasattr(self.repo.odb, 'update_cache'): self.repo.odb.update_cache() return res def pull(self, refspec: Union[str, List[str], None] = None, progress: Union[RemoteProgress, 'UpdateProgress', None] = None, + kill_after_timeout: Union[None, float] = None, **kwargs: Any) -> IterableList[FetchInfo]: """Pull changes from the given branch, being the same as a fetch followed by a merge of branch with your local branch. :param refspec: see 'fetch' method :param progress: see 'push' method + :param kill_after_timeout: see 'fetch' method :param kwargs: Additional arguments to be passed to git-pull :return: Please see 'fetch' method """ if refspec is None: @@ -874,13 +888,15 @@ def pull(self, refspec: Union[str, List[str], None] = None, kwargs = add_progress(kwargs, self.repo.git, progress) proc = self.repo.git.pull(self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs) - res = self._get_fetch_info_from_stderr(proc, progress) + res = self._get_fetch_info_from_stderr(proc, progress, + kill_after_timeout=kill_after_timeout) if hasattr(self.repo.odb, 'update_cache'): self.repo.odb.update_cache() return res def push(self, refspec: Union[str, List[str], None] = None, progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] = None, + kill_after_timeout: Union[None, float] = None, **kwargs: Any) -> IterableList[PushInfo]: """Push changes from source branch in refspec to target branch in refspec. @@ -897,6 +913,9 @@ def push(self, refspec: Union[str, List[str], None] = None, overrides the ``update()`` function. :note: No further progress information is returned after push returns. + :param kill_after_timeout: + To specify a timeout in seconds for the git command, after which the process + should be killed. It is set to None by default. :param kwargs: Additional arguments to be passed to git-push :return: list(PushInfo, ...) list of PushInfo instances, each @@ -908,8 +927,11 @@ def push(self, refspec: Union[str, List[str], None] = None, be 0.""" kwargs = add_progress(kwargs, self.repo.git, progress) proc = self.repo.git.push(self, refspec, porcelain=True, as_process=True, - universal_newlines=True, **kwargs) - return self._get_push_info(proc, progress) + universal_newlines=True, + kill_after_timeout=kill_after_timeout, + **kwargs) + return self._get_push_info(proc, progress, + kill_after_timeout=kill_after_timeout) @ property def config_reader(self) -> SectionConstraint[GitConfigParser]: diff --git a/test/test_remote.py b/test/test_remote.py index c29fac65c..088fdad55 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -6,6 +6,7 @@ import random import tempfile +import pytest from unittest import skipIf from git import ( @@ -164,7 +165,7 @@ def _commit_random_file(self, repo): index.commit("Committing %s" % new_file) return new_file - def _do_test_fetch(self, remote, rw_repo, remote_repo): + def _do_test_fetch(self, remote, rw_repo, remote_repo, **kwargs): # specialized fetch testing to de-clutter the main test self._do_test_fetch_info(rw_repo) @@ -183,7 +184,7 @@ def get_info(res, remote, name): # put remote head to master as it is guaranteed to exist remote_repo.head.reference = remote_repo.heads.master - res = fetch_and_test(remote) + res = fetch_and_test(remote, **kwargs) # all up to date for info in res: self.assertTrue(info.flags & info.HEAD_UPTODATE) @@ -401,12 +402,12 @@ def _assert_push_and_pull(self, remote, rw_repo, remote_repo): res = remote.push(all=True) self._do_test_push_result(res, remote) - remote.pull('master') + remote.pull('master', kill_after_timeout=10.0) # cleanup - delete created tags and branches as we are in an innerloop on # the same repository TagReference.delete(rw_repo, new_tag, other_tag) - remote.push(":%s" % other_tag.path) + remote.push(":%s" % other_tag.path, kill_after_timeout=10.0) @skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes!") @with_rw_and_rw_remote_repo('0.1.6') @@ -467,7 +468,8 @@ def test_base(self, rw_repo, remote_repo): # Only for remotes - local cases are the same or less complicated # as additional progress information will never be emitted if remote.name == "daemon_origin": - self._do_test_fetch(remote, rw_repo, remote_repo) + self._do_test_fetch(remote, rw_repo, remote_repo, + kill_after_timeout=10.0) ran_fetch_test = True # END fetch test @@ -651,3 +653,21 @@ def test_push_error(self, repo): rem = repo.remote('origin') with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"): rem.push('__BAD_REF__') + + +class TestTimeouts(TestBase): + @with_rw_repo('HEAD', bare=False) + def test_timeout_funcs(self, repo): + # Force error code to prevent a race condition if the python thread is + # slow + default = Git.AutoInterrupt._status_code_if_terminate + Git.AutoInterrupt._status_code_if_terminate = -15 + for function in ["pull", "fetch"]: # can't get push to timeout + f = getattr(repo.remotes.origin, function) + assert f is not None # Make sure these functions exist + _ = f() # Make sure the function runs + with pytest.raises(GitCommandError, + match="kill_after_timeout=0 s"): + f(kill_after_timeout=0) + + Git.AutoInterrupt._status_code_if_terminate = default