Skip to content

Commit

Permalink
feat: move cli functionality to client
Browse files Browse the repository at this point in the history
  • Loading branch information
johnson2427 committed May 2, 2024
1 parent 6e508bc commit b738c57
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 51 deletions.
69 changes: 67 additions & 2 deletions ape_aws/accounts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any
from pydantic import BaseModel, Field

from typing import Iterator, Optional
from typing import Any, Iterator, Optional

from eth_account.messages import _hash_eip191_message, encode_defunct
from eth_account._utils.legacy_transactions import serializable_unsigned_transaction_from_dict
Expand All @@ -14,6 +14,71 @@
from .client import kms_client


class KeyBaseModel(BaseModel):
alias: str = Field(required=True)


class CreateKeyModel(KeyBaseModel):
description: str | None = Field(default=None, required=False)
policy: str | None = Field(default=None, required=False)
key_usage: str | None = Field(default='SIGN_VERIFY', required=False)
key_spec: str | None = Field(default='ECC_SECG_P256K1', required=False)
admins: list[str] | None = Field(required=False)
users: list[str] | None = Field(required=False)
tags: list[dict[str, str]] | None = Field(required=False)
multi_region: bool | None = Field(default=False, required=False)
ADMIN_KEY_POLICY: str | None = """{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {"AWS": "{arn}"},
"Action": "kms:*",
"Resource": "*"
}]
}"""
USER_KEY_POLICY: str | None = """{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [{
"Sid": "Allow use of the key",
"Effect": "Allow",
"Principal": {"AWS": "{arn}"},
"Action": ["kms:Sign", "kms:Verify"],
"Resource": "*"
}]
}"""


class CreateKey(CreateKeyModel):
origin: str = 'AWS_KMS'


class ImportKey(CreateKeyModel):
origin: str = 'EXTERNAL'


class DeleteKey(KeyBaseModel):
days: int | None = Field(default=30, required=False)

@property
def kms_account(self):
if "alias" not in self.alias:
alias_name = f"alias/{self.alias}"
aws_accounts = AwsAccountContainer(data_folder='./', account_type=KmsAccount)
kms_account = None
for account in aws_accounts.accounts:
if account.key_alias == alias_name:
kms_account = account

return kms_account

@property
def key_id(self):
return self.kms_account.key_id


class AwsAccountContainer(AccountContainerAPI):

@property
Expand Down
110 changes: 96 additions & 14 deletions ape_aws/client.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,73 @@
from typing import List

import boto3

from ape.utils import cached_property
from pydantic import BaseModel, Field

from .utils import AliasResponse


class KeyBaseModel(BaseModel):
alias: str = Field(required=True)


class CreateKeyModel(KeyBaseModel):
description: str | None = Field(default=None, required=False)
policy: str | None = Field(default=None, required=False)
key_usage: str | None = Field(default='SIGN_VERIFY', required=False)
key_spec: str | None = Field(default='ECC_SECG_P256K1', required=False)
admins: list[str] | None = Field(required=False)
users: list[str] | None = Field(required=False)
tags: list[dict[str, str]] | None = Field(required=False)
multi_region: bool | None = Field(default=False, required=False)
ADMIN_KEY_POLICY: str | None = """{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {"AWS": "{arn}"},
"Action": "kms:*",
"Resource": "*"
}]
}"""
USER_KEY_POLICY: str | None = """{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [{
"Sid": "Allow use of the key",
"Effect": "Allow",
"Principal": {"AWS": "{arn}"},
"Action": ["kms:Sign", "kms:Verify"],
"Resource": "*"
}]
}"""


class CreateKey(CreateKeyModel):
origin: str = 'AWS_KMS'


class ImportKey(CreateKeyModel):
origin: str = 'EXTERNAL'


class DeleteKey(KeyBaseModel):
key_id: str
days: int | None = Field(default=30, required=False)


class Client:

def __init__(self, client_name: str):
self.client_name = client_name
self._client = None

@property
def client(self):
if not self._client:
self._client = boto3.client(self.client_name)
return self._client
self.client = boto3.client(self.client_name)


class KmsClient(Client):

def __init__(self):
super().__init__(client_name='kms')

