Source code for awsapilib.authentication.authentication

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: authentication.py
#
# Copyright 2020 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for authentication.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""
import json
import logging
import urllib

from copy import deepcopy
from dataclasses import dataclass

import boto3
import botocore
import requests

from boto3_type_annotations.sts import Client as StsClient
from bs4 import BeautifulSoup as Bfs

from .authenticationexceptions import NoSigninTokenReceived, InvalidCredentials, ExpiredCredentials

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''11-03-2020'''
__copyright__ = '''Copyright 2020, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''authentication'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())

DEFAULT_REGION = 'eu-west-1'


[docs]@dataclass class FilterCookie: """Object modeling a cookie for filtering.""" name: str domain: str = '' exact_match: bool = False
[docs]@dataclass class CsrfTokenData: """Object modeling the data required for csrf token filtering.""" entity_type: str attributes: dict attribute_value: str headers_name: str
[docs]@dataclass class Domains: """Dataclass holding the domains required for authenticating.""" region: str root: str = 'aws.amazon.com' sign_in: str = f'signin.{root}' console: str = f'console.{root}' @property def regional_console(self): """The domain of the regional console. Returns: regional_console (str): The regional console domain. """ return f'{self.region}.console.{self.root}'
[docs]@dataclass class Urls: """Dataclass holding the urls required for authenticating.""" region: str scheme: str = 'https://' root_domain: str = 'aws.amazon.com' root: str = f'{scheme}{root_domain}' sign_in: str = f'{scheme}signin.{root_domain}' console: str = f'{scheme}console.{root_domain}' console_home: str = f'{scheme}console.{root_domain}/console/home' billing_home: str = f'{scheme}console.{root_domain}/billing/home' billing_rest: str = f'{scheme}console.{root_domain}/billing/rest' iam_home: str = f'{scheme}console.{root_domain}/iam/home' iam_api: str = f'{scheme}console.{root_domain}/iam/api' federation: str = f'{sign_in}/federation' @property def regional_console(self): """The url of the regional console. Returns: regional_console (str): The regional console url. """ return f'{self.scheme}{self.region}.console.{self.root_domain}' @property def regional_console_home(self): """The url of the regional console home page. Returns: regional_console (str): The regional console home page url. """ return f'{self.scheme}{self.region}.console.{self.root_domain}/console/home' @property def regional_single_sign_on(self): """The url of the regional single sign on. Returns: regional_single_sign_on (str): The regional single sign on url. """ return f'{self.scheme}{self.region}.console.{self.root_domain}/singlesignon' @property def regional_control_tower(self): """The url of the regional control tower service. Returns: regional_control_tower (str): The regional control tower on url. """ return f'{self.scheme}{self.region}.console.{self.root_domain}/controltower' @property def regional_relay_state(self): """The regional relay state url. Returns: relay_state (str): The regional relay state url. """ return f'{self.regional_console}home?region={self.region}#' @property def global_billing_home(self): """The url of the global billing console. Returns: global_billing (str): The url of the global billing console. """ return f'{self.scheme}us-east-1.console.{self.root_domain}/billing/home' @property def global_iam_home(self): """The url of the global IAM console. Returns: global_iam_home (str): The url of the global IAM console. """ return f'{self.scheme}us-east-1.console.{self.root_domain}/iam/home'
[docs]class LoggerMixin: """Logger.""" @property def logger(self): """Exposes the logger to be used by objects using the Mixin. Returns: logger (logger): The properly named logger. """ return logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}')
[docs]class BaseAuthenticator(LoggerMixin): """Interfaces with aws authentication mechanisms, providing pre signed urls, or authenticated sessions.""" def __init__(self, region=None): self._session = requests.Session() self.region = region self.urls = Urls(self.region) self.domains = Domains(self.region) @staticmethod def _filter_cookies(cookies, filters=None): result_cookies = [] for filter_ in filters: for cookie in cookies: conditions = [cookie.name == filter_.name] if filter_.exact_match: conditions.extend([filter_.domain == f'{cookie.domain}{cookie.path}']) elif filter_.domain: conditions.extend([filter_.domain in f'{cookie.domain}{cookie.path}']) if all(conditions): result_cookies.append(cookie) return result_cookies @staticmethod def _cookies_to_dict(cookies): return {cookie.name: cookie.value for cookie in cookies} @staticmethod def _header_cookie_from_cookies(cookies): return '; '.join([f'{key}={value}' for key, value in BaseAuthenticator._cookies_to_dict(cookies).items()]) @property def _default_headers(self): return deepcopy({'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:73.0) ' 'Gecko/20100101 Firefox/73.0'}) @property def _standard_cookies(self): return [FilterCookie('aws-account-data'), FilterCookie('aws-ubid-main'), FilterCookie('aws-userInfo'), FilterCookie('awsc-actm'), FilterCookie('awsm-vid')] def _get_response(self, url, params=None, extra_cookies=None, headers=None): extra_cookies = extra_cookies or [] headers = headers or {} cookies_to_filter = self._standard_cookies + extra_cookies headers.update(self._default_headers) cookies = self._filter_cookies(self._session.cookies, cookies_to_filter) headers['Cookie'] = self._header_cookie_from_cookies(cookies) arguments = {'url': url, 'headers': headers, 'cookies': self._cookies_to_dict(cookies), 'allow_redirects': False, 'timeout': 5} if params: arguments.update({'params': params}) self.logger.debug('Getting url :%s with arguments : %s', url, arguments) response = requests.get(**arguments) if not response.ok: try: error_response = Bfs(response.text, features='html.parser') error_title = error_response.title.string.strip() err_msg = error_response.find('div', {'id': 'content'}).find('p').string except AttributeError: raise ValueError(f'Response received: {response.text}') from None if all([response.status_code == 400, error_title == 'Credentials expired']): raise ExpiredCredentials(response.status_code, err_msg) raise ValueError(f'Response received: {response.text}') from None self._debug_response(response, cookies) self._session.cookies.update(response.cookies) return response @staticmethod def _query_to_params(query): query_lines = query.split('&') if query else [] return {line.split('=')[0]: line.split('=')[1] for line in query_lines} def _debug_response(self, response, cookies): params = self._query_to_params(urllib.parse.urlparse(response.request.url)[4]) self.logger.debug('URL : %s', response.request.url) if params: self.logger.debug('Params : ') for key, value in params.items(): self.logger.debug('\t%s : %s', key, value) self.logger.debug('Response status : %s', response.status_code) self.logger.debug('\tRequest Headers :') for name, value in dict(sorted(response.request.headers.items(), key=lambda x: x[0].lower())).items(): self.logger.debug('\t\t%s : %s', name, value) self.logger.debug('\tRequest Cookies :') for cookie in sorted(cookies, key=lambda x: x.name.lower()): self.logger.debug('\t\t%s (domain:%s) : %s', cookie.name, cookie.domain, cookie.value) self.logger.debug('\tResponse Headers :') for name, value in dict(sorted(response.headers.items(), key=lambda x: x[0].lower())).items(): self.logger.debug('\t\t%s : %s', name, value) self.logger.debug('\tResponse Cookies :') for name, value in dict(sorted(response.cookies.items(), key=lambda x: x[0].lower())).items(): self.logger.debug('\t\t%s : %s', name, value) self.logger.debug('Session Cookies :') for cookie in sorted(self._session.cookies, key=lambda x: x.name.lower()): self.logger.debug('\t%s (domain:%s%s) : %s', cookie.name, cookie.domain, cookie.path, cookie.value) def _get_session_from_console(self, console_page_response, csrf_token_data, extra_cookies=None, token_transform=lambda x: x): soup = Bfs(console_page_response.text, features='html.parser') try: csrf_token = soup.find(csrf_token_data.entity_type, csrf_token_data.attributes).attrs.get(csrf_token_data.attribute_value) except AttributeError: raise ValueError(f'Response received: {console_page_response.text}') from None if not csrf_token: raise NoSigninTokenReceived('Unable to retrieve csrf token.') session = requests.Session() cookies_to_filter = self._standard_cookies + extra_cookies if extra_cookies else [] cookies = self._filter_cookies(self._session.cookies, cookies_to_filter) session.headers.update(self._default_headers) session.headers.update({'Cookie': self._header_cookie_from_cookies(cookies), csrf_token_data.headers_name: token_transform(csrf_token)}) for cookie in cookies: session.cookies.set_cookie(cookie) return session
[docs]class Authenticator(BaseAuthenticator): """Interfaces with aws authentication mechanisms, providing pre signed urls, or authenticated sessions.""" def __init__(self, arn, session_duration=3600, region=None): super().__init__(region=region) self.arn = arn self.session_duration = session_duration self._session = requests.Session() self._sts_connection: StsClient = boto3.client('sts', region_name=region) self.region = region or self._get_region() self._assumed_role = self._get_assumed_role(arn) self.urls = Urls(self.region) self.domains = Domains(self.region) def _get_region(self): region = self._sts_connection._client_config.region_name # pylint: disable=protected-access return region if not region == 'aws-global' else DEFAULT_REGION def _get_assumed_role(self, arn): self.logger.debug('Trying to assume role "%s".', arn) try: return self._sts_connection.assume_role(RoleArn=arn, RoleSessionName="AssumeRoleSession", DurationSeconds=self.session_duration) except botocore.exceptions.ParamValidationError as error: raise ValueError(f'The arn you provided is incorrect: {error}') from None except (botocore.exceptions.NoCredentialsError, botocore.exceptions.ClientError) as error: raise InvalidCredentials(error) from None @property def session_credentials(self): """Valid credentials for a session. Returns: credentials (dict): A properly structured dictionary of session credentials. """ payload = {'sessionId': 'AccessKeyId', 'sessionKey': 'SecretAccessKey', 'sessionToken': 'SessionToken'} return self._get_credentials(payload) @property def assumed_role_credentials(self): """Valid credentials for an assumed session. Returns: credentials (dict): A properly structured dictionary of an assumed session credentials. """ payload = {'aws_access_key_id': 'AccessKeyId', 'aws_secret_access_key': 'SecretAccessKey', 'aws_session_token': 'SessionToken'} return self._get_credentials(payload) def _get_credentials(self, payload): self.logger.debug('Getting credentials from assumed role object.') credentials = self._assumed_role.get('Credentials') self.logger.debug('Building payload.') payload_ = {key: credentials.get(value) for key, value in payload.items()} return payload_ def _get_signin_token(self): # we can pass a duration here. self.logger.debug('Trying to get signin token.') params = {'Action': 'getSigninToken', # 'SessionDuration': str(duration), 'Session': json.dumps(self.session_credentials)} response = requests.get(self.urls.federation, params=params, timeout=5) if all([response.status_code == 401, response.text == 'Token Expired']): try: self._assumed_role = self._get_assumed_role(self.arn) return self._get_signin_token() except InvalidCredentials: self.logger.error('The credentials on the environment do not provide access for session refresh.') raise if response.ok: return response.json().get('SigninToken') raise NoSigninTokenReceived(response.status_code, response.text)
[docs] def get_signed_url(self, domain='Example.com', destination=None): """Returns a pre signed url that is authenticated. Args: domain (str): The domain to request the session as. destination (str): The service to redirect to after successful redirection. Returns: url (str): An authenticated pre signed url. """ params = {'Action': 'login', 'Issuer': domain, 'Destination': destination or self.urls.console, 'SigninToken': self._get_signin_token()} return f'{self.urls.federation}?{urllib.parse.urlencode(params)}'
[docs] def get_sso_authenticated_session(self): """Authenticates to Single Sign On and returns an authenticated session. Returns: session (requests.Session): An authenticated session with headers and cookies set. """ service = 'singlesignon' url = f'{self.urls.regional_console}/{service}/home?region={self.region}#/dashboard' self._get_response(self.get_signed_url()) host = urllib.parse.urlparse(url)[1] self.logger.debug('Setting host to: %s', host) self._get_response(url, extra_cookies=[FilterCookie('JSESSIONID', f'/{service}')]) hash_args = self._get_response(url, params={'state': 'hashArgs#'}, extra_cookies=[FilterCookie('JSESSIONID', f'/{service}'), FilterCookie('aws-userInfo-signed', )]) oauth = self._get_response(hash_args.headers.get('Location'), extra_cookies=[FilterCookie('aws-creds', self.domains.sign_in), FilterCookie('aws-userInfo-signed', )]) oauth_challenge = self._get_response(oauth.headers.get('Location'), extra_cookies=[FilterCookie('JSESSIONID', self.urls.regional_console), FilterCookie('aws-userInfo-signed', ), FilterCookie('aws-creds-code-verifier', f'/{service}') ]) dashboard = self._get_response(oauth_challenge.headers.get('Location'), extra_cookies=[FilterCookie('aws-creds', f'/{service}'), FilterCookie('JSESSIONID', host)]) csrf_token_data = CsrfTokenData(entity_type='meta', attributes={'name': 'awsc-csrf-token'}, attribute_value='content', headers_name='X-CSRF-TOKEN') extra_cookies = [FilterCookie('JSESSIONID', self.domains.regional_console), FilterCookie('aws-creds', f'{self.domains.regional_console}/{service}')] return self._get_session_from_console(dashboard, csrf_token_data, extra_cookies)
[docs] def get_billing_authenticated_session(self): """Authenticates to billing and returns an authenticated session. Returns: session (requests.Session): An authenticated session with headers and cookies set. """ service = 'billing' self._get_response(self.get_signed_url()) billing_response = self._get_response(self.urls.global_billing_home, params={ 'state': 'hashArgs#', 'skipRegion': 'true', 'region': 'us-east-1' }, extra_cookies=[FilterCookie('aws-userInfo-signed', ), FilterCookie('aws-creds-code-verifier', f'/{service}') ]) oauth = self._get_response(billing_response.headers.get('Location'), extra_cookies=[FilterCookie('aws-creds', self.domains.sign_in), FilterCookie('aws-userInfo-signed', ), FilterCookie('aws-signin-account-info', ) ]) oauth_challenge = self._get_response(oauth.headers.get('Location'), extra_cookies=[FilterCookie('aws-userInfo-signed', ), FilterCookie('aws-creds-code-verifier', f'/{service}') ]) dashboard = self._get_response(oauth_challenge.headers.get('Location'), extra_cookies=[FilterCookie('aws-creds', f'/{service}')]) csrf_token_data = CsrfTokenData(entity_type='input', attributes={'id': 'xsrfToken'}, attribute_value='value', headers_name='x-awsbc-xsrf-token') extra_cookies = [FilterCookie('aws-creds', f'/{service}'), FilterCookie('aws-signin-csrf', '/signin'), ] return self._get_session_from_console(dashboard, csrf_token_data, extra_cookies)
[docs] def get_cloudformation_authenticated_session(self): """Authenticates to cloudformation and returns an authenticated session. Returns: session (requests.Session): An authenticated session with headers and cookies set. """ service = 'cloudformation' url = f'{self.urls.regional_console}/{service}/home?region={self.region}' self._get_response(self.get_signed_url()) host = urllib.parse.urlparse(url)[1] self.logger.debug('Setting host to: %s', host) self._get_response(url, extra_cookies=[FilterCookie('aws-consoleInfo',), FilterCookie('cfn_sessId', ), FilterCookie('aws-csds-token', ), FilterCookie('aws-creds-code-verifier', )]) hash_args = self._get_response(url, params={'state': 'hashArgs#'}, extra_cookies=[FilterCookie('aws-userInfo-signed', ), FilterCookie('cfn_sessId', ), FilterCookie('aws-csds-token', ), FilterCookie('aws-creds-code-verifier', f'/{service}')]) oauth = self._get_response(hash_args.headers.get('Location'), extra_cookies=[FilterCookie('aws-creds', self.domains.sign_in), FilterCookie('cfn_sessId', ), FilterCookie('aws-csds-token', ), FilterCookie('aws-userInfo-signed', )]) oauth_challenge = self._get_response(oauth.headers.get('Location'), extra_cookies=[FilterCookie('aws-userInfo-signed', ), FilterCookie('cfn_sessId', ), FilterCookie('aws-csds-token', ), FilterCookie('aws-consoleInfo', ), FilterCookie('aws-creds', f'/{service}')]) dashboard = self._get_response(oauth_challenge.headers.get('Location'), extra_cookies=[FilterCookie('aws-creds', f'/{service}'), FilterCookie('aws-csds-token', ), FilterCookie('cfn_sessId', )]) csrf_token_data = CsrfTokenData(entity_type='div', attributes={'id': 'console-preload-data'}, attribute_value='data-xsrf-token', headers_name='x-cfn-xsrf-token') extra_cookies = [FilterCookie('aws-creds-code-verifier', f'/{service}'), FilterCookie('aws-consoleInfo', f'/{service}'), FilterCookie('cfn_sessId', f'/{service}'), FilterCookie('aws-creds', f'/{service}'), FilterCookie('aws-userInfo-signed',), FilterCookie('awsccc',)] return self._get_session_from_console(dashboard, csrf_token_data, extra_cookies, token_transform=lambda x: json.loads(x.replace('\\', '')).get('token'))