Source code for awsapilib.sso.sso

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: sso.py
#
# Copyright 2020 Sayantan Khanra, Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for sso.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""

import copy
import logging
import json
from awsapilib.authentication import (Authenticator,
                                      LoggerMixin,
                                      Urls)
from .ssoexceptions import (UnsupportedTarget,
                            NoPermissionSet,
                            NoAccount,
                            NoGroup,
                            NoProfileID,
                            NoUser)
from .entities import (Group,
                       User,
                       Account,
                       PermissionSet)

__author__ = '''Sayantan Khanra <skhanra@schubergphilis.com>, Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''18-05-2020'''
__copyright__ = '''Copyright 2020, Sayantan Khanra, Costas Tyfoxylos'''
__credits__ = ["Sayantan Khanra", "Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Sayantan Khanra, Costas Tyfoxylos'''
__email__ = '''<skhanra@schubergphilis.com>, <ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''sso'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())

SUPPORTED_TARGETS = ['GetUserPoolInfo',
                     'SearchGroups',
                     'ProvisionApplicationInstanceForAWSAccount',
                     'ListPermissionSets',
                     'GetApplicationInstanceForAWSAccount',
                     'ProvisionApplicationProfileForAWSAccountInstance',
                     'AssociateProfile',
                     'ListAWSAccountProfiles',
                     'DisassociateProfile',
                     'SearchUsers',
                     'ListMembersInGroup',
                     'ListGroupsForUser',
                     'CreatePermissionSet',
                     'PutPermissionsPolicy',
                     'GetPermissionsPolicy',
                     'ListAccountsWithProvisionedPermissionSet',
                     'UpdatePermissionSet',
                     'listAccounts',
                     'DeletePermissionSet',
                     'DeletePermissionsPolicy',
                     'ProvisionApplicationInstanceForAWSAccount',
                     'ProvisionSAMLProvider']


[docs]class Sso(LoggerMixin): """Models AWS SSO.""" API_CONTENT_TYPE = 'application/json; charset=UTF-8' API_CONTENT_ENCODING = 'amz-1.0' DEFAULT_AWS_REGION = 'eu-west-1' def __init__(self, arn, region=None): self.aws_authenticator = Authenticator(arn, region=region) self._urls = Urls(self.aws_region) self.session = self._get_authenticated_session() self._directory_id = None @property def relay_state(self): """The relay state of the SSO. Returns: relay_state (str): The relay state of sso. """ return self._urls.regional_relay_state @property def api_url(self): """The url of the api for sso. Returns: api_url (str): The url of the api for sso. """ return f'{self._urls.regional_single_sign_on}/api' @property def endpoint_url(self): """The url of the api endpoint for sso. Returns: endpoint_url (str): The url of the api endpoint for sso. """ return f'{self.api_url}/peregrine' @property def aws_region(self): """Aws Console Region. Returns: region (str): The region of the console. """ return self.aws_authenticator.region
[docs] def get_api_payload(self, # pylint: disable=too-many-arguments content_string, target, method='POST', params=None, path='/', content_type=None, content_encoding=None, x_amz_target='', region=None): """Generates the payload for calling the AWS SSO APIs. Returns: payload (dict): Returns a deepcopy object of the payload """ target = self._validate_target(target) payload = {'contentString': json.dumps(content_string), 'headers': {'Content-Type': content_type or self.API_CONTENT_TYPE, 'Content-Encoding': content_encoding or self.API_CONTENT_ENCODING, 'X-Amz-Target': x_amz_target}, 'method': method, 'operation': target, 'params': params or {}, 'path': path, 'region': region or self.DEFAULT_AWS_REGION} return copy.deepcopy(payload)
@staticmethod def _validate_target(target): if target not in SUPPORTED_TARGETS: raise UnsupportedTarget(target) return target def _get_authenticated_session(self): return self.aws_authenticator.get_sso_authenticated_session() @property def directory_id(self): """The external/internal directory id configured with aws sso. Returns: str: The id of directory configured in SSO """ if self._directory_id is None: payload = self.get_api_payload(content_string={}, target='GetUserPoolInfo', path='/userpool/', x_amz_target='com.amazonaws.swbup.service.SWBUPService.GetUserPoolInfo', region=self.aws_region) self.logger.debug('Trying to get directory id for sso with payload: %s', payload) response = self.session.post(f'{self.api_url}/userpool', json=payload) if not response.ok: raise ValueError(response.text) self._directory_id = response.json().get('DirectoryId') return self._directory_id @property def accounts(self): """The aws accounts in sso. Returns: accounts (generator): The accounts configured in SSO """ headers = {'Content-Type': 'application/x-amz-json-1.1', 'Content-Encoding': 'amz-1.0', 'X-Amz-Target': 'AWSOrganizationsV20161128.ListAccounts', 'X-Amz-User-Agent': 'aws-sdk-js/2.152.0 promise'} return self._get_paginated_results(content_payload={}, path='', target='listAccounts', amz_target='AWSOrganizationsV20161128.ListAccounts', region='us-east-1', object_type=Account, object_group='Accounts', url=f'{self.api_url}/organizations', headers=headers) @property def users(self): """The users configured in SSO. Returns: users (generator): The users configured in SSO """ content_payload = {'IdentityStoreId': self.directory_id, 'MaxResults': 50} return self._get_paginated_results(content_payload, path='identitystore', target='SearchUsers', amz_target='com.amazonaws.identitystore.AWSIdentityStoreService.SearchUsers', region=self.aws_region, object_type=User, object_group='Users') @property def groups(self): """The groups configured in SSO. Returns: groups (generator): The groups configured in SSO """ content_payload = {'SearchString': '*', 'SearchAttributes': ['GroupName'], 'MaxResults': 100} return self._get_paginated_results(content_payload, path='userpool', target='SearchGroups', amz_target='com.amazonaws.swbup.service.SWBUPService.SearchGroups', region=self.aws_region, object_type=Group, object_group='Groups') @property def permission_sets(self): """The permission_sets configured in SSO. Returns: permission_sets (generator): The permission sets configured in SSO """ return self._get_paginated_results(content_payload={}, path='control', target='ListPermissionSets', amz_target='com.amazon.switchboard.service.SWBService.ListPermissionSets', region=self.aws_region, object_type=PermissionSet, object_group='permissionSets', url=self.endpoint_url, next_token_marker='marker')
[docs] def get_user_by_name(self, user_name): """The user configured in SSO. Returns: user (User): The User object """ return next((user for user in self.users if user.name == user_name), None)
[docs] def get_user_by_id(self, user_id): """The user configured in SSO. Returns: user (User): The User object """ return next((user for user in self.users if user.id == user_id), None)
[docs] def get_group_by_name(self, group_name): """The group configured in SSO. Returns: group (Group): The Group object """ return next((group for group in self.groups if group.name == group_name), None)
[docs] def get_group_by_id(self, group_id): """The group configured in SSO. Returns: group (Group): The Group object """ return next((group for group in self.groups if group.id == group_id), None)
[docs] def get_account_by_name(self, account_name): """The account configured in SSO. Returns: account (Account): The Account object """ return next((account for account in self.accounts if account.name == account_name), None)
[docs] def get_account_by_id(self, account_id): """The account configured in SSO. Returns: account (Account): The Account object """ return next((account for account in self.accounts if account.id == account_id), None)
[docs] def get_permission_set_by_name(self, permission_set_name): """The permission-set configured in SSO. Returns: permission_set (PermissionSet): The PermissionSet object """ return next((permission_set for permission_set in self.permission_sets if permission_set.name == permission_set_name), None)
def _provision_application_profile_for_aws_account_instance(self, permission_set_name, account_name): method = 'ProvisionApplicationProfileForAWSAccountInstance' permission_set = self.get_permission_set_by_name(permission_set_name) if not permission_set: raise NoPermissionSet(permission_set_name) account = self.get_account_by_name(account_name) if not account: raise NoAccount(account_name) payload = self.get_api_payload(content_string={'permissionSetId': permission_set.id, 'instanceId': account.instance_id}, target=method, path='/control/', x_amz_target=f'com.amazon.switchboard.service.SWBService.{method}', region=self.aws_region) self.logger.debug('Trying to provision application profile for aws account with payload: %s', payload) response = self.session.post(self.endpoint_url, json=payload) if not response.ok: raise ValueError(response.text) return response.json().get('applicationProfile', {}).get('profileId', '') def _get_aws_account_profile_for_permission_set(self, account_name, permission_set_name): account = self.get_account_by_name(account_name) if not account: raise NoAccount(account_name) return next((profile for profile in account.associated_profiles if profile.get('name') == permission_set_name), None)
[docs] def associate_group_to_account(self, group_name, account_name, permission_set_name): """Associates a group with an account with proper permissions. Args: group_name: The name of the group to be assigned. account_name: Name of the account to which the group will be assigned permission_set_name: the Permission Set the group will have on the account Returns: bool: True or False """ group = self.get_group_by_name(group_name) if not group: raise NoGroup(group_name) account = self.get_account_by_name(account_name) if not account: raise NoAccount(account_name) profile_id = self._provision_application_profile_for_aws_account_instance(permission_set_name, account_name) directory_id = self.directory_id content_string = {'accessorId': group.id, 'accessorType': 'GROUP', 'accessorDisplay': {'groupName': group_name}, 'instanceId': account.instance_id, 'profileId': profile_id, 'directoryType': 'UserPool', 'directoryId': directory_id} payload = self.get_api_payload(content_string=content_string, target='AssociateProfile', path='/control/', x_amz_target='com.amazon.switchboard.service.SWBService.AssociateProfile', region=self.aws_region) self.logger.debug('Trying to assign groups to aws account with payload: %s', payload) response = self.session.post(self.endpoint_url, json=payload) if not response.ok: self.logger.error('Received :%s', response.text) return response.ok
[docs] def disassociate_group_from_account(self, group_name, account_name, permission_set_name): """Disassociates a group with an account with proper permissions. Args: group_name: The name of the group to be assigned. account_name: Name of the account to which the group will be assigned permission_set_name: the Permission Set the group will have on the account Returns: bool: True or False """ group = self.get_group_by_name(group_name) if not group: raise NoGroup(group_name) account = self.get_account_by_name(account_name) if not account: raise NoAccount(account_name) profile_id = self._get_aws_account_profile_for_permission_set(account_name, permission_set_name).get('profileId') if not profile_id: raise NoProfileID(f'{account_name}:{permission_set_name}') content_string = {'accessorId': group.id, 'accessorType': 'GROUP', 'accessorDisplay': {"groupName": group_name}, 'instanceId': account.instance_id, 'profileId': profile_id, 'directoryType': 'UserPool', 'directoryId': self.directory_id} payload = self.get_api_payload(content_string=content_string, target='DisassociateProfile', path='/control/', x_amz_target='com.amazon.switchboard.service.SWBService.DisassociateProfile', region=self.aws_region) self.logger.debug('Trying to disassociate group from aws account with payload: %s', payload) response = self.session.post(self.endpoint_url, json=payload) if not response.ok: self.logger.error('Received :%s', response.text) return response.ok
[docs] def associate_user_to_account(self, user_name, account_name, permission_set_name): """Associates an user with an account with proper permissions. Args: user_name: The name of the user to be assigned. account_name: Name of the account to which the user will be assigned permission_set_name: the Permission Set the user will have on the account Returns: bool: True or False """ user = self.get_user_by_name(user_name) if not user: raise NoUser(user_name) account = self.get_account_by_name(account_name) if not account: raise NoAccount(account_name) profile_id = self._provision_application_profile_for_aws_account_instance(permission_set_name, account_name) if not profile_id: raise NoProfileID(f'{account_name}:{permission_set_name}') content_string = {'accessorId': user.id, 'accessorType': 'USER', 'accessorDisplay': {'userName': user_name, 'firstName': user.first_name, 'last_name': user.last_name, 'windowsUpn': user_name}, 'instanceId': account.instance_id, 'profileId': profile_id, 'directoryType': 'UserPool', 'directoryId': self.directory_id} payload = self.get_api_payload(content_string=content_string, target='AssociateProfile', path='/control/', x_amz_target='com.amazon.switchboard.service.SWBService.AssociateProfile', region=self.aws_region) self.logger.debug('Trying to assign users to aws account with payload: %s', payload) response = self.session.post(self.endpoint_url, json=payload) if not response.ok: self.logger.error('Received :%s', response.text) return response.ok
[docs] def disassociate_user_from_account(self, user_name, account_name, permission_set_name): """Disassociates an user with an account with proper permissions. Args: user_name: The name of the user to be assigned. account_name: Name of the account to which the user will be assigned permission_set_name: the Permission Set the user will have on the account Returns: bool: True or False """ user = self.get_user_by_name(user_name) if not user: raise NoUser(user_name) account = self.get_account_by_name(account_name) if not account: raise NoAccount(account_name) profile_id = self._get_aws_account_profile_for_permission_set(account_name, permission_set_name).get('profileId') if not profile_id: raise NoProfileID(f'{account_name}:{permission_set_name}') content_string = {'accessorId': user.id, 'accessorType': 'USER', 'accessorDisplay': {'userName': user_name, 'firstName': user.first_name, 'last_name': user.last_name, 'windowsUpn': user_name}, 'instanceId': account.instance_id, 'profileId': profile_id, 'directoryType': 'UserPool', 'directoryId': self.directory_id} payload = self.get_api_payload(content_string=content_string, target='DisassociateProfile', path='/control/', x_amz_target='com.amazon.switchboard.service.SWBService.DisassociateProfile', region=self.aws_region) self.logger.debug('Trying to disassociate users from aws account with payload: %s', payload) response = self.session.post(self.endpoint_url, json=payload) if not response.ok: self.logger.error('Received :%s', response.text) return response.ok
def _get_paginated_results(self, # pylint: disable=too-many-arguments, too-many-locals content_payload, path, target, amz_target, object_group, object_type=None, region=None, next_token_marker='NextToken', url=None, headers=None): payload = self.get_api_payload(content_string=content_payload, target=target, path=f'/{path}/' if path else '/', x_amz_target=amz_target, region=region) if headers: payload.update({'headers': headers}) url = url or f'{self.api_url}/{path}' response, next_token = self._get_partial_response(url, payload, next_token_marker) for data in response.json().get(object_group, []): if object_type: yield object_type(self, data) else: yield data while next_token: content_string = copy.deepcopy(json.loads(payload.get('contentString'))) content_string.update({next_token_marker: next_token}) payload.update({'contentString': json.dumps(content_string)}) response, next_token = self._get_partial_response(url, payload, next_token_marker) for data in response.json().get(object_group, []): if object_type: yield object_type(self, data) else: yield data def _get_partial_response(self, url, payload, next_token_marker): response = self.session.post(url, json=payload) if not response.ok: raise ValueError(response.text) next_token = response.json().get(next_token_marker) return response, next_token
[docs] def create_permission_set(self, name, description=' ', relay_state=None, ttl='PT2H'): """Create a permission_set with a aws defined policy or custom policy. Args: name: The name of the permission_set . description: Description for the permission set relay_state: The relay state for the permission set. https://docs.aws.amazon.com/singlesignon/latest/userguide/howtopermrelaystate.html ttl: session duration Returns: PermissionSet: Permission Set object """ content_string = {'permissionSetName': name, 'description': description, 'relayState': relay_state or self.relay_state, 'ttl': ttl} payload = self.get_api_payload(content_string=content_string, target='CreatePermissionSet', path='/control/', x_amz_target='com.amazon.switchboard.service.SWBService.CreatePermissionSet', region=self.aws_region) self.logger.debug('Trying to create Permission set with payload: %s', payload) response = self.session.post(self.endpoint_url, json=payload) if response.ok: return PermissionSet(self, response.json().get('permissionSet')) return None
[docs] def delete_permission_set(self, name): """Delete a permission_set . Args: name: The name of the permission_set . Returns: Bool: Status of the deletion """ permission_set_id = self.get_permission_set_by_name(name).id content_string = {'permissionSetId': permission_set_id} payload = self.get_api_payload(content_string=content_string, target='DeletePermissionSet', path='/control/', x_amz_target='com.amazon.switchboard.service.SWBService.DeletePermissionSet', region=self.aws_region ) self.logger.debug('Trying to delete Permission set...') response = self.session.post(self.endpoint_url, json=payload) if not response.ok: self.logger.error('Received :%s', response.text) return response.ok