123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- #!/usr/bin/env python3
- # Copyright 2022 The ChromiumOS Authors
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- import os
- import subprocess
- import sys
- import traceback
- from concurrent.futures import ThreadPoolExecutor
- from dataclasses import dataclass
- from datetime import datetime, timedelta
- from fnmatch import fnmatch
- from pathlib import Path
- from time import sleep
- from typing import Callable, List, Sequence, NamedTuple, Optional, Union
- from impl.common import (
- Command,
- ParallelCommands,
- all_tracked_files,
- cmd,
- console,
- rich,
- strip_ansi_escape_sequences,
- verbose,
- )
- git = cmd("git")
- @dataclass
- class CheckContext(object):
- "Information passed to each check when it's called."
- # Whether or not --fix was set and checks should attempt to fix problems they encounter.
- fix: bool
- # All files that this check should cover (e.g. all python files on a python check).
- all_files: List[Path]
- # Those files of all_files that were modified locally.
- modified_files: List[Path]
- # Files that do not exist upstream and have been added locally.
- new_files: List[Path]
- class Check(NamedTuple):
- "Metadata for each check, definining on which files it should run."
- # Function to call for this check
- check_function: Callable[[CheckContext], Union[Command, None, List[Command]]]
- custom_name: Optional[str] = None
- # List of globs that this check should be triggered on
- files: List[str] = []
- python_tools: bool = False
- # List of globs to exclude from this check
- exclude: List[str] = []
- # Whether or not this check can fix issues.
- can_fix: bool = False
- # Which groups this check belongs to.
- groups: List[str] = []
- # Priority tasks usually take lonkger and are started first, and will show preliminary output.
- priority: bool = False
- @property
- def name(self):
- if self.custom_name:
- return self.custom_name
- name = self.check_function.__name__
- if name.startswith("check_"):
- return name[len("check_") :]
- return name
- @property
- def doc(self):
- if self.check_function.__doc__:
- return self.check_function.__doc__.strip()
- else:
- return None
- class Group(NamedTuple):
- "Metadata for a group of checks"
- name: str
- doc: str
- checks: List[str]
- def list_file_diff():
- """
- Lists files there were modified compared to the upstream branch.
- Falls back to all files tracked by git if there is no upstream branch.
- """
- upstream = git("rev-parse @{u}").stdout(check=False)
- if upstream:
- for line in git("diff --name-status", upstream).lines():
- parts = line.split("\t", 1)
- file = Path(parts[1].strip())
- if file.is_file():
- yield (parts[0].strip(), file)
- else:
- print("WARNING: Not tracking a branch. Checking all files.")
- for file in all_tracked_files():
- yield ("M", file)
- def should_run_check_on_file(check: Check, file: Path):
- "Returns true if `file` should be run on `check`."
- # Skip third_party except vmm_vhost.
- if str(file).startswith("third_party") and not str(file).startswith("third_party/vmm_vhost"):
- return False
- # Skip excluded files
- for glob in check.exclude:
- if fnmatch(str(file), glob):
- return False
- # Match python tools (no file-extension, but with a python shebang line)
- if check.python_tools:
- if fnmatch(str(file), "tools/*") and file.suffix == "" and file.is_file():
- if file.open(errors="ignore").read(32).startswith("#!/usr/bin/env python3"):
- return True
- # If no constraint is specified, match all files.
- if not check.files and not check.python_tools:
- return True
- # Otherwise, match only those specified by `files`.
- for glob in check.files:
- if fnmatch(str(file), glob):
- return True
- return False
- class Task(object):
- """
- Represents a task that needs to be executed to perform a `Check`.
- The task can be executed via `Task.execute`, which will update the state variables with
- status and progress information.
- This information can then be rendered from a separate thread via `Task.status_widget()`
- """
- def __init__(self, title: str, commands: Sequence[Command], priority: bool):
- "Display title."
- self.title = title
- "Commands to execute."
- self.commands = commands
- "Task is a priority check."
- self.priority = priority
- "List of log lines (stdout+stderr) produced by the task."
- self.log_lines: List[str] = []
- "Task was compleded, but may or not have been successful."
- self.done = False
- "True if the task completed successfully."
- self.success = False
- "Time the task was started."
- self.start_time = datetime.min
- "Duration the task took to execute. Only filled after completion."
- self.duration = timedelta.max
- "Spinner object for status_widget UI."
- self.spinner = rich.spinner.Spinner("point", title)
- def status_widget(self):
- "Returns a rich console object showing the currrent status of the task."
- duration = self.duration if self.done else datetime.now() - self.start_time
- title = f"[{duration.total_seconds():6.2f}s] [bold]{self.title}[/bold]"
- if self.done:
- status: str = "[green]OK [/green]" if self.success else "[red]ERR[/red]"
- title_widget = rich.text.Text.from_markup(f"{status} {title}")
- else:
- self.spinner.text = rich.text.Text.from_markup(title)
- title_widget = self.spinner
- if not self.priority:
- return title_widget
- last_lines = [
- self.log_lines[-3] if len(self.log_lines) >= 3 else "",
- self.log_lines[-2] if len(self.log_lines) >= 2 else "",
- self.log_lines[-1] if len(self.log_lines) >= 1 else "",
- ]
- return rich.console.Group(
- *(
- # Print last log lines without it's original colors
- rich.text.Text(
- "│ " + strip_ansi_escape_sequences(log_line),
- style="light_slate_grey",
- overflow="ellipsis",
- no_wrap=True,
- )
- for log_line in last_lines
- ),
- rich.text.Text("└ ", end="", style="light_slate_grey"),
- title_widget,
- rich.text.Text(),
- )
- def execute(self):
- "Execute the task while updating the status variables."
- try:
- self.start_time = datetime.now()
- success = True
- if verbose():
- for command in self.commands:
- self.log_lines.append(f"$ {command}")
- # Spawn all commands as separate processes
- processes = [
- command.popen(stdout=subprocess.PIPE, stderr=subprocess.STDOUT, errors="replace")
- for command in self.commands
- ]
- # The stdout is collected before we wait for the processes to exit so that the UI is
- # at least real-time for the first process. Note that in this way, the output for
- # other processes other than the first process are not real-time. In addition, we
- # can't proactively kill other processes in the same task if any process fails.
- for process in processes:
- assert process.stdout
- for line in iter(process.stdout.readline, ""):
- self.log_lines.append(line.strip())
- # Wait for all processes to finish and check return code
- for process in processes:
- if process.wait() != 0:
- success = False
- self.duration = datetime.now() - self.start_time
- self.success = success
- self.done = True
- except Exception:
- self.log_lines.append(traceback.format_exc())
- def print_logs(tasks: List[Task]):
- "Prints logs of all failed or unfinished tasks."
- for task in tasks:
- if not task.done:
- print()
- console.rule(f"{task.title} did not finish", style="yellow")
- for line in task.log_lines:
- print(line)
- if not task.log_lines:
- print(f"{task.title} did not output any logs")
- for task in tasks:
- if task.done and not task.success:
- console.rule(f"{task.title} failed", style="red")
- for line in task.log_lines:
- print(line)
- if not task.log_lines:
- print(f"{task.title} did not output any logs")
- def print_summary(tasks: List[Task]):
- "Prints a summary of all task results."
- console.rule("Summary")
- tasks.sort(key=lambda t: t.duration)
- for task in tasks:
- title = f"[{task.duration.total_seconds():6.2f}s] [bold]{task.title}[/bold]"
- status: str = "[green]OK [/green]" if task.success else "[red]ERR[/red]"
- console.print(f"{status} {title}")
- def execute_tasks_parallel(tasks: List[Task]):
- "Executes the list of tasks in parallel, while rendering live status updates."
- with ThreadPoolExecutor() as executor:
- try:
- # Since tasks are executed in subprocesses, we can use a thread pool to parallelize
- # despite the GIL.
- task_futures = [executor.submit(lambda: t.execute()) for t in tasks]
- # Render task updates while they are executing in the background.
- with rich.live.Live(refresh_per_second=30) as live:
- while True:
- live.update(
- rich.console.Group(
- *(t.status_widget() for t in tasks),
- rich.text.Text(),
- rich.text.Text.from_markup(
- "[green]Tip:[/green] Press CTRL-C to abort execution and see all logs."
- ),
- )
- )
- if all(future.done() for future in task_futures):
- break
- sleep(0.1)
- except KeyboardInterrupt:
- print_logs(tasks)
- # Force exit to skip waiting for the executor to shutdown. This will kill all
- # running subprocesses.
- os._exit(1) # type: ignore
- # Render error logs and summary after execution
- print_logs(tasks)
- print_summary(tasks)
- if any(not t.success for t in tasks):
- raise Exception("Some checks failed")
- def execute_tasks_serial(tasks: List[Task]):
- "Executes the list of tasks one-by-one"
- for task in tasks:
- console.rule(task.title)
- for command in task.commands:
- command.fg()
- console.print()
- def generate_plan(
- checks_list: List[Check],
- fix: bool,
- run_on_all_files: bool,
- ):
- "Generates a list of `Task`s to execute the checks provided in `checks_list`"
- all_files = [*all_tracked_files()]
- file_diff = [*list_file_diff()]
- new_files = [f for (s, f) in file_diff if s == "A"]
- if run_on_all_files:
- modified_files = all_files
- else:
- modified_files = [f for (s, f) in file_diff if s in ("M", "A")]
- tasks: List[Task] = []
- unsupported_checks: List[str] = []
- for check in checks_list:
- if fix and not check.can_fix:
- continue
- context = CheckContext(
- fix=fix,
- all_files=[f for f in all_files if should_run_check_on_file(check, f)],
- modified_files=[f for f in modified_files if should_run_check_on_file(check, f)],
- new_files=[f for f in new_files if should_run_check_on_file(check, f)],
- )
- if context.modified_files:
- maybe_commands = check.check_function(context)
- if maybe_commands is None:
- unsupported_checks.append(check.name)
- continue
- commands_list = maybe_commands if isinstance(maybe_commands, list) else [maybe_commands]
- title = f"fixing {check.name}" if fix else check.name
- tasks.append(Task(title, commands_list, check.priority))
- if unsupported_checks:
- console.print("[yellow]Warning:[/yellow] The following checks cannot be run:")
- for unsupported_check in unsupported_checks:
- console.print(f" - {unsupported_check}")
- console.print()
- console.print("[green]Tip:[/green] Use the dev container to run presubmits:")
- console.print()
- console.print(
- f" [blue] $ tools/dev_container tools/presubmit {' '.join(sys.argv[1:])}[/blue]"
- )
- console.print()
- if not os.access("/dev/kvm", os.W_OK):
- console.print("[yellow]Warning:[/yellow] Cannot access KVM. Integration tests are not run.")
- # Sort so that priority tasks are launched (and rendered) first
- tasks.sort(key=lambda t: (t.priority, t.title), reverse=True)
- return tasks
- def run_checks(
- checks_list: List[Check],
- fix: bool,
- run_on_all_files: bool,
- parallel: bool,
- ):
- """
- Runs all checks in checks_list.
- Arguments:
- fix: Run fixes instead of checks on `Check`s that support it.
- run_on_all_files: Do not use git delta, but run on all files.
- nightly_fmt: Use nightly version of rust tooling.
- parallel: Run tasks in parallel.
- """
- tasks = generate_plan(checks_list, fix, run_on_all_files)
- if len(tasks) == 1:
- parallel = False
- if parallel:
- execute_tasks_parallel(list(tasks))
- else:
- execute_tasks_serial(list(tasks))
|