#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: entities.py
#
# Copyright 2020 Sayantan Khanra, Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for entities.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""
import logging
import json
from awsapilib.authentication import LoggerMixin
__author__ = '''Sayantan Khanra <skhanra@schubergphilis.com>, Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''18-05-2020'''
__copyright__ = '''Copyright 2020, Sayantan Khanra, Costas Tyfoxylos'''
__credits__ = ["Sayantan Khanra", "Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Sayantan Khanra, Costas Tyfoxylos'''
__email__ = '''<skhanra@schubergphilis.com>, <ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# This is the main prefix used for logging
LOGGER_BASENAME = '''entities'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
[docs]class Entity(LoggerMixin):
"""The core entity."""
def __init__(self, sso_instance, data):
self._sso = sso_instance
self._data = self._parse_data(data)
def _parse_data(self, data):
if not isinstance(data, dict):
self.logger.error(f'Invalid data received :{data}')
data = {}
return data
[docs]class Group(Entity):
"""Models the group object of AWS SSO."""
def __init__(self, sso_instance, data):
super().__init__(sso_instance, data)
self.url = f'{sso_instance.api_url}/userpool'
@property
def id(self): # pylint: disable=invalid-name
"""The id of the group.
Returns:
id (str) : The id of the group
"""
return self._data.get('GroupId')
@property
def name(self):
"""The name of the group.
Returns:
name (str) : The name of the group
"""
return self._data.get('GroupName', '')
@property
def description(self):
"""The description of the group.
Returns:
description (str) : The description of the group
"""
return self._data.get('Description', '')
@property
def users(self):
"""The users in the group.
Returns:
users (list): The users part of the group
"""
content_payload = {'GroupId': self.id,
'MaxResults': 100}
target = 'com.amazonaws.swbup.service.SWBUPService.ListMembersInGroup'
for user in self._sso._get_paginated_results(content_payload=content_payload, # pylint: disable=protected-access
path='userpool',
target='ListMembersInGroup',
amz_target=target,
object_group='Members',
url=self.url):
yield self._sso.get_user_by_id(user.get('Id'))
[docs]class Account(Entity):
"""Models the Account object of AWS SSO."""
@property
def url(self):
"""Url for the account.
Returns:
url (str): The url of the account
"""
return self._sso.endpoint_url
@property
def name(self):
"""The name of the application.
Returns:
name (str): The name of the application
"""
return self._data.get('Name')
@property
def email(self):
"""The name of the application.
Returns:
email (str) : The name of the application
"""
return self._data.get('Email')
@property
def id(self): # pylint: disable=invalid-name
"""The id of the application.
Returns:
id (str): The id of the application
"""
return self._data.get('Id')
@property
def arn(self):
"""The arn of the application.
Returns:
arn (str): The arn of the application
"""
return self._data.get('Arn')
@property
def status(self):
"""The status of the application.
Returns:
status (str): The status of the application
"""
return self._data.get('Status')
[docs] def provision_saml_provider(self):
"""Creates the SAMl provider.
Returns:
arn (str): The arn of the SAMl provider
"""
target = 'com.amazon.switchboard.service.SWBService.ProvisionSAMLProvider'
payload = self._sso.get_api_payload(content_string={'applicationInstanceId': self.instance_id
},
target='ProvisionSAMLProvider',
path='/control/',
x_amz_target=target)
self.logger.debug('Trying to create saml provider for aws account with payload: %s', payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
return {}
return response.json()
@property
def instance_id(self):
"""The instance id of the Account.
Returns:
instance_id (str): The instance id of the account
"""
instance_id = self._retrieve_instance_id()
if not instance_id:
instance_id = self._provision_application_instance_for_aws_account()
return instance_id
def _provision_application_instance_for_aws_account(self):
target = 'com.amazon.switchboard.service.SWBService.ProvisionApplicationInstanceForAWSAccount'
payload = self._sso.get_api_payload(content_string={'accountId': self.id,
'accountEmail': self.email,
'accountName': self.name
},
target='ProvisionApplicationInstanceForAWSAccount',
path='/control/',
x_amz_target=target)
self.logger.debug('Trying to get instance id for aws account with payload: %s', payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
return None
return response.json().get('applicationInstance', {}).get('instanceId', None)
def _retrieve_instance_id(self):
account_id = self.id
target = 'com.amazon.switchboard.service.SWBService.GetApplicationInstanceForAWSAccount'
payload = self._sso.get_api_payload(content_string={'awsAccountId': account_id},
target='GetApplicationInstanceForAWSAccount',
path='/control/',
x_amz_target=target)
self.logger.debug('Trying to get instance id for aws account with payload: %s', payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
return None
return response.json().get('applicationInstance', {}).get('instanceId', None)
@property
def associated_profiles(self):
"""The associated profiles with the Account.
Returns:
associated_profiles (list): The profiles associated with the Account
"""
target = 'com.amazon.switchboard.service.SWBService.ListAWSAccountProfiles'
payload = self._sso.get_api_payload(content_string={'instanceId': self.instance_id},
target='ListAWSAccountProfiles',
path='/control/',
x_amz_target=target)
self.logger.debug('Trying to provision application profile for aws account with payload: %s', payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
return []
return response.json().get('profileList', [])
[docs]class User(Entity):
"""Models the user object of SSO."""
@property
def url(self):
"""Url for the user.
Returns:
url (str): The url for the user
"""
return f'{self._sso.api_url}/userpool'
@property
def status(self):
"""The status of the user.
Returns:
status (str): The status of the user
"""
return self._data.get('Active')
@property
def created_at(self):
"""The date and time of the users's activation.
Returns:
created_at (datetime): The datetime object of when the user was activated
"""
return self._data.get('Meta', {}).get('CreatedAt')
@property
def updated_at(self):
"""The date and time of the users's status change.
Returns:
updated_at (datetime): The datetime object of when the user had last changed status
"""
return self._data.get('Meta', {}).get('UpdatedAt')
@property
def emails(self):
"""The date and time of the users's last password change.
Returns:
emails (datetime): The datetime object of when the user last changed password
"""
return self._data.get('UserAttributes').get('emails', {}).get('ComplexListValue', '')
@property
def _name(self):
return self._data.get('UserAttributes').get('name', {}).get('ComplexValue', {})
@property
def first_name(self):
"""The first name of the user.
Returns:
first_name (str): The first name of the user
"""
return self._name.get('givenName', {}).get('StringValue', '')
@property
def last_name(self):
"""The last name of the user.
Returns:
last_name (str): The last name of the user
"""
return self._name.get('familyName', {}).get('StringValue', '')
@property
def id(self): # pylint: disable=invalid-name
"""The manager of the user.
Returns:
id (str): The manager of the user
"""
return self._data.get('UserId')
@property
def name(self):
"""The manager of the user.
Returns:
name (str): The manager of the user
"""
return self._data.get('UserName')
@property
def display_name(self):
"""The display name of the user.
Returns:
display_name (str): The display name of the user
"""
return self._data.get('UserAttributes', {}).get('displayName', {}).get('StringValue')
@property
def groups(self):
"""The groups associated with the user.
Returns:
groups (list): The groups associated with the user
"""
content_payload = {'UserId': self.id,
'MaxResults': 100}
target = 'com.amazonaws.swbup.service.SWBUPService.ListGroupsForUser'
for group in self._sso._get_paginated_results(content_payload=content_payload, # pylint: disable=protected-access
path='userpool',
target='ListGroupsForUser',
amz_target=target,
object_group='Groups',
url=self.url):
yield self._sso.get_group_by_id(group.get('GroupId'))
[docs]class PermissionSet(Entity):
"""Models the permission set object of SSO."""
@property
def url(self):
"""Url of the permission set.
Returns:
url (str): The url of the permission set
"""
return self._sso.endpoint_url
@property
def description(self):
"""The description of the permission set.
Returns:
description (str): The description of the permission set
"""
return self._data.get('Description')
@property
def id(self): # pylint: disable=invalid-name
"""The id of the permission set.
Returns:
id (str): The id of the permission set
"""
return self._data.get('Id')
@property
def name(self):
"""The name of the permission set.
Returns:
name (str): The name of the permission set
"""
return self._data.get('Name')
@property
def ttl(self):
"""The ttl of the permission set.
Returns:
ttl (str): The ttl of the permission set
"""
return self._data.get('ttl')
@property
def creation_date(self):
"""The creation date of the permission set.
Returns:
creation_date (str): The creation date of the permission set
"""
return self._data.get('CreationDate')
@property
def relay_state(self):
"""The relay_state of the permission_set.
Returns:
relay_state (str): The relayState of the permission_set
"""
return self._data.get('relayState')
@property
def permission_policy(self):
"""The permission policy of the permission_set.
Returns:
permission_policy (dict): The permission policy of the permission_set
"""
target = 'com.amazon.switchboard.service.SWBService.GetPermissionsPolicy'
content_string = {'permissionSetId': self.id}
payload = self._sso.get_api_payload(content_string=content_string,
target='GetPermissionsPolicy',
path='/control/',
x_amz_target=target)
self.logger.debug('Getting permission policy for permission_set with payload of %s:', payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
return None
return response.json()
@property
def provisioned_accounts(self):
"""The provisioned accounts with the permission set.
Returns:
list: Accounts provisioned with the permission set
"""
content_payload = {'permissionSetId': self.id,
'onlyOutOfSync': 'false'}
target = 'com.amazon.switchboard.service.SWBService.ListAccountsWithProvisionedPermissionSet'
for account_id in self._sso._get_paginated_results(content_payload=content_payload, # pylint: disable=protected-access
path='control',
target='ListAccountsWithProvisionedPermissionSet',
amz_target=target,
object_group='accountIds',
next_token_marker='marker',
url=self._sso.endpoint_url):
yield self._sso.get_account_by_id(account_id)
[docs] def assign_custom_policy_to_permission_set(self, policy_document):
"""Assign Custom policy to a permission_set.
Args:
permission_set_name: The name of the permission_set .
policy_document: The policy for the permission_set
Returns:
Bool: True or False
"""
content_string = {'permissionSetId': self.id,
'policyDocument': json.dumps(policy_document)}
target = 'com.amazon.switchboard.service.SWBService.PutPermissionsPolicy'
payload = self._sso.get_api_payload(content_string=content_string,
target='PutPermissionsPolicy',
path='/control/',
x_amz_target=target)
self.logger.debug('Assigning custom policy to permission set with payload %s:', payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
else:
if self.provisioned_accounts: # pylint: disable=using-constant-test
for account in self.provisioned_accounts:
self.logger.debug('Updating associated account %s', account.name)
self._sso._provision_application_profile_for_aws_account_instance(self.name, account.name) # pylint: disable=protected-access
return response.ok
[docs] def delete_custom_policy_from_permission_set(self):
"""Assign Custom policy to a permission_set.
Returns:
Bool: True or False
"""
content_string = {'permissionSetId': self.id}
target = 'com.amazon.switchboard.service.SWBService.DeletePermissionsPolicy'
payload = self._sso.get_api_payload(content_string=content_string,
target='DeletePermissionsPolicy',
path='/control/',
x_amz_target=target)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
else:
if self.provisioned_accounts: # pylint: disable=using-constant-test
for account in self.provisioned_accounts:
self.logger.debug('Updating associated account %s', account.name)
self._sso._provision_application_profile_for_aws_account_instance(self.name, account.name) # pylint: disable=protected-access
return response.ok
[docs] def update(self, description=' ', relay_state='', ttl=''):
"""The relayState of the permission_set.
Args:
description: Description for the permission set
relay_state: The relay state for the permission set.
https://docs.aws.amazon.com/singlesignon/latest/userguide/howtopermrelaystate.html
ttl: session duration
Returns:
bool: True or False
"""
content_string = {'permissionSetId': self.id,
'description': description if description != ' ' else self.description,
'ttl': ttl if ttl else self.ttl,
'relayState': relay_state if relay_state else self.relay_state}
target = 'com.amazon.switchboard.service.SWBService.UpdatePermissionSet'
payload = self._sso.get_api_payload(content_string=content_string,
target='UpdatePermissionSet',
path='/control/',
x_amz_target=target)
self.logger.debug('Posting to url %s payload of %s:', self.url, payload)
response = self._sso.session.post(self.url, json=payload)
if not response.ok:
self.logger.error(response.text)
return response.ok