Source code for runway.cfngin.hooks.keypair

"""AWS EC2 keypair hook."""
# pylint: disable=unused-argument
import logging
import os
import sys

from botocore.exceptions import ClientError

from ..session_cache import get_session
from ..ui import get_raw_input
from . import utils

LOGGER = logging.getLogger(__name__)

KEYPAIR_LOG_MESSAGE = "keypair %s (%s) %s"


[docs]def get_existing_key_pair(ec2, keypair_name): """Get existing keypair.""" resp = ec2.describe_key_pairs() keypair = next( (kp for kp in resp["KeyPairs"] if kp["KeyName"] == keypair_name), None ) if keypair: LOGGER.info( KEYPAIR_LOG_MESSAGE, keypair["KeyName"], keypair["KeyFingerprint"], "exists" ) return { "status": "exists", "key_name": keypair["KeyName"], "fingerprint": keypair["KeyFingerprint"], } LOGGER.info('keypair "%s" not found', keypair_name) return None
[docs]def import_key_pair(ec2, keypair_name, public_key_data): """Import keypair.""" keypair = ec2.import_key_pair( KeyName=keypair_name, PublicKeyMaterial=public_key_data.strip(), DryRun=False ) LOGGER.info( KEYPAIR_LOG_MESSAGE, keypair["KeyName"], keypair["KeyFingerprint"], "imported" ) return keypair
[docs]def read_public_key_file(path): """Read public key file.""" try: with open(utils.full_path(path), "rb") as file_: data = file_.read() if not data.startswith(b"ssh-rsa"): raise ValueError( "Bad public key data, must be an RSA key in SSH authorized " "keys format (beginning with `ssh-rsa`)" ) return data.strip() except (ValueError, IOError, OSError) as err: LOGGER.error('failed to read public key file :%s": %s', path, str(err)) return None
[docs]def create_key_pair_from_public_key_file(ec2, keypair_name, public_key_path): """Create keypair from public key file.""" public_key_data = read_public_key_file(public_key_path) if not public_key_data: return None keypair = import_key_pair(ec2, keypair_name, public_key_data) return { "status": "imported", "key_name": keypair["KeyName"], "fingerprint": keypair["KeyFingerprint"], }
[docs]def create_key_pair_in_ssm(ec2, ssm, keypair_name, parameter_name, kms_key_id=None): """Create keypair in SSM.""" keypair = create_key_pair(ec2, keypair_name) try: kms_key_label = "default" kms_args = {} if kms_key_id: kms_key_label = kms_key_id kms_args = {"KeyId": kms_key_id} LOGGER.info( 'storing generated key in SSM parameter "%s" using KMS key "%s"', parameter_name, kms_key_label, ) ssm.put_parameter( Name=parameter_name, Description='SSH private key for KeyPair "{}" ' "(generated by Runway)".format(keypair_name), Value=keypair["KeyMaterial"], Type="SecureString", Overwrite=False, **kms_args ) except ClientError: # Erase the key pair if we failed to store it in SSM, since the # private key will be lost anyway LOGGER.exception( "failed to store generated key in SSM; deleting " "created key pair as private key will be lost" ) ec2.delete_key_pair(KeyName=keypair_name, DryRun=False) return None return { "status": "created", "key_name": keypair["KeyName"], "fingerprint": keypair["KeyFingerprint"], }
[docs]def create_key_pair(ec2, keypair_name): """Create keypair.""" keypair = ec2.create_key_pair(KeyName=keypair_name, DryRun=False) LOGGER.info( KEYPAIR_LOG_MESSAGE, keypair["KeyName"], keypair["KeyFingerprint"], "created" ) return keypair
[docs]def create_key_pair_local(ec2, keypair_name, dest_dir): """Create local keypair.""" dest_dir = utils.full_path(dest_dir) if not os.path.isdir(dest_dir): LOGGER.error('"%s" is not a valid directory', dest_dir) return None file_name = "{0}.pem".format(keypair_name) key_path = os.path.join(dest_dir, file_name) if os.path.isfile(key_path): # This mimics the old boto2 keypair.save error LOGGER.error('"%s" already exists in directory "%s"', file_name, dest_dir) return None # Open the file before creating the key pair to catch errors early with open(key_path, "wb") as file_: keypair = create_key_pair(ec2, keypair_name) file_.write(keypair["KeyMaterial"].encode("ascii")) return { "status": "created", "key_name": keypair["KeyName"], "fingerprint": keypair["KeyFingerprint"], "file_path": key_path, }
[docs]def interactive_prompt(keypair_name): """Interactive prompt.""" if not sys.stdin.isatty(): return None, None try: while True: action = get_raw_input( 'import or create keypair "%s"? (import/create/cancel) ' % (keypair_name,) ) if action.lower() == "cancel": break if action.lower() in ("i", "import"): path = get_raw_input("path to keypair file: ") return "import", path.strip() if action.lower() == "create": path = get_raw_input("directory to save keyfile: ") return "create", path.strip() except (EOFError, KeyboardInterrupt): return None, None return None, None
[docs]def ensure_keypair_exists(provider, context, **kwargs): """Ensure a specific keypair exists within AWS. If the key doesn't exist, upload it. Args: provider (:class:`runway.cfngin.providers.base.BaseProvider`): Provider instance. (passed in by CFNgin) context (:class:`runway.cfngin.context.Context`): Context instance. (passed in by CFNgin) Keyword Args: keypair (str): Name of the key pair to create ssm_parameter_name (Optional[str]): Path to an SSM store parameter to receive the generated private key, instead of importing it or storing it locally. ssm_key_id (Optional[str]): ID of a KMS key to encrypt the SSM parameter with. If omitted, the default key will be used. public_key_path (Optional[str]): Path to a public key file to be imported instead of generating a new key. Incompatible with the SSM options, as the private key will not be available for storing. Returns: Union[bool, Dict[str, Optional[str]]]: In case of failure ``False``, otherwise a dict containing: **status** (str): Ene of ``exists``, ``imported`` or ``created``. **key_name** (str): Name of the key pair. **fingerprint** (str): Fingerprint of the key pair. **file_path** (Optional[str]): If a new key was created, the path to the file where the private key was stored. """ keypair_name = kwargs["keypair"] ssm_parameter_name = kwargs.get("ssm_parameter_name") ssm_key_id = kwargs.get("ssm_key_id") public_key_path = kwargs.get("public_key_path") if public_key_path and ssm_parameter_name: LOGGER.error( "public_key_path and ssm_parameter_name cannot be " "specified at the same time" ) return False session = get_session(region=provider.region, profile=kwargs.get("profile")) ec2 = session.client("ec2") keypair = get_existing_key_pair(ec2, keypair_name) if keypair: return keypair if public_key_path: keypair = create_key_pair_from_public_key_file( ec2, keypair_name, public_key_path ) elif ssm_parameter_name: ssm = session.client("ssm") keypair = create_key_pair_in_ssm( ec2, ssm, keypair_name, ssm_parameter_name, ssm_key_id ) else: action, path = interactive_prompt(keypair_name) if action == "import": keypair = create_key_pair_from_public_key_file(ec2, keypair_name, path) elif action == "create": keypair = create_key_pair_local(ec2, keypair_name, path) else: LOGGER.error("no action to find keypair") if not keypair: return False return keypair