command.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. #!/usr/bin/env python3
  2. # Copyright 2023 The ChromiumOS Authors
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """
  6. Provides helpers for writing shell-like scripts in Python.
  7. It provides tools to execute commands with similar flexibility as shell scripts.
  8. """
  9. import contextlib
  10. import json
  11. import os
  12. import re
  13. import shlex
  14. import subprocess
  15. import sys
  16. from copy import deepcopy
  17. from math import ceil
  18. from multiprocessing.pool import ThreadPool
  19. from pathlib import Path
  20. from subprocess import DEVNULL, PIPE, STDOUT # type: ignore
  21. from typing import (
  22. Any,
  23. Callable,
  24. Dict,
  25. Iterable,
  26. List,
  27. NamedTuple,
  28. Optional,
  29. TypeVar,
  30. Union,
  31. )
  32. from .util import verbose, very_verbose, color_enabled
  33. PathLike = Union[Path, str]
  34. # Regex that matches ANSI escape sequences
  35. ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
  36. class CommandResult(NamedTuple):
  37. """Results of a command execution as returned by Command.run()"""
  38. stdout: str
  39. stderr: str
  40. returncode: int
  41. class Command(object):
  42. """
  43. Simplified subprocess handling for shell-like scripts.
  44. ## Example Usage
  45. To run a program on behalf of the user:
  46. >> cmd("cargo build").fg()
  47. This will run the program with stdio passed to the user. Developer tools usually run a set of
  48. actions on behalf of the user. These should be executed with fg().
  49. To make calls in the background to gather information use success/stdout/lines:
  50. >> cmd("git branch").lines()
  51. >> cmd("git rev-parse foo").success()
  52. These will capture all program output. Try to avoid using these to run mutating commands,
  53. as they will remain hidden to the user even when using --verbose.
  54. ## Arguments
  55. Arguments are provided as a list similar to subprocess.run():
  56. >>> Command('cargo', 'build', '--workspace')
  57. Command('cargo', 'build', '--workspace')
  58. In contrast to subprocess.run, all strings are split by whitespaces similar to bash:
  59. >>> Command('cargo build --workspace', '--features foo')
  60. Command('cargo', 'build', '--workspace', '--features', 'foo')
  61. In contrast to bash, globs are *not* evaluated, but can easily be provided using Path:
  62. >>> Command('ls -l', *Path(CROSVM_ROOT).glob('*.toml'))
  63. Command('ls', '-l', ...)
  64. None or False are ignored to make it easy to include conditional arguments:
  65. >>> all = False
  66. >>> Command('cargo build', '--workspace' if all else None)
  67. Command('cargo', 'build')
  68. ## Nesting
  69. Commands can be nested, similar to $() subshells in bash. The sub-commands will be executed
  70. right away and their output will undergo the usual splitting:
  71. >>> Command('printf "(%s)"', Command('echo foo bar')).stdout()
  72. '(foo)(bar)'
  73. Arguments can be explicitly quoted to prevent splitting, it applies to both sub-commands
  74. as well as strings:
  75. >>> Command('printf "(%s)"', quoted(Command('echo foo bar'))).stdout()
  76. '(foo bar)'
  77. Commands can also be piped into one another:
  78. >>> wc = Command('wc')
  79. >>> Command('echo "abcd"').pipe(wc('-c')).stdout()
  80. '5'
  81. ## Verbosity
  82. The --verbose flag is intended for users and will show all command lines executed in the
  83. foreground with fg(), it'll also include output of programs run with fg(quiet=True). Commands
  84. executed in the background are not shown.
  85. For script developers, the --very-verbose flag will print full details and output of all
  86. executed command lines, including those run hidden from the user.
  87. """
  88. def __init__(
  89. self,
  90. *args: Any,
  91. stdin_cmd: Optional["Command"] = None,
  92. env_vars: Dict[str, str] = {},
  93. cwd: Optional[Path] = None,
  94. ):
  95. self.args = Command.__parse_cmd(args)
  96. self.stdin_cmd = stdin_cmd
  97. self.env_vars = env_vars
  98. self.cwd = cwd
  99. ### Builder API to construct commands
  100. def with_args(self, *args: Any):
  101. """Returns a new Command with added arguments.
  102. >>> cargo = Command('cargo')
  103. >>> cargo.with_args('clippy')
  104. Command('cargo', 'clippy')
  105. """
  106. cmd = deepcopy(self)
  107. cmd.args = [*self.args, *Command.__parse_cmd(args)]
  108. return cmd
  109. def with_cwd(self, cwd: Optional[Path]):
  110. """Changes the working directory the command is executed in.
  111. >>> cargo = Command('pwd')
  112. >>> cargo.with_cwd('/tmp').stdout()
  113. '/tmp'
  114. """
  115. cmd = deepcopy(self)
  116. cmd.cwd = cwd
  117. return cmd
  118. def __call__(self, *args: Any):
  119. """Shorthand for Command.with_args"""
  120. return self.with_args(*args)
  121. def with_env(self, key: str, value: Optional[str]):
  122. """
  123. Returns a command with an added env variable.
  124. The variable is removed if value is None.
  125. """
  126. return self.with_envs({key: value})
  127. def with_envs(self, envs: Union[Dict[str, Optional[str]], Dict[str, str]]):
  128. """
  129. Returns a command with an added env variable.
  130. The variable is removed if value is None.
  131. """
  132. cmd = deepcopy(self)
  133. for key, value in envs.items():
  134. if value is not None:
  135. cmd.env_vars[key] = value
  136. else:
  137. if key in cmd.env_vars:
  138. del cmd.env_vars[key]
  139. return cmd
  140. def with_path_env(self, new_path: str):
  141. """Returns a command with a path added to the PATH variable."""
  142. path_var = self.env_vars.get("PATH", os.environ.get("PATH", ""))
  143. return self.with_env("PATH", f"{path_var}:{new_path}")
  144. def with_color_arg(
  145. self,
  146. always: Optional[str] = None,
  147. never: Optional[str] = None,
  148. ):
  149. """Returns a command with an argument added to pass through enabled/disabled colors."""
  150. new_cmd = self
  151. if color_enabled():
  152. if always:
  153. new_cmd = new_cmd(always)
  154. else:
  155. if never:
  156. new_cmd = new_cmd(never)
  157. return new_cmd
  158. def with_color_env(self, var_name: str):
  159. """Returns a command with an env var added to pass through enabled/disabled colors."""
  160. return self.with_env(var_name, "1" if color_enabled() else "0")
  161. def with_color_flag(self, flag: str = "--color"):
  162. """Returns a command with an added --color=always/never/auto flag."""
  163. return self.with_color_arg(always=f"{flag}=always", never=f"{flag}=never")
  164. def foreach(self, arguments: Iterable[Any], batch_size: int = 1):
  165. """
  166. Yields a new command for each entry in `arguments`.
  167. The argument is appended to each command and is intended to be used in
  168. conjunction with `parallel()` to execute a command on a list of arguments in
  169. parallel.
  170. >>> parallel(*cmd('echo').foreach((1, 2, 3))).stdout()
  171. ['1', '2', '3']
  172. Arguments can also be batched by setting batch_size > 1, which will append multiple
  173. arguments to each command.
  174. >>> parallel(*cmd('echo').foreach((1, 2, 3), batch_size=2)).stdout()
  175. ['1 2', '3']
  176. """
  177. for batch in batched(arguments, batch_size):
  178. yield self(*batch)
  179. def pipe(self, *args: Any):
  180. """
  181. Pipes the output of this command into another process.
  182. The target can either be another Command or the argument list to build a new command.
  183. """
  184. if len(args) == 1 and isinstance(args[0], Command):
  185. cmd = Command(stdin_cmd=self)
  186. cmd.args = args[0].args
  187. cmd.env_vars = self.env_vars.copy()
  188. return cmd
  189. else:
  190. return Command(*args, stdin_cmd=self, env_vars=self.env_vars)
  191. ### Executing programs in the foreground
  192. def run_foreground(
  193. self,
  194. quiet: bool = False,
  195. check: bool = True,
  196. dry_run: bool = False,
  197. style: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
  198. ):
  199. """
  200. Runs a program in the foreground with output streamed to the user.
  201. >>> Command('true').fg()
  202. 0
  203. Non-zero exit codes will trigger an Exception
  204. >>> Command('false').fg()
  205. Traceback (most recent call last):
  206. ...
  207. subprocess.CalledProcessError...
  208. But can be disabled:
  209. >>> Command('false').fg(check=False)
  210. 1
  211. Output can be hidden by setting quiet=True:
  212. >>> Command("echo foo").fg(quiet=True)
  213. 0
  214. This will hide the programs stdout and stderr unless the program fails.
  215. More sophisticated means of outputting stdout/err are available via `Styles`:
  216. >>> Command("echo foo").fg(style=Styles.live_truncated())
  217. foo
  218. 0
  219. Will output the results of the command but truncate output after a few lines. See `Styles`
  220. for more options.
  221. Arguments:
  222. quiet: Do not show stdout/stderr unless the program failed.
  223. check: Raise an exception if the program returned an error code.
  224. style: A function to present the output of the program. See `Styles`
  225. Returns: The return code of the program.
  226. """
  227. if dry_run:
  228. print(f"Not running: {self}")
  229. return 0
  230. if quiet:
  231. def quiet_style(process: "subprocess.Popen[str]"):
  232. "Won't print anything unless the command failed."
  233. assert process.stdout
  234. stdout = process.stdout.read()
  235. if process.wait() != 0:
  236. print(stdout, end="")
  237. style = quiet_style
  238. if verbose():
  239. print(f"$ {self}")
  240. if style is None or verbose():
  241. return self.__run(stdout=None, stderr=None, check=check).returncode
  242. else:
  243. process = self.popen(stdout=PIPE, stderr=STDOUT)
  244. style(process)
  245. returncode = process.wait()
  246. if returncode != 0 and check:
  247. assert process.stdout
  248. raise subprocess.CalledProcessError(returncode, process.args)
  249. return returncode
  250. def fg(
  251. self,
  252. quiet: bool = False,
  253. check: bool = True,
  254. dry_run: bool = False,
  255. style: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
  256. ):
  257. """
  258. Shorthand for Command.run_foreground()
  259. """
  260. return self.run_foreground(quiet, check, dry_run, style)
  261. def write_to(self, filename: Path):
  262. """
  263. Writes stdout to the provided file.
  264. """
  265. if verbose():
  266. print(f"$ {self} > {filename}")
  267. with open(filename, "w") as file:
  268. file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout)
  269. def append_to(self, filename: Path):
  270. """
  271. Appends stdout to the provided file.
  272. """
  273. if verbose():
  274. print(f"$ {self} >> {filename}")
  275. with open(filename, "a") as file:
  276. file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout)
  277. ### API for executing commands hidden from the user
  278. def success(self):
  279. """
  280. Returns True if the program succeeded (i.e. returned 0).
  281. The program will not be visible to the user unless --very-verbose is specified.
  282. """
  283. if very_verbose():
  284. print(f"$ {self}")
  285. return self.__run(stdout=PIPE, stderr=PIPE, check=False).returncode == 0
  286. def stdout(self, check: bool = True, stderr: int = PIPE):
  287. """
  288. Runs a program and returns stdout.
  289. The program will not be visible to the user unless --very-verbose is specified.
  290. """
  291. if very_verbose():
  292. print(f"$ {self}")
  293. return self.__run(stdout=PIPE, stderr=stderr, check=check).stdout.strip()
  294. def json(self, check: bool = True) -> Any:
  295. """
  296. Runs a program and returns stdout parsed as json.
  297. The program will not be visible to the user unless --very-verbose is specified.
  298. """
  299. stdout = self.stdout(check=check)
  300. if stdout:
  301. return json.loads(stdout)
  302. else:
  303. return None
  304. def lines(self, check: bool = True, stderr: int = PIPE):
  305. """
  306. Runs a program and returns stdout line by line.
  307. The program will not be visible to the user unless --very-verbose is specified.
  308. """
  309. return self.stdout(check=check, stderr=stderr).splitlines()
  310. ### Utilities
  311. def __str__(self):
  312. stdin = ""
  313. if self.stdin_cmd:
  314. stdin = str(self.stdin_cmd) + " | "
  315. return stdin + shlex.join(self.args)
  316. def __repr__(self):
  317. stdin = ""
  318. if self.stdin_cmd:
  319. stdin = ", stdin_cmd=" + repr(self.stdin_cmd)
  320. return f"Command({', '.join(repr(a) for a in self.args)}{stdin})"
  321. ### Private implementation details
  322. def __run(
  323. self,
  324. stdout: Optional[int],
  325. stderr: Optional[int],
  326. check: bool = True,
  327. ) -> CommandResult:
  328. "Run this command in subprocess.run()"
  329. if very_verbose():
  330. print(f"cwd: {Path().resolve()}")
  331. for k, v in self.env_vars.items():
  332. print(f"env: {k}={v}")
  333. result = subprocess.run(
  334. self.args,
  335. cwd=self.cwd,
  336. stdout=stdout,
  337. stderr=stderr,
  338. stdin=self.__stdin_stream(),
  339. env={**os.environ, **self.env_vars},
  340. check=check,
  341. text=True,
  342. )
  343. if very_verbose():
  344. if result.stdout:
  345. for line in result.stdout.splitlines():
  346. print("stdout:", line)
  347. if result.stderr:
  348. for line in result.stderr.splitlines():
  349. print("stderr:", line)
  350. print("returncode:", result.returncode)
  351. if check and result.returncode != 0:
  352. raise subprocess.CalledProcessError(result.returncode, str(self), result.stdout)
  353. return CommandResult(result.stdout, result.stderr, result.returncode)
  354. def __stdin_stream(self):
  355. if self.stdin_cmd:
  356. return self.stdin_cmd.popen(stdout=PIPE, stderr=PIPE).stdout
  357. return None
  358. def popen(self, **kwargs: Any) -> "subprocess.Popen[str]":
  359. """
  360. Runs a program and returns the Popen object of the running process.
  361. """
  362. return subprocess.Popen(
  363. self.args,
  364. cwd=self.cwd,
  365. stdin=self.__stdin_stream(),
  366. env={**os.environ, **self.env_vars},
  367. text=True,
  368. **kwargs,
  369. )
  370. @staticmethod
  371. def __parse_cmd(args: Iterable[Any]) -> List[str]:
  372. """Parses command line arguments for Command."""
  373. res = [parsed for arg in args for parsed in Command.__parse_cmd_args(arg)]
  374. return res
  375. @staticmethod
  376. def __parse_cmd_args(arg: Any) -> List[str]:
  377. """Parses a mixed type command line argument into a list of strings."""
  378. def escape_backslash_if_necessary(input: str) -> str:
  379. if os.name == "nt":
  380. return input.replace("\\", "\\\\")
  381. else:
  382. return input
  383. if isinstance(arg, Path):
  384. return [escape_backslash_if_necessary(str(arg))]
  385. elif isinstance(arg, QuotedString):
  386. return [arg.value]
  387. elif isinstance(arg, Command):
  388. return [*shlex.split(escape_backslash_if_necessary(arg.stdout()))]
  389. elif arg is None or arg is False:
  390. return []
  391. else:
  392. return [*shlex.split(escape_backslash_if_necessary(str(arg)))]
  393. class ParallelCommands(object):
  394. """
  395. Allows commands to be run in parallel.
  396. >>> parallel(cmd('true'), cmd('false')).fg(check=False)
  397. [0, 1]
  398. >>> parallel(cmd('echo a'), cmd('echo b')).stdout()
  399. ['a', 'b']
  400. """
  401. def __init__(self, *commands: Command):
  402. self.commands = commands
  403. def fg(self, quiet: bool = False, check: bool = True):
  404. with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool:
  405. return pool.map(lambda command: command.fg(quiet=quiet, check=check), self.commands)
  406. def stdout(self):
  407. with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool:
  408. return pool.map(lambda command: command.stdout(), self.commands)
  409. def success(self):
  410. results = self.fg(check=False, quiet=True)
  411. return all(result == 0 for result in results)
  412. class Remote(object):
  413. """
  414. Wrapper around the cmd() API and allow execution of commands via SSH."
  415. """
  416. def __init__(self, host: str, opts: Dict[str, str]):
  417. self.host = host
  418. ssh_opts = [f"-o{k}={v}" for k, v in opts.items()]
  419. self.ssh_cmd = cmd("ssh", host, "-T", *ssh_opts)
  420. self.scp_cmd = cmd("scp", *ssh_opts)
  421. def ssh(self, cmd: Command, remote_cwd: Optional[Path] = None):
  422. # Use huponexit to ensure the process is killed if the connection is lost.
  423. # Use shlex to properly quote the command.
  424. wrapped_cmd = f"bash -O huponexit -c {shlex.quote(str(cmd))}"
  425. if remote_cwd is not None:
  426. wrapped_cmd = f"cd {remote_cwd} && {wrapped_cmd}"
  427. # The whole command to pass it to SSH for remote execution.
  428. return self.ssh_cmd.with_args(quoted(wrapped_cmd))
  429. def scp(self, sources: List[Path], target: str, quiet: bool = False):
  430. return self.scp_cmd.with_args(*sources, f"{self.host}:{target}").fg(quiet=quiet)
  431. @contextlib.contextmanager
  432. def cwd_context(path: PathLike):
  433. """Context for temporarily changing the cwd.
  434. >>> with cwd('/tmp'):
  435. ... os.getcwd()
  436. '/tmp'
  437. """
  438. cwd = os.getcwd()
  439. try:
  440. chdir(path)
  441. yield
  442. finally:
  443. chdir(cwd)
  444. def chdir(path: PathLike):
  445. if very_verbose():
  446. print("cd", path)
  447. os.chdir(path)
  448. class QuotedString(object):
  449. """
  450. Prevents the provided string from being split.
  451. Commands will be executed and their stdout is quoted.
  452. """
  453. def __init__(self, value: Any):
  454. if isinstance(value, Command):
  455. self.value = value.stdout()
  456. else:
  457. self.value = str(value)
  458. def __str__(self):
  459. return f'"{self.value}"'
  460. T = TypeVar("T")
  461. def batched(source: Iterable[T], max_batch_size: int) -> Iterable[List[T]]:
  462. """
  463. Returns an iterator over batches of elements from source_list.
  464. >>> list(batched([1, 2, 3, 4, 5], 2))
  465. [[1, 2], [3, 4], [5]]
  466. """
  467. source_list = list(source)
  468. # Calculate batch size that spreads elements evenly across all batches
  469. batch_count = ceil(len(source_list) / max_batch_size)
  470. batch_size = ceil(len(source_list) / batch_count)
  471. for index in range(0, len(source_list), batch_size):
  472. yield source_list[index : min(index + batch_size, len(source_list))]
  473. # Shorthands
  474. quoted = QuotedString
  475. cmd = Command
  476. cwd = cwd_context
  477. parallel = ParallelCommands
  478. if __name__ == "__main__":
  479. import doctest
  480. (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS)
  481. sys.exit(1 if failures > 0 else 0)