Source code for runway.cfngin.hooks.iam

"""AWS IAM hook."""
# pylint: disable=unused-argument
import copy
import logging

from awacs import ecs
from awacs.aws import Allow, Policy, Statement
from awacs.helpers.trust import get_ecs_assumerole_policy
from botocore.exceptions import ClientError
from six.moves import input

from ..session_cache import get_session
from . import utils

LOGGER = logging.getLogger(__name__)


[docs]def create_ecs_service_role(provider, context, **kwargs): """Create ecsServieRole, which has to be named exactly that currently. http://docs.aws.amazon.com/AmazonECS/latest/developerguide/IAM_policies.html#service_IAM_role Args: provider (:class:`runway.cfngin.providers.base.BaseProvider`): Provider instance. (passed in by CFNgin) context (:class:`runway.cfngin.context.Context`): Context instance. (passed in by CFNgin) Keyword Args: role_name (str): Name of the role to create. (*default: ecsServiceRole*) Returns: bool: Whether or not the hook succeeded. """ role_name = kwargs.get("role_name", "ecsServiceRole") client = get_session(provider.region).client("iam") try: client.create_role( RoleName=role_name, AssumeRolePolicyDocument=get_ecs_assumerole_policy().to_json(), ) except ClientError as err: if "already exists" in str(err): pass else: raise policy = Policy( Version="2012-10-17", Statement=[ Statement( Effect=Allow, Resource=["*"], Action=[ ecs.CreateCluster, ecs.DeregisterContainerInstance, ecs.DiscoverPollEndpoint, ecs.Poll, ecs.Action("Submit*"), ], ) ], ) client.put_role_policy( RoleName=role_name, PolicyName="AmazonEC2ContainerServiceRolePolicy", PolicyDocument=policy.to_json(), ) return True
def _get_cert_arn_from_response(response): result = copy.deepcopy(response) # GET response returns this extra key if "ServerCertificate" in response: result = response["ServerCertificate"] return result["ServerCertificateMetadata"]["Arn"] def _get_cert_contents(kwargs): """Build parameters with server cert file contents. Args: kwargs (Dict[str, Any]): The keyword args passed to ensure_server_cert_exists, optionally containing the paths to the cert, key and chain files. Returns: Dict[str, Any]: A dictionary containing the appropriate parameters to supply to upload_server_certificate. An empty dictionary if there is a problem. """ paths = { "certificate": kwargs.get("path_to_certificate"), "private_key": kwargs.get("path_to_private_key"), "chain": kwargs.get("path_to_chain"), } for key, value in paths.items(): if value is not None: continue path = input("Path to %s (skip): " % (key,)) if path == "skip" or not path.strip(): continue paths[key] = path parameters = { "ServerCertificateName": kwargs.get("cert_name"), } for key, path in paths.items(): if not path: continue # Allow passing of file like object for tests try: contents = path.read() except AttributeError: with open(utils.full_path(path)) as read_file: contents = read_file.read() if key == "certificate": parameters["CertificateBody"] = contents elif key == "private_key": parameters["PrivateKey"] = contents elif key == "chain": parameters["CertificateChain"] = contents return parameters
[docs]def ensure_server_cert_exists(provider, context, **kwargs): """Ensure server cert exists. Args: provider (:class:`runway.cfngin.providers.base.BaseProvider`): Provider instance. (passed in by CFNgin) context (:class:`runway.cfngin.context.Context`): Context instance. (passed in by CFNgin) Keyword Args: cert_name (str): Name of the certificate that should exist. prompt (bool): Whether to prompt to upload a certificate if one does not exist. (*default:* ``True``) Returns: Dict[str, str]: Dict containing ``status``, ``cert_name``, and ``cert_arn``. """ client = get_session(provider.region).client("iam") cert_name = kwargs["cert_name"] status = "unknown" try: response = client.get_server_certificate(ServerCertificateName=cert_name) cert_arn = _get_cert_arn_from_response(response) status = "exists" LOGGER.info("certificate exists: %s (%s)", cert_name, cert_arn) except ClientError: if kwargs.get("prompt", True): upload = input( "Certificate '%s' wasn't found. Upload it now? (yes/no) " % (cert_name,) ) if upload != "yes": return False parameters = _get_cert_contents(kwargs) if not parameters: return False response = client.upload_server_certificate(**parameters) cert_arn = _get_cert_arn_from_response(response) status = "uploaded" LOGGER.info( "uploaded certificate: %s (%s)", cert_name, cert_arn, ) return { "status": status, "cert_name": cert_name, "cert_arn": cert_arn, }