Skip to content

Commit

Permalink
New steps are added. Also added remote git repository functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerkunt committed Jul 10, 2018
1 parent b4e4dd4 commit c9ea49e
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 77 deletions.
68 changes: 44 additions & 24 deletions terraform_compliance/main.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,75 @@
import sys
import os
from argparse import ArgumentParser, ArgumentTypeError, Action
from argparse import ArgumentParser, Action
from terraform_compliance import Validator
from radish.main import main as call_radish
from tempfile import mkdtemp
from shutil import rmtree
from git import Repo

__app_name__ = "terraform-compliance"
__version__ = "0.1.0"
__version__ = "0.1.1"


class ArgHandling(object):
def __init__(self):
self.features = None
self.steps = None
self.tf_dir = None
pass

#TODO: Handle all directory/protocol handling via a better class structure here.
#TODO: Extend git: (on features or tf files argument) into native URLs instead of using a prefix here.

class readable_dir(Action):
def __call__(self, parser, namespace, values, option_string=None):
prospective_dir = values

# Check if the given directory is actually a git repo
if prospective_dir.startswith('git:'):
print('Using remote git repository: {}'.format(prospective_dir[4:]))
setattr(namespace, self.dest, prospective_dir[4:])
return True

# Check if the given path is a directory really
if not os.path.isdir(prospective_dir):
print('Invalid use or arguments: {}'.format(prospective_dir))
print('ERROR: {} is not a directory.'.format(prospective_dir))
sys.exit(1)

# Check if we have access to that directory
if os.access(prospective_dir, os.R_OK):
setattr(namespace, self.dest, prospective_dir)
else:
print('Invalid use or arguments: {}'.format(prospective_dir))
sys.exit(1)
return True

print('ERROR: Can not read {}'.format(prospective_dir))
sys.exit(1)


def cli():
print('{} v{} initiated'.format(__app_name__, __version__))

