diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index be97d5f9b..1ac5baa00 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,7 +29,7 @@ repos: hooks: - id: shellcheck args: [--color] - exclude: ^git/ext/ + exclude: ^test/fixtures/polyglot$|^git/ext/ - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.4.0 diff --git a/git/cmd.py b/git/cmd.py index 9c94748c5..4413182e0 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -46,6 +46,7 @@ Iterator, List, Mapping, + Optional, Sequence, TYPE_CHECKING, TextIO, @@ -102,7 +103,7 @@ def handle_process_output( Callable[[bytes, "Repo", "DiffIndex"], None], ], stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], - finalizer: Union[None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]] = None, + finalizer: Union[None, Callable[[Union[Popen, "Git.AutoInterrupt"]], None]] = None, decode_streams: bool = True, kill_after_timeout: Union[None, float] = None, ) -> None: @@ -207,6 +208,68 @@ def pump_stream( finalizer(process) +def _safer_popen_windows( + command: Union[str, Sequence[Any]], + *, + shell: bool = False, + env: Optional[Mapping[str, str]] = None, + **kwargs: Any, +) -> Popen: + """Call :class:`subprocess.Popen` on Windows but don't include a CWD in the search. + + This avoids an untrusted search path condition where a file like ``git.exe`` in a + malicious repository would be run when GitPython operates on the repository. The + process using GitPython may have an untrusted repository's working tree as its + current working directory. Some operations may temporarily change to that directory + before running a subprocess. In addition, while by default GitPython does not run + external commands with a shell, it can be made to do so, in which case the CWD of + the subprocess, which GitPython usually sets to a repository working tree, can + itself be searched automatically by the shell. This wrapper covers all those cases. + + :note: This currently works by setting the ``NoDefaultCurrentDirectoryInExePath`` + environment variable during subprocess creation. It also takes care of passing + Windows-specific process creation flags, but that is unrelated to path search. + + :note: The current implementation contains a race condition on :attr:`os.environ`. + GitPython isn't thread-safe, but a program using it on one thread should ideally + be able to mutate :attr:`os.environ` on another, without unpredictable results. + See comments in https://github.com/gitpython-developers/GitPython/pull/1650. + """ + # CREATE_NEW_PROCESS_GROUP is needed for some ways of killing it afterwards. See: + # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal + # https://docs.python.org/3/library/subprocess.html#subprocess.CREATE_NEW_PROCESS_GROUP + creationflags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP + + # When using a shell, the shell is the direct subprocess, so the variable must be + # set in its environment, to affect its search behavior. (The "1" can be any value.) + if shell: + safer_env = {} if env is None else dict(env) + safer_env["NoDefaultCurrentDirectoryInExePath"] = "1" + else: + safer_env = env + + # When not using a shell, the current process does the search in a CreateProcessW + # API call, so the variable must be set in our environment. With a shell, this is + # unnecessary, in versions where https://github.com/python/cpython/issues/101283 is + # patched. If not, in the rare case the ComSpec environment variable is unset, the + # shell is searched for unsafely. Setting NoDefaultCurrentDirectoryInExePath in all + # cases, as here, is simpler and protects against that. (The "1" can be any value.) + with patch_env("NoDefaultCurrentDirectoryInExePath", "1"): + return Popen( + command, + shell=shell, + env=safer_env, + creationflags=creationflags, + **kwargs, + ) + + +if os.name == "nt": + safer_popen = _safer_popen_windows +else: + safer_popen = Popen + + def dashify(string: str) -> str: return string.replace("_", "-") @@ -225,14 +288,6 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc ## -- End Utilities -- @} -if os.name == "nt": - # CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards. See: - # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal - PROC_CREATIONFLAGS = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP -else: - PROC_CREATIONFLAGS = 0 - - class Git(LazyMixin): """The Git class manages communication with the Git binary. @@ -992,11 +1047,8 @@ def execute( redacted_command, '"kill_after_timeout" feature is not supported on Windows.', ) - # Only search PATH, not CWD. This must be in the *caller* environment. The "1" can be any value. - maybe_patch_caller_env = patch_env("NoDefaultCurrentDirectoryInExePath", "1") else: cmd_not_found_exception = FileNotFoundError - maybe_patch_caller_env = contextlib.nullcontext() # END handle stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") @@ -1011,20 +1063,18 @@ def execute( universal_newlines, ) try: - with maybe_patch_caller_env: - proc = Popen( - command, - env=env, - cwd=cwd, - bufsize=-1, - stdin=(istream or DEVNULL), - stderr=PIPE, - stdout=stdout_sink, - shell=shell, - universal_newlines=universal_newlines, - creationflags=PROC_CREATIONFLAGS, - **subprocess_kwargs, - ) + proc = safer_popen( + command, + env=env, + cwd=cwd, + bufsize=-1, + stdin=(istream or DEVNULL), + stderr=PIPE, + stdout=stdout_sink, + shell=shell, + universal_newlines=universal_newlines, + **subprocess_kwargs, + ) except cmd_not_found_exception as err: raise GitCommandNotFound(redacted_command, err) from err else: diff --git a/git/index/fun.py b/git/index/fun.py index 402f85d2b..580493f6d 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -18,7 +18,7 @@ ) import subprocess -from git.cmd import PROC_CREATIONFLAGS, handle_process_output +from git.cmd import handle_process_output, safer_popen from git.compat import defenc, force_bytes, force_text, safe_decode from git.exc import HookExecutionError, UnmergedEntriesError from git.objects.fun import ( @@ -98,13 +98,12 @@ def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() cmd = ["bash.exe", relative_hp] - process = subprocess.Popen( + process = safer_popen( cmd + list(args), env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=index.repo.working_dir, - creationflags=PROC_CREATIONFLAGS, ) except Exception as ex: raise HookExecutionError(hp, ex) from ex diff --git a/git/util.py b/git/util.py index c5afea241..5ccd2b04c 100644 --- a/git/util.py +++ b/git/util.py @@ -327,6 +327,17 @@ def _get_exe_extensions() -> Sequence[str]: def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: + """Perform a path search to assist :func:`is_cygwin_git`. + + This is not robust for general use. It is an implementation detail of + :func:`is_cygwin_git`. When a search following all shell rules is needed, + :func:`shutil.which` can be used instead. + + :note: Neither this function nor :func:`shutil.which` will predict the effect of an + executable search on a native Windows system due to a :class:`subprocess.Popen` + call without ``shell=True``, because shell and non-shell executable search on + Windows differ considerably. + """ # From: http://stackoverflow.com/a/377028/548792 winprog_exts = _get_exe_extensions() diff --git a/test/fixtures/polyglot b/test/fixtures/polyglot new file mode 100755 index 000000000..f1dd56b26 --- /dev/null +++ b/test/fixtures/polyglot @@ -0,0 +1,8 @@ +#!/usr/bin/env sh +# Valid script in both Bash and Python, but with different behavior. +""":" +echo 'Ran intended hook.' >output.txt +exit +" """ +from pathlib import Path +Path('payload.txt').write_text('Ran impostor hook!', encoding='utf-8') diff --git a/test/lib/helper.py b/test/lib/helper.py index d662b632d..1fdc56fd1 100644 --- a/test/lib/helper.py +++ b/test/lib/helper.py @@ -14,6 +14,7 @@ import textwrap import time import unittest +import venv import gitdb @@ -36,6 +37,7 @@ "with_rw_repo", "with_rw_and_rw_remote_repo", "TestBase", + "VirtualEnvironment", "TestCase", "SkipTest", "skipIf", @@ -88,11 +90,11 @@ def with_rw_directory(func): test succeeds, but leave it otherwise to aid additional debugging.""" @wraps(func) - def wrapper(self): + def wrapper(self, *args, **kwargs): path = tempfile.mkdtemp(prefix=func.__name__) keep = False try: - return func(self, path) + return func(self, path, *args, **kwargs) except Exception: log.info( "Test %s.%s failed, output is at %r\n", @@ -390,3 +392,46 @@ def _make_file(self, rela_path, data, repo=None): with open(abs_path, "w") as fp: fp.write(data) return abs_path + + +class VirtualEnvironment: + """A newly created Python virtual environment for use in a test.""" + + __slots__ = ("_env_dir",) + + def __init__(self, env_dir, *, with_pip): + if os.name == "nt": + self._env_dir = osp.realpath(env_dir) + venv.create(self.env_dir, symlinks=False, with_pip=with_pip) + else: + self._env_dir = env_dir + venv.create(self.env_dir, symlinks=True, with_pip=with_pip) + + @property + def env_dir(self): + """The top-level directory of the environment.""" + return self._env_dir + + @property + def python(self): + """Path to the Python executable in the environment.""" + return self._executable("python") + + @property + def pip(self): + """Path to the pip executable in the environment, or RuntimeError if absent.""" + return self._executable("pip") + + @property + def sources(self): + """Path to a src directory in the environment, which may not exist yet.""" + return os.path.join(self.env_dir, "src") + + def _executable(self, basename): + if os.name == "nt": + path = osp.join(self.env_dir, "Scripts", basename + ".exe") + else: + path = osp.join(self.env_dir, "bin", basename) + if osp.isfile(path) or osp.islink(path): + return path + raise RuntimeError(f"no regular file or symlink {path!r}") diff --git a/test/test_git.py b/test/test_git.py index 6d0ae82d0..3b9abc712 100644 --- a/test/test_git.py +++ b/test/test_git.py @@ -3,16 +3,18 @@ # This module is part of GitPython and is released under the # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ +import contextlib import gc import inspect import logging import os import os.path as osp +from pathlib import Path import re import shutil import subprocess import sys -from tempfile import TemporaryDirectory, TemporaryFile +from tempfile import TemporaryFile from unittest import skipUnless if sys.version_info >= (3, 8): @@ -27,6 +29,21 @@ from test.lib import TestBase, fixture_path, with_rw_directory +@contextlib.contextmanager +def _patch_out_env(name): + try: + old_value = os.environ[name] + except KeyError: + old_value = None + else: + del os.environ[name] + try: + yield + finally: + if old_value is not None: + os.environ[name] = old_value + + @ddt.ddt class TestGit(TestBase): @classmethod @@ -97,29 +114,28 @@ def test_it_transforms_kwargs_into_git_command_arguments(self): def _do_shell_combo(self, value_in_call, value_from_class): with mock.patch.object(Git, "USE_SHELL", value_from_class): - # git.cmd gets Popen via a "from" import, so patch it there. - with mock.patch.object(cmd, "Popen", wraps=cmd.Popen) as mock_popen: + with mock.patch.object(cmd, "safer_popen", wraps=cmd.safer_popen) as mock_safer_popen: # Use a command with no arguments (besides the program name), so it runs # with or without a shell, on all OSes, with the same effect. self.git.execute(["git"], with_exceptions=False, shell=value_in_call) - return mock_popen + return mock_safer_popen @ddt.idata(_shell_cases) def test_it_uses_shell_or_not_as_specified(self, case): """A bool passed as ``shell=`` takes precedence over `Git.USE_SHELL`.""" value_in_call, value_from_class, expected_popen_arg = case - mock_popen = self._do_shell_combo(value_in_call, value_from_class) - mock_popen.assert_called_once() - self.assertIs(mock_popen.call_args.kwargs["shell"], expected_popen_arg) + mock_safer_popen = self._do_shell_combo(value_in_call, value_from_class) + mock_safer_popen.assert_called_once() + self.assertIs(mock_safer_popen.call_args.kwargs["shell"], expected_popen_arg) @ddt.idata(full_case[:2] for full_case in _shell_cases) def test_it_logs_if_it_uses_a_shell(self, case): """``shell=`` in the log message agrees with what is passed to `Popen`.""" value_in_call, value_from_class = case with self.assertLogs(cmd.log, level=logging.DEBUG) as log_watcher: - mock_popen = self._do_shell_combo(value_in_call, value_from_class) - self._assert_logged_for_popen(log_watcher, "shell", mock_popen.call_args.kwargs["shell"]) + mock_safer_popen = self._do_shell_combo(value_in_call, value_from_class) + self._assert_logged_for_popen(log_watcher, "shell", mock_safer_popen.call_args.kwargs["shell"]) @ddt.data( ("None", None), @@ -134,22 +150,49 @@ def test_it_logs_istream_summary_for_stdin(self, case): def test_it_executes_git_and_returns_result(self): self.assertRegex(self.git.execute(["git", "version"]), r"^git version [\d\.]{2}.*$") - def test_it_executes_git_not_from_cwd(self): - with TemporaryDirectory() as tmpdir: - if os.name == "nt": - # Copy an actual binary executable that is not git. - other_exe_path = os.path.join(os.getenv("WINDIR"), "system32", "hostname.exe") - impostor_path = os.path.join(tmpdir, "git.exe") - shutil.copy(other_exe_path, impostor_path) - else: - # Create a shell script that doesn't do anything. - impostor_path = os.path.join(tmpdir, "git") - with open(impostor_path, mode="w", encoding="utf-8") as file: - print("#!/bin/sh", file=file) - os.chmod(impostor_path, 0o755) - - with cwd(tmpdir): - self.assertRegex(self.git.execute(["git", "version"]), r"^git version\b") + @ddt.data( + # chdir_to_repo, shell, command, use_shell_impostor + (False, False, ["git", "version"], False), + (False, True, "git version", False), + (False, True, "git version", True), + (True, False, ["git", "version"], False), + (True, True, "git version", False), + (True, True, "git version", True), + ) + @with_rw_directory + def test_it_executes_git_not_from_cwd(self, rw_dir, case): + chdir_to_repo, shell, command, use_shell_impostor = case + + repo = Repo.init(rw_dir) + + if os.name == "nt": + # Copy an actual binary executable that is not git. (On Windows, running + # "hostname" only displays the hostname, it never tries to change it.) + other_exe_path = Path(os.environ["SystemRoot"], "system32", "hostname.exe") + impostor_path = Path(rw_dir, "git.exe") + shutil.copy(other_exe_path, impostor_path) + else: + # Create a shell script that doesn't do anything. + impostor_path = Path(rw_dir, "git") + impostor_path.write_text("#!/bin/sh\n", encoding="utf-8") + os.chmod(impostor_path, 0o755) + + if use_shell_impostor: + shell_name = "cmd.exe" if os.name == "nt" else "sh" + shutil.copy(impostor_path, Path(rw_dir, shell_name)) + + with contextlib.ExitStack() as stack: + if chdir_to_repo: + stack.enter_context(cwd(rw_dir)) + if use_shell_impostor: + stack.enter_context(_patch_out_env("ComSpec")) + + # Run the command without raising an exception on failure, as the exception + # message is currently misleading when the command is a string rather than a + # sequence of strings (it really runs "git", but then wrongly reports "g"). + output = repo.git.execute(command, with_exceptions=False, shell=shell) + + self.assertRegex(output, r"^git version\b") @skipUnless( os.name == "nt", @@ -345,7 +388,7 @@ def test_environment(self, rw_dir): self.assertIn("FOO", str(err)) def test_handle_process_output(self): - from git.cmd import handle_process_output + from git.cmd import handle_process_output, safer_popen line_count = 5002 count = [None, 0, 0] @@ -361,13 +404,12 @@ def counter_stderr(line): fixture_path("cat_file.py"), str(fixture_path("issue-301_stderr")), ] - proc = subprocess.Popen( + proc = safer_popen( cmdline, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, - creationflags=cmd.PROC_CREATIONFLAGS, ) handle_process_output(proc, counter_stdout, counter_stderr, finalize_process) diff --git a/test/test_index.py b/test/test_index.py index 50d941e83..8a64e2293 100644 --- a/test/test_index.py +++ b/test/test_index.py @@ -3,16 +3,19 @@ # This module is part of GitPython and is released under the # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ +import contextlib from io import BytesIO import logging import os import os.path as osp from pathlib import Path import re +import shutil from stat import S_ISLNK, ST_MODE import subprocess import tempfile +import ddt import pytest from sumtypes import constructor, sumtype @@ -36,9 +39,16 @@ from git.index.typ import BaseIndexEntry, IndexEntry from git.index.util import TemporaryFileSwap from git.objects import Blob -from git.util import Actor, hex_to_bin, rmtree +from git.util import Actor, cwd, hex_to_bin, rmtree from gitdb.base import IStream -from test.lib import TestBase, fixture, fixture_path, with_rw_directory, with_rw_repo +from test.lib import ( + TestBase, + VirtualEnvironment, + fixture, + fixture_path, + with_rw_directory, + with_rw_repo, +) HOOKS_SHEBANG = "#!/usr/bin/env sh\n" @@ -172,6 +182,7 @@ def _make_hook(git_dir, name, content, make_exec=True): return hp +@ddt.ddt class TestIndex(TestBase): def __init__(self, *args): super().__init__(*args) @@ -1012,6 +1023,47 @@ def test_run_commit_hook(self, rw_repo): output = Path(rw_repo.git_dir, "output.txt").read_text(encoding="utf-8") self.assertEqual(output, "ran fake hook\n") + @ddt.data((False,), (True,)) + @with_rw_directory + def test_hook_uses_shell_not_from_cwd(self, rw_dir, case): + (chdir_to_repo,) = case + + shell_name = "bash.exe" if os.name == "nt" else "sh" + maybe_chdir = cwd(rw_dir) if chdir_to_repo else contextlib.nullcontext() + repo = Repo.init(rw_dir) + + # We need an impostor shell that works on Windows and that the test can + # distinguish from the real bash.exe. But even if the real bash.exe is absent or + # unusable, we should verify the impostor is not run. So the impostor needs a + # clear side effect (unlike in TestGit.test_it_executes_git_not_from_cwd). Popen + # on Windows uses CreateProcessW, which disregards PATHEXT; the impostor may + # need to be a binary executable to ensure the vulnerability is found if + # present. No compiler need exist, shipping a binary in the test suite may + # target the wrong architecture, and generating one in a bespoke way may trigger + # false positive virus scans. So we use a Bash/Python polyglot for the hook and + # use the Python interpreter itself as the bash.exe impostor. But an interpreter + # from a venv may not run when copied outside of it, and a global interpreter + # won't run when copied to a different location if it was installed from the + # Microsoft Store. So we make a new venv in rw_dir and use its interpreter. + venv = VirtualEnvironment(rw_dir, with_pip=False) + shutil.copy(venv.python, Path(rw_dir, shell_name)) + shutil.copy(fixture_path("polyglot"), hook_path("polyglot", repo.git_dir)) + payload = Path(rw_dir, "payload.txt") + + if type(_win_bash_status) in {WinBashStatus.Absent, WinBashStatus.WslNoDistro}: + # The real shell can't run, but the impostor should still not be used. + with self.assertRaises(HookExecutionError): + with maybe_chdir: + run_commit_hook("polyglot", repo.index) + self.assertFalse(payload.exists()) + else: + # The real shell should run, and not the impostor. + with maybe_chdir: + run_commit_hook("polyglot", repo.index) + self.assertFalse(payload.exists()) + output = Path(rw_dir, "output.txt").read_text(encoding="utf-8") + self.assertEqual(output, "Ran intended hook.\n") + @pytest.mark.xfail( type(_win_bash_status) is WinBashStatus.Absent, reason="Can't run a hook on Windows without bash.exe.", diff --git a/test/test_installation.py b/test/test_installation.py index 0a200415b..15ed5b13b 100644 --- a/test/test_installation.py +++ b/test/test_installation.py @@ -4,31 +4,19 @@ import ast import os import subprocess -import sys -from test.lib import TestBase -from test.lib.helper import with_rw_directory +from test.lib import TestBase, VirtualEnvironment, with_rw_directory class TestInstallation(TestBase): - def setUp_venv(self, rw_dir): - self.venv = rw_dir - subprocess.run([sys.executable, "-m", "venv", self.venv], stdout=subprocess.PIPE) - bin_name = "Scripts" if os.name == "nt" else "bin" - self.python = os.path.join(self.venv, bin_name, "python") - self.pip = os.path.join(self.venv, bin_name, "pip") - self.sources = os.path.join(self.venv, "src") - self.cwd = os.path.dirname(os.path.dirname(__file__)) - os.symlink(self.cwd, self.sources, target_is_directory=True) - @with_rw_directory def test_installation(self, rw_dir): - self.setUp_venv(rw_dir) + venv = self._set_up_venv(rw_dir) result = subprocess.run( - [self.pip, "install", "."], + [venv.pip, "install", "."], stdout=subprocess.PIPE, - cwd=self.sources, + cwd=venv.sources, ) self.assertEqual( 0, @@ -37,9 +25,9 @@ def test_installation(self, rw_dir): ) result = subprocess.run( - [self.python, "-c", "import git"], + [venv.python, "-c", "import git"], stdout=subprocess.PIPE, - cwd=self.sources, + cwd=venv.sources, ) self.assertEqual( 0, @@ -48,9 +36,9 @@ def test_installation(self, rw_dir): ) result = subprocess.run( - [self.python, "-c", "import gitdb; import smmap"], + [venv.python, "-c", "import gitdb; import smmap"], stdout=subprocess.PIPE, - cwd=self.sources, + cwd=venv.sources, ) self.assertEqual( 0, @@ -62,9 +50,9 @@ def test_installation(self, rw_dir): # by inserting its location into PYTHONPATH or otherwise patched into # sys.path, make sure it is not wrongly inserted as the *first* entry. result = subprocess.run( - [self.python, "-c", "import sys; import git; print(sys.path)"], + [venv.python, "-c", "import sys; import git; print(sys.path)"], stdout=subprocess.PIPE, - cwd=self.sources, + cwd=venv.sources, ) syspath = result.stdout.decode("utf-8").splitlines()[0] syspath = ast.literal_eval(syspath) @@ -73,3 +61,13 @@ def test_installation(self, rw_dir): syspath[0], msg="Failed to follow the conventions for https://docs.python.org/3/library/sys.html#sys.path", ) + + @staticmethod + def _set_up_venv(rw_dir): + venv = VirtualEnvironment(rw_dir, with_pip=True) + os.symlink( + os.path.dirname(os.path.dirname(__file__)), + venv.sources, + target_is_directory=True, + ) + return venv