diff --git a/src/security/safe_command/api.py b/src/security/safe_command/api.py index 9713de8..8cca354 100644 --- a/src/security/safe_command/api.py +++ b/src/security/safe_command/api.py @@ -6,7 +6,7 @@ from os.path import expanduser, expandvars from shutil import which from subprocess import CompletedProcess -from typing import Union, Optional, List, Tuple, Set, FrozenSet, Sequence, Callable, Iterator +from typing import Union, Optional, List, FrozenSet, Sequence, Callable, Iterator from security.exceptions import SecurityException ValidRestrictions = Optional[Union[FrozenSet[str], Sequence[str]]] @@ -37,18 +37,19 @@ ("nc", "netcat", "ncat", "curl", "wget", "dpkg", "rpm")) BANNED_PATHTYPES = frozenset( ("mount", "symlink", "block_device", "char_device", "fifo", "socket")) -BANNED_OWNERS = frozenset(("root", "admin", "wheel", "sudo")) -BANNED_GROUPS = frozenset(("root", "admin", "wheel", "sudo")) +BANNED_OWNERS = frozenset(("root", "admin", "wheel", "sudo", "Administrator", "SYSTEM")) +BANNED_GROUPS = frozenset(("root", "admin", "wheel", "sudo", "Administrators", "SYSTEM")) BANNED_COMMAND_CHAINING_SEPARATORS = frozenset(("&", ";", "|", "\n")) BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS = frozenset( ("$(", "`", "<(", ">(")) BANNED_COMMAND_CHAINING_EXECUTABLES = frozenset(( - "eval", "exec", "-exec", "env", "source", "sudo", "su", "gosu", "sudoedit", + "eval", "exec", "env", "source", "sudo", "su", "gosu", "sudoedit", "xargs", "awk", "perl", "python", "ruby", "php", "lua", "sqlplus", "expect", "screen", "tmux", "byobu", "byobu-ugraph", "time", "nohup", "at", "batch", "anacron", "cron", "crontab", "systemctl", "service", "init", "telinit", "systemd", "systemd-run" )) +BANNED_COMMAND_CHAINING_ARGUMENTS = frozenset(("-exec", "-execdir", "-ok", "-okdir", "system(")) COMMON_SHELLS = frozenset(("sh", "bash", "zsh", "csh", "rsh", "tcsh", "tclsh", "ksh", "dash", "ash", "jsh", "jcsh", "mksh", "wsh", "fish", "busybox", "powershell", "pwsh", "pwsh-preview", "pwsh-lts")) @@ -57,9 +58,15 @@ ("!", "*", "@", "#", "%", "/", "^", ",")) -def run(original_func: Callable, command: ValidCommand, *args, restrictions: ValidRestrictions = DEFAULT_CHECKS, **kwargs) -> Union[CompletedProcess, None]: +def run(original_func: Callable, + command: ValidCommand, + *args, + restrictions: ValidRestrictions = DEFAULT_CHECKS, + max_resolved_paths: int = 10000, + rglob_dirs: bool = True, + **kwargs) -> Union[CompletedProcess, None]: # If there is a command and it passes the checks pass it the original function call - check(command, restrictions, **kwargs) + check(command, restrictions, max_resolved_paths, rglob_dirs, **kwargs) return _call_original(original_func, command, *args, **kwargs) @@ -261,7 +268,7 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: """ PARAM_EXPANSION_REGEX = re_compile( - r'(?P\$(?P[a-zA-Z_][a-zA-Z0-9_]*|\{[^{}\$]+?\}))') + r'(?P\$(?P[a-zA-Z_]\w*|\{[^{}\$]+?\}))') BRACE_EXPANSION_REGEX = re_compile( r'(?P\S*(?P\{[^{}\$]+?\})\S*)') @@ -319,8 +326,6 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: value = _get_env_var_value(var, venv, default="") if start_slice is not None: value = value[start_slice:end_slice] - elif not operator or operator == "?": - value = value elif operator in "-=": value = value or default if operator == "=": @@ -328,6 +333,7 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: venv[var] = value elif operator == "+": value = default if value else "" + # value = value when operator is ? or there is no operator command = command.replace(full_expansion, value, 1) @@ -431,10 +437,7 @@ def _recursive_shlex_split(command: str) -> Iterator[str]: yield from _recursive_shlex_split(cmd_part) -def _parse_command(command: ValidCommand, venv: Optional[dict] = None, shell: Optional[bool] = True) -> Tuple[str, List[str]]: - """ - Expands the shell exspansions in the command then parses the expanded command into a list of command parts. - """ +def _validate_and_expand_command(command: ValidCommand, venv: Optional[dict] = None, shell: Optional[bool] = True) -> str: if isinstance(command, str): command_str = command elif isinstance(command, list): @@ -443,18 +446,16 @@ def _parse_command(command: ValidCommand, venv: Optional[dict] = None, shell: Op raise TypeError("Command must be a str or a list") if not command_str: - # No need to expand or parse an empty command - return ("", []) - + # No need to expand an empty command + return command_str + spaced_command = _space_redirection_operators(command_str) - expanded_command = _shell_expand( - spaced_command, venv) if shell else spaced_command - parsed_command = list(_recursive_shlex_split(expanded_command)) - return expanded_command, parsed_command + expanded_command = _shell_expand(spaced_command, venv) if shell else spaced_command + return expanded_command def _path_is_executable(path: Path) -> bool: - return access(path, X_OK) + return access(path, X_OK) and not path.is_dir() def _resolve_executable_path(executable: Optional[str], venv: Optional[dict] = None) -> Optional[Path]: @@ -475,153 +476,184 @@ def _resolve_executable_path(executable: Optional[str], venv: Optional[dict] = N return None -def _resolve_paths_in_parsed_command(parsed_command: List[str], venv: Optional[dict] = None) -> Tuple[Set[Path], Set[str]]: +def _recursive_resolve_symlinks(path: Path, rglob_dirs: bool = True) -> Iterator[Path]: """ - Create Path objects from the parsed commands and resolve symlinks then add to sets of unique Paths - and absolute path strings for comparison with the sensitive files, common exploit executables and group/owner checks. + Recursively resolves symlinks in the path. + When the path is a symlink first the absolute path of the symlink is yielded + then _recursive_resolve_symlinks is called on the resolved path of its target + this is needed to handle nested symlinks and symlinks to directories which may contain symlinks """ + if path.is_symlink(): + yield path.absolute() + yield from _recursive_resolve_symlinks(path.resolve(), rglob_dirs) + elif path.is_dir(): + yield path.absolute() + if rglob_dirs: + for file in path.rglob("*"): + yield from _recursive_resolve_symlinks(file, rglob_dirs) + else: + # a final .resolve is needed to handle files like /etc/passwd on MacOS which behaves + # like a symlink to /private/etc/passwd but is not a symlink according to Path.is_symlink + yield path.resolve() - abs_paths, abs_path_strings = set(), set() - - for cmd_part in parsed_command: - - if "~" in cmd_part: - # Expand ~ and ~user constructions in the cmd_part - cmd_part = expanduser(cmd_part) - - # Check if the cmd_part is an executable and resolve the path - if executable_path := _resolve_executable_path(cmd_part, venv): - abs_paths.add(executable_path) - abs_path_strings.add(str(executable_path)) - - # Handle any globbing characters and repeating slashes from the command and resolve symlinks to get absolute path - for path in iglob(cmd_part, recursive=True): - path = Path(path) - - # When its a symlink both the absolute path of the symlink - # and the resolved path of its target are added to the sets - if path.is_symlink(): - path = path.absolute() - abs_paths.add(path) - abs_path_strings.add(str(path)) - abs_path = Path(path).resolve() - abs_paths.add(abs_path) - abs_path_strings.add(str(abs_path)) +def _resolve_paths_in_command_part(cmd_part: str, venv: Optional[dict] = None, rglob_dirs: bool = True) -> Iterator[Path]: + """ + Create Path objects handling tilde expansion, globbing and symlinks in the command part. + """ - # Check if globbing and/or resolving symlinks returned an executable and add to the sets - if executable_path := _resolve_executable_path(str(path), venv): - abs_paths.add(executable_path) - abs_path_strings.add(str(executable_path)) + if "~" in cmd_part: + # Expand ~ and ~user constructions in the cmd_part + cmd_part = expanduser(cmd_part) - # Check if globbing and/or resolving symlinks returned a directory and add all files in the directory to the sets - if abs_path.is_dir(): - for file in abs_path.rglob("*"): - file = file.resolve() - abs_paths.add(file) - abs_path_strings.add(str(file)) + # Check if the cmd_part is an executable and resolve the path + if executable_path := _resolve_executable_path(cmd_part, venv): + yield executable_path + return # Globbing is redundant when the cmd_part is an executable - return abs_paths, abs_path_strings + # Handle any globbing characters and repeating slashes from the command and resolve symlinks to get absolute paths + for path in map(Path, iglob(cmd_part, recursive=True)): + yield from _recursive_resolve_symlinks(path, rglob_dirs) -def check(command: ValidCommand, restrictions: ValidRestrictions, **kwargs) -> None: +def check(command: ValidCommand, + restrictions: ValidRestrictions, + max_resolved_paths: int = 10000, + rglob_dirs: bool = True, + **popen_kwargs) -> None: + if not restrictions: # No restrictions no checks return None - # venv is a copy to avoid modifying the original Popen kwargs or None to default to using os.environ when env is not set - venv = dict(**Popen_env) if (Popen_env := kwargs.get("env")) is not None else None - - # Check if the executable is set by the Popen kwargs (either executable or shell) + # venv is a copy to avoid modifying the original popen_kwargs or None to default to using os.environ when env is not set + venv = dict(**popen_env) if (popen_env := popen_kwargs.get("env")) is not None else None + + # Check if the executable is set by the popen popen_kwargs (either executable or shell) # Executable takes precedence over shell. see subprocess.py line 1593 - executable_path = _resolve_executable_path(kwargs.get("executable"), venv) - shell = executable_path.name in COMMON_SHELLS if executable_path else kwargs.get("shell") + executable_path = _resolve_executable_path(popen_kwargs.get("executable"), venv) + shell = executable_path.name in COMMON_SHELLS if executable_path else popen_kwargs.get("shell") - expanded_command, parsed_command = _parse_command(command, venv, shell) - if not parsed_command: + if not (expanded_command := _validate_and_expand_command(command, venv, shell)): # Empty commands are safe return None - - # If the executable is not set by the Popen kwargs it is the first command part (args). see subprocess.py line 1596 - if not executable_path: - executable_path = _resolve_executable_path(parsed_command[0], venv) - - abs_paths, abs_path_strings = _resolve_paths_in_parsed_command( - parsed_command, venv) - - if "PREVENT_COMMAND_CHAINING" in restrictions: - check_multiple_commands(expanded_command, parsed_command) - - if "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES" in restrictions: - check_sensitive_files(expanded_command, abs_path_strings) - - if "PREVENT_COMMON_EXPLOIT_EXECUTABLES" in restrictions: - check_banned_executable(expanded_command, abs_path_strings) - + + # First check the expanded command string for the restrictions + if prevent_command_chaining := "PREVENT_COMMAND_CHAINING" in restrictions: + check_newline_in_expanded_command(expanded_command) + if prevent_sensitive_files := "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES" in restrictions: + check_sensitive_files_in_expanded_command(expanded_command) + if prevent_common_exploit_executables := "PREVENT_COMMON_EXPLOIT_EXECUTABLES" in restrictions: + check_banned_executable_in_expanded_command(expanded_command) + + # Create local bools for each restriction to avoid membership check each iteration prevent_uncommon_path_types = "PREVENT_UNCOMMON_PATH_TYPES" in restrictions prevent_admin_owned_files = "PREVENT_ADMIN_OWNED_FILES" in restrictions - - for path in abs_paths: - # to avoid blocking the executable itself since most are symlinks to the actual executable - # and owned by root with group wheel or sudo - if path == executable_path: - continue - - if prevent_uncommon_path_types: - check_path_type(path) - - if prevent_admin_owned_files: - check_file_owner(path) - check_file_group(path) - - -def check_multiple_commands(expanded_command: str, parsed_command: List[str]) -> None: + + # Then check the parsed command cmd_parts for the restrictions if the expanded command passes checks + num_resolved_paths = 0 + for cmd_part in _recursive_shlex_split(expanded_command): + + if prevent_command_chaining: + check_multiple_commands(cmd_part) + + for path in _resolve_paths_in_command_part(cmd_part, venv, rglob_dirs): + num_resolved_paths += 1 + if max_resolved_paths >= 0 and num_resolved_paths > max_resolved_paths: + raise SecurityException( + f"Exceeded maximum number of resolved paths: {max_resolved_paths}") + + if prevent_command_chaining: + check_path_is_chaining_executable(path) + check_path_is_shell(path) + if prevent_sensitive_files: + check_path_is_sensitive_file(path) + if prevent_common_exploit_executables: + check_path_is_banned_executable(path) + + if not executable_path: + # If the executable is not set by the popen kwargs it is the first command part (args). see subprocess.py line 1596 + executable_path = path + # continue to avoid blocking the executable itself since most are symlinks to the actual executable and owned by root with group wheel or sudo + continue + + if prevent_uncommon_path_types: + check_path_type(path) + if prevent_admin_owned_files: + check_path_owner(path) + check_path_group(path) + + +# Expanded Command checks +def check_newline_in_expanded_command(expanded_command: str) -> None: # Since shlex.split removes newlines from the command, it would not be present in the parsed_command and # must be checked for in the expanded command string if '\n' in expanded_command: raise SecurityException( "Multiple commands not allowed. Newline found.") + - for cmd_part in parsed_command: - if any(seperator in cmd_part for seperator in BANNED_COMMAND_CHAINING_SEPARATORS): +def check_sensitive_files_in_expanded_command(expanded_command: str) -> None: + for sensitive_path in SENSITIVE_FILE_PATHS: + # Handles edge cases when a sensitive file is part of a command but the path could not be resolved + if sensitive_path in expanded_command: raise SecurityException( - f"Multiple commands not allowed. Separators found.") + f"Disallowed access to sensitive file: {sensitive_path}") + - if any(substitution_op in cmd_part for substitution_op in BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS): +def check_banned_executable_in_expanded_command(expanded_command: str) -> None: + for banned_executable in BANNED_EXECUTABLES: + # Handles edge cases when a banned executable is part of a command but the path could not be resolved + if (expanded_command.startswith(f"{banned_executable} ") + or f"bin/{banned_executable}" in expanded_command + or f" {banned_executable} " in expanded_command + ): raise SecurityException( - f"Multiple commands not allowed. Process substitution operators found.") + f"Disallowed command: {banned_executable}") - if cmd_part.strip() in BANNED_COMMAND_CHAINING_EXECUTABLES | COMMON_SHELLS: - raise SecurityException( - f"Multiple commands not allowed. Executable {cmd_part} allows command chaining.") +# Parsed Command (cmd_part) checks +def check_multiple_commands(cmd_part: str) -> None: + if any(seperator in cmd_part for seperator in BANNED_COMMAND_CHAINING_SEPARATORS): + raise SecurityException( + "Multiple commands not allowed. Separators found.") -def check_sensitive_files(expanded_command: str, abs_path_strings: Set[str]) -> None: + if any(substitution_op in cmd_part for substitution_op in BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS): + raise SecurityException( + "Multiple commands not allowed. Process substitution operators found.") + + if cmd_part.strip() in BANNED_COMMAND_CHAINING_ARGUMENTS: + raise SecurityException( + f"Multiple commands not allowed. Argument {cmd_part} allows command chaining.") + + +# Path checks +def check_path_is_chaining_executable(path: Path) -> None: + if (banned_chaining_executable := path.name) in BANNED_COMMAND_CHAINING_EXECUTABLES: + raise SecurityException( + f"Multiple commands not allowed. Executable {banned_chaining_executable} allows command chaining.") + + +def check_path_is_shell(path: Path) -> None: + if (shell := path.name) in COMMON_SHELLS: + raise SecurityException( + f"Multiple commands not allowed. Shell {shell} allows command chaining.") + + +def check_path_is_sensitive_file(path: Path) -> None: + # Convert to string and check endswith so /private/etc/passwd on mac and /etc/passwd on linux are handled the same + path_string = str(path) for sensitive_path in SENSITIVE_FILE_PATHS: - # First check the absolute path strings for the sensitive files - # Then handle edge cases when a sensitive file is part of a command but the path could not be resolved - if ( - any(abs_path_string.endswith(sensitive_path) - for abs_path_string in abs_path_strings) - or sensitive_path in expanded_command - ): + # Check if the absolute path is a sensitive file + if path_string.endswith(sensitive_path): raise SecurityException( f"Disallowed access to sensitive file: {sensitive_path}") -def check_banned_executable(expanded_command: str, abs_path_strings: Set[str]) -> None: - for banned_executable in BANNED_EXECUTABLES: - # First check the absolute path strings for the banned executables - # Then handle edge cases when a banned executable is part of a command but the path could not be resolved - if ( - any((abs_path_string.endswith( - f"/{banned_executable}") for abs_path_string in abs_path_strings)) - or expanded_command.startswith(f"{banned_executable} ") - or f"bin/{banned_executable}" in expanded_command - or f" {banned_executable} " in expanded_command - ): - raise SecurityException( - f"Disallowed command: {banned_executable}") +def check_path_is_banned_executable(path: Path) -> None: + if (banned_executable := path.name) in BANNED_EXECUTABLES: + raise SecurityException( + f"Disallowed command: {banned_executable}") def check_path_type(path: Path) -> None: @@ -631,15 +663,13 @@ def check_path_type(path: Path) -> None: f"Disallowed access to path type {pathtype}: {path}") -def check_file_owner(path: Path) -> None: - owner = path.owner() - if owner in BANNED_OWNERS: +def check_path_owner(path: Path) -> None: + if (owner := path.owner()) in BANNED_OWNERS: raise SecurityException( f"Disallowed access to file owned by {owner}: {path}") -def check_file_group(path: Path) -> None: - group = path.group() - if group in BANNED_GROUPS: +def check_path_group(path: Path) -> None: + if (group := path.group()) in BANNED_GROUPS: raise SecurityException( f"Disallowed access to file owned by {group}: {path}") diff --git a/tests/safe_command/test_injection.py b/tests/safe_command/test_injection.py index 2a4cf14..61ac2c9 100644 --- a/tests/safe_command/test_injection.py +++ b/tests/safe_command/test_injection.py @@ -1,11 +1,11 @@ import pytest import subprocess from pathlib import Path -from os import mkfifo, symlink, remove, getenv +from os import mkfifo, symlink, remove, getenv, listdir from shutil import which from security import safe_command -from security.safe_command.api import _parse_command, _resolve_paths_in_parsed_command, _shell_expand +from security.safe_command.api import _validate_and_expand_command, _recursive_shlex_split, _resolve_paths_in_command_part, _shell_expand from security.exceptions import SecurityException with (Path(__file__).parent / "fuzzdb" / "command-injection-template.txt").open() as f: @@ -38,10 +38,29 @@ def setup_teardown(tmpdir): cwd_testfile.touch() fifo_testfile = (wd / "fifo_testfile").resolve() mkfifo(fifo_testfile) - symlink_testfile = (wd / "symlink_testfile").resolve() - symlink(cwd_testfile, symlink_testfile) # Target of symlink_testfile is cwd_testfile.txt + passwd = Path("/etc/passwd").resolve() sudoers = Path("/etc/sudoers").resolve() + + symlink_to_cwd_testfile = (wd / "symlink_to_cwd_testfile").resolve() + symlink(cwd_testfile, symlink_to_cwd_testfile) # Target of symlink_to_cwd_testfile is cwd_testfile.txt + + # Directory with a symlink to passwd + symlink_testdir = wd / "symlink_testdir" + symlink_testdir.mkdir() + symlink_to_passwd = symlink_testdir / "symlink_to_passwd" + symlink(passwd, symlink_to_passwd) + + # Symlink to a directory with a symlink + symlink_to_symlink_testdir = wd / "symlink_to_symlink_testdir" + symlink(symlink_testdir, symlink_to_symlink_testdir) + + # Directory with a symlink to a symlink to symlink passwd + nested_symlink_testdir = wd / "nested_symlink_testdir" + nested_symlink_testdir.mkdir() + symlink_to_symlink_to_passwd = nested_symlink_testdir / "symlink_to_symlink_to_passwd" + symlink(symlink_to_passwd, symlink_to_symlink_to_passwd) + # Get Path objects for the test commands cat, echo, grep, nc, curl, sh = map(lambda cmd: Path(which(cmd) or f"/usr/bin/{cmd}" ), ["cat", "echo", "grep", "nc", "curl", "sh"]) testpaths = { @@ -53,9 +72,14 @@ def setup_teardown(tmpdir): "space_in_name": space_in_name, "cwd_testfile": cwd_testfile, "fifo_testfile": fifo_testfile, - "symlink_testfile": symlink_testfile, + "symlink_to_cwd_testfile": symlink_to_cwd_testfile, "passwd": passwd, "sudoers": sudoers, + "symlink_testdir": symlink_testdir, + "nested_symlink_testdir": nested_symlink_testdir, + "symlink_to_passwd": symlink_to_passwd, + "symlink_to_symlink_to_passwd": symlink_to_symlink_to_passwd, + "symlink_to_symlink_testdir": symlink_to_symlink_testdir, "cat": cat, "echo": echo, "grep": grep, @@ -93,9 +117,12 @@ class TestSafeCommandInternals: ("echo test1 test2 test3 > test.txt", ["echo", "test1", "test2", "test3", ">", "test.txt"], ["echo", "test1", "test2", "test3", ">", "test.txt"]), ] ) - def test_parse_command(self, str_cmd, list_cmd, expected_parsed_command, setup_teardown): - expanded_str_cmd, parsed_str_cmd = _parse_command(str_cmd) - expanded_list_cmd, parsed_list_cmd = _parse_command(list_cmd) + def test_parse_command(self, str_cmd, list_cmd, expected_parsed_command): + expanded_str_cmd = _validate_and_expand_command(str_cmd) + expanded_list_cmd = _validate_and_expand_command(list_cmd) + parsed_str_cmd = list(_recursive_shlex_split(expanded_str_cmd)) + parsed_list_cmd = list(_recursive_shlex_split(expanded_list_cmd)) + assert expanded_str_cmd == expanded_list_cmd assert parsed_str_cmd == parsed_list_cmd == expected_parsed_command @@ -122,7 +149,18 @@ def test_parse_command(self, str_cmd, list_cmd, expected_parsed_command, setup_t # Check fifo and symlink ("cat {fifo_testfile}", {"cat", "fifo_testfile"}), # Symlink should resolve to cwdtest.txt so should get the symlink and the target - ("cat {symlink_testfile}", {"cat", "symlink_testfile", "cwd_testfile"},), + ("cat {symlink_to_cwd_testfile}", {"cat", "symlink_to_cwd_testfile", "cwd_testfile"},), + # symlink_to_passwd should resolve to passwd so should get the symlink and the target + ("cat {symlink_to_passwd}", {"cat", "symlink_to_passwd", "passwd"}), + # Returns the original symlink and the target but NOT the intermediate symlink + ("cat {symlink_to_symlink_to_passwd}", {"cat", "symlink_to_symlink_to_passwd", "passwd"}), + # Returns the dir, the symlink in the dir, and the target + ("grep '.+' -r {symlink_testdir}", {"grep", "symlink_testdir", "symlink_to_passwd", "passwd"}), + # Returns the symlink to the dir, the dir, the symlink in the dir and the target + ("grep '.+' -r {symlink_to_symlink_testdir}", {"grep", "symlink_to_symlink_testdir", "symlink_testdir", "symlink_to_passwd", "passwd"}), + # Returns the dir, the symlink in the dir, and the target but NOT the intermediate symlink + ("grep '.+' -r {nested_symlink_testdir}", {"grep", "nested_symlink_testdir", "symlink_to_symlink_to_passwd", "passwd"}), + # Check a command with binary name as an argument ("echo 'cat' {test.txt}", {"echo", "cat", "test.txt"}), # Command has a directory so should get the dir and all the subfiles and resolved symlink to cwdtest.txt @@ -138,11 +176,14 @@ def test_resolve_paths_in_parsed_command(self, command, expected_paths, setup_te command = insert_testpaths(command, testpaths) expected_paths = {testpaths[p] for p in expected_paths} - expanded_command, parsed_command = _parse_command(command) - abs_paths, abs_path_strings = _resolve_paths_in_parsed_command(parsed_command) + expanded_command = _validate_and_expand_command(command) + abs_paths = set() + for cmd_part in _recursive_shlex_split(expanded_command): + for path in _resolve_paths_in_command_part(cmd_part): + abs_paths.add(path) + assert abs_paths == expected_paths - assert abs_path_strings == {str(p) for p in expected_paths} - + @pytest.mark.parametrize( "string, expanded_str", @@ -326,6 +367,7 @@ def test_banned_shell_expansion(self, string): "PREVENT_COMMON_EXPLOIT_EXECUTABLES": SecurityException("Disallowed command"), "PREVENT_UNCOMMON_PATH_TYPES": SecurityException("Disallowed access to path type"), "PREVENT_ADMIN_OWNED_FILES": SecurityException("Disallowed access to file owned by"), + "MAX_RESOLVED_PATHS": SecurityException("Exceeded maximum number of resolved paths"), "ANY": SecurityException("Any Security exception") } @@ -525,9 +567,9 @@ def test_check_banned_executable(self, command, original_func, setup_teardown): "command", [ "cat {fifo_testfile}", - "cat {symlink_testfile}", + "cat {symlink_to_cwd_testfile}", ["cat", "{fifo_testfile}"], - ["cat", "{symlink_testfile}"], + ["cat", "{symlink_to_cwd_testfile}"], ] ) def test_check_path_type(self, command, original_func, setup_teardown): @@ -603,7 +645,7 @@ def test_valid_commands_not_blocked(self, command, expected_result, original_fun ("echo $HOME", "/Users/TESTHOME", {"env": {"HOME": "/Users/TESTHOME"}, "shell": True}), ("echo $HOME", EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], {"env": {"HOME": "/etc/passwd"}, "shell": True}), (["/bin/echo $HOME/somefile/"], f"{str(Path.home())}/somefile/", {"shell": True}), - (["/bin/echo", "$HOME/somefile/"], f"$HOME/somefile/", {"shell": False}), + (["/bin/echo", "$HOME/somefile/"], "$HOME/somefile/", {"shell": False}), # Should only raise exception if shell is True or executable is a shell (["/bin/cat /etc/${BADKEY:-passwd}"], EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], {"shell": True}), @@ -643,6 +685,66 @@ def test_popen_kwargs(self, command, expected_result, popen_kwargs, original_fun self._run_test_with_command(command, expected_result, restrictions, original_func, **popen_kwargs) + @pytest.mark.parametrize( + "command, max_resolved_paths, rglob_dirs, expected_result", + [ + ("ls -a / ", 1000, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + # Should not raise a MAX_RESOLVED_PATHS exception because the rglob_dirs kwarg is set to False + ("ls -a / ", 1000, False, '\n'.join(sorted([".", ".."] + listdir("/")))), + + # Should raise a MAX_RESOLVED_PATHS max_resolved_paths is 1 which is less than the number of files in /etc regardless of whether rglob_dirs is True or False + ("ls -a /etc ", 1, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ("ls -a /etc ", 1, False, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + + # Should raise a MAX_RESOLVED_PATHS exception since max_resolved_paths is 0 and the command has 1 resolved path even when rglob_dirs is False + ("ls -a /etc/ ", 0, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ("ls -a /etc/ ", 0, False, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + + # Should not raise a MAX_RESOLVED_PATHS exception since there is no max_resolved_paths + # but should raise a PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES exception since /etc/ contains sensitive files + ("ls -a /etc ", -1, True, EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"]), + # Should not raise either exception since there is no max_resolved_paths and etc/passwd is not resolved since rglob_dirs is False + ("ls -a /etc ", -1, False, '\n'.join(sorted([".", ".."] + listdir("/etc")))), + ("ls -a /etc/ ", -1, False, '\n'.join(sorted([".", ".."] + listdir("/etc/")))), + + #Same as above but with globbing + ("ls -a ////e*c ", 1, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ("ls -a ////e*c ", 1, False, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ("ls -a ////e*c// ", 0, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ("ls -a ////e*c// ", 0, False, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ("ls -a ////e*c ", -1, True, EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"]), + ("ls -a ////e*c ", -1, False, '\n'.join(sorted([".", ".."] + listdir("/etc")))), + ("ls -a ////e*c/// ", -1, False, '\n'.join(sorted([".", ".."] + listdir("/etc/")))), + + # Each of these should raise a MAX_RESOLVED_PATHS exception since max_resolved_paths is 1 less than the number of resolved paths + ("cat {symlink_to_cwd_testfile}", 2, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + # symlink_to_passwd should resolve to passwd so should get the symlink and the target (3 resolved paths) + ("cat {symlink_to_passwd}", 2, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + # Resolves the executable, the original symlink and the target but NOT the intermediate symlink (3 resolved paths) + ("cat {symlink_to_symlink_to_passwd}", 2, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + # Resolves the executable, the dir, the symlink in the dir, and the target (4 resolved paths) + ("grep '.+' -r {symlink_testdir}", 3, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + # Resolves the executable, the symlink to the dir, the dir, the symlink in the dir and the target (5 resolved paths) + ("grep '.+' -r {symlink_to_symlink_testdir}", 4, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + # Resolves the executable, the dir, the symlink in the dir, and the target but NOT the intermediate symlink (4 resolved paths) + ("grep '.+' -r {nested_symlink_testdir}", 3, True, EXCEPTIONS["MAX_RESOLVED_PATHS"]), + ] + ) + def test_max_resolved_paths(self, command, max_resolved_paths, rglob_dirs, expected_result, original_func, setup_teardown): + if original_func.__name__ == "call": + # call doesn't have capture_output kwarg so can't compare result and easier to just return than refactor + return + + testpaths = setup_teardown + command = insert_testpaths(command, testpaths) + expected_result = insert_testpaths(expected_result, testpaths) + + restrictions = [ + "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", + ] + self._run_test_with_command(command, expected_result, restrictions, original_func, rglob_dirs=rglob_dirs, max_resolved_paths=max_resolved_paths) + + # FUZZDB tests @pytest.mark.parametrize( "command",