argument = ArgHandling()
parser = ArgumentParser(prog=__app_name__,
description="BDD Test Framework for Hashicorp terraform")
try:
parser.add_argument("--features", "-f", dest="features", metavar='feature_directory', action=readable_dir,
parser.add_argument("--features", "-f", dest="features", metavar='feature_directory', action=readable_dir,
help="Directory consists of BDD features", required=True)
parser.add_argument("--tfdir", "-t", dest="tf_dir", metavar='terraform_directory', action=readable_dir,
help="Directory consists of Terraform Files", required=True)
except argparse.ArgumentTypeError:
print('Invalid use or arguments: {}'.format(sys.exc_info()[1]))
sys.exit(1)
parser.add_argument("--tfdir", "-t", dest="tf_dir", metavar='terraform_directory', action=readable_dir,
help="Directory (or git repository with 'git:' prefix) consists of Terraform Files", required=True)

# parser.parse_args(namespace=argument)
_, radish_arguments = parser.parse_known_args(namespace=argument)

print('{} v{} initiated'.format(__app_name__, __version__))
steps_directory = os.path.join(os.path.split(os.path.abspath(__file__))[0], 'steps')
print('Steps : {}'.format(steps_directory))

if argument.features.startswith('http'):
features_git_repo = argument.features
argument.features = mkdtemp()
Repo.clone_from(features_git_repo, argument.features)
features_directory = os.path.join(os.path.abspath(argument.features))
steps_directory = os.path.join(os.path.split(os.path.abspath(__file__))[0], 'steps')
tf_directory = os.path.join(os.path.abspath(argument.tf_dir))
print('Features : {}{}'.format(features_directory, (' ({})'.format(features_git_repo) if 'features_git_repo' in locals() else '')))

print('Features : {}'.format(features_directory))
print('Steps : {}'.format(steps_directory))
print('TF Files : {}'.format(tf_directory))
if argument.tf_dir.startswith('http'):
tf_git_repo = argument.tf_dir
argument.tf_dir = mkdtemp()
Repo.clone_from(tf_git_repo, argument.tf_dir)
tf_directory = os.path.join(os.path.abspath(argument.tf_dir))
print('TF Files : {}{}'.format(tf_directory, (' ({})'.format(tf_git_repo) if 'tf_git_repo' in locals() else '')))

commands = ['radish',
features_directory,
Expand All @@ -72,3 +89,6 @@ def cli():

print('Running tests.')
return call_radish(args=commands[1:])


#TODO: Implement a cleanup for temporary directories since they are not deleted.
43 changes: 43 additions & 0 deletions terraform_compliance/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@


untaggable_resources = [
"aws_route_table",
"aws_elastic_beanstalk",
"aws_security_group_rule",
"aws_eip",
"aws_nat_gateway",
"aws_key_pair",
"aws_lambda",
"aws_iam",
"aws_s3_bucket_notification",
"aws_api_gateway",
"aws_cloudfront_origin_access_identity",
"aws_cloudwatch",
"aws_server_certificate",
"aws_route53_record",
"aws_directory_service_directory",
"azurerm_resource_group"
]

encryption_property = {
"aws_db_instance": "storage_encrypted",
"ebs_block_device": "encrypted",
"aws_ebs_volume": "encrypted",
"azurerm_storage_account": "enable_blob_encryption",
"azurerm_sql_database": "encryption"
}

resource_name = {
"AWS RDS instance": "aws_db_instance",
"AWS EC2 instance": "aws_instance",
"AWS EBS volume": "aws_ebs_volume",
"AWS Security Group": "aws_security_group",
"AWS Subnet": "aws_subnet",
"Azure Storage Account": "azurerm_storage_account",
"Azure SQL Database": "azurerm_sql_database",
"resource that supports tags": "(?!{0}).*".format("|".join(untaggable_resources))
}

regex = {
"Name": "^\${var.platform}_\${var.environment}_.*"
}
75 changes: 23 additions & 52 deletions terraform_compliance/steps/steps.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,28 @@
# -*- coding: utf-8 -*-

from radish import step, world, custom_type

untaggable_resources = [
"aws_route_table",
"aws_elastic_beanstalk",
"aws_security_group_rule",
"aws_eip",
"aws_nat_gateway",
"aws_key_pair",
"aws_lambda",
"aws_iam",
"aws_s3_bucket_notification",
"aws_api_gateway",
"aws_cloudfront_origin_access_identity",
"aws_cloudwatch",
"aws_server_certificate",
"aws_route53_record",
"aws_directory_service_directory",
"azurerm_resource_group"
]

encryption_property = {
"aws_db_instance": "storage_encrypted",
"ebs_block_device": "encrypted",
"aws_ebs_volume": "encrypted",
"azurerm_storage_account": "enable_blob_encryption",
"azurerm_sql_database": "encryption"
}

resource_name = {
"AWS RDS instance": "aws_db_instance",
"AWS EC2 instance": "aws_instance",
"AWS EBS volume": "aws_ebs_volume",
"AWS Security Group": "aws_security_group",
"AWS Subnet": "aws_subnet",
"Azure Storage Account": "azurerm_storage_account",
"Azure SQL Database": "azurerm_sql_database",
"resource that supports tags": "(?!{0}).*".format("|".join(untaggable_resources))
}

regex = {
"Name": "^\${var.platform}_\${var.environment}_.*"
}
from radish import step, world, custom_type, then, when, given
from terraform_compliance.steps import untaggable_resources, regex, resource_name, encryption_property


# New Arguments
@custom_type("ANY", r"[\.\/_\-A-Za-z0-9\s]+")
def arg_exp_for_secure_text(text):
return text

@step(r'Given I define {resource}')

@step(u'I define an {resource:ANY}')
@step(u'I define a {resource:ANY}')
@step(u'I define {resource:ANY}')
def define_a_resource(step, resource):
world.config.terraform.error_if_property_missing()
if (resource in resource_name.keys()):
resource = resource_name[resource]

step.context.resource_type = resource
step.context.stash = step.context.resources = world.config.terraform.resources(resource)


@step(r'When I {action_type} them')
@step(u'I {action_type:ANY} them')
def i_action_them(step, action_type):
if action_type == "count":
step.context.stash = len(step.context.stash.resource_list)
Expand All @@ -68,7 +32,7 @@ def i_action_them(step, action_type):
AssertionError("Invalid action_type in the scenario: {}".format(action))


@step(r'Then I expect the result is {operator} than {number:d}')
@step(u'I expect the result is {operator:ANY} than {number:d}')
def func(step, operator, number):
value = int(step.context.stash)

Expand All @@ -84,7 +48,7 @@ def func(step, operator, number):
AssertionError("Invalid operator: " + str(operator))


@step('it must contain {something}')
@step(u'it must contain {something:ANY}')
def func(step, something):
world.config.terraform.error_if_property_missing()

Expand All @@ -94,35 +58,42 @@ def func(step, something):
step.context.resource_type = something
step.context.resources = step.context.resources.property(something)

@step(u'it contains {something:ANY}')
def func(step, something):
if something in resource_name.keys():
something = resource_name[something]

step.context.resource_type = something
step.context.resources = step.context.resources.property(something)

@step('encryption must be enabled')
@step(u'encryption must be enabled')
def func(step):
world.config.terraform.error_if_property_missing()
prop = encryption_property[step.context.resource_type]
step.context.resources.property(prop).should_equal(True)


@step(u'it must have the "([^"]*)" tag')
@step(u'it must have the "{tag:ANY}" tag')
def func(step, tag):
world.config.terraform.error_if_property_missing()
step.context.tag = tag
step.context.properties = step.context.resources.property('tags')
step.context.properties.should_have_properties(tag)


@step(u'And its value must match the "([^"]*)" regex')
@step(u'its value must match the "{regex_type}" regex')
def func(step, regex_type):
world.config.terraform.error_if_property_missing()
step.context.regex = regex[regex_type]
step.context.properties.property(regex_type).should_match_regex(step.context.regex)


@step(u'And its value must be set by a variable')
@step(u'its value must be set by a variable')
def func(step):
step.context.resources.property('tags').property(step.context.tag).should_match_regex('\${var.(.*)}')


@step(r'with {proto} protocol and not port {port} for {cidr}')
@step(u'with {proto} protocol and not port {port:d} for {cidr:ANY}')
def func(step, proto, port, cidr):
proto = str(proto)
port = int(port)
Expand Down
4 changes: 3 additions & 1 deletion terraform_compliance/steps/terrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@

@before.each_feature
def load_terraform_data(feature):
world.config.terraform = Validator(world.config.user_data['tf_dir'])
v = Validator(world.config.user_data['tf_dir'])
v.enable_variable_expansion()
world.config.terraform = v

0 comments on commit c9ea49e

Please sign in to comment.