diff --git a/bin/config.py b/bin/config.py index c02fd784..8c1b5438 100644 --- a/bin/config.py +++ b/bin/config.py @@ -125,14 +125,18 @@ # fmt: on -def set_default_args() -> None: +def set_default_args() -> list[str]: # Set default argument values. + missing = [] for arg in ARGS_LIST: if not hasattr(args, arg): setattr(args, arg, None) + missing.append(arg) for arg, value in DEFAULT_ARGS.items(): if not hasattr(args, arg): setattr(args, arg, value) + missing.append(arg) + return missing level: Optional[Literal["problem", "problemset"]] = None diff --git a/bin/contest.py b/bin/contest.py index 4801d3f5..87e9d178 100644 --- a/bin/contest.py +++ b/bin/contest.py @@ -38,6 +38,11 @@ def problems_yaml() -> Optional[list[dict[str, Any]]]: _problems_yaml = False return None _problems_yaml = read_yaml(problemsyaml_path) + if _problems_yaml is None: + _problems_yaml = False + return None + if not isinstance(_problems_yaml, list): + fatal("problems.yaml must contain a list of problems") return cast(list[dict[str, Any]], _problems_yaml) diff --git a/bin/stats.py b/bin/stats.py index 0ce1681b..a65187fd 100644 --- a/bin/stats.py +++ b/bin/stats.py @@ -380,7 +380,7 @@ def get_submissions_row( values = [] for problem in problems: directory = ( - Path().cwd() / "submissions" / problem.name + Path.cwd() / "submissions" / problem.name if team_submissions else problem.path / "submissions" ) diff --git a/bin/tools.py b/bin/tools.py index d22e7e30..08633a84 100755 --- a/bin/tools.py +++ b/bin/tools.py @@ -27,7 +27,7 @@ from collections import Counter from colorama import Style from pathlib import Path -from typing import cast, Literal, Optional +from typing import Any, cast, Optional # Local imports import config @@ -75,47 +75,37 @@ fatal("BAPCtools requires at least Python 3.10.") -# Get the list of relevant problems. -# Either use the problems.yaml, or check the existence of problem.yaml and sort -# by shortname. -def get_problems(): - def is_problem_directory(path): - return (path / "problem.yaml").is_file() - - contest: Optional[Path] = None - problem: Optional[Path] = None - level: Optional[Literal["problem", "problemset"]] = None +# Changes the working directory to the root of the contest. +# sets the "level" of the current command (either 'problem' or 'problemset') +# and, if `level == 'problem'` returns the directory of the problem. +def change_directory() -> Optional[Path]: + problem_dir: Optional[Path] = None + config.level = "problemset" if config.args.contest: # TODO #102: replace cast with typed Namespace field - contest = cast(Path, config.args.contest).absolute() - os.chdir(contest) - level = "problemset" + contest_dir = cast(Path, config.args.contest).absolute() + os.chdir(contest_dir) if config.args.problem: # TODO #102: replace cast with typed Namespace field - problem = cast(Path, config.args.problem).absolute() - level = "problem" - os.chdir(problem.parent) - elif is_problem_directory(Path(".")): - problem = Path().cwd() - level = "problem" - os.chdir("..") - else: - level = "problemset" + problem_dir = cast(Path, config.args.problem).absolute() + elif is_problem_directory(Path.cwd()): + problem_dir = Path.cwd().absolute() + if problem_dir is not None: + config.level = "problem" + os.chdir(problem_dir.parent) + return problem_dir + +# Get the list of relevant problems. +# Either use the problems.yaml, +# or check the existence of problem.yaml and sort by shortname. +def get_problems(problem_dir: Optional[Path]) -> tuple[list[Problem], Path]: # We create one tmpdir per contest. - h = hashlib.sha256(bytes(Path().cwd())).hexdigest()[-6:] + h = hashlib.sha256(bytes(Path.cwd())).hexdigest()[-6:] tmpdir = Path(tempfile.gettempdir()) / ("bapctools_" + h) tmpdir.mkdir(parents=True, exist_ok=True) - def parse_problems_yaml(problemlist): - if problemlist is None: - fatal(f"Did not find any problem in {problemsyaml}.") - problemlist = problemlist - if problemlist is None: - problemlist = [] - if not isinstance(problemlist, list): - fatal("problems.yaml must contain a problems: list.") - + def parse_problems_yaml(problemlist: list[dict[str, Any]]) -> list[tuple[str, str]]: labels = dict[str, str]() # label -> shortname problems = [] for p in problemlist: @@ -136,7 +126,7 @@ def parse_problems_yaml(problemlist): error(f"No directory found for problem {shortname} mentioned in problems.yaml.") return problems - def fallback_problems(): + def fallback_problems() -> list[tuple[Path, str]]: problem_paths = list(filter(is_problem_directory, glob(Path("."), "*/"))) label = ( chr(ord("Z") - len(problem_paths) + 1) if contest_yaml().get("test_session") else "A" @@ -148,25 +138,25 @@ def fallback_problems(): return problems problems = [] - if level == "problem": - assert problem + if config.level == "problem": + assert problem_dir # If the problem is mentioned in problems.yaml, use that ID. problemsyaml = problems_yaml() if problemsyaml: problem_labels = parse_problems_yaml(problemsyaml) for shortname, label in problem_labels: - if shortname == problem.name: - problems = [Problem(Path(problem.name), tmpdir, label)] + if shortname == problem_dir.name: + problems = [Problem(Path(problem_dir.name), tmpdir, label)] break if len(problems) == 0: found_label = None for path, label in fallback_problems(): - if path.name == problem.name: + if path.name == problem_dir.name: found_label = label - problems = [Problem(Path(problem.name), tmpdir, found_label)] + problems = [Problem(Path(problem_dir.name), tmpdir, found_label)] else: - level = "problemset" + assert config.level == "problemset" # If problems.yaml is available, use it. problemsyaml = problems_yaml() if problemsyaml: @@ -199,7 +189,7 @@ def fallback_problems(): warn(f"{p.label} does not appear in 'order'") # Sort by position of id in order - def get_pos(id): + def get_pos(id: Optional[str]) -> int: if id in order: return order.index(id) else: @@ -217,7 +207,7 @@ def __init__(self): self.teams_submitted = 0 self.teams_pending = 0 - def update(self, team_stats: dict[str, Any]): + def update(self, team_stats: dict[str, Any]) -> None: if team_stats["solved"]: self.solved += 1 if team_stats["num_judged"]: @@ -264,14 +254,12 @@ def key(self) -> tuple[int, int]: else: error("ruamel.yaml library not found. Update the order manually.") - contest_name = Path().cwd().name - # Filter problems by submissions/testcases, if given. - if level == "problemset" and (config.args.submissions or config.args.testcases): + if config.level == "problemset" and (config.args.submissions or config.args.testcases): submissions = config.args.submissions or [] testcases = config.args.testcases or [] - def keep_problem(problem): + def keep_problem(problem: Problem) -> bool: for s in submissions: x = resolve_path_argument(problem, s, "submissions") if x: @@ -286,28 +274,29 @@ def keep_problem(problem): problems = [p for p in problems if keep_problem(p)] - config.level = level - return problems, level, contest_name, tmpdir + return problems, tmpdir # NOTE: This is one of the few places that prints to stdout instead of stderr. -def print_sorted(problems): +def print_sorted(problems: list[Problem]) -> None: for problem in problems: print(f"{problem.label:<2}: {problem.path}") -def split_submissions_and_testcases(s): - # Everything containing data/, .in, or .ans goes into testcases. +def split_submissions_and_testcases(s: list[Path]) -> tuple[list[Path], list[Path]]: + # We try to identify testcases by common directory names and common suffixes submissions = [] testcases = [] for p in s: - ps = str(p) - if "data" in ps or "sample" in ps or "secret" in ps or ".in" in ps or ".ans" in ps: - # Strip potential .ans and .in - if p.suffix in [".ans", ".in"]: - testcases.append(p.with_suffix("")) - else: - testcases.append(p) + testcase_dirs = ["data", "sample", "secret", "fuzz"] + if ( + any(part in testcase_dirs for part in p.parts) + or p.suffix in config.KNOWN_DATA_EXTENSIONS + ): + # Strip potential suffix + if p.suffix in config.KNOWN_DATA_EXTENSIONS: + p = p.with_suffix("") + testcases.append(p) else: submissions.append(p) return (submissions, testcases) @@ -322,7 +311,7 @@ def __init__(self, **kwargs): super(SuppressingParser, self).__init__(**kwargs, argument_default=argparse.SUPPRESS) -def build_parser(): +def build_parser() -> SuppressingParser: parser = SuppressingParser( description=""" Tools for ICPC style problem sets. @@ -1014,39 +1003,90 @@ def build_parser(): return parser -# Takes a Namespace object returned by argparse.parse_args(). -def run_parsed_arguments(args): +def find_personal_config() -> Optional[Path]: + if is_windows(): + app_data = os.getenv("AppData") + return Path(app_data) if app_data else None + else: + home = os.getenv("HOME") + xdg_config_home = os.getenv("XDG_CONFIG_HOME") + return ( + Path(xdg_config_home) if xdg_config_home else Path(home) / ".config" if home else None + ) + + +def read_personal_config(problem_dir: Optional[Path]) -> dict[str, Any]: + args = {} + home_config = find_personal_config() + # possible config files, sorted by priority + config_files = [] + if problem_dir: + config_files.append(problem_dir / ".bapctools.yaml") + config_files.append(Path.cwd() / ".bapctools.yaml") + if home_config: + config_files.append(home_config / "bapctools" / "config.yaml") + + for config_file in config_files: + if not config_file.is_file(): + continue + config_data = read_yaml(config_file) or {} + for arg, value in config_data.items(): + if arg not in args: + args[arg] = value + + return args + + +def run_parsed_arguments(args: argparse.Namespace, personal_config: bool = True) -> None: + # Don't zero newly allocated memory for this and any subprocess + # Will likely only have an effect on linux + os.environ["MALLOC_PERTURB_"] = str(0b01011001) + # Process arguments config.args = args - config.set_default_args() + missing_args = config.set_default_args() - action = config.args.action + # cd to contest directory + call_cwd = Path.cwd().absolute() + problem_dir = change_directory() + level = config.level + contest_name = Path.cwd().name - # Split submissions and testcases when needed. - if action in ["run", "fuzz", "time_limit"]: - if config.args.submissions: - config.args.submissions, config.args.testcases = split_submissions_and_testcases( - config.args.submissions - ) - else: - config.args.testcases = [] + if personal_config: + personal_args = read_personal_config(problem_dir) + for arg in missing_args: + if arg in personal_args: + setattr(config.args, arg, personal_args[arg]) + + action = config.args.action # upgrade commands. if action == "upgrade": - upgrade.upgrade() + upgrade.upgrade(problem_dir) return # Skel commands. if action == "new_contest": + os.chdir(call_cwd) skel.new_contest() return if action == "new_problem": + os.chdir(call_cwd) skel.new_problem() return - # Get problem_paths and cd to contest - problems, level, contest, tmpdir = get_problems() + # get problems list + problems, tmpdir = get_problems(problem_dir) + + # Split submissions and testcases when needed. + if action in ["run", "fuzz", "time_limit"]: + if config.args.submissions: + config.args.submissions, config.args.testcases = split_submissions_and_testcases( + config.args.submissions + ) + else: + config.args.testcases = [] # Check non unique uuid # TODO: check this even more globally? @@ -1136,15 +1176,15 @@ def run_parsed_arguments(args): return if action == "gitlabci": - skel.create_gitlab_jobs(contest, problems) + skel.create_gitlab_jobs(contest_name, problems) return if action == "forgejo_actions": - skel.create_forgejo_actions(contest, problems) + skel.create_forgejo_actions(contest_name, problems) return if action == "github_actions": - skel.create_github_actions(contest, problems) + skel.create_github_actions(contest_name, problems) return if action == "skel": @@ -1293,15 +1333,15 @@ def run_parsed_arguments(args): export.export_contest_and_problems(problems, languages) if level == "problemset": - print(f"{Style.BRIGHT}CONTEST {contest}{Style.RESET_ALL}", file=sys.stderr) + print(f"{Style.BRIGHT}CONTEST {contest_name}{Style.RESET_ALL}", file=sys.stderr) # build pdf for the entire contest if action in ["pdf"]: - success &= latex.build_contest_pdfs(contest, problems, tmpdir, web=config.args.web) + success &= latex.build_contest_pdfs(contest_name, problems, tmpdir, web=config.args.web) if action in ["solutions"]: success &= latex.build_contest_pdfs( - contest, + contest_name, problems, tmpdir, build_type=latex.PdfType.SOLUTION, @@ -1310,7 +1350,7 @@ def run_parsed_arguments(args): if action in ["problem_slides"]: success &= latex.build_contest_pdfs( - contest, + contest_name, problems, tmpdir, build_type=latex.PdfType.PROBLEM_SLIDE, @@ -1329,20 +1369,20 @@ def run_parsed_arguments(args): ) for language in languages: - success &= latex.build_contest_pdfs(contest, problems, tmpdir, language) + success &= latex.build_contest_pdfs(contest_name, problems, tmpdir, language) success &= latex.build_contest_pdfs( - contest, problems, tmpdir, language, web=True + contest_name, problems, tmpdir, language, web=True ) if not config.args.no_solutions: success &= latex.build_contest_pdf( - contest, + contest_name, problems, tmpdir, language, build_type=latex.PdfType.SOLUTION, ) success &= latex.build_contest_pdf( - contest, + contest_name, problems, tmpdir, language, @@ -1351,7 +1391,7 @@ def run_parsed_arguments(args): ) if build_problem_slides: success &= latex.build_contest_pdf( - contest, + contest_name, problems, tmpdir, language, @@ -1361,9 +1401,9 @@ def run_parsed_arguments(args): if not build_problem_slides: log(f"No problem has {slideglob.name}, skipping problem slides") - outfile = contest + ".zip" + outfile = contest_name + ".zip" if config.args.kattis: - outfile = contest + "-kattis.zip" + outfile = contest_name + "-kattis.zip" export.build_contest_zip(problems, problem_zips, outfile, languages) if action in ["update_problems_yaml"]: @@ -1380,54 +1420,15 @@ def run_parsed_arguments(args): sys.exit(1) -def find_personal_config() -> Optional[Path]: - if is_windows(): - app_data = os.getenv("AppData") - return Path(app_data) if app_data else None - else: - home = os.getenv("HOME") - xdg_config_home = os.getenv("XDG_CONFIG_HOME") - return ( - Path(xdg_config_home) if xdg_config_home else Path(home) / ".config" if home else None - ) - - -def read_personal_config(): - args = {} - home_config = find_personal_config() - - for config_file in [ - # Highest prio: contest directory - Path() / ".bapctools.yaml", - Path() / ".." / ".bapctools.yaml", - ] + ( - # Lowest prio: user config directory - [home_config / "bapctools" / "config.yaml"] if home_config else [] - ): - if not config_file.is_file(): - continue - config_data = read_yaml(config_file) or {} - for arg, value in config_data.items(): - if arg not in args: - args[arg] = value - - return args - - # Takes command line arguments -def main(): - def interrupt_handler(sig, frame): +def main() -> None: + def interrupt_handler(sig: Any, frame: Any) -> None: fatal("Running interrupted") signal.signal(signal.SIGINT, interrupt_handler) - # Don't zero newly allocated memory for this any any subprocess - # Will likely only work on linux - os.environ["MALLOC_PERTURB_"] = str(0b01011001) - try: parser = build_parser() - parser.set_defaults(**read_personal_config()) run_parsed_arguments(parser.parse_args()) except AbortException: fatal("Running interrupted") @@ -1437,19 +1438,19 @@ def interrupt_handler(sig, frame): main() -def test(args): +def test(args: list[str]) -> None: config.RUNNING_TEST = True # Make sure to cd back to the original directory before returning. # Needed to stay in the same directory in tests. - original_directory = Path().cwd() + original_directory = Path.cwd() config.n_warn = 0 config.n_error = 0 contest._contest_yaml = None contest._problems_yaml = None try: parser = build_parser() - run_parsed_arguments(parser.parse_args(args)) + run_parsed_arguments(parser.parse_args(args), personal_config=False) finally: os.chdir(original_directory) ProgressBar.current_bar = None diff --git a/bin/upgrade.py b/bin/upgrade.py index 12aeb4c3..5362e9c4 100644 --- a/bin/upgrade.py +++ b/bin/upgrade.py @@ -609,26 +609,27 @@ def _upgrade(problem_path: Path, bar: ProgressBar) -> None: bar.done() -def upgrade() -> None: +def upgrade(problem_dir: Optional[Path]) -> None: if not has_ryaml: error("upgrade needs the ruamel.yaml python3 library. Install python[3]-ruamel.yaml.") return - cwd = Path().cwd() - def is_problem_directory(path: Path) -> bool: - return (path / "problem.yaml").is_file() - - if is_problem_directory(cwd): - paths = [cwd] + if config.level == "problem": + assert problem_dir + if not is_problem_directory(problem_dir): + fatal(f"{problem_dir} does not contain a problem.yaml") + paths = [problem_dir] + bar = ProgressBar("upgrade", items=paths) else: - paths = [p for p in cwd.iterdir() if is_problem_directory(p)] - - bar = ProgressBar("upgrade", items=["contest.yaml", *paths]) - - bar.start("contest.yaml") - if (cwd / "contest.yaml").is_file(): - upgrade_contest_yaml(cwd / "contest.yaml", bar) - bar.done() + assert config.level == "problemset" + contest_dir = Path.cwd() + paths = [p for p in contest_dir.iterdir() if is_problem_directory(p)] + bar = ProgressBar("upgrade", items=["contest.yaml", *paths]) + + bar.start("contest.yaml") + if (contest_dir / "contest.yaml").is_file(): + upgrade_contest_yaml(contest_dir / "contest.yaml", bar) + bar.done() for path in paths: _upgrade(path, bar) diff --git a/bin/util.py b/bin/util.py index 27f76904..eb584f6c 100644 --- a/bin/util.py +++ b/bin/util.py @@ -1461,6 +1461,11 @@ def inc_label(label: str) -> str: return "A" + label +# A path is a problem directory if it contains a `problem.yaml` file. +def is_problem_directory(path: Path) -> bool: + return (path / "problem.yaml").is_file() + + def combine_hashes(values: Sequence[str]) -> str: hasher = hashlib.sha512(usedforsecurity=False) for item in sorted(values):