"""AWS EC2 keypair hook."""
# pylint: disable=unused-argument
import logging
import os
import sys
from botocore.exceptions import ClientError
from ..session_cache import get_session
from ..ui import get_raw_input
from . import utils
LOGGER = logging.getLogger(__name__)
KEYPAIR_LOG_MESSAGE = "keypair %s (%s) %s"
[docs]def get_existing_key_pair(ec2, keypair_name):
"""Get existing keypair."""
resp = ec2.describe_key_pairs()
keypair = next(
(kp for kp in resp["KeyPairs"] if kp["KeyName"] == keypair_name), None
)
if keypair:
LOGGER.info(
KEYPAIR_LOG_MESSAGE, keypair["KeyName"], keypair["KeyFingerprint"], "exists"
)
return {
"status": "exists",
"key_name": keypair["KeyName"],
"fingerprint": keypair["KeyFingerprint"],
}
LOGGER.info('keypair "%s" not found', keypair_name)
return None
[docs]def import_key_pair(ec2, keypair_name, public_key_data):
"""Import keypair."""
keypair = ec2.import_key_pair(
KeyName=keypair_name, PublicKeyMaterial=public_key_data.strip(), DryRun=False
)
LOGGER.info(
KEYPAIR_LOG_MESSAGE, keypair["KeyName"], keypair["KeyFingerprint"], "imported"
)
return keypair
[docs]def read_public_key_file(path):
"""Read public key file."""
try:
with open(utils.full_path(path), "rb") as file_:
data = file_.read()
if not data.startswith(b"ssh-rsa"):
raise ValueError(
"Bad public key data, must be an RSA key in SSH authorized "
"keys format (beginning with `ssh-rsa`)"
)
return data.strip()
except (ValueError, IOError, OSError) as err:
LOGGER.error('failed to read public key file :%s": %s', path, str(err))
return None
[docs]def create_key_pair_from_public_key_file(ec2, keypair_name, public_key_path):
"""Create keypair from public key file."""
public_key_data = read_public_key_file(public_key_path)
if not public_key_data:
return None
keypair = import_key_pair(ec2, keypair_name, public_key_data)
return {
"status": "imported",
"key_name": keypair["KeyName"],
"fingerprint": keypair["KeyFingerprint"],
}
[docs]def create_key_pair_in_ssm(ec2, ssm, keypair_name, parameter_name, kms_key_id=None):
"""Create keypair in SSM."""
keypair = create_key_pair(ec2, keypair_name)
try:
kms_key_label = "default"
kms_args = {}
if kms_key_id:
kms_key_label = kms_key_id
kms_args = {"KeyId": kms_key_id}
LOGGER.info(
'storing generated key in SSM parameter "%s" using KMS key "%s"',
parameter_name,
kms_key_label,
)
ssm.put_parameter(
Name=parameter_name,
Description='SSH private key for KeyPair "{}" '
"(generated by Runway)".format(keypair_name),
Value=keypair["KeyMaterial"],
Type="SecureString",
Overwrite=False,
**kms_args
)
except ClientError:
# Erase the key pair if we failed to store it in SSM, since the
# private key will be lost anyway
LOGGER.exception(
"failed to store generated key in SSM; deleting "
"created key pair as private key will be lost"
)
ec2.delete_key_pair(KeyName=keypair_name, DryRun=False)
return None
return {
"status": "created",
"key_name": keypair["KeyName"],
"fingerprint": keypair["KeyFingerprint"],
}
[docs]def create_key_pair(ec2, keypair_name):
"""Create keypair."""
keypair = ec2.create_key_pair(KeyName=keypair_name, DryRun=False)
LOGGER.info(
KEYPAIR_LOG_MESSAGE, keypair["KeyName"], keypair["KeyFingerprint"], "created"
)
return keypair
[docs]def create_key_pair_local(ec2, keypair_name, dest_dir):
"""Create local keypair."""
dest_dir = utils.full_path(dest_dir)
if not os.path.isdir(dest_dir):
LOGGER.error('"%s" is not a valid directory', dest_dir)
return None
file_name = "{0}.pem".format(keypair_name)
key_path = os.path.join(dest_dir, file_name)
if os.path.isfile(key_path):
# This mimics the old boto2 keypair.save error
LOGGER.error('"%s" already exists in directory "%s"', file_name, dest_dir)
return None
# Open the file before creating the key pair to catch errors early
with open(key_path, "wb") as file_:
keypair = create_key_pair(ec2, keypair_name)
file_.write(keypair["KeyMaterial"].encode("ascii"))
return {
"status": "created",
"key_name": keypair["KeyName"],
"fingerprint": keypair["KeyFingerprint"],
"file_path": key_path,
}
[docs]def interactive_prompt(keypair_name):
"""Interactive prompt."""
if not sys.stdin.isatty():
return None, None
try:
while True:
action = get_raw_input(
'import or create keypair "%s"? (import/create/cancel) '
% (keypair_name,)
)
if action.lower() == "cancel":
break
if action.lower() in ("i", "import"):
path = get_raw_input("path to keypair file: ")
return "import", path.strip()
if action.lower() == "create":
path = get_raw_input("directory to save keyfile: ")
return "create", path.strip()
except (EOFError, KeyboardInterrupt):
return None, None
return None, None
[docs]def ensure_keypair_exists(provider, context, **kwargs):
"""Ensure a specific keypair exists within AWS.
If the key doesn't exist, upload it.
Args:
provider (:class:`runway.cfngin.providers.base.BaseProvider`): Provider
instance. (passed in by CFNgin)
context (:class:`runway.cfngin.context.Context`): Context instance.
(passed in by CFNgin)
Keyword Args:
keypair (str): Name of the key pair to create
ssm_parameter_name (Optional[str]): Path to an SSM store parameter
to receive the generated private key, instead of importing it or
storing it locally.
ssm_key_id (Optional[str]): ID of a KMS key to encrypt the SSM
parameter with. If omitted, the default key will be used.
public_key_path (Optional[str]): Path to a public key file to be
imported instead of generating a new key. Incompatible with the
SSM options, as the private key will not be available for
storing.
Returns:
Union[bool, Dict[str, Optional[str]]]: In case of failure
``False``, otherwise a dict containing:
**status** (str):
Ene of ``exists``, ``imported`` or ``created``.
**key_name** (str):
Name of the key pair.
**fingerprint** (str):
Fingerprint of the key pair.
**file_path** (Optional[str]):
If a new key was created, the path to the file where the private
key was stored.
"""
keypair_name = kwargs["keypair"]
ssm_parameter_name = kwargs.get("ssm_parameter_name")
ssm_key_id = kwargs.get("ssm_key_id")
public_key_path = kwargs.get("public_key_path")
if public_key_path and ssm_parameter_name:
LOGGER.error(
"public_key_path and ssm_parameter_name cannot be "
"specified at the same time"
)
return False
session = get_session(region=provider.region, profile=kwargs.get("profile"))
ec2 = session.client("ec2")
keypair = get_existing_key_pair(ec2, keypair_name)
if keypair:
return keypair
if public_key_path:
keypair = create_key_pair_from_public_key_file(
ec2, keypair_name, public_key_path
)
elif ssm_parameter_name:
ssm = session.client("ssm")
keypair = create_key_pair_in_ssm(
ec2, ssm, keypair_name, ssm_parameter_name, ssm_key_id
)
else:
action, path = interactive_prompt(keypair_name)
if action == "import":
keypair = create_key_pair_from_public_key_file(ec2, keypair_name, path)
elif action == "create":
keypair = create_key_pair_local(ec2, keypair_name, path)
else:
LOGGER.error("no action to find keypair")
if not keypair:
return False
return keypair