From 6d21e35f61225feabda54a2530b492b772465241 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Wed, 17 Jun 2026 16:39:21 -0400 Subject: [PATCH] feat(bootstrap): parallelize RESOLVE and PREPARE_SOURCE I/O with background threads Submit version resolution and source download/unpack to a thread pool as items are pushed onto the bootstrap stack, overlapping I/O with the main thread's serial processing. Key design decisions: - Background callables are module-level functions (no self capture) so background threads cannot accidentally access mutable Bootstrapper state (self.why, self._seen_requirements, etc.) - _push_items() helper guarantees every RESOLVE and PREPARE_SOURCE item gets a bg_future, including the initial item in bootstrap() - _drain_background_pool() provides an exclusive-build barrier - BootstrapRequirementResolver cache protected by threading.Lock - --bg-threads CLI option (min 1, default: cpu_count // 2) - finalize() uses cancel_futures=True to avoid blocking on abort paths Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Doug Hellmann --- e2e/test_bootstrap_build_tags.sh | 4 +- e2e/test_bootstrap_cache.sh | 12 +- e2e/test_bootstrap_cooldown_prebuilt.sh | 2 +- e2e/test_bootstrap_prerelease.sh | 4 +- pyproject.toml | 1 + .../bootstrap_requirement_resolver.py | 35 + src/fromager/bootstrapper.py | 666 ++++++++++++------ src/fromager/commands/bootstrap.py | 82 ++- src/fromager/commands/lint_requirements.py | 91 ++- tests/test_bootstrap.py | 8 + tests/test_bootstrap_test_mode.py | 14 +- tests/test_bootstrapper.py | 118 +++- tests/test_bootstrapper_iterative.py | 249 ++++--- 13 files changed, 835 insertions(+), 451 deletions(-) diff --git a/e2e/test_bootstrap_build_tags.sh b/e2e/test_bootstrap_build_tags.sh index 6a4dce195..a92523b8e 100755 --- a/e2e/test_bootstrap_build_tags.sh +++ b/e2e/test_bootstrap_build_tags.sh @@ -55,7 +55,7 @@ fromager \ --settings-file="$SCRIPTDIR/bootstrap_settings.yaml" \ bootstrap --cache-wheel-server-url=$WHEEL_SERVER_URL 'stevedore==5.2.0' -if ! grep -q "stevedore: found built wheel on cache server" "$LOGFILE"; then +if ! grep -q "stevedore-5.2.0: found built wheel on cache server" "$LOGFILE"; then echo "FAIL: Did not find log message found built wheel on cache server in $LOGFILE" 1>&2 pass=false fi @@ -77,7 +77,7 @@ fromager \ --settings-file="$SCRIPTDIR/bootstrap_settings.yaml" \ bootstrap --cache-wheel-server-url=$WHEEL_SERVER_URL 'stevedore==5.2.0' -if ! grep -q "stevedore: found existing wheel " "$LOGFILE"; then +if ! grep -q "stevedore-5.2.0: found existing wheel " "$LOGFILE"; then echo "FAIL: Did not find log message found existing wheel in $LOGFILE" 1>&2 pass=false fi diff --git a/e2e/test_bootstrap_cache.sh b/e2e/test_bootstrap_cache.sh index a1c6773b9..feabfd1c1 100755 --- a/e2e/test_bootstrap_cache.sh +++ b/e2e/test_bootstrap_cache.sh @@ -42,8 +42,8 @@ fromager \ bootstrap --cache-wheel-server-url="https://pypi.org/simple" "$DIST==$VER" EXPECTED_LOG_MESSAGES=( -"$DIST: looking for existing wheel for version $VER with build tag () in" -"$DIST: found existing wheel" +"$DIST-$VER: looking for existing wheel for version $VER with build tag () in" +"$DIST-$VER: found existing wheel" ) for pattern in "${EXPECTED_LOG_MESSAGES[@]}"; do if ! grep -q "$pattern" "$OUTDIR/bootstrap.log"; then @@ -54,7 +54,7 @@ done $pass UNEXPECTED_LOG_MESSAGES=( -"$DIST: checking if wheel was already uploaded to https://pypi.org/simple" +"$DIST-$VER: checking if wheel was already uploaded to https://pypi.org/simple" ) for pattern in "${UNEXPECTED_LOG_MESSAGES[@]}"; do @@ -105,9 +105,9 @@ done $pass UNEXPECTED_LOG_MESSAGES=( -"$DIST: loading build sdist dependencies from build-sdist-requirements.txt" -"$DIST: loading build backend dependencies from build-backend-requirements.txt" -"$DIST: loading build system dependencies from build-system-requirements.txt" +"$DIST-$VER: loading build sdist dependencies from build-sdist-requirements.txt" +"$DIST-$VER: loading build backend dependencies from build-backend-requirements.txt" +"$DIST-$VER: loading build system dependencies from build-system-requirements.txt" ) for pattern in "${UNEXPECTED_LOG_MESSAGES[@]}"; do diff --git a/e2e/test_bootstrap_cooldown_prebuilt.sh b/e2e/test_bootstrap_cooldown_prebuilt.sh index 21cb72839..faf080478 100755 --- a/e2e/test_bootstrap_cooldown_prebuilt.sh +++ b/e2e/test_bootstrap_cooldown_prebuilt.sh @@ -45,7 +45,7 @@ if ! grep -q "new toplevel dependency stevedore resolves to 5.3.0" "$OUTDIR/boot fi # The wheel must have been downloaded as a pre-built (not built from source). -if ! grep -q "uses a pre-built wheel" "$OUTDIR/bootstrap.log"; then +if ! grep -q "using pre-built wheel" "$OUTDIR/bootstrap.log"; then echo "FAIL: stevedore was not downloaded as a pre-built wheel" 1>&2 pass=false fi diff --git a/e2e/test_bootstrap_prerelease.sh b/e2e/test_bootstrap_prerelease.sh index c650db0a4..04c4b3723 100755 --- a/e2e/test_bootstrap_prerelease.sh +++ b/e2e/test_bootstrap_prerelease.sh @@ -20,7 +20,7 @@ fromager \ pass=true # Check for log message that the override is loaded -if ! grep -q "flit_core: new toplevel dependency flit_core<2.0.1 resolves to 2.0" "$OUTDIR/bootstrap.log"; then +if ! grep -q "flit_core-2.0: new toplevel dependency flit_core<2.0.1 resolves to 2.0" "$OUTDIR/bootstrap.log"; then echo "FAIL: flit_core did not resolve to 2.0 $OUTDIR/bootstrap.log" 1>&2 pass=false fi @@ -41,7 +41,7 @@ DEBUG_RESOLVER=true fromager \ # Check for log message that the override is loaded -if ! grep -q "flit_core: new toplevel dependency flit_core<2.0.1 resolves to 2.0rc3" "$OUTDIR/bootstrap.log"; then +if ! grep -q "flit_core-2.0rc3: new toplevel dependency flit_core<2.0.1 resolves to 2.0rc3" "$OUTDIR/bootstrap.log"; then echo "FAIL: flit_core did not resolve to 2.0rc3 $OUTDIR/bootstrap.log" 1>&2 pass=false fi diff --git a/pyproject.toml b/pyproject.toml index adf93290e..0add06187 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -293,6 +293,7 @@ features = ["docs"] build = "sphinx-build -M html docs docs/_build -j auto --keep-going {args:--fail-on-warning --fresh-env -n}" [tool.pytest.ini_options] +testpaths = ["tests"] markers = [ "network: test that need network access", ] diff --git a/src/fromager/bootstrap_requirement_resolver.py b/src/fromager/bootstrap_requirement_resolver.py index 3cc2abdb2..1f12a8a0f 100644 --- a/src/fromager/bootstrap_requirement_resolver.py +++ b/src/fromager/bootstrap_requirement_resolver.py @@ -118,6 +118,15 @@ def resolve( pbi = self.ctx.package_build_info(req) pre_built = pbi.pre_built + # Check session cache BEFORE the git URL guard so that background + # threads can retrieve pre-cached git URL resolutions (populated by + # Bootstrapper.resolve_versions() on the main thread before bootstrap() + # is called) without hitting the ValueError. + cached_result = self.get_cached_resolution(req, pre_built) + if cached_result is not None: + logger.debug(f"resolved {req} from cache") + return list(cached_result) if return_all_versions else [cached_result[0]] + # Git URL source resolution must be handled by Bootstrapper. # But git URL prebuilt resolution is allowed - we look for wheels on PyPI # (test mode fallback uses this path). @@ -146,6 +155,32 @@ def resolve( return matching return [matching[0]] + def get_cached_resolution( + self, + req: Requirement, + pre_built: bool, + ) -> list[tuple[str, Version]] | None: + """Return cached matching versions if this requirement was already resolved. + + Returns ``None`` if the requirement has not been resolved yet, allowing + callers to distinguish between "no matching versions" and "not yet resolved". + + Used by background threads to retrieve pre-cached git URL resolutions + populated by the main thread before entering the parallel section. + + Args: + req: Package requirement + pre_built: Whether looking for prebuilt or source resolution + + Returns: + List of (url, version) tuples if previously resolved, None otherwise. + """ + rule_key = (str(req), pre_built) + with self._lock: + if rule_key in self._resolved_rules: + return self._get_matching_versions(req, pre_built) + return None + def _resolve_and_extend( self, req: Requirement, diff --git a/src/fromager/bootstrapper.py b/src/fromager/bootstrapper.py index c5ec9e729..165018370 100644 --- a/src/fromager/bootstrapper.py +++ b/src/fromager/bootstrapper.py @@ -1,5 +1,6 @@ from __future__ import annotations +import concurrent.futures import contextlib import dataclasses import datetime @@ -62,6 +63,27 @@ class SourceBuildResult: source_type: SourceType +@dataclasses.dataclass +class PreparedSourceData: + """Result of background I/O pre-fetching returned to the main thread. + + Fields are set in one of three combinations depending on the result type: + + - Source (no cache hit): only ``sdist_root_dir`` is set. + - Source (cache hit): both ``sdist_root_dir`` and ``cached_wheel_filename`` are set. + - Prebuilt wheel: both ``wheel_filename`` and ``unpack_dir`` are set. + """ + + # Source path: set after download+unpack OR cache hit + sdist_root_dir: pathlib.Path | None = None + # Source path: set when the result came from the wheel cache + cached_wheel_filename: pathlib.Path | None = None + # Prebuilt path: downloaded wheel file + wheel_filename: pathlib.Path | None = None + # Prebuilt path: unpack directory (created by mkdir) + unpack_dir: pathlib.Path | None = None + + # Valid failure types for test mode error recording FailureType = typing.Literal["resolution", "bootstrap", "hook", "dependency_extraction"] @@ -143,6 +165,248 @@ class WorkItem: build_backend_deps: set[Requirement] = dataclasses.field(default_factory=set) build_sdist_deps: set[Requirement] = dataclasses.field(default_factory=set) + # Background future: set by _push_items for RESOLVE and PREPARE_SOURCE items. + # None for all other phases or before _push_items has been called. + bg_future: concurrent.futures.Future[typing.Any] | None = dataclasses.field( + default=None, compare=False, repr=False + ) + + +def _create_unpack_dir( + work_dir: pathlib.Path, + req: Requirement, + resolved_version: Version, +) -> pathlib.Path: + unpack_dir = work_dir / f"{req.name}-{resolved_version}" + unpack_dir.mkdir(parents=True, exist_ok=True) + return unpack_dir + + +def _unpack_metadata_from_wheel( + work_dir: pathlib.Path, + req: Requirement, + resolved_version: Version, + wheel_filename: pathlib.Path, +) -> pathlib.Path | None: + dist_name, dist_version, _, _ = wheels.extract_info_from_wheel_file( + req, wheel_filename + ) + unpack_dir = _create_unpack_dir(work_dir, req, resolved_version) + dist_filename = f"{dist_name}-{dist_version}" + metadata_dir = pathlib.Path(f"{dist_filename}.dist-info") + req_filenames: list[str] = [ + dependencies.BUILD_BACKEND_REQ_FILE_NAME, + dependencies.BUILD_SDIST_REQ_FILE_NAME, + dependencies.BUILD_SYSTEM_REQ_FILE_NAME, + ] + try: + with zipfile.ZipFile(wheel_filename) as archive: + for filename in req_filenames: + zipinfo = archive.getinfo( + str(metadata_dir / f"{wheels.FROMAGER_BUILD_REQ_PREFIX}-{filename}") + ) + if os.path.isabs(zipinfo.filename) or ".." in zipinfo.filename: + raise ValueError(f"Unsafe path in wheel: {zipinfo.filename}") + zipinfo.filename = filename + output_file = archive.extract(zipinfo, unpack_dir) + logger.info(f"extracted {output_file}") + logger.info(f"extracted build requirements from wheel into {unpack_dir}") + return unpack_dir + except Exception as e: + logger.info(f"could not extract build requirements from wheel: {e}") + for filename in req_filenames: + unpack_dir.joinpath(filename).unlink(missing_ok=True) + return None + + +def _look_for_existing_wheel( + ctx: context.WorkContext, + req: Requirement, + resolved_version: Version, + search_in: pathlib.Path, +) -> tuple[pathlib.Path | None, pathlib.Path | None]: + pbi = ctx.package_build_info(req) + expected_build_tag = pbi.build_tag(resolved_version) + logger.info( + f"looking for existing wheel for version {resolved_version} with build tag {expected_build_tag} in {search_in}" + ) + wheel_filename = finders.find_wheel( + downloads_dir=search_in, + req=req, + dist_version=str(resolved_version), + build_tag=expected_build_tag, + ) + if not wheel_filename: + return None, None + _, _, build_tag, _ = wheels.extract_info_from_wheel_file(req, wheel_filename) + if expected_build_tag and expected_build_tag != build_tag: + logger.info( + f"found wheel for {resolved_version} in {wheel_filename} but build tag does not match. Got {build_tag} but expected {expected_build_tag}" + ) + return None, None + logger.info(f"found existing wheel {wheel_filename}") + metadata_dir = _unpack_metadata_from_wheel( + ctx.work_dir, req, resolved_version, wheel_filename + ) + return wheel_filename, metadata_dir + + +def _download_wheel_from_cache( + ctx: context.WorkContext, + cache_wheel_server_url: str | None, + req: Requirement, + resolved_version: Version, +) -> tuple[pathlib.Path | None, pathlib.Path | None]: + if not cache_wheel_server_url: + return None, None + logger.info(f"checking if wheel was already uploaded to {cache_wheel_server_url}") + try: + pinned_req = Requirement(f"{req.name}=={resolved_version}") + provider = finders.PyPICacheProvider( + cache_server_url=cache_wheel_server_url, + constraints=ctx.constraints, + ) + results = resolver.find_all_matching_from_provider(provider, pinned_req) + wheel_url, _ = results[0] + wheelfile_name = pathlib.Path(urlparse(wheel_url).path) + pbi = ctx.package_build_info(req) + expected_build_tag = pbi.build_tag(resolved_version) + logger.info(f"has expected build tag {expected_build_tag}") + changelogs = pbi.get_changelog(resolved_version) + logger.debug(f"has change logs {changelogs}") + + _, _, build_tag, _ = wheels.extract_info_from_wheel_file(req, wheelfile_name) + if expected_build_tag and expected_build_tag != build_tag: + logger.info( + f"found wheel for {resolved_version} in cache but build tag does not match. Got {build_tag} but expected {expected_build_tag}" + ) + return None, None + + cached_wheel = wheels.download_wheel( + req=req, wheel_url=wheel_url, output_directory=ctx.wheels_downloads + ) + if cache_wheel_server_url != ctx.wheel_server_url: + server.update_wheel_mirror(ctx) + logger.info("found built wheel on cache server") + unpack_dir = _unpack_metadata_from_wheel( + ctx.work_dir, req, resolved_version, cached_wheel + ) + return cached_wheel, unpack_dir + except ResolverException: + logger.info( + f"did not find wheel for {resolved_version} in {cache_wheel_server_url}" + ) + return None, None + except requests.exceptions.RequestException as err: + logger.warning( + f"network error checking wheel cache for {resolved_version} " + f"at {cache_wheel_server_url}: {err}" + ) + return None, None + except Exception as err: + logger.warning( + f"unexpected error checking wheel cache for {resolved_version} " + f"at {cache_wheel_server_url}: {err}" + ) + return None, None + + +def _find_cached_wheel( + ctx: context.WorkContext, + cache_wheel_server_url: str | None, + req: Requirement, + resolved_version: Version, +) -> tuple[pathlib.Path | None, pathlib.Path | None]: + """Look for cached wheel in 3 locations (thread-safe, no Bootstrapper state). + + Checks for cached wheels in order: + 1. wheels_build directory (previously built) + 2. wheels_downloads directory (previously downloaded) + 3. Cache server (remote cache) + + Returns: + Tuple of (cached_wheel_filename, unpacked_cached_wheel). + Both None if no cache hit. + """ + cached_wheel, unpacked = _look_for_existing_wheel( + ctx, req, resolved_version, ctx.wheels_build + ) + if cached_wheel: + return cached_wheel, unpacked + + cached_wheel, unpacked = _look_for_existing_wheel( + ctx, req, resolved_version, ctx.wheels_downloads + ) + if cached_wheel: + return cached_wheel, unpacked + + cached_wheel, unpacked = _download_wheel_from_cache( + ctx, cache_wheel_server_url, req, resolved_version + ) + if cached_wheel: + return cached_wheel, unpacked + + return None, None + + +def _bg_resolve( + bg_resolver: bootstrap_requirement_resolver.BootstrapRequirementResolver, + req: Requirement, + req_type: RequirementType, + parent_req: Requirement | None, + return_all_versions: bool, +) -> list[tuple[str, Version]]: + """Background-safe resolution: no Bootstrapper state accessed.""" + logger.info(f"{BootstrapPhase.RESOLVE} for {req_type} requirement") + return bg_resolver.resolve( + req=req, + req_type=req_type, + parent_req=parent_req, + return_all_versions=return_all_versions, + ) + + +def _bg_prepare_source( + ctx: context.WorkContext, + cache_wheel_server_url: str | None, + req: Requirement, + resolved_version: Version, + source_url: str, +) -> PreparedSourceData: + """Background-safe source download+unpack: no Bootstrapper state accessed.""" + logger.info("preparing source") + cached_wheel, unpacked = _find_cached_wheel( + ctx, cache_wheel_server_url, req, resolved_version + ) + if unpacked is not None: + return PreparedSourceData( + sdist_root_dir=unpacked / unpacked.stem, + cached_wheel_filename=cached_wheel, + ) + source_filename = sources.download_source( + ctx=ctx, req=req, version=resolved_version, download_url=source_url + ) + sdist_root_dir = sources.prepare_source( + ctx=ctx, req=req, source_filename=source_filename, version=resolved_version + ) + return PreparedSourceData(sdist_root_dir=sdist_root_dir) + + +def _bg_prepare_prebuilt( + ctx: context.WorkContext, + req: Requirement, + req_type: RequirementType, + resolved_version: Version, + wheel_url: str, +) -> PreparedSourceData: + """Background-safe prebuilt download: no Bootstrapper state accessed.""" + logger.info(f"using pre-built wheel for {req_type} requirement") + wheel_filename = wheels.download_wheel(req, wheel_url, ctx.wheels_prebuilt) + unpack_dir = ctx.work_dir / f"{req.name}-{resolved_version}" + unpack_dir.mkdir(parents=True, exist_ok=True) + server.update_wheel_mirror(ctx) + return PreparedSourceData(wheel_filename=wheel_filename, unpack_dir=unpack_dir) + class Bootstrapper: def __init__( @@ -154,6 +418,7 @@ def __init__( sdist_only: bool = False, test_mode: bool = False, multiple_versions: bool = False, + num_bg_threads: int = max(1, (os.cpu_count() or 2) // 2), ) -> None: if test_mode and sdist_only: raise ValueError( @@ -168,6 +433,12 @@ def __init__( self.test_mode = test_mode self.multiple_versions = multiple_versions self.why: list[tuple[RequirementType, Requirement, Version]] = [] + self._num_bg_threads = max(1, num_bg_threads) + self._bg_pool: concurrent.futures.ThreadPoolExecutor | None = ( + concurrent.futures.ThreadPoolExecutor( + max_workers=self._num_bg_threads, thread_name_prefix="fromager-bg" + ) + ) # Delegate resolution to BootstrapRequirementResolver self._resolver = bootstrap_requirement_resolver.BootstrapRequirementResolver( @@ -228,6 +499,7 @@ def resolve_and_add_top_level( results = self.resolve_versions( req=req, req_type=RequirementType.TOP_LEVEL, + parent_req=None, return_all_versions=self.multiple_versions, ) if self.multiple_versions: @@ -275,6 +547,7 @@ def resolve_versions( self, req: Requirement, req_type: RequirementType, + parent_req: Requirement | None = None, return_all_versions: bool = False, ) -> list[tuple[str, Version]]: """Resolve version(s) of a requirement. @@ -288,6 +561,8 @@ def resolve_versions( Args: req: Package requirement to resolve req_type: Type of requirement + parent_req: Explicit parent requirement from dependency chain. + Callers must pass this explicitly; do not read ``self.why`` here. return_all_versions: If True, return all matching versions. If False, return only highest version. @@ -317,7 +592,6 @@ def resolve_versions( return result # Git URLs always return single version # Delegate to RequirementResolver - parent_req = self.why[-1][1] if self.why else None return self._resolver.resolve( req=req, req_type=req_type, @@ -377,15 +651,15 @@ def bootstrap(self, req: Requirement, req_type: RequirementType) -> None: # Single RESOLVE item — resolution, version expansion, and error # handling all happen inside the loop via _phase_resolve. - stack: list[WorkItem] = [ - WorkItem( - req=req, - req_type=req_type, - phase=BootstrapPhase.RESOLVE, - why_snapshot=list(self.why), - parent=parent, - ) - ] + initial_item = WorkItem( + req=req, + req_type=req_type, + phase=BootstrapPhase.RESOLVE, + why_snapshot=list(self.why), + parent=parent, + ) + stack: list[WorkItem] = [] + self._push_items(stack, [initial_item]) # Main iterative DFS loop while stack: @@ -393,7 +667,10 @@ def bootstrap(self, req: Requirement, req_type: RequirementType) -> None: item = stack.pop() self.why = list(item.why_snapshot) - with req_ctxvar_context(item.req), self._track_why(item): + with ( + req_ctxvar_context(item.req, item.resolved_version), + self._track_why(item), + ): try: new_items = self._dispatch_phase(item) except Exception as err: @@ -408,9 +685,10 @@ def bootstrap(self, req: Requirement, req_type: RequirementType) -> None: if not new_items: self.progressbar.update() - # Phase handlers return [continuation, *new_deps] so extend() - # naturally puts new deps on top of the stack (processed first). - stack.extend(new_items) + # Phase handlers return [continuation, *new_deps] so _push_items + # naturally puts new deps on top of the stack (processed first) + # and submits background tasks for them. + self._push_items(stack, new_items) # Restore why stack for the caller self.why = saved_why @@ -637,54 +915,17 @@ def _download_prebuilt( resolved_version: Version, wheel_url: str, ) -> tuple[pathlib.Path, pathlib.Path]: - logger.info(f"{req_type} requirement {req} uses a pre-built wheel") + logger.info( + f"{BootstrapPhase.PREPARE_SOURCE} using pre-built wheel for {req_type} requirement" + ) wheel_filename = wheels.download_wheel(req, wheel_url, self.ctx.wheels_prebuilt) - unpack_dir = self._create_unpack_dir(req, resolved_version) + unpack_dir = _create_unpack_dir(self.ctx.work_dir, req, resolved_version) # Update the wheel mirror so pre-built wheels are indexed # and available to subsequent builds that need them as dependencies server.update_wheel_mirror(self.ctx) return (wheel_filename, unpack_dir) - def _find_cached_wheel( - self, - req: Requirement, - resolved_version: Version, - ) -> tuple[pathlib.Path | None, pathlib.Path | None]: - """Look for cached wheel in 3 locations. - - Checks for cached wheels in order: - 1. wheels_build directory (previously built) - 2. wheels_downloads directory (previously downloaded) - 3. Cache server (remote cache) - - Returns: - Tuple of (cached_wheel_filename, unpacked_cached_wheel). - Both None if no cache hit. - """ - # Check if we have previously built a wheel and still have it on the - # local filesystem. - cached_wheel, unpacked = self._look_for_existing_wheel( - req, resolved_version, self.ctx.wheels_build - ) - if cached_wheel: - return cached_wheel, unpacked - - # Check if we have previously downloaded a wheel and still have it - # on the local filesystem. - cached_wheel, unpacked = self._look_for_existing_wheel( - req, resolved_version, self.ctx.wheels_downloads - ) - if cached_wheel: - return cached_wheel, unpacked - - # Look for a wheel on the cache server and download it if there is one. - cached_wheel, unpacked = self._download_wheel_from_cache(req, resolved_version) - if cached_wheel: - return cached_wheel, unpacked - - return None, None - def _get_install_dependencies( self, req: Requirement, @@ -881,143 +1122,6 @@ def _handle_test_mode_failure( # Return None to signal failure; bootstrap() will record via re-raised exception return None - def _look_for_existing_wheel( - self, - req: Requirement, - resolved_version: Version, - search_in: pathlib.Path, - ) -> tuple[pathlib.Path | None, pathlib.Path | None]: - pbi = self.ctx.package_build_info(req) - expected_build_tag = pbi.build_tag(resolved_version) - logger.info( - f"looking for existing wheel for version {resolved_version} with build tag {expected_build_tag} in {search_in}" - ) - wheel_filename = finders.find_wheel( - downloads_dir=search_in, - req=req, - dist_version=str(resolved_version), - build_tag=expected_build_tag, - ) - if not wheel_filename: - return None, None - - # Re-validate build tag from the actual wheel metadata because - # finders.find_wheel matches by filename prefix, which may not - # enforce an exact build tag match. - _, _, build_tag, _ = wheels.extract_info_from_wheel_file(req, wheel_filename) - if expected_build_tag and expected_build_tag != build_tag: - logger.info( - f"found wheel for {resolved_version} in {wheel_filename} but build tag does not match. Got {build_tag} but expected {expected_build_tag}" - ) - return None, None - - logger.info(f"found existing wheel {wheel_filename}") - metadata_dir = self._unpack_metadata_from_wheel( - req, resolved_version, wheel_filename - ) - return wheel_filename, metadata_dir - - def _download_wheel_from_cache( - self, req: Requirement, resolved_version: Version - ) -> tuple[pathlib.Path | None, pathlib.Path | None]: - if not self.cache_wheel_server_url: - return None, None - logger.info( - f"checking if wheel was already uploaded to {self.cache_wheel_server_url}" - ) - try: - pinned_req = Requirement(f"{req.name}=={resolved_version}") - provider = finders.PyPICacheProvider( - cache_server_url=self.cache_wheel_server_url, - constraints=self.ctx.constraints, - ) - results = resolver.find_all_matching_from_provider(provider, pinned_req) - wheel_url, _ = results[0] - wheelfile_name = pathlib.Path(urlparse(wheel_url).path) - pbi = self.ctx.package_build_info(req) - expected_build_tag = pbi.build_tag(resolved_version) - # Log the expected build tag for debugging - logger.info(f"has expected build tag {expected_build_tag}") - # Get changelogs for debug info - changelogs = pbi.get_changelog(resolved_version) - logger.debug(f"has change logs {changelogs}") - - _, _, build_tag, _ = wheels.extract_info_from_wheel_file( - req, wheelfile_name - ) - if expected_build_tag and expected_build_tag != build_tag: - logger.info( - f"found wheel for {resolved_version} in cache but build tag does not match. Got {build_tag} but expected {expected_build_tag}" - ) - return None, None - - cached_wheel = wheels.download_wheel( - req=req, wheel_url=wheel_url, output_directory=self.ctx.wheels_downloads - ) - if self.cache_wheel_server_url != self.ctx.wheel_server_url: - # Only update the local server if we actually downloaded - # something from a different server. - server.update_wheel_mirror(self.ctx) - logger.info("found built wheel on cache server") - unpack_dir = self._unpack_metadata_from_wheel( - req, resolved_version, cached_wheel - ) - return cached_wheel, unpack_dir - except ResolverException: - logger.info( - f"did not find wheel for {resolved_version} in {self.cache_wheel_server_url}" - ) - return None, None - except requests.exceptions.RequestException as err: - logger.warning( - f"network error checking wheel cache for {resolved_version} " - f"at {self.cache_wheel_server_url}: {err}" - ) - return None, None - except Exception as err: - logger.warning( - f"unexpected error checking wheel cache for {resolved_version} " - f"at {self.cache_wheel_server_url}: {err}" - ) - return None, None - - def _unpack_metadata_from_wheel( - self, req: Requirement, resolved_version: Version, wheel_filename: pathlib.Path - ) -> pathlib.Path | None: - dist_name, dist_version, _, _ = wheels.extract_info_from_wheel_file( - req, - wheel_filename, - ) - unpack_dir = self._create_unpack_dir(req, resolved_version) - dist_filename = f"{dist_name}-{dist_version}" - metadata_dir = pathlib.Path(f"{dist_filename}.dist-info") - req_filenames: list[str] = [ - dependencies.BUILD_BACKEND_REQ_FILE_NAME, - dependencies.BUILD_SDIST_REQ_FILE_NAME, - dependencies.BUILD_SYSTEM_REQ_FILE_NAME, - ] - try: - archive = zipfile.ZipFile(wheel_filename) - for filename in req_filenames: - zipinfo = archive.getinfo( - str(metadata_dir / f"{wheels.FROMAGER_BUILD_REQ_PREFIX}-{filename}") - ) - # Check for path traversal attempts - if os.path.isabs(zipinfo.filename) or ".." in zipinfo.filename: - raise ValueError(f"Unsafe path in wheel: {zipinfo.filename}") - zipinfo.filename = filename - output_file = archive.extract(zipinfo, unpack_dir) - logger.info(f"extracted {output_file}") - - logger.info(f"extracted build requirements from wheel into {unpack_dir}") - return unpack_dir - except Exception as e: - # implies that the wheel server hosted non-fromager built wheels - logger.info(f"could not extract build requirements from wheel: {e}") - for filename in req_filenames: - unpack_dir.joinpath(filename).unlink(missing_ok=True) - return None - def _resolve_version_from_git_url(self, req: Requirement) -> tuple[str, Version]: """Resolve source path and version from a ``git+`` URL. @@ -1175,13 +1279,6 @@ def _get_version_from_package_metadata( metadata = dependencies.parse_metadata(metadata_filename, validate=False) return metadata.version - def _create_unpack_dir( - self, req: Requirement, resolved_version: Version - ) -> pathlib.Path: - unpack_dir = self.ctx.work_dir / f"{req.name}-{resolved_version}" - unpack_dir.mkdir(parents=True, exist_ok=True) - return unpack_dir - def _add_to_graph( self, req: Requirement, @@ -1325,6 +1422,88 @@ def serialize(item: WorkItem) -> dict[str, typing.Any]: # ---- Iterative bootstrap: phase handlers and helpers ---- + def _push_items(self, stack: list[WorkItem], items: list[WorkItem]) -> None: + """Push items onto the stack and submit background tasks in LIFO order. + + Submits the item that will be processed first (top of stack) to the + background pool first, maximising overlap between background I/O and + main-thread serial work. + """ + stack.extend(items) + if self._bg_pool is not None: + for item in reversed(items): + bg_work = self._get_background_work(item) + if bg_work is not None: + item.bg_future = self._bg_pool.submit(bg_work) + + def _get_background_work( + self, item: WorkItem + ) -> typing.Callable[[], typing.Any] | None: + """Return a zero-argument callable for background submission, or None. + + Uses closures that capture local variables rather than ``self``, so + the returned callable cannot access mutable ``Bootstrapper`` state. + Each closure sets up ``req_ctxvar_context`` so log messages include + the package name (and version when known). + """ + if item.phase == BootstrapPhase.RESOLVE: + bg_resolver = self._resolver + req = item.req + req_type = item.req_type + parent_req = item.why_snapshot[-1][1] if item.why_snapshot else None + return_all = self.multiple_versions + + def do_resolve() -> list[tuple[str, Version]]: + with req_ctxvar_context(req): + return _bg_resolve( + bg_resolver, req, req_type, parent_req, return_all + ) + + return do_resolve + + if item.phase == BootstrapPhase.PREPARE_SOURCE: + assert item.resolved_version is not None + assert item.source_url is not None + ctx = self.ctx + cache_wheel_server_url = self.cache_wheel_server_url + req = item.req + req_type = item.req_type + resolved_version = item.resolved_version + source_url = item.source_url + + if item.pbi_pre_built: + + def do_prepare_prebuilt() -> PreparedSourceData: + with req_ctxvar_context(req, resolved_version): + return _bg_prepare_prebuilt( + ctx, req, req_type, resolved_version, source_url + ) + + return do_prepare_prebuilt + + def do_prepare_source() -> PreparedSourceData: + with req_ctxvar_context(req, resolved_version): + return _bg_prepare_source( + ctx, cache_wheel_server_url, req, resolved_version, source_url + ) + + return do_prepare_source + + return None + + def _drain_background_pool(self) -> None: + """Drain all in-flight background tasks and recreate the pool. + + Used as an exclusive-build barrier: ensures all background I/O completes + before an exclusive build starts. ``cancel_futures=False`` guarantees + every submitted task runs to completion. + """ + if self._bg_pool is not None: + self._bg_pool.shutdown(wait=True, cancel_futures=False) + self._bg_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=self._num_bg_threads, thread_name_prefix="fromager-bg" + ) + def _create_unresolved_work_items( self, deps: typing.Iterable[Requirement], @@ -1361,11 +1540,10 @@ def _phase_resolve(self, item: WorkItem) -> list[WorkItem]: One START-phase item per resolved version that needs building. Empty list if all versions are already cached. """ - resolved_versions = self.resolve_versions( - item.req, - item.req_type, - return_all_versions=self.multiple_versions, - ) + assert item.bg_future is not None + # bg_future.result() blocks until the background resolution completes, + # then returns the result or re-raises any exception from the background. + resolved_versions = item.bg_future.result() if not resolved_versions: raise RuntimeError(f"Could not resolve any versions for {item.req}") @@ -1373,7 +1551,9 @@ def _phase_resolve(self, item: WorkItem) -> list[WorkItem]: logger.info(f"resolved {len(resolved_versions)} version(s) for {item.req}") filtered: list[tuple[str, Version]] = [] for source_url, version in resolved_versions: - cached_wheel, _ = self._find_cached_wheel(item.req, version) + cached_wheel, _ = _find_cached_wheel( + self.ctx, self.cache_wheel_server_url, item.req, version + ) if cached_wheel: logger.info( f"{item.req.name}=={version}: wheel already cached " @@ -1459,6 +1639,9 @@ def _phase_start(self, item: WorkItem) -> list[WorkItem]: def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: """PREPARE_SOURCE phase: download source or prebuilt, get build system deps. + Uses background I/O result from ``item.bg_future`` when available, + falling back to inline I/O otherwise. + Returns: Prebuilt: [item] advanced to PROCESS_INSTALL_DEPS (skip build phases). Source: [item advanced to PREPARE_BUILD, *build_system_dep_items]. @@ -1466,6 +1649,11 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: assert item.resolved_version is not None assert item.source_url is not None + # bg_future is always set for PREPARE_SOURCE items (see _push_items / _get_background_work). + # bg_future.result() blocks until done and re-raises any background exception. + assert item.bg_future is not None + prepared: PreparedSourceData = item.bg_future.result() + constraint = self.ctx.constraints.get_constraint(item.req.name) if constraint: logger.info( @@ -1474,16 +1662,13 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: ) if item.pbi_pre_built: - wheel_filename, unpack_dir = self._download_prebuilt( - req=item.req, - req_type=item.req_type, - resolved_version=item.resolved_version, - wheel_url=item.source_url, - ) + # Background task already downloaded the prebuilt wheel + assert prepared.wheel_filename is not None + assert prepared.unpack_dir is not None item.build_result = SourceBuildResult( - wheel_filename=wheel_filename, + wheel_filename=prepared.wheel_filename, sdist_filename=None, - unpack_dir=unpack_dir, + unpack_dir=prepared.unpack_dir, sdist_root_dir=None, build_env=None, source_type=SourceType.PREBUILT, @@ -1491,27 +1676,10 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: item.phase = BootstrapPhase.PROCESS_INSTALL_DEPS return [item] - # Source build path - cached_wheel, unpacked = self._find_cached_wheel( - item.req, item.resolved_version - ) - item.cached_wheel_filename = cached_wheel - - if not unpacked: - logger.debug("no cached wheel, downloading sources") - source_filename = self._download_source( - req=item.req, - resolved_version=item.resolved_version, - source_url=item.source_url, - ) - sdist_root_dir = self._prepare_source( - req=item.req, - resolved_version=item.resolved_version, - source_filename=source_filename, - ) - else: - logger.debug(f"have cached wheel in {unpacked}") - sdist_root_dir = unpacked / unpacked.stem + # Source build path: background task already downloaded and prepared the source + assert prepared.sdist_root_dir is not None + sdist_root_dir = prepared.sdist_root_dir + item.cached_wheel_filename = prepared.cached_wheel_filename assert sdist_root_dir is not None @@ -1602,6 +1770,16 @@ def _phase_build(self, item: WorkItem) -> list[WorkItem]: assert item.build_env is not None assert item.sdist_root_dir is not None + # Drain all in-flight background I/O before an exclusive build starts. + # This ensures no parallel background tasks interfere with a build that + # must run without resource contention. + pbi = self.ctx.package_build_info(item.req) + if pbi.exclusive_build: + logger.info( + "%s requires exclusive build, draining background pool", item.req + ) + self._drain_background_pool() + # Install backend+sdist deps if disjoint from system deps remaining_deps = item.build_backend_deps | item.build_sdist_deps if remaining_deps.isdisjoint(item.build_system_deps): @@ -1857,6 +2035,13 @@ def finalize(self) -> int: 0 if all packages built successfully (or not in test/multiple versions mode) 1 if any packages failed in test mode """ + if self._bg_pool is not None: + # cancel_futures=True cancels pending (not-yet-started) futures immediately + # so we don't block waiting for work whose result will never be used. + # Already-running futures still complete naturally. + self._bg_pool.shutdown(wait=True, cancel_futures=True) + self._bg_pool = None + if self.multiple_versions and self._failed_versions: self._log_failed_versions_table() @@ -1882,3 +2067,16 @@ def finalize(self) -> int: ", ".join(failed_names), ) return 1 + + def __enter__(self) -> Bootstrapper: + return self + + def __exit__( + self, + exc_type: type | None, + exc_val: BaseException | None, + exc_tb: object, + ) -> None: + if self._bg_pool is not None: + self._bg_pool.shutdown(wait=False, cancel_futures=True) + self._bg_pool = None diff --git a/src/fromager/commands/bootstrap.py b/src/fromager/commands/bootstrap.py index 9baeb36ce..89ffea436 100644 --- a/src/fromager/commands/bootstrap.py +++ b/src/fromager/commands/bootstrap.py @@ -1,4 +1,5 @@ import logging +import os import time import typing from datetime import timedelta @@ -117,6 +118,14 @@ def _get_requirements_from_args( default=None, help="Reject package versions published more than this many days ago.", ) +@click.option( + "--bg-threads", + "num_bg_threads", + type=click.IntRange(min=1), + default=max(1, (os.cpu_count() or 2) // 2), + show_default=True, + help="Number of background threads for parallel I/O pre-fetching (min 1).", +) @click.argument("toplevel", nargs=-1) @click.pass_obj def bootstrap( @@ -129,6 +138,7 @@ def bootstrap( test_mode: bool, multiple_versions: bool, max_release_age: int | None, + num_bg_threads: int, toplevel: list[str], ) -> None: """Compute and build the dependencies of a set of requirements recursively @@ -136,6 +146,8 @@ def bootstrap( TOPLEVEL is a requirements specification, including a package name and optional version constraints. + .. versionadded:: 0.89.0 + ``--bg-threads`` option for parallel I/O pre-fetching. """ logger.info(f"cache wheel server url: {cache_wheel_server_url}") @@ -190,7 +202,7 @@ def bootstrap( server.start_wheel_server(wkctx) with progress.progress_context(total=len(to_build * 2)) as progressbar: - bt = bootstrapper.Bootstrapper( + with bootstrapper.Bootstrapper( wkctx, progressbar, prev_graph, @@ -198,35 +210,35 @@ def bootstrap( sdist_only=sdist_only, test_mode=test_mode, multiple_versions=multiple_versions, - ) - - # Pre-resolution phase: Resolve all top-level dependencies before recursive - # bootstrapping begins. Test-mode error handling is in Bootstrapper. - # Note: We don't use try/finally here because: - # - In test-mode: exceptions are caught inside resolve_and_add_top_level() - # - In normal mode: exceptions should propagate with context preserved for logging - logger.info("resolving top-level dependencies before building") - resolved_reqs: list[Requirement] = [] - for req in to_build: - token = requirement_ctxvar.set(req) - result = bt.resolve_and_add_top_level(req) - if result is not None: - resolved_reqs.append(req) - # If result is None, test_mode or multiple_versions recorded the failure - requirement_ctxvar.reset(token) - - # Bootstrap only packages that were successfully resolved - # Note: Same pattern - no try/finally to preserve context for error logging - for req in resolved_reqs: - token = requirement_ctxvar.set(req) - bt.bootstrap(req, requirements_file.RequirementType.TOP_LEVEL) - progressbar.update() - requirement_ctxvar.reset(token) - - # Finalize test mode and check for failures - exit_code = bt.finalize() - if exit_code != 0: - raise SystemExit(exit_code) + num_bg_threads=num_bg_threads, + ) as bt: + # Pre-resolution phase: Resolve all top-level dependencies before recursive + # bootstrapping begins. Test-mode error handling is in Bootstrapper. + # Note: We don't use try/finally here because: + # - In test-mode: exceptions are caught inside resolve_and_add_top_level() + # - In normal mode: exceptions should propagate with context preserved for logging + logger.info("resolving top-level dependencies before building") + resolved_reqs: list[Requirement] = [] + for req in to_build: + token = requirement_ctxvar.set(req) + result = bt.resolve_and_add_top_level(req) + if result is not None: + resolved_reqs.append(req) + # If result is None, test_mode or multiple_versions recorded the failure + requirement_ctxvar.reset(token) + + # Bootstrap only packages that were successfully resolved + # Note: Same pattern - no try/finally to preserve context for error logging + for req in resolved_reqs: + token = requirement_ctxvar.set(req) + bt.bootstrap(req, requirements_file.RequirementType.TOP_LEVEL) + progressbar.update() + requirement_ctxvar.reset(token) + + # Finalize test mode and check for failures + exit_code = bt.finalize() + if exit_code != 0: + raise SystemExit(exit_code) constraints_filename = wkctx.work_dir / "constraints.txt" if skip_constraints: @@ -514,6 +526,14 @@ def write_constraints_file( default=None, help="Reject package versions published more than this many days ago.", ) +@click.option( + "--bg-threads", + "num_bg_threads", + type=click.IntRange(min=1), + default=max(1, (os.cpu_count() or 2) // 2), + show_default=True, + help="Number of background threads for parallel I/O pre-fetching (min 1).", +) @click.argument("toplevel", nargs=-1) @click.pass_obj @click.pass_context @@ -529,6 +549,7 @@ def bootstrap_parallel( max_workers: int | None, multiple_versions: bool, max_release_age: int | None, + num_bg_threads: int, toplevel: list[str], ) -> None: """Bootstrap and build-parallel @@ -557,6 +578,7 @@ def bootstrap_parallel( skip_constraints=skip_constraints, multiple_versions=multiple_versions, max_release_age=max_release_age, + num_bg_threads=num_bg_threads, toplevel=toplevel, ) diff --git a/src/fromager/commands/lint_requirements.py b/src/fromager/commands/lint_requirements.py index 776f759cc..8052874d5 100644 --- a/src/fromager/commands/lint_requirements.py +++ b/src/fromager/commands/lint_requirements.py @@ -47,61 +47,60 @@ def lint_requirements( failures: list[str] = [] # Create bootstrapper for requirement resolution - bt = bootstrapper.Bootstrapper( + with bootstrapper.Bootstrapper( ctx=wkctx, progressbar=progress.Progressbar(None), prev_graph=None, cache_wheel_server_url=None, sdist_only=True, - ) + ) as bt: + for path in input_files_path: + is_constraints: bool = path.name.endswith("constraints.txt") + parsed_lines = requirements_file.parse_requirements_file(path) + unique_entries: dict[tuple[str, str], Requirement] = {} + for line in parsed_lines: + try: + requirement = Requirement(line) + marker_key = str(requirement.marker) if requirement.marker else "" + unique_key = (requirement.name, marker_key) - for path in input_files_path: - is_constraints: bool = path.name.endswith("constraints.txt") - parsed_lines = requirements_file.parse_requirements_file(path) - unique_entries: dict[tuple[str, str], Requirement] = {} - for line in parsed_lines: - try: - requirement = Requirement(line) - marker_key = str(requirement.marker) if requirement.marker else "" - unique_key = (requirement.name, marker_key) + if is_constraints: + if unique_key in unique_entries: + raise InvalidRequirement( + f"Duplicate entry, first found: {unique_entries[unique_key]}" + ) + unique_entries[unique_key] = requirement + if requirement.extras: + raise InvalidRequirement( + f"{requirement.name}: Constraints files cannot contain extra dependencies" + ) + if not requirement.specifier: + raise InvalidRequirement( + f"{requirement.name}: Constraints must have a version specifier" + ) + except InvalidRequirement as err: + msg = f"{path}: {line}: {err}" + logger.error(msg) + failures.append(msg) - if is_constraints: - if unique_key in unique_entries: - raise InvalidRequirement( - f"Duplicate entry, first found: {unique_entries[unique_key]}" - ) - unique_entries[unique_key] = requirement - if requirement.extras: - raise InvalidRequirement( - f"{requirement.name}: Constraints files cannot contain extra dependencies" + # Resolve the requirement to ensure it can be found + # Skip resolution for constraints files as they should only specify versions + if resolve_requirements and not is_constraints: + token = requirement_ctxvar.set(requirement) + try: + results = bt.resolve_versions( + req=requirement, + req_type=RequirementType.TOP_LEVEL, ) - if not requirement.specifier: - raise InvalidRequirement( - f"{requirement.name}: Constraints must have a version specifier" + _, version = results[0] + logger.info(f"{requirement} resolves to {version}") + except Exception as err: + logger.error( + f"{path}: {line}: Failed to resolve requirement: {err}" ) - except InvalidRequirement as err: - msg = f"{path}: {line}: {err}" - logger.error(msg) - failures.append(msg) - - # Resolve the requirement to ensure it can be found - # Skip resolution for constraints files as they should only specify versions - if resolve_requirements and not is_constraints: - token = requirement_ctxvar.set(requirement) - try: - results = bt.resolve_versions( - req=requirement, - req_type=RequirementType.TOP_LEVEL, - ) - _, version = results[0] - logger.info(f"{requirement} resolves to {version}") - except Exception as err: - logger.error( - f"{path}: {line}: Failed to resolve requirement: {err}" - ) - failures.append(f"{path}: {line}: {err}") - finally: - requirement_ctxvar.reset(token) + failures.append(f"{path}: {line}: {err}") + finally: + requirement_ctxvar.reset(token) if failures: click.echo("Validation error:", err=True) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 944926492..9985b08ee 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -560,6 +560,8 @@ def test_multiple_versions_auto_disables_constraints( mock_progress.return_value.__enter__.return_value = Mock() mock_progress.return_value.__exit__.return_value = None mock_bt_instance = Mock() + mock_bt_instance.__enter__ = Mock(return_value=mock_bt_instance) + mock_bt_instance.__exit__ = Mock(return_value=None) mock_bt_instance.resolve_and_add_top_level.return_value = ("url", Version("1.0")) mock_bt_instance.finalize.return_value = 0 mock_bootstrapper.return_value = mock_bt_instance @@ -608,6 +610,8 @@ def test_multiple_versions_with_skip_constraints_no_duplicate_log( mock_progress.return_value.__enter__.return_value = Mock() mock_progress.return_value.__exit__.return_value = None mock_bt_instance = Mock() + mock_bt_instance.__enter__ = Mock(return_value=mock_bt_instance) + mock_bt_instance.__exit__ = Mock(return_value=None) mock_bt_instance.resolve_and_add_top_level.return_value = ("url", Version("1.0")) mock_bt_instance.finalize.return_value = 0 mock_bootstrapper.return_value = mock_bt_instance @@ -657,6 +661,8 @@ def test_without_multiple_versions_constraints_not_disabled( mock_progress.return_value.__enter__.return_value = Mock() mock_progress.return_value.__exit__.return_value = None mock_bt_instance = Mock() + mock_bt_instance.__enter__ = Mock(return_value=mock_bt_instance) + mock_bt_instance.__exit__ = Mock(return_value=None) mock_bt_instance.resolve_and_add_top_level.return_value = ("url", Version("1.0")) mock_bt_instance.finalize.return_value = 0 mock_bootstrapper.return_value = mock_bt_instance @@ -720,6 +726,8 @@ def test_max_release_age_sets_context( mock_progress.return_value.__enter__.return_value = Mock() mock_progress.return_value.__exit__.return_value = None mock_bt_instance = Mock() + mock_bt_instance.__enter__ = Mock(return_value=mock_bt_instance) + mock_bt_instance.__exit__ = Mock(return_value=None) mock_bt_instance.resolve_and_add_top_level.return_value = ("url", Version("1.0")) mock_bt_instance.finalize.return_value = 0 mock_bootstrapper.return_value = mock_bt_instance diff --git a/tests/test_bootstrap_test_mode.py b/tests/test_bootstrap_test_mode.py index 150261f56..ba1126110 100644 --- a/tests/test_bootstrap_test_mode.py +++ b/tests/test_bootstrap_test_mode.py @@ -260,10 +260,11 @@ def test_resolution_failure_recorded_in_test_mode( bt = bootstrapper.Bootstrapper(ctx=tmp_context, test_mode=True) req = Requirement("nonexistent-package>=1.0") - # Mock resolve_versions to raise an exception + # Mock _resolver.resolve to raise an exception (background tasks call + # _resolver.resolve directly, not resolve_versions) with mock.patch.object( - bt, - "resolve_versions", + bt._resolver, + "resolve", side_effect=RuntimeError("Version resolution failed"), ): # Should not raise in test mode @@ -286,10 +287,11 @@ def test_resolution_failure_raises_in_normal_mode( bt = bootstrapper.Bootstrapper(ctx=tmp_context, test_mode=False) req = Requirement("nonexistent-package>=1.0") - # Mock resolve_versions to raise an exception + # Mock _resolver.resolve to raise an exception (background tasks call + # _resolver.resolve directly, not resolve_versions) with mock.patch.object( - bt, - "resolve_versions", + bt._resolver, + "resolve", side_effect=RuntimeError("Version resolution failed"), ): with pytest.raises(RuntimeError, match="Version resolution failed"): diff --git a/tests/test_bootstrapper.py b/tests/test_bootstrapper.py index efde0adaa..2d16d926a 100644 --- a/tests/test_bootstrapper.py +++ b/tests/test_bootstrapper.py @@ -11,7 +11,7 @@ from packaging.version import Version from resolvelib.resolvers import ResolverException -from fromager import bootstrapper +from fromager import bootstrapper, log from fromager.context import WorkContext from fromager.requirements_file import RequirementType, SourceType @@ -225,7 +225,9 @@ def test_find_cached_wheel_returns_tuple(tmp_context: WorkContext) -> None: bt = bootstrapper.Bootstrapper(tmp_context) # Call method (will return None, None since no wheels exist) - result = bt._find_cached_wheel( + result = bootstrapper._find_cached_wheel( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-package"), resolved_version=Version("1.0.0"), ) @@ -395,7 +397,9 @@ def test_download_wheel_from_cache_bypasses_hooks( mock_find_all.side_effect = RuntimeError("no match") with patch("fromager.overrides.find_and_invoke") as mock_override: - result = bt._download_wheel_from_cache( + result = bootstrapper._download_wheel_from_cache( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-pkg"), resolved_version=Version("1.0.0"), ) @@ -435,7 +439,9 @@ def test_cache_lookup_resolver_exception_logs_info( side_effect=ResolverException("no matching version"), ), ): - result = bt._download_wheel_from_cache( + result = bootstrapper._download_wheel_from_cache( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-package"), resolved_version=Version("1.0.0"), ) @@ -465,7 +471,9 @@ def test_cache_lookup_request_exception_logs_warning( "fromager.resolver.find_all_matching_from_provider", side_effect=exc_class(exc_msg), ): - result = bt._download_wheel_from_cache( + result = bootstrapper._download_wheel_from_cache( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-package"), resolved_version=Version("1.0.0"), ) @@ -485,7 +493,9 @@ def test_cache_lookup_unexpected_exception_logs_warning( "fromager.resolver.find_all_matching_from_provider", side_effect=ValueError("unexpected parsing error"), ): - result = bt._download_wheel_from_cache( + result = bootstrapper._download_wheel_from_cache( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-package"), resolved_version=Version("1.0.0"), ) @@ -534,7 +544,9 @@ def test_cache_lookup_download_wheel_error_logs_warning( side_effect=exc_class(exc_msg), ), ): - result = bt._download_wheel_from_cache( + result = bootstrapper._download_wheel_from_cache( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-package"), resolved_version=Version("1.0.0"), ) @@ -548,7 +560,9 @@ def test_cache_lookup_no_cache_url_returns_none(tmp_context: WorkContext) -> Non bt = bootstrapper.Bootstrapper(tmp_context) bt.cache_wheel_server_url = "" - result = bt._download_wheel_from_cache( + result = bootstrapper._download_wheel_from_cache( + bt.ctx, + bt.cache_wheel_server_url, req=Requirement("test-package"), resolved_version=Version("1.0.0"), ) @@ -721,3 +735,91 @@ def counting_record(stack: list[bootstrapper.WorkItem]) -> None: bt.bootstrap(req=req, req_type=RequirementType.TOP_LEVEL) assert call_count["n"] >= 1 + + +def test_bg_prepare_source_log_prefix_includes_version( + tmp_context: WorkContext, + caplog: pytest.LogCaptureFixture, +) -> None: + """_bg_prepare_source log messages include name-version prefix when called with context.""" + old_factory = logging.getLogRecordFactory() + logging.setLogRecordFactory(log.FromagerLogRecord) + req = Requirement("mypkg==1.2.3") + version = Version("1.2.3") + + messages: list[str] = [] + try: + with ( + caplog.at_level(logging.INFO, logger="fromager.bootstrapper"), + patch( + "fromager.bootstrapper._find_cached_wheel", + return_value=(None, None), + ), + patch( + "fromager.sources.download_source", + return_value=pathlib.Path("mypkg-1.2.3.tar.gz"), + ), + patch( + "fromager.sources.prepare_source", + return_value=pathlib.Path(tmp_context.work_dir / "mypkg-1.2.3"), + ), + log.req_ctxvar_context(req, version), + ): + bootstrapper._bg_prepare_source( + ctx=tmp_context, + cache_wheel_server_url=None, + req=req, + resolved_version=version, + source_url="https://pkg.test/simple/mypkg/mypkg-1.2.3.tar.gz", + ) + # Collect messages while context vars are still set so getMessage() + # returns the prefixed form. + messages = [r.getMessage() for r in caplog.records] + finally: + logging.setLogRecordFactory(old_factory) + + for msg in messages: + assert msg.startswith("mypkg-1.2.3: "), ( + f"Expected 'mypkg-1.2.3: ' prefix, got: {msg!r}" + ) + + +def test_bg_prepare_prebuilt_log_prefix_includes_version( + tmp_context: WorkContext, + caplog: pytest.LogCaptureFixture, +) -> None: + """_bg_prepare_prebuilt log message includes name-version prefix when called with context.""" + old_factory = logging.getLogRecordFactory() + logging.setLogRecordFactory(log.FromagerLogRecord) + req = Requirement("mypkg==1.2.3") + version = Version("1.2.3") + + messages: list[str] = [] + try: + with ( + caplog.at_level(logging.INFO, logger="fromager.bootstrapper"), + patch( + "fromager.wheels.download_wheel", + return_value=pathlib.Path("mypkg-1.2.3-py3-none-any.whl"), + ), + patch("fromager.server.update_wheel_mirror"), + log.req_ctxvar_context(req, version), + ): + bootstrapper._bg_prepare_prebuilt( + ctx=tmp_context, + req=req, + req_type=RequirementType.INSTALL, + resolved_version=version, + wheel_url="https://pkg.test/simple/mypkg/mypkg-1.2.3-py3-none-any.whl", + ) + # Collect messages while context vars are still set so getMessage() + # returns the prefixed form. + messages = [r.getMessage() for r in caplog.records] + finally: + logging.setLogRecordFactory(old_factory) + + assert len(messages) >= 1 + for msg in messages: + assert msg.startswith("mypkg-1.2.3: "), ( + f"Expected 'mypkg-1.2.3: ' prefix, got: {msg!r}" + ) diff --git a/tests/test_bootstrapper_iterative.py b/tests/test_bootstrapper_iterative.py index 27692dc95..cfe598d8e 100644 --- a/tests/test_bootstrapper_iterative.py +++ b/tests/test_bootstrapper_iterative.py @@ -19,7 +19,9 @@ from __future__ import annotations +import concurrent.futures import pathlib +import typing from unittest.mock import Mock, call, patch import pytest @@ -33,6 +35,15 @@ from fromager.requirements_file import RequirementType, SourceType +def _make_resolved_future( + result: typing.Any, +) -> concurrent.futures.Future[typing.Any]: + """Return an already-completed Future carrying *result*.""" + future: concurrent.futures.Future[typing.Any] = concurrent.futures.Future() + future.set_result(result) + return future + + def _make_resolve_item( req: str = "testpkg", req_type: RequirementType = RequirementType.INSTALL, @@ -252,13 +263,11 @@ def test_single_version(self, tmp_context: WorkContext) -> None: item = _make_resolve_item() parent = (Requirement("parent"), Version("2.0")) item.parent = parent + item.bg_future = _make_resolved_future( + [("https://pypi.org/testpkg-1.0.tar.gz", Version("1.0"))] + ) - with patch.object( - bt, - "resolve_versions", - return_value=[("https://pypi.org/testpkg-1.0.tar.gz", Version("1.0"))], - ): - result = bt._phase_resolve(item) + result = bt._phase_resolve(item) assert len(result) == 1 assert result[0].phase == BootstrapPhase.START @@ -269,16 +278,14 @@ def test_single_version(self, tmp_context: WorkContext) -> None: def test_multiple_versions(self, tmp_context: WorkContext) -> None: bt = bootstrapper.Bootstrapper(tmp_context, multiple_versions=True) item = _make_resolve_item() - - with patch.object( - bt, - "resolve_versions", - return_value=[ + item.bg_future = _make_resolved_future( + [ ("https://pypi.org/testpkg-2.0.tar.gz", Version("2.0")), ("https://pypi.org/testpkg-1.0.tar.gz", Version("1.0")), - ], - ): - result = bt._phase_resolve(item) + ] + ) + + result = bt._phase_resolve(item) assert len(result) == 2 # Reversed so highest version ends up on top of stack (last element) @@ -288,22 +295,18 @@ def test_multiple_versions(self, tmp_context: WorkContext) -> None: def test_empty_resolution_raises(self, tmp_context: WorkContext) -> None: bt = bootstrapper.Bootstrapper(tmp_context) item = _make_resolve_item() + item.bg_future = _make_resolved_future([]) - with patch.object(bt, "resolve_versions", return_value=[]): - with pytest.raises(RuntimeError, match="Could not resolve"): - bt._phase_resolve(item) + with pytest.raises(RuntimeError, match="Could not resolve"): + bt._phase_resolve(item) def test_preserves_why_snapshot(self, tmp_context: WorkContext) -> None: bt = bootstrapper.Bootstrapper(tmp_context) snapshot = [(RequirementType.TOP_LEVEL, Requirement("root"), Version("1.0"))] item = _make_resolve_item(why_snapshot=list(snapshot)) + item.bg_future = _make_resolved_future([("url", Version("1.0"))]) - with patch.object( - bt, - "resolve_versions", - return_value=[("url", Version("1.0"))], - ): - result = bt._phase_resolve(item) + result = bt._phase_resolve(item) assert result[0].why_snapshot == snapshot @@ -313,24 +316,22 @@ def test_filters_cached_versions_in_multiple_versions_mode( """Cached versions are filtered out before creating START items.""" bt = bootstrapper.Bootstrapper(tmp_context, multiple_versions=True) item = _make_resolve_item() + item.bg_future = _make_resolved_future( + [ + ("url-3.0", Version("3.0")), + ("url-2.0", Version("2.0")), + ("url-1.0", Version("1.0")), + ] + ) - def mock_cache(req: Requirement, version: Version) -> tuple: + def mock_cache( + ctx: object, cache_url: object, req: Requirement, version: Version + ) -> tuple: if str(version) == "2.0": return (tmp_context.work_dir / "pkg-2.0-py3-none-any.whl", None) return (None, None) - with ( - patch.object( - bt, - "resolve_versions", - return_value=[ - ("url-3.0", Version("3.0")), - ("url-2.0", Version("2.0")), - ("url-1.0", Version("1.0")), - ], - ), - patch.object(bt, "_find_cached_wheel", side_effect=mock_cache), - ): + with patch("fromager.bootstrapper._find_cached_wheel", side_effect=mock_cache): result = bt._phase_resolve(item) assert len(result) == 2 @@ -341,22 +342,17 @@ def test_all_cached_keeps_highest_version(self, tmp_context: WorkContext) -> Non """If all versions are cached, keeps the highest for dependency discovery.""" bt = bootstrapper.Bootstrapper(tmp_context, multiple_versions=True) item = _make_resolve_item() + item.bg_future = _make_resolved_future( + [ + ("url-3.0", Version("3.0")), + ("url-2.0", Version("2.0")), + ("url-1.0", Version("1.0")), + ] + ) - with ( - patch.object( - bt, - "resolve_versions", - return_value=[ - ("url-3.0", Version("3.0")), - ("url-2.0", Version("2.0")), - ("url-1.0", Version("1.0")), - ], - ), - patch.object( - bt, - "_find_cached_wheel", - return_value=(tmp_context.work_dir / "cached.whl", None), - ), + with patch( + "fromager.bootstrapper._find_cached_wheel", + return_value=(tmp_context.work_dir / "cached.whl", None), ): result = bt._phase_resolve(item) @@ -369,15 +365,9 @@ def test_no_filtering_in_single_version_mode( """Cache filtering does not apply in single version mode.""" bt = bootstrapper.Bootstrapper(tmp_context, multiple_versions=False) item = _make_resolve_item() + item.bg_future = _make_resolved_future([("url-1.0", Version("1.0"))]) - with ( - patch.object( - bt, - "resolve_versions", - return_value=[("url-1.0", Version("1.0"))], - ), - patch.object(bt, "_find_cached_wheel") as mock_cache, - ): + with patch("fromager.bootstrapper._find_cached_wheel") as mock_cache: result = bt._phase_resolve(item) assert len(result) == 1 @@ -390,13 +380,24 @@ def test_empty_resolution_raises_runtime_error( for multi in (False, True): bt = bootstrapper.Bootstrapper(tmp_context, multiple_versions=multi) item = _make_resolve_item() + item.bg_future = _make_resolved_future([]) - with ( - patch.object(bt, "resolve_versions", return_value=[]), - pytest.raises(RuntimeError, match="Could not resolve"), - ): + with pytest.raises(RuntimeError, match="Could not resolve"): bt._phase_resolve(item) + def test_bg_future_exception_propagates(self, tmp_context: WorkContext) -> None: + """Exceptions from the background resolver thread are surfaced by _phase_resolve.""" + bt = bootstrapper.Bootstrapper(tmp_context) + item = _make_resolve_item() + future: concurrent.futures.Future[list[tuple[str, Version]]] = ( + concurrent.futures.Future() + ) + future.set_exception(ValueError("resolver exploded")) + item.bg_future = future + + with pytest.raises(ValueError, match="resolver exploded"): + bt._phase_resolve(item) + class TestPhaseStart: def test_new_item_advances_to_prepare_source( @@ -759,8 +760,8 @@ def tracking_dispatch(item: WorkItem) -> list[WorkItem]: with ( patch.object(bt, "_dispatch_phase", side_effect=tracking_dispatch), patch.object( - bt, - "resolve_versions", + bt._resolver, + "resolve", return_value=[("https://pypi.org/pkg-1.0.tar.gz", Version("1.0"))], ), ): @@ -824,8 +825,8 @@ def tracking_dispatch(item: WorkItem) -> list[WorkItem]: with ( patch.object(bt, "_dispatch_phase", side_effect=tracking_dispatch), patch.object( - bt, - "resolve_versions", + bt._resolver, + "resolve", return_value=[("https://pypi.org/pkg-1.0.tar.gz", Version("1.0"))], ), ): @@ -939,8 +940,8 @@ def mock_dispatch(item: WorkItem) -> list[WorkItem]: with ( patch.object(bt, "_dispatch_phase", side_effect=mock_dispatch), patch.object( - bt, - "resolve_versions", + bt._resolver, + "resolve", return_value=[("url", Version("1.0"))], ), ): @@ -957,10 +958,8 @@ def mock_dispatch(item: WorkItem) -> list[WorkItem]: class TestPhasePrepareSource: """Tests for _phase_prepare_source: prebuilt, source, cache, and error paths.""" - def test_prebuilt_downloads_and_skips_to_process_install_deps( - self, tmp_context: WorkContext - ) -> None: - """Prebuilt package downloads wheel and advances to PROCESS_INSTALL_DEPS.""" + def test_prebuilt_uses_background_result(self, tmp_context: WorkContext) -> None: + """Prebuilt package uses bg_future result and advances to PROCESS_INSTALL_DEPS.""" bt = bootstrapper.Bootstrapper(tmp_context) item = _make_build_item( phase=BootstrapPhase.PREPARE_SOURCE, @@ -970,13 +969,13 @@ def test_prebuilt_downloads_and_skips_to_process_install_deps( mock_wheel = tmp_context.work_dir / "testpkg-1.0-py3-none-any.whl" mock_unpack = tmp_context.work_dir / "testpkg-1.0" + item.bg_future = _make_resolved_future( + bootstrapper.PreparedSourceData( + wheel_filename=mock_wheel, unpack_dir=mock_unpack + ) + ) - with ( - patch.object( - bt, "_download_prebuilt", return_value=(mock_wheel, mock_unpack) - ) as mock_dl, - patch.object(tmp_context.constraints, "get_constraint", return_value=None), - ): + with patch.object(tmp_context.constraints, "get_constraint", return_value=None): result = bt._phase_prepare_source(item) assert len(result) == 1 @@ -987,27 +986,23 @@ def test_prebuilt_downloads_and_skips_to_process_install_deps( assert item.build_result.wheel_filename == mock_wheel assert item.build_result.sdist_filename is None assert item.build_result.build_env is None - mock_dl.assert_called_once() - def test_source_no_cache_downloads_and_prepares( + def test_source_no_cache_uses_background_result( self, tmp_context: WorkContext ) -> None: - """Source build with no cached wheel downloads and prepares source.""" + """Source build with no cached wheel uses bg_future result.""" bt = bootstrapper.Bootstrapper(tmp_context) item = _make_build_item(phase=BootstrapPhase.PREPARE_SOURCE) sdist_root = tmp_context.work_dir / "testpkg-1.0" / "testpkg-1.0" - source_filename = tmp_context.work_dir / "testpkg-1.0.tar.gz" mock_env = Mock() mock_dep_item = _make_resolve_item(req="setuptools") + item.bg_future = _make_resolved_future( + bootstrapper.PreparedSourceData(sdist_root_dir=sdist_root) + ) with ( patch.object(tmp_context.constraints, "get_constraint", return_value=None), - patch.object(bt, "_find_cached_wheel", return_value=(None, None)), - patch.object( - bt, "_download_source", return_value=source_filename - ) as mock_dl_src, - patch.object(bt, "_prepare_source", return_value=sdist_root) as mock_prep, patch.object( bt, "_create_build_env", return_value=mock_env ) as mock_create_env, @@ -1025,10 +1020,9 @@ def test_source_no_cache_downloads_and_prepares( assert item.build_env is mock_env assert item.sdist_root_dir == sdist_root assert item.unpack_dir == sdist_root.parent + assert item.cached_wheel_filename is None assert result[0] is item assert result[1] is mock_dep_item - mock_dl_src.assert_called_once() - mock_prep.assert_called_once() mock_create_env.assert_called_once() mock_create_items.assert_called_once_with( item.build_system_deps, @@ -1037,8 +1031,10 @@ def test_source_no_cache_downloads_and_prepares( item.resolved_version, ) - def test_source_cached_wheel_skips_download(self, tmp_context: WorkContext) -> None: - """Cached wheel skips download/prepare, uses unpacked dir for sdist root.""" + def test_source_cached_wheel_uses_background_result( + self, tmp_context: WorkContext + ) -> None: + """Cached wheel background result sets cached_wheel_filename and sdist_root.""" bt = bootstrapper.Bootstrapper(tmp_context) item = _make_build_item(phase=BootstrapPhase.PREPARE_SOURCE) @@ -1046,14 +1042,15 @@ def test_source_cached_wheel_skips_download(self, tmp_context: WorkContext) -> N unpacked.mkdir(parents=True) cached_wheel = tmp_context.work_dir / "testpkg-1.0-py3-none-any.whl" mock_env = Mock() + item.bg_future = _make_resolved_future( + bootstrapper.PreparedSourceData( + sdist_root_dir=unpacked / unpacked.stem, + cached_wheel_filename=cached_wheel, + ) + ) with ( patch.object(tmp_context.constraints, "get_constraint", return_value=None), - patch.object( - bt, "_find_cached_wheel", return_value=(cached_wheel, unpacked) - ), - patch.object(bt, "_download_source") as mock_dl_src, - patch.object(bt, "_prepare_source") as mock_prep, patch.object(bt, "_create_build_env", return_value=mock_env), patch( "fromager.dependencies.get_build_system_dependencies", @@ -1066,8 +1063,6 @@ def test_source_cached_wheel_skips_download(self, tmp_context: WorkContext) -> N assert item.cached_wheel_filename == cached_wheel assert item.sdist_root_dir == unpacked / unpacked.stem assert item.phase == BootstrapPhase.PREPARE_BUILD - mock_dl_src.assert_not_called() - mock_prep.assert_not_called() assert len(result) == 1 def test_bad_sdist_root_raises_valueerror(self, tmp_context: WorkContext) -> None: @@ -1076,17 +1071,11 @@ def test_bad_sdist_root_raises_valueerror(self, tmp_context: WorkContext) -> Non item = _make_build_item(phase=BootstrapPhase.PREPARE_SOURCE) bad_root = tmp_context.work_dir / "a" / "b" / "c" + item.bg_future = _make_resolved_future( + bootstrapper.PreparedSourceData(sdist_root_dir=bad_root) + ) - with ( - patch.object(tmp_context.constraints, "get_constraint", return_value=None), - patch.object(bt, "_find_cached_wheel", return_value=(None, None)), - patch.object( - bt, - "_download_source", - return_value=tmp_context.work_dir / "src.tar.gz", - ), - patch.object(bt, "_prepare_source", return_value=bad_root), - ): + with patch.object(tmp_context.constraints, "get_constraint", return_value=None): with pytest.raises(ValueError, match="should be"): bt._phase_prepare_source(item) @@ -1099,6 +1088,9 @@ def test_constraint_logged_when_present( sdist_root = tmp_context.work_dir / "testpkg-1.0" / "testpkg-1.0" mock_env = Mock() + item.bg_future = _make_resolved_future( + bootstrapper.PreparedSourceData(sdist_root_dir=sdist_root) + ) with ( patch.object( @@ -1106,13 +1098,6 @@ def test_constraint_logged_when_present( "get_constraint", return_value=Requirement("testpkg>=1.0"), ), - patch.object(bt, "_find_cached_wheel", return_value=(None, None)), - patch.object( - bt, - "_download_source", - return_value=tmp_context.work_dir / "src.tar.gz", - ), - patch.object(bt, "_prepare_source", return_value=sdist_root), patch.object(bt, "_create_build_env", return_value=mock_env), patch( "fromager.dependencies.get_build_system_dependencies", @@ -1672,3 +1657,35 @@ def test_build_order_called_with_correct_args( prebuilt=True, constraint=constraint, ) + + +class TestThreadPoolSubmission: + """Tests that exercise the real ThreadPoolExecutor submission path.""" + + def test_push_items_submits_resolve_to_real_thread_pool( + self, tmp_context: WorkContext + ) -> None: + """_push_items submits RESOLVE work to a real thread pool and result() returns correctly.""" + expected = [("https://pkg.test/testpkg-1.0.tar.gz", Version("1.0"))] + + with bootstrapper.Bootstrapper(tmp_context, num_bg_threads=1) as bt: + with patch.object(bt._resolver, "resolve", return_value=expected): + item = _make_resolve_item(req="testpkg") + stack: list[WorkItem] = [] + + bt._push_items(stack, [item]) + + # bg_future must be a real Future, not pre-set + assert item.bg_future is not None + assert isinstance(item.bg_future, concurrent.futures.Future) + + # Blocking on result() exercises the actual thread submission path + result = item.bg_future.result(timeout=10) + assert result == expected + + # _phase_resolve processes the future correctly + new_items = bt._phase_resolve(item) + + assert len(new_items) == 1 + assert new_items[0].phase == BootstrapPhase.START + assert new_items[0].resolved_version == Version("1.0")