#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: authentication.py
#
# Copyright 2020 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for authentication.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""
import json
import logging
import urllib
from copy import deepcopy
from dataclasses import dataclass
import boto3
import botocore
import requests
from boto3_type_annotations.sts import Client as StsClient
from bs4 import BeautifulSoup as Bfs
from .authenticationexceptions import NoSigninTokenReceived, InvalidCredentials, ExpiredCredentials
__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''11-03-2020'''
__copyright__ = '''Copyright 2020, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# This is the main prefix used for logging
LOGGER_BASENAME = '''authentication'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
DEFAULT_REGION = 'eu-west-1'
[docs]@dataclass
class FilterCookie:
"""Object modeling a cookie for filtering."""
name: str
domain: str = ''
exact_match: bool = False
[docs]@dataclass
class CsrfTokenData:
"""Object modeling the data required for csrf token filtering."""
entity_type: str
attributes: dict
attribute_value: str
headers_name: str
[docs]@dataclass
class Domains:
"""Dataclass holding the domains required for authenticating."""
region: str
root: str = 'aws.amazon.com'
sign_in: str = f'signin.{root}'
console: str = f'console.{root}'
@property
def regional_console(self):
"""The domain of the regional console.
Returns:
regional_console (str): The regional console domain.
"""
return f'{self.region}.console.{self.root}'
[docs]@dataclass
class Urls:
"""Dataclass holding the urls required for authenticating."""
region: str
scheme: str = 'https://'
root_domain: str = 'aws.amazon.com'
root: str = f'{scheme}{root_domain}'
sign_in: str = f'{scheme}signin.{root_domain}'
console: str = f'{scheme}console.{root_domain}'
console_home: str = f'{scheme}console.{root_domain}/console/home'
billing_home: str = f'{scheme}console.{root_domain}/billing/home'
billing_rest: str = f'{scheme}console.{root_domain}/billing/rest'
iam_home: str = f'{scheme}console.{root_domain}/iam/home'
iam_api: str = f'{scheme}console.{root_domain}/iam/api'
federation: str = f'{sign_in}/federation'
@property
def regional_console(self):
"""The url of the regional console.
Returns:
regional_console (str): The regional console url.
"""
return f'{self.scheme}{self.region}.console.{self.root_domain}'
@property
def regional_console_home(self):
"""The url of the regional console home page.
Returns:
regional_console (str): The regional console home page url.
"""
return f'{self.scheme}{self.region}.console.{self.root_domain}/console/home'
@property
def regional_single_sign_on(self):
"""The url of the regional single sign on.
Returns:
regional_single_sign_on (str): The regional single sign on url.
"""
return f'{self.scheme}{self.region}.console.{self.root_domain}/singlesignon'
@property
def regional_control_tower(self):
"""The url of the regional control tower service.
Returns:
regional_control_tower (str): The regional control tower on url.
"""
return f'{self.scheme}{self.region}.console.{self.root_domain}/controltower'
@property
def regional_relay_state(self):
"""The regional relay state url.
Returns:
relay_state (str): The regional relay state url.
"""
return f'{self.regional_console}home?region={self.region}#'
@property
def global_billing_home(self):
"""The url of the global billing console.
Returns:
global_billing (str): The url of the global billing console.
"""
return f'{self.scheme}us-east-1.console.{self.root_domain}/billing/home'
@property
def global_iam_home(self):
"""The url of the global IAM console.
Returns:
global_iam_home (str): The url of the global IAM console.
"""
return f'{self.scheme}us-east-1.console.{self.root_domain}/iam/home'
[docs]class LoggerMixin:
"""Logger."""
@property
def logger(self):
"""Exposes the logger to be used by objects using the Mixin.
Returns:
logger (logger): The properly named logger.
"""
return logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}')
[docs]class BaseAuthenticator(LoggerMixin):
"""Interfaces with aws authentication mechanisms, providing pre signed urls, or authenticated sessions."""
def __init__(self, region=None):
self._session = requests.Session()
self.region = region
self.urls = Urls(self.region)
self.domains = Domains(self.region)
@staticmethod
def _filter_cookies(cookies, filters=None):
result_cookies = []
for filter_ in filters:
for cookie in cookies:
conditions = [cookie.name == filter_.name]
if filter_.exact_match:
conditions.extend([filter_.domain == f'{cookie.domain}{cookie.path}'])
elif filter_.domain:
conditions.extend([filter_.domain in f'{cookie.domain}{cookie.path}'])
if all(conditions):
result_cookies.append(cookie)
return result_cookies
@staticmethod
def _cookies_to_dict(cookies):
return {cookie.name: cookie.value for cookie in cookies}
@staticmethod
def _header_cookie_from_cookies(cookies):
return '; '.join([f'{key}={value}'
for key, value in BaseAuthenticator._cookies_to_dict(cookies).items()])
@property
def _default_headers(self):
return deepcopy({'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:73.0) '
'Gecko/20100101 Firefox/73.0'})
@property
def _standard_cookies(self):
return [FilterCookie('aws-account-data'),
FilterCookie('aws-ubid-main'),
FilterCookie('aws-userInfo'),
FilterCookie('awsc-actm'),
FilterCookie('awsm-vid')]
def _get_response(self, url, params=None, extra_cookies=None, headers=None):
extra_cookies = extra_cookies or []
headers = headers or {}
cookies_to_filter = self._standard_cookies + extra_cookies
headers.update(self._default_headers)
cookies = self._filter_cookies(self._session.cookies, cookies_to_filter)
headers['Cookie'] = self._header_cookie_from_cookies(cookies)
arguments = {'url': url,
'headers': headers,
'cookies': self._cookies_to_dict(cookies),
'allow_redirects': False,
'timeout': 5}
if params:
arguments.update({'params': params})
self.logger.debug('Getting url :%s with arguments : %s', url, arguments)
response = requests.get(**arguments)
if not response.ok:
try:
error_response = Bfs(response.text, features='html.parser')
error_title = error_response.title.string.strip()
err_msg = error_response.find('div', {'id': 'content'}).find('p').string
except AttributeError:
raise ValueError(f'Response received: {response.text}') from None
if all([response.status_code == 400, error_title == 'Credentials expired']):
raise ExpiredCredentials(response.status_code, err_msg)
raise ValueError(f'Response received: {response.text}') from None
self._debug_response(response, cookies)
self._session.cookies.update(response.cookies)
return response
@staticmethod
def _query_to_params(query):
query_lines = query.split('&') if query else []
return {line.split('=')[0]: line.split('=')[1] for line in query_lines}
def _debug_response(self, response, cookies):
params = self._query_to_params(urllib.parse.urlparse(response.request.url)[4])
self.logger.debug('URL : %s', response.request.url)
if params:
self.logger.debug('Params : ')
for key, value in params.items():
self.logger.debug('\t%s : %s', key, value)
self.logger.debug('Response status : %s', response.status_code)
self.logger.debug('\tRequest Headers :')
for name, value in dict(sorted(response.request.headers.items(), key=lambda x: x[0].lower())).items():
self.logger.debug('\t\t%s : %s', name, value)
self.logger.debug('\tRequest Cookies :')
for cookie in sorted(cookies, key=lambda x: x.name.lower()):
self.logger.debug('\t\t%s (domain:%s) : %s', cookie.name, cookie.domain, cookie.value)
self.logger.debug('\tResponse Headers :')
for name, value in dict(sorted(response.headers.items(), key=lambda x: x[0].lower())).items():
self.logger.debug('\t\t%s : %s', name, value)
self.logger.debug('\tResponse Cookies :')
for name, value in dict(sorted(response.cookies.items(), key=lambda x: x[0].lower())).items():
self.logger.debug('\t\t%s : %s', name, value)
self.logger.debug('Session Cookies :')
for cookie in sorted(self._session.cookies, key=lambda x: x.name.lower()):
self.logger.debug('\t%s (domain:%s%s) : %s', cookie.name, cookie.domain, cookie.path, cookie.value)
def _get_session_from_console(self,
console_page_response,
csrf_token_data,
extra_cookies=None,
token_transform=lambda x: x):
soup = Bfs(console_page_response.text, features='html.parser')
try:
csrf_token = soup.find(csrf_token_data.entity_type,
csrf_token_data.attributes).attrs.get(csrf_token_data.attribute_value)
except AttributeError:
raise ValueError(f'Response received: {console_page_response.text}') from None
if not csrf_token:
raise NoSigninTokenReceived('Unable to retrieve csrf token.')
session = requests.Session()
cookies_to_filter = self._standard_cookies + extra_cookies if extra_cookies else []
cookies = self._filter_cookies(self._session.cookies, cookies_to_filter)
session.headers.update(self._default_headers)
session.headers.update({'Cookie': self._header_cookie_from_cookies(cookies),
csrf_token_data.headers_name: token_transform(csrf_token)})
for cookie in cookies:
session.cookies.set_cookie(cookie)
return session
[docs]class Authenticator(BaseAuthenticator):
"""Interfaces with aws authentication mechanisms, providing pre signed urls, or authenticated sessions."""
def __init__(self, arn, session_duration=3600, region=None):
super().__init__(region=region)
self.arn = arn
self.session_duration = session_duration
self._session = requests.Session()
self._sts_connection: StsClient = boto3.client('sts', region_name=region)
self.region = region or self._get_region()
self._assumed_role = self._get_assumed_role(arn)
self.urls = Urls(self.region)
self.domains = Domains(self.region)
def _get_region(self):
region = self._sts_connection._client_config.region_name # pylint: disable=protected-access
return region if not region == 'aws-global' else DEFAULT_REGION
def _get_assumed_role(self, arn):
self.logger.debug('Trying to assume role "%s".', arn)
try:
return self._sts_connection.assume_role(RoleArn=arn,
RoleSessionName="AssumeRoleSession",
DurationSeconds=self.session_duration)
except botocore.exceptions.ParamValidationError as error:
raise ValueError(f'The arn you provided is incorrect: {error}') from None
except (botocore.exceptions.NoCredentialsError, botocore.exceptions.ClientError) as error:
raise InvalidCredentials(error) from None
@property
def session_credentials(self):
"""Valid credentials for a session.
Returns:
credentials (dict): A properly structured dictionary of session credentials.
"""
payload = {'sessionId': 'AccessKeyId',
'sessionKey': 'SecretAccessKey',
'sessionToken': 'SessionToken'}
return self._get_credentials(payload)
@property
def assumed_role_credentials(self):
"""Valid credentials for an assumed session.
Returns:
credentials (dict): A properly structured dictionary of an assumed session credentials.
"""
payload = {'aws_access_key_id': 'AccessKeyId',
'aws_secret_access_key': 'SecretAccessKey',
'aws_session_token': 'SessionToken'}
return self._get_credentials(payload)
def _get_credentials(self, payload):
self.logger.debug('Getting credentials from assumed role object.')
credentials = self._assumed_role.get('Credentials')
self.logger.debug('Building payload.')
payload_ = {key: credentials.get(value)
for key, value in payload.items()}
return payload_
def _get_signin_token(self): # we can pass a duration here.
self.logger.debug('Trying to get signin token.')
params = {'Action': 'getSigninToken',
# 'SessionDuration': str(duration),
'Session': json.dumps(self.session_credentials)}
response = requests.get(self.urls.federation, params=params, timeout=5)
if all([response.status_code == 401, response.text == 'Token Expired']):
try:
self._assumed_role = self._get_assumed_role(self.arn)
return self._get_signin_token()
except InvalidCredentials:
self.logger.error('The credentials on the environment do not provide access for session refresh.')
raise
if response.ok:
return response.json().get('SigninToken')
raise NoSigninTokenReceived(response.status_code, response.text)
[docs] def get_signed_url(self, domain='Example.com', destination=None):
"""Returns a pre signed url that is authenticated.
Args:
domain (str): The domain to request the session as.
destination (str): The service to redirect to after successful redirection.
Returns:
url (str): An authenticated pre signed url.
"""
params = {'Action': 'login',
'Issuer': domain,
'Destination': destination or self.urls.console,
'SigninToken': self._get_signin_token()}
return f'{self.urls.federation}?{urllib.parse.urlencode(params)}'
[docs] def get_sso_authenticated_session(self):
"""Authenticates to Single Sign On and returns an authenticated session.
Returns:
session (requests.Session): An authenticated session with headers and cookies set.
"""
service = 'singlesignon'
url = f'{self.urls.regional_console}/{service}/home?region={self.region}#/dashboard'
self._get_response(self.get_signed_url())
host = urllib.parse.urlparse(url)[1]
self.logger.debug('Setting host to: %s', host)
self._get_response(url, extra_cookies=[FilterCookie('JSESSIONID', f'/{service}')])
hash_args = self._get_response(url,
params={'state': 'hashArgs#'},
extra_cookies=[FilterCookie('JSESSIONID', f'/{service}'),
FilterCookie('aws-userInfo-signed', )])
oauth = self._get_response(hash_args.headers.get('Location'),
extra_cookies=[FilterCookie('aws-creds', self.domains.sign_in),
FilterCookie('aws-userInfo-signed', )])
oauth_challenge = self._get_response(oauth.headers.get('Location'),
extra_cookies=[FilterCookie('JSESSIONID', self.urls.regional_console),
FilterCookie('aws-userInfo-signed', ),
FilterCookie('aws-creds-code-verifier', f'/{service}')
])
dashboard = self._get_response(oauth_challenge.headers.get('Location'),
extra_cookies=[FilterCookie('aws-creds', f'/{service}'),
FilterCookie('JSESSIONID', host)])
csrf_token_data = CsrfTokenData(entity_type='meta',
attributes={'name': 'awsc-csrf-token'},
attribute_value='content',
headers_name='X-CSRF-TOKEN')
extra_cookies = [FilterCookie('JSESSIONID', self.domains.regional_console),
FilterCookie('aws-creds', f'{self.domains.regional_console}/{service}')]
return self._get_session_from_console(dashboard, csrf_token_data, extra_cookies)
[docs] def get_billing_authenticated_session(self):
"""Authenticates to billing and returns an authenticated session.
Returns:
session (requests.Session): An authenticated session with headers and cookies set.
"""
service = 'billing'
self._get_response(self.get_signed_url())
billing_response = self._get_response(self.urls.global_billing_home,
params={
'state': 'hashArgs#',
'skipRegion': 'true',
'region': 'us-east-1'
},
extra_cookies=[FilterCookie('aws-userInfo-signed', ),
FilterCookie('aws-creds-code-verifier', f'/{service}')
])
oauth = self._get_response(billing_response.headers.get('Location'),
extra_cookies=[FilterCookie('aws-creds', self.domains.sign_in),
FilterCookie('aws-userInfo-signed', ),
FilterCookie('aws-signin-account-info', )
])
oauth_challenge = self._get_response(oauth.headers.get('Location'),
extra_cookies=[FilterCookie('aws-userInfo-signed', ),
FilterCookie('aws-creds-code-verifier', f'/{service}')
])
dashboard = self._get_response(oauth_challenge.headers.get('Location'),
extra_cookies=[FilterCookie('aws-creds', f'/{service}')])
csrf_token_data = CsrfTokenData(entity_type='input',
attributes={'id': 'xsrfToken'},
attribute_value='value',
headers_name='x-awsbc-xsrf-token')
extra_cookies = [FilterCookie('aws-creds', f'/{service}'),
FilterCookie('aws-signin-csrf', '/signin'),
]
return self._get_session_from_console(dashboard, csrf_token_data, extra_cookies)