This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
veridic/acs/attribute_aggregator/core.py

192 lines
6.3 KiB
Python

'''
VERIDIC Project - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import time
import datetime
import logging
import re
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
from attribute_aggregator.xacml_constants import *
from attribute_aggregator.mapping import ATTRIBUTE_MAPPING
logger = logging.getLogger('attribute_aggregator')
def iso8601_to_datetime(date_string):
'''
Convert a string formatted as an ISO8601 date into a time_t value.
This function ignores the sub-second resolution.
'''
m = re.match(r'(\d+-\d+-\d+T\d+:\d+:\d+)(?:\.\d+)?Z?$', date_string)
if not m:
raise ValueError('Invalid ISO8601 date')
tm = time.strptime(m.group(1)+'Z', "%Y-%m-%dT%H:%M:%SZ")
return datetime.datetime.fromtimestamp(time.mktime(tm))
def get_def_name_from_name_and_ns_of_attribute(name, namespace):
if not name or not namespace:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if namespace in content["definitions"].keys():
if name in content["definitions"][namespace]["identifiers"]:
return def_name
if name in content["definitions"][namespace]["friendly_name"]:
return def_name
return None
def get_attribute_name_in_namespace(definition, namespace):
if not definition or not namespace:
return None
if definition in ATTRIBUTE_MAPPING:
if namespace in ATTRIBUTE_MAPPING[definition]["definitions"]:
return ATTRIBUTE_MAPPING[definition]\
["definitions"][namespace]["identifiers"][0]
return None
def convert_from_string(definition_name, value):
if not definition_name in ATTRIBUTE_MAPPING:
return None
type_ = ATTRIBUTE_MAPPING[definition_name]['type']
if type_ == ACS_XACML_DATATYPE_STRING:
return value
elif type_ == ACS_XACML_DATATYPE_BOOLEAN:
if value in ('True', 'true', 'Vrai', 'vrai'):
return True
return False
elif type_ == ACS_XACML_DATATYPE_INTEGER:
try:
return int(value)
except:
return None
elif type_ == ACS_XACML_DATATYPE_DOUBLE:
try:
return float(value)
except:
return None
elif type_ == ACS_XACML_DATATYPE_TIME:
try:
return time.strptime(value,"%h:%m:%s") #12:15:00
except:
return None
elif type_ == ACS_XACML_DATATYPE_DATE:
try:
return time.strptime(value,"%d/%b/%Y") #28/01/1982
except:
return None
elif type_ == ACS_XACML_DATATYPE_DATETIME:
try:
return iso8601_to_datetime(value)
except:
return None
elif type_ == ACS_XACML_DATATYPE_RFC822NAME: # email
r = re.compile(\
'[a-zA-Z0-9+_\-\.]+@[0-9a-zA-Z][.-0-9a-zA-Z]*.[a-zA-Z]+')
if r.search(value):
return value
else:
return None
elif type_ == ACS_XACML_DATATYPE_IPADDRESS: # x.x.x.x
r = re.compile(\
'(?:[\d]{1,3})\.(?:[\d]{1,3})\.(?:[\d]{1,3})\.(?:[\d]{1,3})')
if r.search(value):
return value
else:
return None
return None
@transaction.commit_manually
def load_or_create_user_profile(user=None, no_cleanup=False):
'''
If the user has a profile, remove assertions outdated and return
profile, else create a new one.
If cleanup: expiration_date < now() remove assertion data from profile
If no_cleanup: return profile if any without removing outdated
assertions
'''
from attribute_aggregator.models import UserAttributeProfile
profile = None
try:
if user:
try:
logger.debug('load_or_create_user_profile: \
search profile for %s' \
% user)
profile = UserAttributeProfile.objects.get(user=user)
except ObjectDoesNotExist:
profile = UserAttributeProfile(user=user)
profile.save()
logger.info('load_or_create_user_profile: \
profile with id %s for user %s created' \
% (str(profile.id), user))
else:
if no_cleanup:
logger.debug('load_or_create_user_profile: Existing user \
profile %s for %s - No cleanup' % (profile, user))
else:
profile.cleanup()
profile.save()
elif not user:
profile = UserAttributeProfile()
profile.save()
logger.debug('load_or_create_user_profile: \
profile with id %s for an anonymous user created' \
% str(profile.id))
except Exception, err:
transaction.rollback()
logger.error('load_or_create_user_profile: An error occured %s' \
% str(err))
return None
else:
transaction.commit()
logger.debug('load_or_create_user_profile: everything successfully \
performed')
return profile
def get_user_alias_in_source(user, source):
from attribute_aggregator.models import UserAliasInSource
try:
alias = UserAliasInSource.objects.get(user=user, source=source)
return alias.name
except:
return None
def set_user_alias_in_source(user, source, name):
from attribute_aggregator.models import UserAliasInSource
try:
alias, c = UserAliasInSource.objects.get_or_create(user=user,
source=source, name=name)
alias.save()
return alias
except:
return None