Source code for runway.cfngin.context

"""CFNgin context."""
import collections
import json
import logging

from runway._logging import PrefixAdaptor

from .config import Config
from .exceptions import (
    PersistentGraphCannotLock,
    PersistentGraphCannotUnlock,
    PersistentGraphLockCodeMissmatch,
    PersistentGraphLocked,
    PersistentGraphUnlocked,
)
from .plan import Graph
from .session_cache import get_session
from .stack import Stack
from .target import Target
from .util import ensure_s3_bucket

LOGGER = logging.getLogger(__name__)


DEFAULT_NAMESPACE_DELIMITER = "-"
DEFAULT_TEMPLATE_INDENT = 4


[docs]def get_fqn(base_fqn, delimiter, name=None): """Return the fully qualified name of an object within this context. If the name passed already appears to be a fully qualified name, it will be returned with no further processing. """ if name and name.startswith("%s%s" % (base_fqn, delimiter)): return name return delimiter.join([_f for _f in [base_fqn, name] if _f])
[docs]class Context(object): """The context under which the current stacks are being executed. The CFNgin Context is responsible for translating the values passed in via the command line and specified in the config to `Stack` objects. """ def __init__( self, environment=None, boto3_credentials=None, stack_names=None, config=None, config_path=None, region=None, force_stacks=None, ): """Instantiate class. Args: boto3_credentials (Optional[Dict[str, str]]): Credentials to use when creating a boto3 session from context. environment (dict): A dictionary used to pass in information about the environment. Useful for templating. stack_names (list): A list of stack_names to operate on. If not passed, usually all stacks defined in the config will be operated on. config (:class:`runway.cfngin.config.Config`): The CFNgin configuration being operated on. config_path (str): Path to the config file that was provided. region (str): Name of an AWS region if provided as a CLI argument. force_stacks (list): A list of stacks to force work on. Used to work on locked stacks. """ self.__boto3_credentials = boto3_credentials self._bucket_name = None self._persistent_graph = None self._persistent_graph_lock_code = None self._persistent_graph_lock_tag = "cfngin_lock_code" self._s3_bucket_verified = None self._stacks = None self._targets = None self._upload_to_s3 = None # TODO load the config from context instead of taking it as an arg self.config = config or Config() # TODO set this value when provisioning a Config object in context # set to a fake location for the time being but this should be set # by all runtime entry points. the only time the fake value should be # used is during tests. self.config_path = config_path or "./" self.bucket_region = self.config.cfngin_bucket_region or region self.environment = environment self.force_stacks = force_stacks or [] self.hook_data = {} # TODO change to MutableMap in next major release self.logger = PrefixAdaptor(config_path, LOGGER) self.region = region self.s3_conn = self.get_session(region=self.bucket_region).client("s3") self.stack_names = stack_names or [] @property def _base_fqn(self): """Return ``namespace`` sanitized for use as an S3 Bucket name.""" return self.namespace.replace(".", "-").lower() @property def _persistent_graph_tags(self): """Cache of tags on the persistent graph object. Returns: Dict[str, str] """ try: return { t["Key"]: t["Value"] for t in self.s3_conn.get_object_tagging( **self.persistent_graph_location ).get("TagSet", {}) } except self.s3_conn.exceptions.NoSuchKey: self.logger.debug( "persistant graph object does not exist in S3; could not get tags" ) return {} @property def bucket_name(self): """Return ``cfngin_bucket`` from config, calculated name, or None.""" if not self.upload_to_s3: return None return self.config.cfngin_bucket or "stacker-%s" % (self.get_fqn(),) @property def mappings(self): """Return ``mappings`` from config.""" return self.config.mappings or {} @property def namespace(self): """Return ``namespace`` from config.""" return self.config.namespace @property def namespace_delimiter(self): """Return ``namespace_delimiter`` from config or default.""" delimiter = self.config.namespace_delimiter if delimiter is not None: return delimiter return DEFAULT_NAMESPACE_DELIMITER @property def persistent_graph(self): """Graph if a persistent graph is being used. Will create an "empty" object in S3 if one is not found. Returns: :class:`runway.cfngin.plan.Graph` """ if not self._persistent_graph: if not self.persistent_graph_location: return None content = "{}" if self.s3_bucket_verified: try: self.logger.debug( "getting persistent graph from s3:\n%s", json.dumps(self.persistent_graph_location, indent=4), ) content = ( self.s3_conn.get_object( ResponseContentType="application/json", **self.persistent_graph_location )["Body"] .read() .decode("utf-8") ) except self.s3_conn.exceptions.NoSuchKey: self.logger.info( "persistant graph object does not exist in s3; " "creating one now..." ) self.s3_conn.put_object( Body=content, ServerSideEncryption="AES256", ACL="bucket-owner-full-control", ContentType="application/json", **self.persistent_graph_location ) self.persistent_graph = json.loads(content) return self._persistent_graph @persistent_graph.setter def persistent_graph(self, graph_dict): """Load a persistent graph dict as a :class:`runway.cfngin.plan.Graph`.""" self._persistent_graph = Graph.from_dict(graph_dict, self) @property def persistent_graph_location(self): """Location of the persistent graph in s3. Returns: Dict[str, str] Bucket and Key for the object in S3. """ if not self.upload_to_s3 or not self.config.persistent_graph_key: return {} return { "Bucket": self.bucket_name, "Key": "persistent_graphs/{namespace}/{key}".format( namespace=self.config.namespace, key=( self.config.persistent_graph_key + ".json" if not self.config.persistent_graph_key.endswith(".json") else self.config.persistent_graph_key ), ), } @property def persistent_graph_lock_code(self): """Code used to lock the persistent graph S3 object. Returns: Optional[str] """ if not self._persistent_graph_lock_code and self.persistent_graph_location: self._persistent_graph_lock_code = self._persistent_graph_tags.get( self._persistent_graph_lock_tag ) return self._persistent_graph_lock_code @property def persistent_graph_locked(self): """Check if persistent graph is locked. Returns: bool """ if not self.persistent_graph: return False if not self.persistent_graph_lock_code: return False return True @property def s3_bucket_verified(self): """Check CFNgin bucket exists and you have access. If the CFNgin bucket does not exist, will try to create one. Returns: bool """ if not self._s3_bucket_verified and self.bucket_name: ensure_s3_bucket( self.s3_conn, self.bucket_name, self.bucket_region, persist_graph=bool(self.persistent_graph_location), ) self._s3_bucket_verified = True return self._s3_bucket_verified @property def tags(self): """Return ``tags`` from config.""" tags = self.config.tags if tags is not None: return tags if self.namespace: return {"cfngin_namespace": self.namespace} return {} @property def template_indent(self): """Return ``template_indent`` from config or default.""" indent = self.config.template_indent if indent is not None: return int(indent) return DEFAULT_TEMPLATE_INDENT @property def upload_to_s3(self): """Check if S3 should be used for caching/persistent graph. Returns: (bool) """ if not self._upload_to_s3: # Don't upload stack templates to S3 if `cfngin_bucket` is # explicitly set to an empty string. if self.config.cfngin_bucket == "": self.logger.debug( "not uploading to s3; cfngin_bucket " "is explicitly set to an empty string" ) return False # If no namespace is specificied, and there's no explicit # cfngin bucket specified, don't upload to s3. This makes # sense because we can't realistically auto generate a cfngin # bucket name in this case. if not self.namespace and not self.config.cfngin_bucket: self.logger.debug( "not uploading to s3; namespace & cfngin_bucket not provided" ) return False return True def _get_stack_definitions(self): """Return ``stacks`` from config.""" return self.config.stacks
[docs] def get_targets(self): """Return the named targets that are specified in the config. Returns: list: a list of :class:`runway.cfngin.target.Target` objects """ if not self._targets: targets = [] for target_def in self.config.targets or []: target = Target(target_def) targets.append(target) self._targets = targets return self._targets
[docs] def get_session(self, profile=None, region=None): """Create a thread-safe boto3 session. Args: profile (Optional[str]): The profile for the session. region (Optional[str]): The region for the session. Returns: :class:`boto3.session.Session`: A thread-safe boto3 session. """ kwargs = {} if profile: kwargs["profile"] = profile elif self.__boto3_credentials: kwargs.update( { "access_key": self.__boto3_credentials.get("aws_access_key_id"), "secret_key": self.__boto3_credentials.get("aws_secret_access_key"), "session_token": self.__boto3_credentials.get("aws_session_token"), } ) return get_session(region=region or self.region, **kwargs)
[docs] def get_stack(self, name): """Get a stack by name. Args: name (str): Name of a stack to retrieve. """ for stack in self.get_stacks(): if stack.name == name: return stack return None
[docs] def get_stacks(self): """Get the stacks for the current action. Handles configuring the :class:`runway.cfngin.stack.Stack` objects that will be used in the current action. Returns: list: a list of :class:`runway.cfngin.stack.Stack` objects """ if not self._stacks: stacks = [] definitions = self._get_stack_definitions() for stack_def in definitions: stack = Stack( definition=stack_def, context=self, mappings=self.mappings, force=stack_def.name in self.force_stacks, locked=stack_def.locked, enabled=stack_def.enabled, protected=stack_def.protected, ) stacks.append(stack) self._stacks = stacks return self._stacks
[docs] def get_stacks_dict(self): """Construct a dict of {stack.fqn: stack} for easy access to stacks.""" return dict((stack.fqn, stack) for stack in self.get_stacks())
[docs] def get_fqn(self, name=None): """Return the fully qualified name of an object within this context. If the name passed already appears to be a fully qualified name, it will be returned with no further processing. """ return get_fqn(self._base_fqn, self.namespace_delimiter, name)
[docs] def lock_persistent_graph(self, lock_code): """Locks the persistent graph in s3. Args: lock_code (str): The code that will be used to lock the S3 object. Raises: :class:`runway.cfngin.exceptions.PersistentGraphLocked` :class:`runway.cfngin.exceptions.PersistentGraphCannotLock` """ if not self.persistent_graph: return if self.persistent_graph_locked: raise PersistentGraphLocked try: self.s3_conn.put_object_tagging( Tagging={ "TagSet": [ {"Key": self._persistent_graph_lock_tag, "Value": lock_code} ] }, **self.persistent_graph_location ) self.logger.info( 'locked persistent graph "%s" with lock ID "%s"', "/".join( [ self.persistent_graph_location["Bucket"], self.persistent_graph_location["Key"], ] ), lock_code, ) except self.s3_conn.exceptions.NoSuchKey: raise PersistentGraphCannotLock("s3 object does not exist")
[docs] def put_persistent_graph(self, lock_code): """Upload persistent graph to s3. Args: lock_code (str): The code that will be used to lock the S3 object. Raises: :class:`runway.cfngin.exceptions.PersistentGraphUnlocked` :class:`runway.cfngin.exceptions.PersistentGraphLockCodeMissmatch` """ if not self.persistent_graph: return if not self.persistent_graph.to_dict(): self.s3_conn.delete_object(**self.persistent_graph_location) self.logger.debug("removed empty persistent graph object from S3") return if not self.persistent_graph_locked: raise PersistentGraphUnlocked( reason="It must be locked by the current session to be updated." ) if self.persistent_graph_lock_code != lock_code: raise PersistentGraphLockCodeMissmatch( lock_code, self.persistent_graph_lock_code ) self.s3_conn.put_object( Body=self.persistent_graph.dumps(4), ServerSideEncryption="AES256", ACL="bucket-owner-full-control", ContentType="application/json", Tagging="{}={}".format(self._persistent_graph_lock_tag, lock_code), **self.persistent_graph_location ) self.logger.debug( "persistent graph updated:\n%s", self.persistent_graph.dumps(indent=4) )
[docs] def set_hook_data(self, key, data): """Set hook data for the given key. Args: key(str): The key to store the hook data in. data(:class:`collections.Mapping`): A dictionary of data to store, as returned from a hook. """ if not isinstance(data, collections.Mapping): raise ValueError( "Hook (key: %s) data must be an instance of " "collections.Mapping (a dictionary for example)." % key ) if key in self.hook_data: raise KeyError( "Hook data for key %s already exists, each hook " "must have a unique data_key." % key ) self.hook_data[key] = data
[docs] def unlock_persistent_graph(self, lock_code): """Unlocks the persistent graph in s3. Args: lock_code (str): The code that will be used to lock the S3 object. Raises: :class:`runway.cfngin.exceptions.PersistentGraphCannotUnlock` """ if not self.persistent_graph: return True if not self.persistent_graph.to_dict(): try: self.s3_conn.get_object( ResponseContentType="application/json", **self.persistent_graph_location ) except self.s3_conn.exceptions.NoSuchKey: self.logger.info( "persistent graph deleted; does not need to be unlocked" ) return True self.logger.verbose( 'unlocking persistent graph "%s"...', self.persistent_graph_location ) if not self.persistent_graph_locked: raise PersistentGraphCannotUnlock( PersistentGraphUnlocked( reason="It must be locked by the current session to be unlocked." ) ) if self.persistent_graph_lock_code == lock_code: try: self.s3_conn.delete_object_tagging(**self.persistent_graph_location) except self.s3_conn.exceptions.NoSuchKey: pass self._persistent_graph_lock_code = None self.logger.info( 'unlocked persistent graph "%s"', "/".join( [ self.persistent_graph_location["Bucket"], self.persistent_graph_location["Key"], ] ), ) return True raise PersistentGraphCannotUnlock( PersistentGraphLockCodeMissmatch(lock_code, self.persistent_graph_lock_code) )