#!/usr/bin/env python3 # /// script # requires-python = ">=3.9" # dependencies = [ # "aiohttp<4.0.0,>=3.9.5", # "PyYAML<7.0.0,>=6.0.2", # "tomlkit>=0.13.3,<1.0" # ] # /// __license__ = "MIT" import argparse import asyncio import contextlib import copy import hashlib import json import logging import os import subprocess from typing import ( TYPE_CHECKING, Any, Dict, Iterator, List, NamedTuple, Optional, Tuple, TypedDict, ) from urllib.parse import ParseResult, parse_qs, urlparse import aiohttp import tomlkit try: import yaml YAML_AVAIL = True except ImportError: YAML_AVAIL = False if TYPE_CHECKING and not YAML_AVAIL: import yaml CRATES_IO = "https://static.crates.io/crates" CARGO_HOME = "cargo" CARGO_CRATES = f"{CARGO_HOME}/vendor" VENDORED_SOURCES = "vendored-sources" GIT_CACHE = "flatpak-cargo/git" COMMIT_LEN = 7 @contextlib.contextmanager def workdir(path: str) -> Iterator[None]: oldpath = os.getcwd() os.chdir(path) try: yield finally: os.chdir(oldpath) def canonical_url(url: str) -> ParseResult: "Converts a string to a Cargo Canonical URL, as per https://github.com/rust-lang/cargo/blob/35c55a93200c84a4de4627f1770f76a8ad268a39/src/cargo/util/canonical_url.rs#L19" # Hrm. The upstream cargo does not replace those URLs, but if we don't then it doesn't work too well :( url = url.replace("git+https://", "https://") u = urlparse(url) # It seems cargo drops query and fragment u = ParseResult(u.scheme, u.netloc, u.path, "", "", "") u = u._replace(path=u.path.rstrip("/")) if u.netloc == "github.com": u = u._replace(scheme="https") u = u._replace(path=u.path.lower()) if u.path.endswith(".git"): u = u._replace(path=u.path[: -len(".git")]) return u def get_git_tarball(repo_url: str, commit: str) -> str: url = canonical_url(repo_url) path = url.path.split("/")[1:] assert len(path) == 2 owner = path[0] if path[1].endswith(".git"): repo = path[1].replace(".git", "") else: repo = path[1] if url.hostname == "github.com": return f"https://codeload.{url.hostname}/{owner}/{repo}/tar.gz/{commit}" elif url.hostname.split(".")[0] == "gitlab": # type: ignore return f"https://{url.hostname}/{owner}/{repo}/-/archive/{commit}/{repo}-{commit}.tar.gz" elif url.hostname == "bitbucket.org": return f"https://{url.hostname}/{owner}/{repo}/get/{commit}.tar.gz" else: raise ValueError(f"Don't know how to get tarball for {repo_url}") async def get_remote_sha256(url: str) -> str: logging.info(f"started sha256({url})") sha256 = hashlib.sha256() async with aiohttp.ClientSession(raise_for_status=True) as http_session: async with http_session.get(url) as response: while True: data = await response.content.read(4096) if not data: break sha256.update(data) logging.info(f"done sha256({url})") return sha256.hexdigest() _TomlType = Dict[str, Any] def load_toml(tomlfile: str = "Cargo.lock") -> _TomlType: with open(tomlfile, "r", encoding="utf-8") as f: toml_data = tomlkit.parse(f.read()).unwrap() return toml_data def git_repo_name(git_url: str, commit: str) -> str: name = canonical_url(git_url).path.split("/")[-1] return f"{name}-{commit[:COMMIT_LEN]}" def fetch_git_repo(git_url: str, commit: str) -> str: repo_dir = git_url.replace("://", "_").replace("/", "_") cache_dir = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache")) clone_dir = os.path.join(cache_dir, "flatpak-cargo", repo_dir) if not os.path.isdir(os.path.join(clone_dir, ".git")): subprocess.run(["git", "clone", "--depth=1", git_url, clone_dir], check=True) rev_parse_proc = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=clone_dir, check=True, stdout=subprocess.PIPE ) head = rev_parse_proc.stdout.decode().strip() if head[:COMMIT_LEN] != commit[:COMMIT_LEN]: subprocess.run(["git", "fetch", "origin", commit], cwd=clone_dir, check=True) try: subprocess.run(["git", "checkout", commit], cwd=clone_dir, check=True) except subprocess.CalledProcessError: logging.info( "Checking out commit %s failed for %s. Trying to force checkout the requested commit", commit, git_url, ) subprocess.run(["git", "checkout", "-f", commit], cwd=clone_dir, check=True) # Get the submodules as they might contain dependencies. This is a noop if # there are no submodules in the repository subprocess.run( ["git", "submodule", "update", "--init", "--recursive"], cwd=clone_dir, check=True, ) return clone_dir def update_workspace_keys(pkg: dict[str, Any], workspace: dict[str, Any]) -> None: for key, item in list(pkg.items()): # There cannot be a 'workspace' key if the item is not a dict. if not isinstance(item, dict): continue # Recurse for keys under target.cfg(..) if key == "target": for target in item.values(): update_workspace_keys(target, workspace) continue # dev-dependencies and build-dependencies should reference root dependencies table from workspace elif key == "dev-dependencies" or key == "build-dependencies": update_workspace_keys(item, workspace.get("dependencies", None)) continue if not workspace or key not in workspace: continue workspace_item = workspace[key] if "workspace" in item: if isinstance(workspace_item, dict): del item["workspace"] for dep_key, workspace_value in workspace_item.items(): # features are additive if dep_key == "features" and "features" in item: item["features"] += workspace_value else: item[dep_key] = workspace_value elif len(item) > 1: del item["workspace"] item.update({"version": workspace_item}) else: pkg[key] = workspace_item else: update_workspace_keys(item, workspace_item) class _GitPackage(NamedTuple): path: str package: _TomlType workspace: Optional[_TomlType] @property def normalized(self) -> _TomlType: package = copy.deepcopy(self.package) if self.workspace is None: return package update_workspace_keys(package, self.workspace) return package _GitPackagesType = Dict[str, _GitPackage] async def get_git_repo_packages(git_url: str, commit: str) -> _GitPackagesType: logging.info("Loading packages from %s", git_url) git_repo_dir = fetch_git_repo(git_url, commit) packages: _GitPackagesType = {} def get_cargo_toml_packages( root_dir: str, workspace: Optional[_TomlType] = None ) -> None: assert not os.path.isabs(root_dir) and os.path.isdir(root_dir) with workdir(root_dir): if os.path.exists("Cargo.toml"): cargo_toml = load_toml("Cargo.toml") workspace = cargo_toml.get("workspace") or workspace if "package" in cargo_toml: packages[cargo_toml["package"]["name"]] = _GitPackage( path=os.path.normpath(root_dir), package=cargo_toml, workspace=workspace, ) for child in os.scandir(root_dir): if child.is_dir(): # the workspace can be referenced by any subdirectory get_cargo_toml_packages(child.path, workspace) with workdir(git_repo_dir): get_cargo_toml_packages(".") assert packages, f"No packages found in {git_repo_dir}" logging.debug( "Packages in %s:\n%s", git_url, json.dumps( {k: v.path for k, v in packages.items()}, indent=4, ), ) return packages _FlatpakSourceType = Dict[str, Any] async def get_git_repo_sources( url: str, commit: str, tarball: bool = False, ) -> List[_FlatpakSourceType]: name = git_repo_name(url, commit) if tarball: tarball_url = get_git_tarball(url, commit) git_repo_sources = [ { "type": "archive", "archive-type": "tar-gzip", "url": tarball_url, "sha256": await get_remote_sha256(tarball_url), "dest": f"{GIT_CACHE}/{name}", } ] else: git_repo_sources = [ { "type": "git", "url": url, "commit": commit, "dest": f"{GIT_CACHE}/{name}", } ] return git_repo_sources _GitRepo = TypedDict( "_GitRepo", {"lock": asyncio.Lock, "commits": Dict[str, _GitPackagesType]} ) _GitReposType = Dict[str, _GitRepo] _VendorEntryType = Dict[str, Dict[str, str]] async def get_git_package_sources( package: _TomlType, git_repos: _GitReposType, ) -> Tuple[List[_FlatpakSourceType], _VendorEntryType]: name = package["name"] source = package["source"] commit = urlparse(source).fragment assert commit, "The commit needs to be indicated in the fragement part" canonical = canonical_url(source) repo_url = canonical.geturl() git_repo = git_repos.setdefault( repo_url, { "commits": {}, "lock": asyncio.Lock(), }, ) async with git_repo["lock"]: if commit not in git_repo["commits"]: git_repo["commits"][commit] = await get_git_repo_packages(repo_url, commit) cargo_vendored_entry: _VendorEntryType = { repo_url: { "git": repo_url, "replace-with": VENDORED_SOURCES, } } rev = parse_qs(urlparse(source).query).get("rev") tag = parse_qs(urlparse(source).query).get("tag") branch = parse_qs(urlparse(source).query).get("branch") if rev: assert len(rev) == 1 cargo_vendored_entry[repo_url]["rev"] = rev[0] elif tag: assert len(tag) == 1 cargo_vendored_entry[repo_url]["tag"] = tag[0] elif branch: assert len(branch) == 1 cargo_vendored_entry[repo_url]["branch"] = branch[0] logging.info("Adding package %s from %s", name, repo_url) git_pkg = git_repo["commits"][commit][name] pkg_repo_dir = os.path.join( GIT_CACHE, git_repo_name(repo_url, commit), git_pkg.path ) git_sources: List[_FlatpakSourceType] = [ { "type": "shell", "commands": [ f'cp -r --reflink=auto "{pkg_repo_dir}" "{CARGO_CRATES}/{name}"' ], }, { "type": "inline", "contents": tomlkit.dumps(git_pkg.normalized), "dest": f"{CARGO_CRATES}/{name}", # -{version}', "dest-filename": "Cargo.toml", }, { "type": "inline", "contents": json.dumps({"package": None, "files": {}}), "dest": f"{CARGO_CRATES}/{name}", # -{version}', "dest-filename": ".cargo-checksum.json", }, ] return (git_sources, cargo_vendored_entry) async def get_package_sources( package: _TomlType, cargo_lock: _TomlType, git_repos: _GitReposType, ) -> Optional[Tuple[List[_FlatpakSourceType], _VendorEntryType]]: metadata = cargo_lock.get("metadata") name = package["name"] version = package["version"] if "source" not in package: logging.debug("%s has no source", name) return None source = package["source"] if source.startswith("git+"): return await get_git_package_sources(package, git_repos) key = f"checksum {name} {version} ({source})" if metadata is not None and key in metadata: checksum = metadata[key] elif "checksum" in package: checksum = package["checksum"] else: logging.warning(f"{name} doesn't have checksum") return None crate_sources = [ { "type": "archive", "archive-type": "tar-gzip", "url": f"{CRATES_IO}/{name}/{name}-{version}.crate", "sha256": checksum, "dest": f"{CARGO_CRATES}/{name}-{version}", }, { "type": "inline", "contents": json.dumps({"package": checksum, "files": {}}), "dest": f"{CARGO_CRATES}/{name}-{version}", "dest-filename": ".cargo-checksum.json", }, ] return (crate_sources, {"crates-io": {"replace-with": VENDORED_SOURCES}}) async def generate_sources( cargo_lock: _TomlType, git_tarballs: bool = False, ) -> List[_FlatpakSourceType]: git_repos: _GitReposType = {} sources: List[_FlatpakSourceType] = [] package_sources = [] cargo_vendored_sources = { VENDORED_SOURCES: {"directory": f"{CARGO_CRATES}"}, } pkg_coros = [ get_package_sources(p, cargo_lock, git_repos) for p in cargo_lock["package"] ] for pkg in await asyncio.gather(*pkg_coros): if pkg is None: continue else: pkg_sources, cargo_vendored_entry = pkg package_sources.extend(pkg_sources) cargo_vendored_sources.update(cargo_vendored_entry) logging.debug( "Adding collected git repos:\n%s", json.dumps(list(git_repos), indent=4) ) git_repo_coros = [] for git_url, git_repo in git_repos.items(): for git_commit in git_repo["commits"]: git_repo_coros.append( get_git_repo_sources(git_url, git_commit, git_tarballs) ) sources.extend(sum(await asyncio.gather(*git_repo_coros), [])) sources.extend(package_sources) logging.debug("Vendored sources:\n%s", json.dumps(cargo_vendored_sources, indent=4)) sources.append( { "type": "inline", "contents": tomlkit.dumps( { "source": cargo_vendored_sources, } ), "dest": CARGO_HOME, "dest-filename": "config", } ) return sources def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("cargo_lock", help="Path to the Cargo.lock file") parser.add_argument( "-o", "--output", required=False, help="Where to write generated sources" ) parser.add_argument( "--yaml", action="store_true", help="Output as YAML instead of JSON" ) parser.add_argument( "-t", "--git-tarballs", action="store_true", help="Download git repos as tarballs", ) parser.add_argument("-d", "--debug", action="store_true") args = parser.parse_args() if args.output is not None: outfile = args.output elif args.yaml and YAML_AVAIL: outfile = "generated-sources.yml" else: outfile = "generated-sources.json" if args.debug: loglevel = logging.DEBUG else: loglevel = logging.INFO logging.basicConfig(level=loglevel) generated_sources = asyncio.run( generate_sources(load_toml(args.cargo_lock), git_tarballs=args.git_tarballs) ) if args.yaml and YAML_AVAIL: with open(outfile, "w", encoding="utf-8") as out: yaml.dump(generated_sources, out, sort_keys=False) else: with open(outfile, "w", encoding="utf-8") as out: json.dump(generated_sources, out, indent=4, sort_keys=False) if __name__ == "__main__": main()