diff --git a/testcases/cloud_user/aws_sigv4/autoscaling_tests.py b/testcases/cloud_user/aws_sigv4/autoscaling_tests.py new file mode 100644 index 00000000..d3f3c629 --- /dev/null +++ b/testcases/cloud_user/aws_sigv4/autoscaling_tests.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python +""" +Purpose: Testcase to demonstrate and confirm that AWS + Signature 4 is supported by Eucalyptus. + +Author: Harold Spencer Jr. (https://github.com/hspencer77) +""" +from eucaops import Eucaops +from eutester.eutestcase import EutesterTestCase +import datetime +import hashlib +import hmac +import requests +import xml.dom.minidom + + +class AutoScalingSigV4Test(EutesterTestCase): + def __init__(self, extra_args=None): + """ + Function to initialize testcase + for AWS SigV4 against AutoScaling Service + + param: ---credpath: path to directory + location of Eucalyptus credentials + """ + self.setuptestcase() + self.setup_parser() + self.parser.add_argument('--clean_on_exit', + action='store_true', default=True, + help=('Boolean, used to flag whether to' + + ' run clean up method after ' + + 'running test list')) + if extra_args: + for arg in extra_args: + self.parser.add_argument(arg) + self.get_args() + + self.tester = Eucaops(credpath=self.args.credpath, + config_file=self.args.config, + password=self.args.password) + + # Gather endpoint information for each region + self.regions = [] + for region in self.tester.ec2.get_all_regions(): + endpoint = str(region.endpoint) + as_endpoint = endpoint.replace('compute', + 'autoscaling') + region_info = {'name': str(region.name), + 'endpoint': as_endpoint} + self.regions.append(region_info) + + @classmethod + def assertEquals(cls, x, y, msg): + """ + Function to compare to values. + + param: x: first value to compare + param: y: second value to compare + param msg: additional message to add in case of error + """ + assert x == y, str(x) + ' is not equal to ' + str(y) + ': ' + msg + + def clean_method(self): + # Function to clean up artifacts + for region in self.regions: + del region + + def request_params(self): + """ + Function to return default request + parameters. + """ + method = 'GET' + service = 'autoscaling' + request_parameters = 'Action=DescribeAutoScalingGroups&Version=2011-01-01' + return (method, service, request_parameters) + + def sign(self, key, msg): + """ + Function to help create an signing key (HMAC). + For more information refer to + http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html + + param: key: key to use for signing + param: msg: sting to sign + """ + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() + + def getSignatureKey(self, key, datestamp, region, service): + """ + Function to generate signing key. For more information, refer + to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html. + + param: key: AWS secret key + param: datestamp: request date for generating signed date element + param: region: region for generating signed region element + param: service: service for generating signed service element + """ + sigDate = self.sign(('AWS4' + key).encode('utf-8'), datestamp) + sigRegion = self.sign(sigDate, region) + sigService = self.sign(sigRegion, service) + sigSigning = self.sign(sigService, 'aws4_request') + return sigSigning + + def createCanonicalRequest(self, method, request_parameters, + host, amzdate): + """ + Function to create canonical request. For more information, + refer to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-create-canonical-request.html + + param: method: HTTP request method + param: request_parameters: parameters for canonical + query string + param: host: service endpoint + param: amzdate: date used to create the signature + """ + canonical_uri = '/' + canonical_querystring = request_parameters + canonical_headers = ('host:' + host + '\n' + + 'x-amz-date:' + + amzdate + '\n') + signed_headers = 'host;x-amz-date' + payload_hash = hashlib.sha256('').hexdigest() + canonical_request = (method + '\n' + + canonical_uri + '\n' + + canonical_querystring + '\n' + + canonical_headers + '\n' + + signed_headers + '\n' + + payload_hash) + return canonical_request + + def createStringtoSign(self, datestamp, region, + service, amzdate, canonical_request): + """ + Function to create string to sign for SigV4. For more + information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-create-string-to-sign.html. + + param: datestamp: request date + param: region: region name + param: service: AWS Service + param: amzdate: x-amz-date header value + param: canonical_request: canonical request to be + added to the meta information in the request + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + region + + '/' + service + '/' + + 'aws4_request') + string_to_sign = (algorithm + '\n' + + amzdate + '\n' + + credential_scope + '\n' + + hashlib.sha256(canonical_request).hexdigest()) + return string_to_sign + + def createAuthHeader(self, datestamp, region, signature, service): + """ + Function to create Authorization header for the Service request. + For more information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-add-signature-to-request.html. + + param: datestamp: request date + param: region: region name + param: signature: signature to add to header + param: service: AWS Service + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + + region + '/' + + service + '/' + + 'aws4_request') + signed_headers = 'host;x-amz-date' + authorization_header = (algorithm + ' ' + + 'Credential=' + + self.tester.ec2.aws_access_key_id + '/' + + credential_scope + ', ' + + 'SignedHeaders=' + signed_headers + ', ' + + 'Signature=' + signature) + return authorization_header + + def sigV4Test(self): + """ + Function to execute testcase that deomnstrates + support for AWS signature 4 for AutoScaling Service. + For more information please refer to + http://docs.aws.amazon.com/general/ + latest/gr/signature-version-4.html + """ + for region in self.regions: + # Grab information from request parameters + (method, service, request_parameters) = self.request_params() + # Strip http:// or https:// from endpoint + host = region['endpoint'].strip('http://https://') + + # Define amzdate and datestamp off current time + t = datetime.datetime.utcnow() + amzdate = t.strftime('%Y%m%dT%H%M%SZ') + datestamp = t.strftime('%Y%m%d') + + # Create canonical request + canonical_request = self.createCanonicalRequest(method, + request_parameters, + host, + amzdate) + # Create sting to sign + string_to_sign = self.createStringtoSign(datestamp, + region['name'], + service, + amzdate, + canonical_request) + # Create signing key for signature + signing_key = self.getSignatureKey( + self.tester.ec2.aws_secret_access_key, + datestamp, + region['name'], + service) + # Calculate the signature + signature = hmac.new(signing_key, + (string_to_sign).encode('utf-8'), + hashlib.sha256).hexdigest() + # Add signing information to Authorization Header + authorization_header = self.createAuthHeader(datestamp, + region['name'], + signature, + service) + headers = {'x-amz-date': amzdate, + 'Authorization': authorization_header} + request_url = region['endpoint'] + '?' + request_parameters + self.tester.info("\nAWS SigV4 Compute Test " + + "against " + region['name'] + + " endpoint") + self.tester.info("\nBEGIN REQUEST +++++++++++++++++++++++++++++++") + self.tester.info("Request URL = " + request_url) + # Perform request + try: + sigv4_request = requests.get(request_url, + headers=headers, + timeout=5) + except requests.exceptions.ConnectionError: + self.tester.debug("Connection error occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.Timeout: + self.tester.debug("Connection timeout occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.RequestException as e: + self.tester.debug("Exception occurred " + + "during 'GET' request using " + + request_url + + ": " + e.message) + raise e + # Process response into readable XML + sigv4_xml_resp = xml.dom.minidom.parseString(sigv4_request.text) + sigv4_response = sigv4_xml_resp.toprettyxml() + self.tester.info("\nRESPONSE ++++++++++++++++++++++++++++++++++++") + self.tester.info("Response code: {code}\n{response}".format( + code=sigv4_request.status_code, + response=sigv4_response)) + self.assertEquals(sigv4_request.status_code, + 200, + "AWS SigV4 Request Failed.") + +if __name__ == "__main__": + # Define AutoScalingSigV4Test testcase + testcase = AutoScalingSigV4Test() + list = ['sigV4Test'] + unit_list = [] + for test in list: + unit_list.append(testcase.create_testunit_by_name(test)) + # Execute testcase + result = testcase.run_test_case_list( + unit_list, + clean_on_exit=testcase.args.clean_on_exit) + exit(result) diff --git a/testcases/cloud_user/aws_sigv4/cloudformation_tests.py b/testcases/cloud_user/aws_sigv4/cloudformation_tests.py new file mode 100644 index 00000000..68d07291 --- /dev/null +++ b/testcases/cloud_user/aws_sigv4/cloudformation_tests.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python +""" +Purpose: Testcase to demonstrate and confirm that AWS + Signature 4 is supported by Eucalyptus. + +Author: Harold Spencer Jr. (https://github.com/hspencer77) +""" +from eucaops import Eucaops +from eutester.eutestcase import EutesterTestCase +import datetime +import hashlib +import hmac +import requests +import xml.dom.minidom + + +class CloudformationSigV4Test(EutesterTestCase): + def __init__(self, extra_args=None): + """ + Function to initialize testcase + for AWS SigV4 against Cloudformation Service + + param: ---credpath: path to directory + location of Eucalyptus credentials + """ + self.setuptestcase() + self.setup_parser() + self.parser.add_argument('--clean_on_exit', + action='store_true', default=True, + help=('Boolean, used to flag whether to' + + ' run clean up method after ' + + 'running test list')) + if extra_args: + for arg in extra_args: + self.parser.add_argument(arg) + self.get_args() + + self.tester = Eucaops(credpath=self.args.credpath, + config_file=self.args.config, + password=self.args.password) + + # Gather endpoint information for each region + self.regions = [] + for region in self.tester.ec2.get_all_regions(): + endpoint = str(region.endpoint) + cfn_endpoint = endpoint.replace('compute', + 'cloudformation') + region_info = {'name': str(region.name), + 'endpoint': cfn_endpoint} + self.regions.append(region_info) + + @classmethod + def assertEquals(cls, x, y, msg): + """ + Function to compare to values. + + param: x: first value to compare + param: y: second value to compare + param msg: additional message to add in case of error + """ + assert x == y, str(x) + ' is not equal to ' + str(y) + ': ' + msg + + def clean_method(self): + # Function to clean up artifacts + for region in self.regions: + del region + + def request_params(self): + """ + Function to return default request + parameters. + """ + method = 'GET' + service = 'cloudformation' + request_parameters = 'Action=DescribeStacks&Version=2010-05-15' + return (method, service, request_parameters) + + def sign(self, key, msg): + """ + Function to help create an signing key (HMAC). + For more information refer to + http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html + + param: key: key to use for signing + param: msg: sting to sign + """ + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() + + def getSignatureKey(self, key, datestamp, region, service): + """ + Function to generate signing key. For more information, refer + to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html. + + param: key: AWS secret key + param: datestamp: request date for generating signed date element + param: region: region for generating signed region element + param: service: service for generating signed service element + """ + sigDate = self.sign(('AWS4' + key).encode('utf-8'), datestamp) + sigRegion = self.sign(sigDate, region) + sigService = self.sign(sigRegion, service) + sigSigning = self.sign(sigService, 'aws4_request') + return sigSigning + + def createCanonicalRequest(self, method, request_parameters, + host, amzdate): + """ + Function to create canonical request. For more information, + refer to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-create-canonical-request.html + + param: method: HTTP request method + param: request_parameters: parameters for canonical + query string + param: host: service endpoint + param: amzdate: date used to create the signature + """ + canonical_uri = '/' + canonical_querystring = request_parameters + canonical_headers = ('host:' + host + '\n' + + 'x-amz-date:' + + amzdate + '\n') + signed_headers = 'host;x-amz-date' + payload_hash = hashlib.sha256('').hexdigest() + canonical_request = (method + '\n' + + canonical_uri + '\n' + + canonical_querystring + '\n' + + canonical_headers + '\n' + + signed_headers + '\n' + + payload_hash) + return canonical_request + + def createStringtoSign(self, datestamp, region, + service, amzdate, canonical_request): + """ + Function to create string to sign for SigV4. For more + information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-create-string-to-sign.html. + + param: datestamp: request date + param: region: region name + param: service: AWS Service + param: amzdate: x-amz-date header value + param: canonical_request: canonical request to be + added to the meta information in the request + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + region + + '/' + service + '/' + + 'aws4_request') + string_to_sign = (algorithm + '\n' + + amzdate + '\n' + + credential_scope + '\n' + + hashlib.sha256(canonical_request).hexdigest()) + return string_to_sign + + def createAuthHeader(self, datestamp, region, signature, service): + """ + Function to create Authorization header for the Service request. + For more information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-add-signature-to-request.html. + + param: datestamp: request date + param: region: region name + param: signature: signature to add to header + param: service: AWS Service + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + + region + '/' + + service + '/' + + 'aws4_request') + signed_headers = 'host;x-amz-date' + authorization_header = (algorithm + ' ' + + 'Credential=' + + self.tester.ec2.aws_access_key_id + '/' + + credential_scope + ', ' + + 'SignedHeaders=' + signed_headers + ', ' + + 'Signature=' + signature) + return authorization_header + + def sigV4Test(self): + """ + Function to execute testcase that deomnstrates + support for AWS signature 4 for Cloudformation Service. + For more information please refer to + http://docs.aws.amazon.com/general/ + latest/gr/signature-version-4.html + """ + for region in self.regions: + # Grab information from request parameters + (method, service, request_parameters) = self.request_params() + # Strip http:// or https:// from endpoint + host = region['endpoint'].strip('http://https://') + + # Define amzdate and datestamp off current time + t = datetime.datetime.utcnow() + amzdate = t.strftime('%Y%m%dT%H%M%SZ') + datestamp = t.strftime('%Y%m%d') + + # Create canonical request + canonical_request = self.createCanonicalRequest(method, + request_parameters, + host, + amzdate) + # Create sting to sign + string_to_sign = self.createStringtoSign(datestamp, + region['name'], + service, + amzdate, + canonical_request) + # Create signing key for signature + signing_key = self.getSignatureKey( + self.tester.ec2.aws_secret_access_key, + datestamp, + region['name'], + service) + # Calculate the signature + signature = hmac.new(signing_key, + (string_to_sign).encode('utf-8'), + hashlib.sha256).hexdigest() + # Add signing information to Authorization Header + authorization_header = self.createAuthHeader(datestamp, + region['name'], + signature, + service) + headers = {'x-amz-date': amzdate, + 'Authorization': authorization_header} + request_url = region['endpoint'] + '?' + request_parameters + self.tester.info("\nAWS SigV4 Compute Test " + + "against " + region['name'] + + " endpoint") + self.tester.info("\nBEGIN REQUEST +++++++++++++++++++++++++++++++") + self.tester.info("Request URL = " + request_url) + # Perform request + try: + sigv4_request = requests.get(request_url, + headers=headers, + timeout=5) + except requests.exceptions.ConnectionError: + self.tester.debug("Connection error occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.Timeout: + self.tester.debug("Connection timeout occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.RequestException as e: + self.tester.debug("Exception occurred " + + "during 'GET' request using " + + request_url + + ": " + e.message) + raise e + # Process response into readable XML + sigv4_xml_resp = xml.dom.minidom.parseString(sigv4_request.text) + sigv4_response = sigv4_xml_resp.toprettyxml() + self.tester.info("\nRESPONSE ++++++++++++++++++++++++++++++++++++") + self.tester.info("Response code: {code}\n{response}".format( + code=sigv4_request.status_code, + response=sigv4_response)) + self.assertEquals(sigv4_request.status_code, + 200, + "AWS SigV4 Request Failed.") + +if __name__ == "__main__": + # Define CloudformationSigV4Test testcase + testcase = CloudformationSigV4Test() + list = ['sigV4Test'] + unit_list = [] + for test in list: + unit_list.append(testcase.create_testunit_by_name(test)) + # Execute testcase + result = testcase.run_test_case_list( + unit_list, + clean_on_exit=testcase.args.clean_on_exit) + exit(result) diff --git a/testcases/cloud_user/aws_sigv4/cloudwatch_tests.py b/testcases/cloud_user/aws_sigv4/cloudwatch_tests.py new file mode 100644 index 00000000..7793930f --- /dev/null +++ b/testcases/cloud_user/aws_sigv4/cloudwatch_tests.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python +""" +Purpose: Testcase to demonstrate and confirm that AWS + Signature 4 is supported by Eucalyptus. + +Author: Harold Spencer Jr. (https://github.com/hspencer77) +""" +from eucaops import Eucaops +from eutester.eutestcase import EutesterTestCase +import datetime +import hashlib +import hmac +import requests +import xml.dom.minidom + + +class CloudWatchSigV4Test(EutesterTestCase): + def __init__(self, extra_args=None): + """ + Function to initialize testcase + for AWS SigV4 against CloudWatch Service + + param: ---credpath: path to directory + location of Eucalyptus credentials + """ + self.setuptestcase() + self.setup_parser() + self.parser.add_argument('--clean_on_exit', + action='store_true', default=True, + help=('Boolean, used to flag whether to' + + ' run clean up method after ' + + 'running test list')) + if extra_args: + for arg in extra_args: + self.parser.add_argument(arg) + self.get_args() + + self.tester = Eucaops(credpath=self.args.credpath, + config_file=self.args.config, + password=self.args.password) + + # Gather endpoint information for each region + self.regions = [] + for region in self.tester.ec2.get_all_regions(): + endpoint = str(region.endpoint) + cw_endpoint = endpoint.replace('compute', + 'monitoring') + region_info = {'name': str(region.name), + 'endpoint': cw_endpoint} + self.regions.append(region_info) + + @classmethod + def assertEquals(cls, x, y, msg): + """ + Function to compare to values. + + param: x: first value to compare + param: y: second value to compare + param msg: additional message to add in case of error + """ + assert x == y, str(x) + ' is not equal to ' + str(y) + ': ' + msg + + def clean_method(self): + # Function to clean up artifacts + for region in self.regions: + del region + + def request_params(self): + """ + Function to return default request + parameters. + """ + method = 'GET' + service = 'cloudwatch' + request_parameters = 'Action=ListMetrics&Version=2010-08-01' + return (method, service, request_parameters) + + def sign(self, key, msg): + """ + Function to help create an signing key (HMAC). + For more information refer to + http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html + + param: key: key to use for signing + param: msg: sting to sign + """ + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() + + def getSignatureKey(self, key, datestamp, region, service): + """ + Function to generate signing key. For more information, refer + to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html. + + param: key: AWS secret key + param: datestamp: request date for generating signed date element + param: region: region for generating signed region element + param: service: service for generating signed service element + """ + sigDate = self.sign(('AWS4' + key).encode('utf-8'), datestamp) + sigRegion = self.sign(sigDate, region) + sigService = self.sign(sigRegion, service) + sigSigning = self.sign(sigService, 'aws4_request') + return sigSigning + + def createCanonicalRequest(self, method, request_parameters, + host, amzdate): + """ + Function to create canonical request. For more information, + refer to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-create-canonical-request.html + + param: method: HTTP request method + param: request_parameters: parameters for canonical + query string + param: host: service endpoint + param: amzdate: date used to create the signature + """ + canonical_uri = '/' + canonical_querystring = request_parameters + canonical_headers = ('host:' + host + '\n' + + 'x-amz-date:' + + amzdate + '\n') + signed_headers = 'host;x-amz-date' + payload_hash = hashlib.sha256('').hexdigest() + canonical_request = (method + '\n' + + canonical_uri + '\n' + + canonical_querystring + '\n' + + canonical_headers + '\n' + + signed_headers + '\n' + + payload_hash) + return canonical_request + + def createStringtoSign(self, datestamp, region, + service, amzdate, canonical_request): + """ + Function to create string to sign for SigV4. For more + information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-create-string-to-sign.html. + + param: datestamp: request date + param: region: region name + param: service: AWS Service + param: amzdate: x-amz-date header value + param: canonical_request: canonical request to be + added to the meta information in the request + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + region + + '/' + service + '/' + + 'aws4_request') + string_to_sign = (algorithm + '\n' + + amzdate + '\n' + + credential_scope + '\n' + + hashlib.sha256(canonical_request).hexdigest()) + return string_to_sign + + def createAuthHeader(self, datestamp, region, signature, service): + """ + Function to create Authorization header for the Service request. + For more information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-add-signature-to-request.html. + + param: datestamp: request date + param: region: region name + param: signature: signature to add to header + param: service: AWS Service + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + + region + '/' + + service + '/' + + 'aws4_request') + signed_headers = 'host;x-amz-date' + authorization_header = (algorithm + ' ' + + 'Credential=' + + self.tester.ec2.aws_access_key_id + '/' + + credential_scope + ', ' + + 'SignedHeaders=' + signed_headers + ', ' + + 'Signature=' + signature) + return authorization_header + + def sigV4Test(self): + """ + Function to execute testcase that deomnstrates + support for AWS signature 4 for CloudWatch Service. + For more information please refer to + http://docs.aws.amazon.com/general/ + latest/gr/signature-version-4.html + """ + for region in self.regions: + # Grab information from request parameters + (method, service, request_parameters) = self.request_params() + # Strip http:// or https:// from endpoint + host = region['endpoint'].strip('http://https://') + + # Define amzdate and datestamp off current time + t = datetime.datetime.utcnow() + amzdate = t.strftime('%Y%m%dT%H%M%SZ') + datestamp = t.strftime('%Y%m%d') + + # Create canonical request + canonical_request = self.createCanonicalRequest(method, + request_parameters, + host, + amzdate) + # Create sting to sign + string_to_sign = self.createStringtoSign(datestamp, + region['name'], + service, + amzdate, + canonical_request) + # Create signing key for signature + signing_key = self.getSignatureKey( + self.tester.ec2.aws_secret_access_key, + datestamp, + region['name'], + service) + # Calculate the signature + signature = hmac.new(signing_key, + (string_to_sign).encode('utf-8'), + hashlib.sha256).hexdigest() + # Add signing information to Authorization Header + authorization_header = self.createAuthHeader(datestamp, + region['name'], + signature, + service) + headers = {'x-amz-date': amzdate, + 'Authorization': authorization_header} + request_url = region['endpoint'] + '?' + request_parameters + self.tester.info("\nAWS SigV4 Compute Test " + + "against " + region['name'] + + " endpoint") + self.tester.info("\nBEGIN REQUEST +++++++++++++++++++++++++++++++") + self.tester.info("Request URL = " + request_url) + # Perform request + try: + sigv4_request = requests.get(request_url, + headers=headers, + timeout=5) + except requests.exceptions.ConnectionError: + self.tester.debug("Connection error occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.Timeout: + self.tester.debug("Connection timeout occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.RequestException as e: + self.tester.debug("Exception occurred " + + "during 'GET' request using " + + request_url + + ": " + e.message) + raise e + # Process response into readable XML + sigv4_xml_resp = xml.dom.minidom.parseString(sigv4_request.text) + sigv4_response = sigv4_xml_resp.toprettyxml() + self.tester.info("\nRESPONSE ++++++++++++++++++++++++++++++++++++") + self.tester.info("Response code: {code}\n{response}".format( + code=sigv4_request.status_code, + response=sigv4_response)) + self.assertEquals(sigv4_request.status_code, + 200, + "AWS SigV4 Request Failed.") + +if __name__ == "__main__": + # Define CloudWatchSigV4Test testcase + testcase = CloudWatchSigV4Test() + list = ['sigV4Test'] + unit_list = [] + for test in list: + unit_list.append(testcase.create_testunit_by_name(test)) + # Execute testcase + result = testcase.run_test_case_list( + unit_list, + clean_on_exit=testcase.args.clean_on_exit) + exit(result) diff --git a/testcases/cloud_user/aws_sigv4/compute_tests.py b/testcases/cloud_user/aws_sigv4/compute_tests.py new file mode 100644 index 00000000..417a135f --- /dev/null +++ b/testcases/cloud_user/aws_sigv4/compute_tests.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +""" +Purpose: Testcase to demonstrate and confirm that AWS + Signature 4 is supported by Eucalyptus. + +Author: Harold Spencer Jr. (https://github.com/hspencer77) +""" +from eucaops import Eucaops +from eutester.eutestcase import EutesterTestCase +import datetime +import hashlib +import hmac +import requests +import xml.dom.minidom + + +class ComputeSigV4Test(EutesterTestCase): + def __init__(self, extra_args=None): + """ + Function to initialize testcase + for AWS SigV4 against Compute (EC2) Service + + param: ---credpath: path to directory + location of Eucalyptus credentials + """ + self.setuptestcase() + self.setup_parser() + self.parser.add_argument('--clean_on_exit', + action='store_true', default=True, + help=('Boolean, used to flag whether to' + + ' run clean up method after ' + + 'running test list')) + if extra_args: + for arg in extra_args: + self.parser.add_argument(arg) + self.get_args() + + self.tester = Eucaops(credpath=self.args.credpath, + config_file=self.args.config, + password=self.args.password) + + # Gather endpoint information for each region + self.regions = [] + for region in self.tester.ec2.get_all_regions(): + region_info = {'name': str(region.name), + 'endpoint': str(region.endpoint)} + self.regions.append(region_info) + + @classmethod + def assertEquals(cls, x, y, msg): + """ + Function to compare to values. + + param: x: first value to compare + param: y: second value to compare + param msg: additional message to add in case of error + """ + assert x == y, str(x) + ' is not equal to ' + str(y) + ': ' + msg + + def clean_method(self): + # Function to clean up artifacts + for region in self.regions: + del region + + def request_params(self): + """ + Function to return default request + parameters. + """ + method = 'GET' + service = 'ec2' + request_parameters = 'Action=DescribeRegions&Version=2013-10-15' + return (method, service, request_parameters) + + def sign(self, key, msg): + """ + Function to help create an signing key (HMAC). + For more information refer to + http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html + + param: key: key to use for signing + param: msg: sting to sign + """ + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() + + def getSignatureKey(self, key, datestamp, region, service): + """ + Function to generate signing key. For more information, refer + to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html. + + param: key: AWS secret key + param: datestamp: request date for generating signed date element + param: region: region for generating signed region element + param: service: service for generating signed service element + """ + sigDate = self.sign(('AWS4' + key).encode('utf-8'), datestamp) + sigRegion = self.sign(sigDate, region) + sigService = self.sign(sigRegion, service) + sigSigning = self.sign(sigService, 'aws4_request') + return sigSigning + + def createCanonicalRequest(self, method, request_parameters, + host, amzdate): + """ + Function to create canonical request. For more information, + refer to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-create-canonical-request.html + + param: method: HTTP request method + param: request_parameters: parameters for canonical + query string + param: host: service endpoint + param: amzdate: date used to create the signature + """ + canonical_uri = '/' + canonical_querystring = request_parameters + canonical_headers = ('host:' + host + '\n' + + 'x-amz-date:' + + amzdate + '\n') + signed_headers = 'host;x-amz-date' + payload_hash = hashlib.sha256('').hexdigest() + canonical_request = (method + '\n' + + canonical_uri + '\n' + + canonical_querystring + '\n' + + canonical_headers + '\n' + + signed_headers + '\n' + + payload_hash) + return canonical_request + + def createStringtoSign(self, datestamp, region, + service, amzdate, canonical_request): + """ + Function to create string to sign for SigV4. For more + information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-create-string-to-sign.html. + + param: datestamp: request date + param: region: region name + param: service: AWS Service + param: amzdate: x-amz-date header value + param: canonical_request: canonical request to be + added to the meta information in the request + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + region + + '/' + service + '/' + + 'aws4_request') + string_to_sign = (algorithm + '\n' + + amzdate + '\n' + + credential_scope + '\n' + + hashlib.sha256(canonical_request).hexdigest()) + return string_to_sign + + def createAuthHeader(self, datestamp, region, signature, service): + """ + Function to create Authorization header for the Service request. + For more information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-add-signature-to-request.html. + + param: datestamp: request date + param: region: region name + param: signature: signature to add to header + param: service: AWS Service + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + + region + '/' + + service + '/' + + 'aws4_request') + signed_headers = 'host;x-amz-date' + authorization_header = (algorithm + ' ' + + 'Credential=' + + self.tester.ec2.aws_access_key_id + '/' + + credential_scope + ', ' + + 'SignedHeaders=' + signed_headers + ', ' + + 'Signature=' + signature) + return authorization_header + + def sigV4Test(self): + """ + Function to execute testcase that deomnstrates + support for AWS signature 4 for Compute (EC2) Service. + For more information please refer to + http://docs.aws.amazon.com/general/ + latest/gr/signature-version-4.html + """ + for region in self.regions: + # Grab information from request parameters + (method, service, request_parameters) = self.request_params() + # Strip http:// from endpoint + host = region['endpoint'].strip('http://https://') + + # Define amzdate and datestamp off current time + t = datetime.datetime.utcnow() + amzdate = t.strftime('%Y%m%dT%H%M%SZ') + datestamp = t.strftime('%Y%m%d') + + # Create canonical request + canonical_request = self.createCanonicalRequest(method, + request_parameters, + host, + amzdate) + # Create sting to sign + string_to_sign = self.createStringtoSign(datestamp, + region['name'], + service, + amzdate, + canonical_request) + # Create signing key for signature + signing_key = self.getSignatureKey( + self.tester.ec2.aws_secret_access_key, + datestamp, + region['name'], + service) + # Calculate the signature + signature = hmac.new(signing_key, + (string_to_sign).encode('utf-8'), + hashlib.sha256).hexdigest() + # Add signing information to Authorization Header + authorization_header = self.createAuthHeader(datestamp, + region['name'], + signature, + service) + headers = {'x-amz-date': amzdate, + 'Authorization': authorization_header} + request_url = region['endpoint'] + '?' + request_parameters + self.tester.info("\nAWS SigV4 Compute Test " + + "against " + region['name'] + + " endpoint") + self.tester.info("\nBEGIN REQUEST +++++++++++++++++++++++++++++++") + self.tester.info("Request URL = " + request_url) + # Perform request + try: + sigv4_request = requests.get(request_url, + headers=headers, + timeout=5) + except requests.exceptions.ConnectionError: + self.tester.debug("Connection error occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.Timeout: + self.tester.debug("Connection timeout occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.RequestException as e: + self.tester.debug("Exception occurred " + + "during 'GET' request using " + + request_url + + ": " + e.message) + raise e + # Process response into readable XML + sigv4_xml_resp = xml.dom.minidom.parseString(sigv4_request.text) + sigv4_response = sigv4_xml_resp.toprettyxml() + self.tester.info("\nRESPONSE ++++++++++++++++++++++++++++++++++++") + self.tester.info("Response code: {code}\n{response}".format( + code=sigv4_request.status_code, + response=sigv4_response)) + self.assertEquals(sigv4_request.status_code, + 200, + "AWS SigV4 Request Failed.") + +if __name__ == "__main__": + # Define ComputeSigV4Test testcase + testcase = ComputeSigV4Test() + list = ['sigV4Test'] + unit_list = [] + for test in list: + unit_list.append(testcase.create_testunit_by_name(test)) + # Execute testcase + result = testcase.run_test_case_list( + unit_list, + clean_on_exit=testcase.args.clean_on_exit) + exit(result) diff --git a/testcases/cloud_user/aws_sigv4/elasticloadbalancing_tests.py b/testcases/cloud_user/aws_sigv4/elasticloadbalancing_tests.py new file mode 100644 index 00000000..b9733608 --- /dev/null +++ b/testcases/cloud_user/aws_sigv4/elasticloadbalancing_tests.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python +""" +Purpose: Testcase to demonstrate and confirm that AWS + Signature 4 is supported by Eucalyptus. + +Author: Harold Spencer Jr. (https://github.com/hspencer77) +""" +from eucaops import Eucaops +from eutester.eutestcase import EutesterTestCase +import datetime +import hashlib +import hmac +import requests +import xml.dom.minidom + + +class ElasticLoadBalancingSigV4Test(EutesterTestCase): + def __init__(self, extra_args=None): + """ + Function to initialize testcase + for AWS SigV4 against ElasticLoadBalancing Service + + param: ---credpath: path to directory + location of Eucalyptus credentials + """ + self.setuptestcase() + self.setup_parser() + self.parser.add_argument('--clean_on_exit', + action='store_true', default=True, + help=('Boolean, used to flag whether to' + + ' run clean up method after ' + + 'running test list')) + if extra_args: + for arg in extra_args: + self.parser.add_argument(arg) + self.get_args() + + self.tester = Eucaops(credpath=self.args.credpath, + config_file=self.args.config, + password=self.args.password) + + # Gather endpoint information for each region + self.regions = [] + for region in self.tester.ec2.get_all_regions(): + endpoint = str(region.endpoint) + elb_endpoint = endpoint.replace('compute', + 'loadbalancing') + region_info = {'name': str(region.name), + 'endpoint': elb_endpoint} + self.regions.append(region_info) + + @classmethod + def assertEquals(cls, x, y, msg): + """ + Function to compare to values. + + param: x: first value to compare + param: y: second value to compare + param msg: additional message to add in case of error + """ + assert x == y, str(x) + ' is not equal to ' + str(y) + ': ' + msg + + def clean_method(self): + # Function to clean up artifacts + for region in self.regions: + del region + + def request_params(self): + """ + Function to return default request + parameters. + """ + method = 'GET' + service = 'elasticloadbalancing' + request_parameters = 'Action=DescribeLoadBalancerPolicyTypes&Version=2012-06-01' + return (method, service, request_parameters) + + def sign(self, key, msg): + """ + Function to help create an signing key (HMAC). + For more information refer to + http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html + + param: key: key to use for signing + param: msg: sting to sign + """ + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() + + def getSignatureKey(self, key, datestamp, region, service): + """ + Function to generate signing key. For more information, refer + to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html. + + param: key: AWS secret key + param: datestamp: request date for generating signed date element + param: region: region for generating signed region element + param: service: service for generating signed service element + """ + sigDate = self.sign(('AWS4' + key).encode('utf-8'), datestamp) + sigRegion = self.sign(sigDate, region) + sigService = self.sign(sigRegion, service) + sigSigning = self.sign(sigService, 'aws4_request') + return sigSigning + + def createCanonicalRequest(self, method, request_parameters, + host, amzdate): + """ + Function to create canonical request. For more information, + refer to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-create-canonical-request.html + + param: method: HTTP request method + param: request_parameters: parameters for canonical + query string + param: host: service endpoint + param: amzdate: date used to create the signature + """ + canonical_uri = '/' + canonical_querystring = request_parameters + canonical_headers = ('host:' + host + '\n' + + 'x-amz-date:' + + amzdate + '\n') + signed_headers = 'host;x-amz-date' + payload_hash = hashlib.sha256('').hexdigest() + canonical_request = (method + '\n' + + canonical_uri + '\n' + + canonical_querystring + '\n' + + canonical_headers + '\n' + + signed_headers + '\n' + + payload_hash) + return canonical_request + + def createStringtoSign(self, datestamp, region, + service, amzdate, canonical_request): + """ + Function to create string to sign for SigV4. For more + information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-create-string-to-sign.html. + + param: datestamp: request date + param: region: region name + param: service: AWS Service + param: amzdate: x-amz-date header value + param: canonical_request: canonical request to be + added to the meta information in the request + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + region + + '/' + service + '/' + + 'aws4_request') + string_to_sign = (algorithm + '\n' + + amzdate + '\n' + + credential_scope + '\n' + + hashlib.sha256(canonical_request).hexdigest()) + return string_to_sign + + def createAuthHeader(self, datestamp, region, signature, service): + """ + Function to create Authorization header for the Service request. + For more information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-add-signature-to-request.html. + + param: datestamp: request date + param: region: region name + param: signature: signature to add to header + param: service: AWS Service + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + + region + '/' + + service + '/' + + 'aws4_request') + signed_headers = 'host;x-amz-date' + authorization_header = (algorithm + ' ' + + 'Credential=' + + self.tester.ec2.aws_access_key_id + '/' + + credential_scope + ', ' + + 'SignedHeaders=' + signed_headers + ', ' + + 'Signature=' + signature) + return authorization_header + + def sigV4Test(self): + """ + Function to execute testcase that deomnstrates + support for AWS signature 4 for ElasticLoadBalancing Service. + For more information please refer to + http://docs.aws.amazon.com/general/ + latest/gr/signature-version-4.html + """ + for region in self.regions: + # Grab information from request parameters + (method, service, request_parameters) = self.request_params() + # Strip http:// or https:// from endpoint + host = region['endpoint'].strip('http://https://') + + # Define amzdate and datestamp off current time + t = datetime.datetime.utcnow() + amzdate = t.strftime('%Y%m%dT%H%M%SZ') + datestamp = t.strftime('%Y%m%d') + + # Create canonical request + canonical_request = self.createCanonicalRequest(method, + request_parameters, + host, + amzdate) + # Create sting to sign + string_to_sign = self.createStringtoSign(datestamp, + region['name'], + service, + amzdate, + canonical_request) + # Create signing key for signature + signing_key = self.getSignatureKey( + self.tester.ec2.aws_secret_access_key, + datestamp, + region['name'], + service) + # Calculate the signature + signature = hmac.new(signing_key, + (string_to_sign).encode('utf-8'), + hashlib.sha256).hexdigest() + # Add signing information to Authorization Header + authorization_header = self.createAuthHeader(datestamp, + region['name'], + signature, + service) + headers = {'x-amz-date': amzdate, + 'Authorization': authorization_header} + request_url = region['endpoint'] + '?' + request_parameters + self.tester.info("\nAWS SigV4 Compute Test " + + "against " + region['name'] + + " endpoint") + self.tester.info("\nBEGIN REQUEST +++++++++++++++++++++++++++++++") + self.tester.info("Request URL = " + request_url) + # Perform request + try: + sigv4_request = requests.get(request_url, + headers=headers, + timeout=5) + except requests.exceptions.ConnectionError: + self.tester.debug("Connection error occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.Timeout: + self.tester.debug("Connection timeout occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.RequestException as e: + self.tester.debug("Exception occurred " + + "during 'GET' request using " + + request_url + + ": " + e.message) + raise e + # Process response into readable XML + sigv4_xml_resp = xml.dom.minidom.parseString(sigv4_request.text) + sigv4_response = sigv4_xml_resp.toprettyxml() + self.tester.info("\nRESPONSE ++++++++++++++++++++++++++++++++++++") + self.tester.info("Response code: {code}\n{response}".format( + code=sigv4_request.status_code, + response=sigv4_response)) + self.assertEquals(sigv4_request.status_code, + 200, + "AWS SigV4 Request Failed.") + +if __name__ == "__main__": + # Define ElasticLoadBalancingSigV4Test testcase + testcase = ElasticLoadBalancingSigV4Test() + list = ['sigV4Test'] + unit_list = [] + for test in list: + unit_list.append(testcase.create_testunit_by_name(test)) + # Execute testcase + result = testcase.run_test_case_list( + unit_list, + clean_on_exit=testcase.args.clean_on_exit) + exit(result) diff --git a/testcases/cloud_user/aws_sigv4/iam_tests.py b/testcases/cloud_user/aws_sigv4/iam_tests.py new file mode 100644 index 00000000..19d1d05d --- /dev/null +++ b/testcases/cloud_user/aws_sigv4/iam_tests.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python +""" +Purpose: Testcase to demonstrate and confirm that AWS + Signature 4 is supported by Eucalyptus. + +Author: Harold Spencer Jr. (https://github.com/hspencer77) +""" +from eucaops import Eucaops +from eutester.eutestcase import EutesterTestCase +import datetime +import hashlib +import hmac +import requests +import xml.dom.minidom + + +class IAMSigV4Test(EutesterTestCase): + def __init__(self, extra_args=None): + """ + Function to initialize testcase + for AWS SigV4 against IAM Service + + param: ---credpath: path to directory + location of Eucalyptus credentials + """ + self.setuptestcase() + self.setup_parser() + self.parser.add_argument('--clean_on_exit', + action='store_true', default=True, + help=('Boolean, used to flag whether to' + + ' run clean up method after ' + + 'running test list')) + if extra_args: + for arg in extra_args: + self.parser.add_argument(arg) + self.get_args() + + self.tester = Eucaops(credpath=self.args.credpath, + config_file=self.args.config, + password=self.args.password) + + # Gather endpoint information for each region + self.regions = [] + for region in self.tester.ec2.get_all_regions(): + endpoint = str(region.endpoint) + iam_endpoint = endpoint.replace('compute', + 'euare') + region_info = {'name': str(region.name), + 'endpoint': iam_endpoint} + self.regions.append(region_info) + + @classmethod + def assertEquals(cls, x, y, msg): + """ + Function to compare to values. + + param: x: first value to compare + param: y: second value to compare + param msg: additional message to add in case of error + """ + assert x == y, str(x) + ' is not equal to ' + str(y) + ': ' + msg + + def clean_method(self): + # Function to clean up artifacts + for region in self.regions: + del region + + def request_params(self): + """ + Function to return default request + parameters. + """ + method = 'GET' + service = 'iam' + request_parameters = 'Action=GetAccountSummary&Version=2010-05-08' + return (method, service, request_parameters) + + def sign(self, key, msg): + """ + Function to help create an signing key (HMAC). + For more information refer to + http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html + + param: key: key to use for signing + param: msg: sting to sign + """ + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() + + def getSignatureKey(self, key, datestamp, region, service): + """ + Function to generate signing key. For more information, refer + to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-calculate-signature.html. + + param: key: AWS secret key + param: datestamp: request date for generating signed date element + param: region: region for generating signed region element + param: service: service for generating signed service element + """ + sigDate = self.sign(('AWS4' + key).encode('utf-8'), datestamp) + sigRegion = self.sign(sigDate, region) + sigService = self.sign(sigRegion, service) + sigSigning = self.sign(sigService, 'aws4_request') + return sigSigning + + def createCanonicalRequest(self, method, request_parameters, + host, amzdate): + """ + Function to create canonical request. For more information, + refer to http://docs.aws.amazon.com/general/latest/ + gr/sigv4-create-canonical-request.html + + param: method: HTTP request method + param: request_parameters: parameters for canonical + query string + param: host: service endpoint + param: amzdate: date used to create the signature + """ + canonical_uri = '/' + canonical_querystring = request_parameters + canonical_headers = ('host:' + host + '\n' + + 'x-amz-date:' + + amzdate + '\n') + signed_headers = 'host;x-amz-date' + payload_hash = hashlib.sha256('').hexdigest() + canonical_request = (method + '\n' + + canonical_uri + '\n' + + canonical_querystring + '\n' + + canonical_headers + '\n' + + signed_headers + '\n' + + payload_hash) + return canonical_request + + def createStringtoSign(self, datestamp, region, + service, amzdate, canonical_request): + """ + Function to create string to sign for SigV4. For more + information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-create-string-to-sign.html. + + param: datestamp: request date + param: region: region name + param: service: AWS Service + param: amzdate: x-amz-date header value + param: canonical_request: canonical request to be + added to the meta information in the request + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + region + + '/' + service + '/' + + 'aws4_request') + string_to_sign = (algorithm + '\n' + + amzdate + '\n' + + credential_scope + '\n' + + hashlib.sha256(canonical_request).hexdigest()) + return string_to_sign + + def createAuthHeader(self, datestamp, region, signature, service): + """ + Function to create Authorization header for the Service request. + For more information refer to http://docs.aws.amazon.com/general/ + latest/gr/sigv4-add-signature-to-request.html. + + param: datestamp: request date + param: region: region name + param: signature: signature to add to header + param: service: AWS Service + """ + algorithm = 'AWS4-HMAC-SHA256' + credential_scope = (datestamp + '/' + + region + '/' + + service + '/' + + 'aws4_request') + signed_headers = 'host;x-amz-date' + authorization_header = (algorithm + ' ' + + 'Credential=' + + self.tester.ec2.aws_access_key_id + '/' + + credential_scope + ', ' + + 'SignedHeaders=' + signed_headers + ', ' + + 'Signature=' + signature) + return authorization_header + + def sigV4Test(self): + """ + Function to execute testcase that deomnstrates + support for AWS signature 4 for IAM Service. + For more information please refer to + http://docs.aws.amazon.com/general/ + latest/gr/signature-version-4.html + """ + for region in self.regions: + # Grab information from request parameters + (method, service, request_parameters) = self.request_params() + # Strip http:// or https:// from endpoint + host = region['endpoint'].strip('http://https://') + + # Define amzdate and datestamp off current time + t = datetime.datetime.utcnow() + amzdate = t.strftime('%Y%m%dT%H%M%SZ') + datestamp = t.strftime('%Y%m%d') + + # Create canonical request + canonical_request = self.createCanonicalRequest(method, + request_parameters, + host, + amzdate) + # Create sting to sign + string_to_sign = self.createStringtoSign(datestamp, + region['name'], + service, + amzdate, + canonical_request) + # Create signing key for signature + signing_key = self.getSignatureKey( + self.tester.ec2.aws_secret_access_key, + datestamp, + region['name'], + service) + # Calculate the signature + signature = hmac.new(signing_key, + (string_to_sign).encode('utf-8'), + hashlib.sha256).hexdigest() + # Add signing information to Authorization Header + authorization_header = self.createAuthHeader(datestamp, + region['name'], + signature, + service) + headers = {'x-amz-date': amzdate, + 'Authorization': authorization_header} + request_url = region['endpoint'] + '?' + request_parameters + self.tester.info("\nAWS SigV4 Compute Test " + + "against " + region['name'] + + " endpoint") + self.tester.info("\nBEGIN REQUEST +++++++++++++++++++++++++++++++") + self.tester.info("Request URL = " + request_url) + # Perform request + try: + sigv4_request = requests.get(request_url, + headers=headers, + timeout=5) + except requests.exceptions.ConnectionError: + self.tester.debug("Connection error occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.Timeout: + self.tester.debug("Connection timeout occurred using " + + "endpoint " + + region['endpoint']) + except requests.exceptions.RequestException as e: + self.tester.debug("Exception occurred " + + "during 'GET' request using " + + request_url + + ": " + e.message) + raise e + # Process response into readable XML + sigv4_xml_resp = xml.dom.minidom.parseString(sigv4_request.text) + sigv4_response = sigv4_xml_resp.toprettyxml() + self.tester.info("\nRESPONSE ++++++++++++++++++++++++++++++++++++") + self.tester.info("Response code: {code}\n{response}".format( + code=sigv4_request.status_code, + response=sigv4_response)) + self.assertEquals(sigv4_request.status_code, + 200, + "AWS SigV4 Request Failed.") + +if __name__ == "__main__": + # Define IAMSigV4Test testcase + testcase = IAMSigV4Test() + list = ['sigV4Test'] + unit_list = [] + for test in list: + unit_list.append(testcase.create_testunit_by_name(test)) + # Execute testcase + result = testcase.run_test_case_list( + unit_list, + clean_on_exit=testcase.args.clean_on_exit) + exit(result)