"""AWS EC2 keypair hook."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from botocore.exceptions import ClientError
from typing_extensions import Literal, TypedDict
from ...utils import BaseModel
from ..ui import get_raw_input
if TYPE_CHECKING:
from mypy_boto3_ec2.client import EC2Client
from mypy_boto3_ec2.type_defs import ImportKeyPairResultTypeDef, KeyPairTypeDef
from mypy_boto3_ssm.client import SSMClient
from ...context import CfnginContext
LOGGER = logging.getLogger(__name__)
KEYPAIR_LOG_MESSAGE = "keypair %s (%s) %s"
[docs]class EnsureKeypairExistsHookArgs(BaseModel):
"""Hook arguments for ``ensure_keypair_exists``."""
keypair: str
"""Name of the key pair to ensure exists."""
public_key_path: Optional[str] = None
"""Path to a public key file to be imported instead of generating a new key.
Incompatible with the SSM options, as the private key will not be available for storing.
"""
ssm_key_id: Optional[str] = None
"""ID of a KMS key to encrypt the SSM parameter with.
If omitted, the default key will be used.
"""
ssm_parameter_name: Optional[str] = None
"""Path to an SSM store parameter to receive the generated private key
instead of importing it or storing it locally.
"""
[docs]class KeyPairInfo(TypedDict, total=False):
"""Value returned from get_existing_key_pair."""
file_path: Path
fingerprint: str
key_name: str
status: Literal["created", "exists", "imported"]
[docs]def get_existing_key_pair(ec2: EC2Client, keypair_name: str) -> Optional[KeyPairInfo]:
"""Get existing keypair."""
resp = ec2.describe_key_pairs()
keypair = next(
(kp for kp in resp.get("KeyPairs", []) if kp.get("KeyName") == keypair_name),
None,
)
if keypair:
LOGGER.info(
KEYPAIR_LOG_MESSAGE,
keypair.get("KeyName"),
keypair.get("KeyFingerprint"),
"exists",
)
return {
"status": "exists",
"key_name": keypair.get("KeyName", ""),
"fingerprint": keypair.get("KeyFingerprint", ""),
}
LOGGER.info('keypair "%s" not found', keypair_name)
return None
[docs]def import_key_pair(
ec2: EC2Client, keypair_name: str, public_key_data: bytes
) -> ImportKeyPairResultTypeDef:
"""Import keypair."""
keypair = ec2.import_key_pair(
KeyName=keypair_name, PublicKeyMaterial=public_key_data.strip(), DryRun=False
)
LOGGER.info(
KEYPAIR_LOG_MESSAGE,
keypair.get("KeyName"),
keypair.get("KeyFingerprint"),
"imported",
)
return keypair
[docs]def read_public_key_file(path: Path) -> Optional[bytes]:
"""Read public key file."""
try:
data = path.read_bytes()
if not data.startswith(b"ssh-rsa"):
raise ValueError(
"Bad public key data, must be an RSA key in SSH authorized "
"keys format (beginning with `ssh-rsa`)"
)
return data.strip()
except (ValueError, OSError) as err:
LOGGER.error('failed to read public key file :%s": %s', path, str(err))
return None
[docs]def create_key_pair_from_public_key_file(
ec2: EC2Client, keypair_name: str, public_key_path: Path
) -> Optional[KeyPairInfo]:
"""Create keypair from public key file."""
public_key_data = read_public_key_file(public_key_path)
if not public_key_data:
return None
keypair = import_key_pair(ec2, keypair_name, public_key_data)
return {
"status": "imported",
"key_name": keypair.get("KeyName", ""),
"fingerprint": keypair.get("KeyFingerprint", ""),
}
[docs]def create_key_pair_in_ssm(
ec2: EC2Client,
ssm: SSMClient,
keypair_name: str,
parameter_name: str,
kms_key_id: Optional[str] = None,
) -> Optional[KeyPairInfo]:
"""Create keypair in SSM."""
keypair = create_key_pair(ec2, keypair_name)
try:
kms_key_label = "default"
kms_args: Dict[str, Any] = {}
if kms_key_id:
kms_key_label = kms_key_id
kms_args = {"KeyId": kms_key_id}
LOGGER.info(
'storing generated key in SSM parameter "%s" using KMS key "%s"',
parameter_name,
kms_key_label,
)
ssm.put_parameter(
Name=parameter_name,
Description=f'SSH private key for KeyPair "{keypair_name}" (generated by Runway)',
Value=keypair["KeyMaterial"],
Type="SecureString",
Overwrite=False,
**kms_args,
)
except ClientError:
# Erase the key pair if we failed to store it in SSM, since the
# private key will be lost anyway
LOGGER.exception(
"failed to store generated key in SSM; deleting "
"created key pair as private key will be lost"
)
ec2.delete_key_pair(KeyName=keypair_name, DryRun=False)
return None
return {
"status": "created",
"key_name": keypair.get("KeyName", ""),
"fingerprint": keypair.get("KeyFingerprint", ""),
}
[docs]def create_key_pair(ec2: EC2Client, keypair_name: str) -> KeyPairTypeDef:
"""Create keypair."""
keypair = ec2.create_key_pair(KeyName=keypair_name, DryRun=False)
LOGGER.info(
KEYPAIR_LOG_MESSAGE,
keypair.get("KeyName"),
keypair.get("KeyFingerprint"),
"created",
)
return keypair
[docs]def create_key_pair_local(
ec2: EC2Client, keypair_name: str, dest_dir: Path
) -> Optional[KeyPairInfo]:
"""Create local keypair."""
dest_dir = dest_dir.resolve()
if not dest_dir.is_dir():
LOGGER.error('"%s" is not a valid directory', dest_dir)
return None
key_path = dest_dir / f"{keypair_name}.pem"
if key_path.is_file():
# This mimics the old boto2 keypair.save error
LOGGER.error('"%s" already exists in directory "%s"', key_path.name, dest_dir)
return None
keypair = create_key_pair(ec2, keypair_name)
key_path.write_text(keypair.get("KeyMaterial", ""), encoding="ascii")
return {
"status": "created",
"key_name": keypair.get("KeyName", ""),
"fingerprint": keypair.get("KeyFingerprint", ""),
"file_path": key_path,
}
[docs]def interactive_prompt(
keypair_name: str,
) -> Tuple[Optional[Literal["create", "import"]], Optional[str]]:
"""Interactive prompt."""
if not sys.stdin.isatty():
return None, None
try:
while True:
action = get_raw_input(
f'import or create keypair "{keypair_name}"? (import/create/cancel) '
)
if action.lower() == "cancel":
break
if action.lower() in ("i", "import"):
path = get_raw_input("path to keypair file: ")
return "import", path.strip()
if action.lower() == "create":
path = get_raw_input("directory to save keyfile: ")
return "create", path.strip()
except (EOFError, KeyboardInterrupt):
return None, None
return None, None
[docs]def ensure_keypair_exists(
context: CfnginContext, *__args: Any, **kwargs: Any
) -> KeyPairInfo:
"""Ensure a specific keypair exists within AWS.
If the key doesn't exist, upload it.
"""
args = EnsureKeypairExistsHookArgs.parse_obj(kwargs)
if args.public_key_path and args.ssm_parameter_name:
LOGGER.error(
"public_key_path and ssm_parameter_name cannot be "
"specified at the same time"
)
return {}
session = context.get_session()
ec2 = session.client("ec2")
keypair_info = get_existing_key_pair(ec2, args.keypair)
if keypair_info:
return keypair_info
if args.public_key_path:
keypair_info = create_key_pair_from_public_key_file(
ec2, args.keypair, Path(args.public_key_path)
)
elif args.ssm_parameter_name:
ssm = session.client("ssm")
keypair_info = create_key_pair_in_ssm(
ec2, ssm, args.keypair, args.ssm_parameter_name, args.ssm_key_id
)
else:
action, path = interactive_prompt(args.keypair)
if action == "import" and path:
keypair_info = create_key_pair_from_public_key_file(
ec2, args.keypair, Path(path)
)
elif action == "create" and path:
keypair_info = create_key_pair_local(ec2, args.keypair, Path(path))
else:
LOGGER.error("no action to find keypair or path not provided")
if not keypair_info:
return {}
return keypair_info