Source code for runway.cfngin.actions.init

"""CFNgin init action."""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Optional, Union, cast

from ...compat import cached_property
from ...config.models.cfngin import CfnginStackDefinitionModel
from import Bucket
from ..exceptions import CfnginBucketAccessDenied
from . import deploy
from .base import BaseAction

    import threading

    from ..._logging import RunwayLogger
    from ...context import CfnginContext
    from import ProviderBuilder

LOGGER = cast("RunwayLogger", logging.getLogger(__name__))

[docs]class Action(BaseAction): """Initialize environment.""" NAME = "init" DESCRIPTION = "Initialize environment"
[docs] def __init__( self, context: CfnginContext, provider_builder: Optional[ProviderBuilder] = None, cancel: Optional[threading.Event] = None, ): """Instantiate class. This class creates a copy of the context object prior to initialization as some of it can perform destructive actions on the context object. Args: context: The context for the current run. provider_builder: An object that will build a provider that will be interacted with in order to perform the necessary actions. cancel: Cancel handler. """ super().__init__( context=context.copy(), provider_builder=provider_builder, cancel=cancel )
@property def _stack_action(self) -> Any: """Run against a step.""" return None @cached_property def cfngin_bucket(self) -> Optional[Bucket]: """CFNgin bucket. Raises: CfnginBucketRequired: cfngin_bucket not defined. """ if not self.context.bucket_name: return None return Bucket( self.context, name=self.context.bucket_name, region=self.context.bucket_region, ) @cached_property def default_cfngin_bucket_stack(self) -> CfnginStackDefinitionModel: """CFNgin bucket stack.""" return CfnginStackDefinitionModel( class_path="runway.cfngin.blueprints.cfngin_bucket.CfnginBucket", in_progress_behavior="wait", name="cfngin-bucket", termination_protection=True, variables={"BucketName": self.context.bucket_name}, )
[docs] def run( self, *, concurrency: int = 0, dump: Union[bool, str] = False, # pylint: disable=unused-argument force: bool = False, # pylint: disable=unused-argument outline: bool = False, # pylint: disable=unused-argument tail: bool = False, upload_disabled: bool = True, # pylint: disable=unused-argument **_kwargs: Any, ) -> None: """Run the action. Args: concurrency: The maximum number of concurrent deployments. dump: Not used by this action force: Not used by this action. outline: Not used by this action. tail: Tail the stack's events. upload_disabled: Not used by this action. Raises: CfnginBucketAccessDenied: Could not head cfngin_bucket. """ # attr can be falsy but still be Bucket so need to check type if not isinstance(self.cfngin_bucket, Bucket):"skipped; cfngin_bucket not defined") return if self.cfngin_bucket.forbidden: raise CfnginBucketAccessDenied( if self.cfngin_bucket.exists:"cfngin_bucket %s already exists", return if self.context.get_stack("cfngin-bucket"): LOGGER.verbose( "found stack for creating cfngin_bucket: cfngin-bucket", ) self.context.stack_names = ["cfngin-bucket"] else: LOGGER.notice("using default blueprint to create cfngin_bucket...") self.context.config.stacks = [self.default_cfngin_bucket_stack] # clear cached values that were populated by checking the previous condition self.context._del_cached_property( # pylint: disable=protected-access "stacks", "stacks_dict" ) if self.provider_builder: self.provider_builder.region = self.context.bucket_region deploy.Action( context=self.context, provider_builder=self.provider_builder, cancel=self.cancel, ).run( concurrency=concurrency, tail=tail, upload_disabled=True, ) return
[docs] def pre_run( self, *, dump: Union[bool, str] = False, outline: bool = False, **__kwargs: Any, ) -> None: """Do nothing."""
[docs] def post_run( self, *, dump: Union[bool, str] = False, outline: bool = False, **__kwargs: Any, ) -> None: """Do nothing."""