Source code for runway.env_mgr.tfenv

"""Terraform version management."""
from __future__ import annotations

import hashlib
import json
import locale
import logging
import os
import platform
import re
import shutil
import subprocess
import sys
import tempfile
import zipfile
from typing import (
from urllib.error import URLError
from urllib.request import urlretrieve

import hcl
import hcl2
import requests
from packaging.version import InvalidVersion
from typing_extensions import Final

from ..compat import cached_property
from ..exceptions import HclParserError
from ..utils import FileHash, Version, get_hash_for_filename, merge_dicts
from . import EnvManager, handle_bin_download_error

    from pathlib import Path
    from types import ModuleType

    from .._logging import RunwayLogger

LOGGER = cast("RunwayLogger", logging.getLogger(__name__))
TF_VERSION_FILENAME = ".terraform-version"

[docs]def download_tf_release( version: str, versions_dir: Path, command_suffix: str, tf_platform: Optional[str] = None, arch: Optional[str] = None, ) -> None: """Download Terraform archive and return path to it.""" version_dir = versions_dir / version if arch is None: arch = os.getenv("TFENV_ARCH", "amd64") if tf_platform: tfver_os = tf_platform + "_" + arch else: if platform.system().startswith("Darwin"): tfver_os = f"darwin_{arch}" elif platform.system().startswith("Windows") or ( platform.system().startswith("MINGW64") or ( platform.system().startswith("MSYS_NT") or platform.system().startswith("CYGWIN_NT") ) ): tfver_os = f"windows_{arch}" else: tfver_os = f"linux_{arch}" download_dir = tempfile.mkdtemp() filename = f"terraform_{version}_{tfver_os}.zip" shasums_name = f"terraform_{version}_SHA256SUMS" tf_url = "" + version try: LOGGER.verbose("downloading Terraform from %s...", tf_url) for i in [filename, shasums_name]: urlretrieve(tf_url + "/" + i, os.path.join(download_dir, i)) except URLError as exc: handle_bin_download_error(exc, "Terraform") tf_hash = get_hash_for_filename(filename, os.path.join(download_dir, shasums_name)) checksum = FileHash(hashlib.sha256()) checksum.add_file(os.path.join(download_dir, filename)) if tf_hash != checksum.hexdigest: LOGGER.error( "downloaded Terraform %s does not match sha256 %s", filename, tf_hash ) sys.exit(1) with zipfile.ZipFile(os.path.join(download_dir, filename)) as tf_zipfile: version_dir.mkdir(parents=True, exist_ok=True) tf_zipfile.extractall(str(version_dir)) shutil.rmtree(download_dir) result = version_dir / ("terraform" + command_suffix) result.chmod(result.stat().st_mode | 0o0111) # ensure it is executable
[docs]def get_available_tf_versions(include_prerelease: bool = False) -> List[str]: """Return available Terraform versions.""" tf_releases = json.loads( requests.get("").text )["terraform"] # Remove versions that don't align with # PEP440 ( for k, _v in list(tf_releases["versions"].items()): try: Version(k) except InvalidVersion: LOGGER.debug("InvalidVersion found, skipping %s. ", k) del tf_releases["versions"][k] tf_versions = sorted( [k for k, _v in tf_releases["versions"].items()], # descending key=Version, reverse=True, ) if include_prerelease: return [i for i in tf_versions if i] return [i for i in tf_versions if i and "-" not in i]
[docs]def get_latest_tf_version(include_prerelease: bool = False) -> str: """Return latest Terraform version.""" return get_available_tf_versions(include_prerelease)[0]
[docs]def load_terraform_module(parser: ModuleType, path: Path) -> Dict[str, Any]: """Load all Terraform files in a module into one dict. Args: parser (Union[hcl, hcl2]): Parser to use when loading files. path: Terraform module path. All Terraform files in the path will be loaded. """ result: Dict[str, Any] = {} LOGGER.debug("using %s parser to load module: %s", parser.__name__.upper(), path) for tf_file in path.glob("*.tf"): try: tf_config = parser.loads(tf_file.read_text()) # type: ignore result = merge_dicts(result, cast(Dict[str, Any], tf_config)) except Exception as exc: raise HclParserError(exc, tf_file, parser) from None return result
[docs]class TFEnvManager(EnvManager): """Terraform version management. Designed to be compatible with """ VERSION_REGEX: Final[str] = r"^(Terraform v)?(?P<version>[0-9]+\.[0-9]+\.[0-9]+\S*)" VERSION_OUTPUT_REGEX: Final[ str ] = r"^Terraform v(?P<version>[0-9]*\.[0-9]*\.[0-9]*)(?P<suffix>-.*)?"
[docs] def __init__(self, path: Optional[Path] = None) -> None: """Initialize class.""" super().__init__("terraform", "tfenv", path)
@cached_property def backend(self) -> Dict[str, Any]: """Backend config of the Terraform module.""" # Terraform can only have one backend configured; this formats the # data to make it easier to work with return [ {"type": k, "config": v} for k, v in self.terraform_block.get( "backend", {None: cast(Dict[str, str], {})} ).items() ][0] @cached_property def terraform_block(self) -> Dict[str, Any]: """Collect Terraform configuration blocks from a Terraform module.""" @overload def _flatten_lists(data: Dict[str, Any]) -> Dict[str, Any]: ... @overload def _flatten_lists(data: List[Any]) -> List[Any]: ... @overload def _flatten_lists(data: str) -> str: ... def _flatten_lists( data: Union[Dict[str, Any], List[Any], Any] ) -> Union[Dict[str, Any], Any]: """Flatten HCL2 list attributes until its fixed. python-hcl2 incorrectly turns all attributes into lists so we need to flatten them so they are more similar to HCL. Args: data: Dict with lists to flatten. """ if not isinstance(data, dict): return data copy_data = cast(Dict[str, Any], data.copy()) for attr, val in copy_data.items(): if isinstance(val, list): if len(cast(List[Any], val)) == 1: # pull single values out of lists data[attr] = _flatten_lists(cast(Any, val[0])) else: data[attr] = [_flatten_lists(v) for v in cast(List[Any], val)] elif isinstance(val, dict): data[attr] = _flatten_lists(cast(Dict[str, Any], val)) return data try: result: Union[Dict[str, Any], List[Dict[str, Any]]] = load_terraform_module( hcl2, self.path ).get("terraform", cast(Dict[str, Any], {})) except HclParserError as exc: LOGGER.warning(exc) LOGGER.warning("failed to parse as HCL2; trying HCL...") try: result = load_terraform_module(hcl, self.path).get( "terraform", cast(Dict[str, Any], {}) ) except HclParserError as exc2: LOGGER.warning(exc2) # return an empty dict if we can't parse HCL # let Terraform decide if it's actually valid result = {} # python-hcl2 turns all blocks into lists in v0.3.0. this flattens it. if isinstance(result, list): return _flatten_lists({k: v for i in result for k, v in i.items()}) return _flatten_lists(result) @cached_property def version(self) -> Optional[Version]: """Terraform version.""" version_requested = self.current_version or self.get_version_from_file() if not version_requested: return None if re.match(r"^min-required$", version_requested): LOGGER.debug("tfenv: detecting minimal required version") version_requested = self.get_min_required() if re.match(r"^latest:.*$", version_requested): regex ="latest:(.*)", version_requested).group( # type: ignore 1 ) include_prerelease_versions = False elif re.match(r"^latest$", version_requested): regex = r"^[0-9]+\.[0-9]+\.[0-9]+$" include_prerelease_versions = False else: regex = f"^{version_requested}$" include_prerelease_versions = True # Return early (i.e before reaching out to the internet) if the # matching version is already installed if (self.versions_dir / version_requested).is_dir(): self.current_version = version_requested return self.parse_version_string(self.current_version) try: version = next( i for i in get_available_tf_versions(include_prerelease_versions) if re.match(regex, i) ) except StopIteration: LOGGER.error("unable to find a Terraform version matching regex: %s", regex) sys.exit(1) self.current_version = version return self.parse_version_string(self.current_version) @cached_property def version_file(self) -> Optional[Path]: """Find and return a ".terraform-version" file if one is present. Returns: Path to the Terraform version file. """ for path in [self.path, self.path.parent]: test_path = path / TF_VERSION_FILENAME if test_path.is_file(): LOGGER.debug("using version file: %s", test_path) return test_path return None
[docs] def get_min_required(self) -> str: """Get the defined minimum required version of Terraform. Returns: The minimum required version as defined in the module. """ version = self.terraform_block.get("required_version") if version: if re.match(r"^!=.+", version): LOGGER.error( "min required Terraform version is a negation (%s) " "- unable to determine required version", version, ) sys.exit(1) else: version ="[0-9]*\.[0-9]*(?:\.[0-9]*)?", version) if version: LOGGER.debug("detected minimum Terraform version is %s", version) return LOGGER.error( "Terraform version specified as min-required, but unable to " "find a specified version requirement in this module's tf files" ) sys.exit(1)
[docs] def get_version_from_file(self, file_path: Optional[Path] = None) -> Optional[str]: """Get Terraform version from a file. Args: file_path: Path to file that will be read. """ file_path = file_path or self.version_file if file_path and file_path.is_file(): return file_path.read_text( encoding=locale.getpreferredencoding(do_setlocale=False) ).strip() LOGGER.debug("file path not provided and version file could not be found") return None
[docs] def install(self, version_requested: Optional[str] = None) -> str: """Ensure Terraform is available.""" if version_requested: self.set_version(version_requested) if not self.version: raise ValueError( f"version not provided and unable to find a {TF_VERSION_FILENAME} file" ) # Now that a version has been selected, skip downloading if it's # already been downloaded if (self.versions_dir / str(self.version)).is_dir(): LOGGER.verbose( "Terraform version %s already installed; using it...", self.version ) return str(self.bin)"downloading and using Terraform version %s ...", self.version) download_tf_release(str(self.version), self.versions_dir, self.command_suffix) LOGGER.verbose("downloaded Terraform %s successfully", self.version) return str(self.bin)
[docs] def list_installed(self) -> Generator[Path, None, None]: """List installed versions of Terraform. Only lists versions of Terraform that have been installed by an instance if this class or by tfenv. """ LOGGER.verbose("checking %s for Terraform versions...", self.versions_dir) return self.versions_dir.rglob("*.*.*")
[docs] def set_version(self, version: str) -> None: """Set current version. Clears cached values as needed. Args: version: Version string. Must be in the format of ``<major>.<minor>.<patch>`` with an optional ``-<prerelease>``. """ if self.current_version == version: return self.current_version = version self._del_cached_property("version")
[docs] @classmethod def get_version_from_executable( cls, bin_path: Union[Path, str], *, cwd: Optional[Union[Path, str]] = None, env: Optional[Dict[str, str]] = None, ) -> Optional[Version]: """Get Terraform version from an executable. Args: bin_path: Path to the Terraform binary to retrieve the version from. cwd: Current working directory to use when calling the executable. env: Environment variable overrides. """ output = subprocess.check_output( [str(bin_path), "-version"], cwd=cwd, env=env ).decode() match =, output) if not match: return None return cls.parse_version_string(output)
[docs] @classmethod def parse_version_string(cls, version: str) -> Version: """Parse version string into a :class:`Version`. Args: version: Version string to parse. Must be in the format of ``<major>.<minor>.<patch>`` with an optional ``-<prerelease>``. """ match =, version) if not match: raise ValueError( f"provided version doesn't conform to regex: {cls.VERSION_REGEX}" ) return Version("version"))