@cached_property
def raw_aliases(self) -> List[AliasResponse]:
@property
def raw_aliases(self) -> list[AliasResponse]:
paginator = self.client.get_paginator('list_aliases')
pages = paginator.paginate()
return [
Expand All @@ -48,14 +89,55 @@ def sign(self, key_id, msghash):
)
return response.get('Signature')

def create_key(self, key_spec: CreateKey):
response = self.client.create_key(
Description=key_spec.description,
KeyUsage=key_spec.key_usage,
KeySpec=key_spec.key_spec,
Origin=key_spec.origin,
MultiRegion=key_spec.multi_region,
)
key_id = response['KeyMetadata']['KeyId']
self.client.create_alias(
AliasName=f'alias/{key_spec.alias}',
TargetKeyId=key_id,
)
if key_spec.tags:
self.client.tag_resource(
KeyId=key_id,
Tags=key_spec.tags,
)
if key_spec.admins:
for arn in key_spec.admins:
self.client.put_key_policy(
KeyId=key_id,
PolicyName='default',
Policy=key_spec.ADMIN_KEY_POLICY.format(arn=arn)
)
if key_spec.users:
for arn in key_spec.users:
kms_client.client.put_key_policy(
KeyId=key_id,
PolicyName='default',
Policy=key_spec.USER_KEY_POLICY.format(arn=arn)
)
return key_id

def delete_key(self, key_spec: DeleteKey):
self.client.delete_alias(AliasName=key_spec.alias)
self.client.schedule_key_deletion(
KeyId=key_spec.key_id, PendingWindowInDays=key_spec.days
)
return key_spec.alias


class IamClient(Client):

def __init__(self):
super().__init__(client_name='iam')

def list_users(self):
result = iam_client.client.list_users()
result = self.client.list_users()
return result.get('Users')

def list_admins(self):
Expand Down
47 changes: 12 additions & 35 deletions ape_aws/kms/_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import click

from ape_aws.accounts import KmsAccount, AwsAccountContainer
from ape_aws.client import kms_client
from ape_aws.client import kms_client, CreateKey, DeleteKey
from ape.cli import ape_cli_context


Expand Down Expand Up @@ -79,40 +79,19 @@ def create_key(
alias_name str: The alias of the key you intend to create
description str: The description of the key you intend to create.
"""
response = kms_client.client.create_key(
Description=description,
KeyUsage='SIGN_VERIFY',
KeySpec='ECC_SECG_P256K1',
Origin='AWS_KMS',
MultiRegion=False,
)
key_id = response['KeyMetadata']['KeyId']
kms_client.client.create_alias(
AliasName=f'alias/{alias_name}',
TargetKeyId=key_id,
)
if tags:
tags_list = []
for k_v in tags:
k, v = k_v.split('=')
tags_list.append(dict(k=v))
kms_client.client.tag_resource(
KeyId=key_id,
Tags=tags_list,
)
for arn in administrators:
kms_client.client.put_key_policy(
KeyId=key_id,
PolicyName='default',
Policy=ADMIN_KEY_POLICY.format(arn=arn)
)
for arn in users:
kms_client.client.put_key_policy(
KeyId=key_id,
PolicyName='default',
Policy=USER_KEY_POLICY.format(arn=arn)
)

key_spec = CreateKey(
alias=alias_name,
description=description,
admins=administrators,
users=users,
tags=tags_list if tags else None,
)
key_id = kms_client.create_key(key_spec)
cli_ctx.logger.success(f"Key created successfully with ID: {key_id}")


Expand All @@ -132,8 +111,6 @@ def schedule_delete_key(cli_ctx, alias_name, days):
if not kms_account:
cli_ctx.abort(f"No KMS Key with alias name: {alias_name}")

kms_client.client.delete_alias(AliasName=alias_name)
kms_client.client.schedule_key_deletion(
KeyId=kms_account.key_id, PendingWindowInDays=days
)
cli_ctx.logger.success(f"Key {kms_account.key_alias} scheduled for deletion")
delete_key_spec = DeleteKey(alias=alias_name, key_id=kms_account.key_id, days=days)
key_alias = kms_client.delete_key(delete_key_spec)
cli_ctx.logger.success(f"Key {key_alias} scheduled for deletion")

0 comments on commit b738c57

Please sign in to comment.