Sophie

Sophie

distrib > Fedora > 19 > i386 > media > updates-src > by-pkgid > ba59a791bd2083edb232ab94febba914 > files > 4

python-keystoneclient-0.2.3-7.fc19.src.rpm

From 34d9cd1795002f4e26d5c603e3dc1a08d02dfa1b Mon Sep 17 00:00:00 2001
From: Steven Hardy <shardy@redhat.com>
Date: Wed, 3 Apr 2013 17:14:30 +0100
Subject: [PATCH] Ec2Signer: Initial support for v4 signature verification

Adds initial support for verifying AWS v4 signatures, tested with
the latest boto trunk (which now uses v4 signatures by default)

Change-Id: Id163363e259cf08aa251a7a00ff4293b742cbef6
blueprint: ec2signer-v4signatures
(cherry picked from commit 5c37d85944d9eed73ec6dd6254842108386bcc4f)
---
 keystoneclient/contrib/ec2/utils.py | 196 ++++++++++++++++++++++++++++++++----
 tests/test_ec2utils.py              |  72 ++++++++++++-
 2 files changed, 249 insertions(+), 19 deletions(-)

diff --git a/keystoneclient/contrib/ec2/utils.py b/keystoneclient/contrib/ec2/utils.py
index fcd8ee3..ca5afa7 100644
--- a/keystoneclient/contrib/ec2/utils.py
+++ b/keystoneclient/contrib/ec2/utils.py
@@ -32,24 +32,69 @@ class Ec2Signer(object):
     """
 
     def __init__(self, secret_key):
-        secret_key = secret_key.encode()
-        self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1)
+        self.secret_key = secret_key.encode()
+        self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1)
         if hashlib.sha256:
-            self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256)
+            self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256)
+
+    def _v4_creds(self, credentials):
+        """
+        Detect if the credentials are for a v4 signed request, since AWS
+        removed the SignatureVersion field from the v4 request spec...
+        This expects a dict of the request headers to be passed in the
+        credentials dict, since the recommended way to pass v4 creds is
+        via the 'Authorization' header
+        see http://docs.aws.amazon.com/general/latest/gr/
+            sigv4-signed-request-examples.html
+
+        Alternatively X-Amz-Algorithm can be specified as a query parameter,
+        and the authentication data can also passed as query parameters.
+
+        Note a hash of the request body is also required in the credentials
+        for v4 auth to work in the body_hash key, calculated via:
+        hashlib.sha256(req.body).hexdigest()
+        """
+        try:
+            auth_str = credentials['headers']['Authorization']
+            if auth_str.startswith('AWS4-HMAC-SHA256'):
+                return True
+        except KeyError:
+            # Alternatively the Authorization data can be passed via
+            # the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256
+            try:
+                if (credentials['params']['X-Amz-Algorithm'] ==
+                    'AWS4-HMAC-SHA256'):
+                    return True
+            except KeyError:
+                pass
+
+        return False
 
     def generate(self, credentials):
         """Generate auth string according to what SignatureVersion is given."""
-        if credentials['params']['SignatureVersion'] == '0':
+        signature_version = credentials['params'].get('SignatureVersion')
+        if signature_version == '0':
             return self._calc_signature_0(credentials['params'])
-        if credentials['params']['SignatureVersion'] == '1':
+        if signature_version == '1':
             return self._calc_signature_1(credentials['params'])
-        if credentials['params']['SignatureVersion'] == '2':
+        if signature_version == '2':
             return self._calc_signature_2(credentials['params'],
                                           credentials['verb'],
                                           credentials['host'],
                                           credentials['path'])
-        raise Exception('Unknown Signature Version: %s' %
-                        credentials['params']['SignatureVersion'])
+        if self._v4_creds(credentials):
+            return self._calc_signature_4(credentials['params'],
+                                          credentials['verb'],
+                                          credentials['host'],
+                                          credentials['path'],
+                                          credentials['headers'],
+                                          credentials['body_hash'])
+
+        if signature_version is not None:
+            raise Exception('Unknown signature version: %s' %
+                            signature_version)
+        else:
+            raise Exception('Unexpected signature format')
 
     @staticmethod
     def _get_utf8_value(value):
@@ -77,6 +122,22 @@ class Ec2Signer(object):
             self.hmac.update(val)
         return base64.b64encode(self.hmac.digest())
 
+    @staticmethod
+    def _canonical_qs(params):
+        """
+        Construct a sorted, correctly encoded query string as required for
+        _calc_signature_2 and _calc_signature_4
+        """
+        keys = params.keys()
+        keys.sort()
+        pairs = []
+        for key in keys:
+            val = Ec2Signer._get_utf8_value(params[key])
+            val = urllib.quote(val, safe='-_~')
+            pairs.append(urllib.quote(key, safe='') + '=' + val)
+        qs = '&'.join(pairs)
+        return qs
+
     def _calc_signature_2(self, params, verb, server_string, path):
         """Generate AWS signature version 2 string."""
         string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
@@ -86,15 +147,116 @@ class Ec2Signer(object):
         else:
             current_hmac = self.hmac
             params['SignatureMethod'] = 'HmacSHA1'
-        keys = params.keys()
-        keys.sort()
-        pairs = []
-        for key in keys:
-            val = self._get_utf8_value(params[key])
-            val = urllib.quote(val, safe='-_~')
-            pairs.append(urllib.quote(key, safe='') + '=' + val)
-        qs = '&'.join(pairs)
-        string_to_sign += qs
+        string_to_sign += self._canonical_qs(params)
         current_hmac.update(string_to_sign)
         b64 = base64.b64encode(current_hmac.digest())
         return b64
+
+    def _calc_signature_4(self, params, verb, server_string, path, headers,
+                          body_hash):
+        """Generate AWS signature version 4 string."""
+
+        def sign(key, msg):
+            return hmac.new(key, self._get_utf8_value(msg),
+                            hashlib.sha256).digest()
+
+        def signature_key(datestamp, region_name, service_name):
+            """
+            Signature key derivation, see
+            http://docs.aws.amazon.com/general/latest/gr/
+            signature-v4-examples.html#signature-v4-examples-python
+            """
+            k_date = sign(self._get_utf8_value("AWS4" + self.secret_key),
+                          datestamp)
+            k_region = sign(k_date, region_name)
+            k_service = sign(k_region, service_name)
+            k_signing = sign(k_service, "aws4_request")
+            return k_signing
+
+        def auth_param(param_name):
+            """
+            Get specified auth parameter, provided via one of:
+            - the Authorization header
+            - the X-Amz-* query parameters
+            """
+            try:
+                auth_str = headers['Authorization']
+                param_str = auth_str.partition(
+                                '%s=' % param_name)[2].split(',')[0]
+            except KeyError:
+                param_str = params.get('X-Amz-%s' % param_name)
+            return param_str
+
+        def date_param():
+            """
+            Get the X-Amz-Date' value, which can be either a header or paramter
+
+            Note AWS supports parsing the Date header also, but this is not
+            currently supported here as it will require some format mangling
+            So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it
+            can be used to match against the YYYYMMDD format provided in the
+            credential scope.
+            see:
+            http://docs.aws.amazon.com/general/latest/gr/
+            sigv4-date-handling.html
+            """
+            try:
+                return headers['X-Amz-Date']
+            except KeyError:
+                return params.get('X-Amz-Date')
+
+        def canonical_header_str():
+            # Get the list of headers to include, from either
+            # - the Authorization header (SignedHeaders key)
+            # - the X-Amz-SignedHeaders query parameter
+            headers_lower = dict((k.lower().strip(), v.strip())
+                                 for (k, v) in headers.iteritems())
+            header_list = []
+            sh_str = auth_param('SignedHeaders')
+            for h in sh_str.split(';'):
+                if h not in headers_lower:
+                    continue
+                if h == 'host':
+                    # Note we discard any port suffix
+                    header_list.append('%s:%s' %
+                                       (h, headers_lower[h].split(':')[0]))
+                else:
+                    header_list.append('%s:%s' % (h, headers_lower[h]))
+            return '\n'.join(header_list) + '\n'
+
+        # Create canonical request:
+        # http://docs.aws.amazon.com/general/latest/gr/
+        # sigv4-create-canonical-request.html
+        # Get parameters and headers in expected string format
+        cr = "\n".join((verb.upper(), path,
+                        self._canonical_qs(params),
+                        canonical_header_str(),
+                        auth_param('SignedHeaders'),
+                        body_hash))
+
+        # Check the date, reject any request where the X-Amz-Date doesn't
+        # match the credential scope
+        credential = auth_param('Credential')
+        credential_split = credential.split('/')
+        credential_scope = '/'.join(credential_split[1:])
+        credential_date = credential_split[1]
+        param_date = date_param()
+        if not param_date.startswith(credential_date):
+            raise Exception('Request date mismatch error')
+
+        # Create the string to sign
+        # http://docs.aws.amazon.com/general/latest/gr/
+        # sigv4-create-string-to-sign.html
+        string_to_sign = '\n'.join(('AWS4-HMAC-SHA256',
+                                    param_date,
+                                    credential_scope,
+                                    hashlib.sha256(cr).hexdigest()))
+
+        # Calculate the derived key, this requires a datestamp, region
+        # and service, which can be extracted from the credential scope
+        (req_region, req_service) = credential_split[2:4]
+        s_key = signature_key(credential_date, req_region, req_service)
+        # Finally calculate the signature!
+        signature = hmac.new(s_key, self._get_utf8_value(string_to_sign),
+                             hashlib.sha256).hexdigest()
+        return signature
diff --git a/tests/test_ec2utils.py b/tests/test_ec2utils.py
index b0bd4df..a3c36fa 100644
--- a/tests/test_ec2utils.py
+++ b/tests/test_ec2utils.py
@@ -27,6 +27,36 @@ class Ec2SignerTest(testtools.TestCase):
         self.secret = '89cdf9e94e2643cab35b8b8ac5a51f83'
         self.signer = Ec2Signer(self.secret)
 
+    def tearDown(self):
+        super(Ec2SignerTest, self).tearDown()
+
+    def test_v4_creds_header(self):
+        auth_str = 'AWS4-HMAC-SHA256 blah'
+        credentials = {'host': '127.0.0.1',
+                       'verb': 'GET',
+                       'path': '/v1/',
+                       'params': {},
+                       'headers': {'Authorization': auth_str}}
+        self.assertTrue(self.signer._v4_creds(credentials))
+
+    def test_v4_creds_param(self):
+        credentials = {'host': '127.0.0.1',
+                       'verb': 'GET',
+                       'path': '/v1/',
+                       'params': {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256'},
+                       'headers': {}}
+        self.assertTrue(self.signer._v4_creds(credentials))
+
+    def test_v4_creds_false(self):
+        credentials = {'host': '127.0.0.1',
+                       'verb': 'GET',
+                       'path': '/v1/',
+                       'params': {'SignatureVersion': '0',
+                                  'AWSAccessKeyId': self.access,
+                                  'Timestamp': '2012-11-27T11:47:02Z',
+                                  'Action': 'Foo'}}
+        self.assertFalse(self.signer._v4_creds(credentials))
+
     def test_generate_0(self):
         """Test generate function for v0 signature"""
         credentials = {'host': '127.0.0.1',
@@ -40,8 +70,6 @@ class Ec2SignerTest(testtools.TestCase):
         expected = 'SmXQEZAUdQw5glv5mX8mmixBtas='
         self.assertEqual(signature, expected)
 
-        pass
-
     def test_generate_1(self):
         """Test generate function for v1 signature"""
         credentials = {'host': '127.0.0.1',
@@ -75,3 +103,43 @@ class Ec2SignerTest(testtools.TestCase):
         signature = self.signer.generate(credentials)
         expected = 'ZqCxMI4ZtTXWI175743mJ0hy/Gc='
         self.assertEqual(signature, expected)
+
+    def test_generate_v4(self):
+        """
+        Test v4 generator with data from AWS docs example, see:
+        http://docs.aws.amazon.com/general/latest/gr/
+        sigv4-create-canonical-request.html
+        and
+        http://docs.aws.amazon.com/general/latest/gr/
+        sigv4-signed-request-examples.html
+        """
+        # Create a new signer object with the AWS example key
+        secret = 'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY'
+        signer = Ec2Signer(secret)
+
+        body_hash = ('b6359072c78d70ebee1e81adcbab4f0'
+                     '1bf2c23245fa365ef83fe8f1f955085e2')
+        auth_str = ('AWS4-HMAC-SHA256 '
+                    'Credential=AKIAIOSFODNN7EXAMPLE/20110909/'
+                    'us-east-1/iam/aws4_request,'
+                    'SignedHeaders=content-type;host;x-amz-date,')
+        headers = {'Content-type':
+                   'application/x-www-form-urlencoded; charset=utf-8',
+                   'X-Amz-Date': '20110909T233600Z',
+                   'Host': 'iam.amazonaws.com',
+                   'Authorization': auth_str}
+        # Note the example in the AWS docs is inconsistent, previous
+        # examples specify no query string, but the final POST example
+        # does, apparently incorrectly since an empty parameter list
+        # aligns all steps and the final signature with the examples
+        params = {}
+        credentials = {'host': 'iam.amazonaws.com',
+                       'verb': 'POST',
+                       'path': '/',
+                       'params': params,
+                       'headers': headers,
+                       'body_hash': body_hash}
+        signature = signer.generate(credentials)
+        expected = ('ced6826de92d2bdeed8f846f0bf508e8'
+                    '559e98e4b0199114b84c54174deb456c')
+        self.assertEqual(signature, expected)