vcs.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #!/usr/bin/env python3
  2. # Copyright 2023 The ChromiumOS Authors
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """
  6. Provides helpers for accessing gerrit and listing files under version control.
  7. """
  8. import functools
  9. import getpass
  10. import json
  11. import shutil
  12. import sys
  13. from pathlib import Path
  14. from tempfile import gettempdir
  15. from typing import (
  16. Any,
  17. Dict,
  18. List,
  19. cast,
  20. )
  21. from .command import quoted, cmd
  22. from .util import very_verbose
  23. # File where to store http headers for gcloud authentication
  24. AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}"
  25. "Url of crosvm's gerrit review host"
  26. GERRIT_URL = "https://chromium-review.googlesource.com"
  27. def all_tracked_files():
  28. for line in cmd("git ls-files").lines():
  29. file = Path(line)
  30. if file.is_file():
  31. yield file
  32. def find_source_files(extension: str, ignore: List[str] = []):
  33. for file in all_tracked_files():
  34. if file.suffix != f".{extension}":
  35. continue
  36. if file.is_relative_to("third_party"):
  37. continue
  38. if str(file) in ignore:
  39. continue
  40. yield file
  41. def find_scripts(path: Path, shebang: str):
  42. for file in path.glob("*"):
  43. if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"):
  44. yield file
  45. def get_cookie_file():
  46. path = cmd("git config http.cookiefile").stdout(check=False)
  47. return Path(path) if path else None
  48. def get_gcloud_access_token():
  49. if not shutil.which("gcloud"):
  50. return None
  51. return cmd("gcloud auth print-access-token").stdout(check=False)
  52. @functools.lru_cache(maxsize=None)
  53. def curl_with_git_auth():
  54. """
  55. Returns a curl `Command` instance set up to use the same HTTP credentials as git.
  56. This currently supports two methods:
  57. - git cookies (the default)
  58. - gcloud
  59. Most developers will use git cookies, which are passed to curl.
  60. glloud for authorization can be enabled in git via `git config credential.helper gcloud.sh`.
  61. If enabled in git, this command will also return a curl command using a gloud access token.
  62. """
  63. helper = cmd("git config credential.helper").stdout(check=False)
  64. if not helper:
  65. cookie_file = get_cookie_file()
  66. if not cookie_file or not cookie_file.is_file():
  67. raise Exception("git http cookiefile is not available.")
  68. return cmd("curl --cookie", cookie_file)
  69. if helper.endswith("gcloud.sh"):
  70. token = get_gcloud_access_token()
  71. if not token:
  72. raise Exception("Cannot get gcloud access token.")
  73. # File where to store http headers for gcloud authentication
  74. AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}"
  75. # Write token to a header file so it will not appear in logs or error messages.
  76. AUTH_HEADERS_FILE.write_text(f"Authorization: Bearer {token}")
  77. return cmd(f"curl -H @{AUTH_HEADERS_FILE}")
  78. raise Exception(f"Unsupported git credentials.helper: {helper}")
  79. def strip_xssi(response: str):
  80. # See https://gerrit-review.googlesource.com/Documentation/rest-api.html#output
  81. assert response.startswith(")]}'\n")
  82. return response[5:]
  83. def gerrit_api_get(path: str):
  84. response = cmd(f"curl --silent --fail {GERRIT_URL}/{path}").stdout()
  85. return json.loads(strip_xssi(response))
  86. def gerrit_api_post(path: str, body: Any):
  87. response = curl_with_git_auth()(
  88. "--silent --fail",
  89. "-X POST",
  90. "-H",
  91. quoted("Content-Type: application/json"),
  92. "-d",
  93. quoted(json.dumps(body)),
  94. f"{GERRIT_URL}/a/{path}",
  95. ).stdout()
  96. if very_verbose():
  97. print("Response:", response)
  98. return json.loads(strip_xssi(response))
  99. class GerritChange(object):
  100. """
  101. Class to interact with the gerrit /changes/ API.
  102. For information on the data format returned by the API, see:
  103. https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-info
  104. """
  105. id: str
  106. _data: Any
  107. def __init__(self, data: Any):
  108. self._data = data
  109. self.id = data["id"]
  110. @functools.cached_property
  111. def _details(self) -> Any:
  112. return gerrit_api_get(f"changes/{self.id}/detail")
  113. @functools.cached_property
  114. def _messages(self) -> List[Any]:
  115. return gerrit_api_get(f"changes/{self.id}/messages")
  116. @property
  117. def status(self):
  118. return cast(str, self._data["status"])
  119. def get_votes(self, label_name: str) -> List[int]:
  120. "Returns the list of votes on `label_name`"
  121. label_info = self._details.get("labels", {}).get(label_name)
  122. votes = label_info.get("all", [])
  123. return [cast(int, v.get("value")) for v in votes]
  124. def get_messages_by(self, email: str) -> List[str]:
  125. "Returns all messages posted by the user with the specified `email`."
  126. return [m["message"] for m in self._messages if m["author"].get("email") == email]
  127. def review(self, message: str, labels: Dict[str, int]):
  128. "Post review `message` and set the specified review `labels`"
  129. print("Posting on", self, ":", message, labels)
  130. gerrit_api_post(
  131. f"changes/{self.id}/revisions/current/review",
  132. {"message": message, "labels": labels},
  133. )
  134. def abandon(self, message: str):
  135. print("Abandoning", self, ":", message)
  136. gerrit_api_post(f"changes/{self.id}/abandon", {"message": message})
  137. @classmethod
  138. def query(cls, *queries: str):
  139. "Returns a list of gerrit changes matching the provided list of queries."
  140. return [cls(c) for c in gerrit_api_get(f"changes/?q={'+'.join(queries)}")]
  141. def short_url(self):
  142. return f"http://crrev.com/c/{self._data['_number']}"
  143. def __str__(self):
  144. return self.short_url()
  145. def pretty_info(self):
  146. return f"{self} - {self._data['subject']}"
  147. if __name__ == "__main__":
  148. import doctest
  149. (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS)
  150. sys.exit(1 if failures > 0 else 0)