# w.c.s. - web application for online forms # Copyright (C) 2005-2020 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import json import urllib.parse from quixote import get_publisher from wcs.qommon.xml_storage import XmlStorableObject from .qommon.errors import ConnectionError from .qommon.misc import http_post_request class ApiAccess(XmlStorableObject): _names = 'apiaccess' xml_root_node = 'apiaccess' name = None access_identifier = None access_key = None description = None restrict_to_anonymised_data = False idp_api_client = False _roles = None _role_ids = Ellipsis # declarations for serialization XML_NODES = [ ('name', 'str'), ('description', 'str'), ('access_identifier', 'str'), ('access_key', 'str'), ('restrict_to_anonymised_data', 'bool'), ('roles', 'roles'), ('idp_api_client', 'bool'), ] @classmethod def get_by_identifier(cls, access_identifier): for api_access in cls.select(): if api_access.access_identifier == access_identifier: return api_access return None @classmethod def get_access_key(cls, access_identifier): api_access = cls.get_by_identifier(access_identifier) if api_access: return api_access.access_key return None def get_roles(self): return self.roles or [] def get_role_ids(self): if self._role_ids is not Ellipsis: return self._role_ids self._role_ids = [x.id for x in self.get_roles()] return self._role_ids def get_as_api_user(self): class RestrictedApiUser: # kept as inner class so cannot be pickled id = Ellipsis # make sure it fails all over the place if used is_admin = False is_api_user = True def __init__(self, api_access): self.api_access = api_access def can_go_in_admin(self): return False def can_go_in_backoffice(self): return False def get_roles(self): return self.roles def get_substitution_variables(self): return {} user = RestrictedApiUser(self) user.roles = self.get_role_ids() return user @classmethod def get_with_credentials(cls, username, password): api_access = cls.get_by_identifier(username) if not api_access or api_access.access_key != password or api_access.idp_api_client: api_access = cls.get_from_idp(username, password) if not api_access: raise KeyError return api_access.get_as_api_user() @property def roles(self): return self._roles() if callable(self._roles) else self._roles @roles.setter def roles(self, value): self._roles = value @classmethod def get_from_idp(cls, username, password): from wcs.api_utils import get_secret_and_orig, sign_url idp_api_url = get_publisher().get_site_option('idp_api_url', 'variables') or '' if not idp_api_url: return None url = urllib.parse.urljoin(idp_api_url, 'check-api-client/') secret, orig = get_secret_and_orig(url) url += '?orig=%s' % orig headers = { 'Accept': 'application/json', 'Content-type': 'application/json', } try: response, status, _, _ = http_post_request( sign_url(url, secret), body=json.dumps({'identifier': username, 'password': password}), headers=headers, ) except ConnectionError: return None if status != 200: return None data = response.json() if data.get('err', 1) != 0: return None # cache api client locally, it is necessary for serialization for afterjobs # in uwsgi spooler. access_identifier = f'_idp_{username}' api_access = cls.get_by_identifier(access_identifier) or cls() api_access.idp_api_client = True api_access.access_identifier = access_identifier role_class = get_publisher().role_class try: api_access.restrict_to_anonymised_data = data['data']['restrict_to_anonymised_data'] api_access.roles = [role_class.get(x, ignore_errors=True) for x in data['data']['roles']] api_access.roles = [x for x in api_access.roles if x is not None] except KeyError: return None api_access.store() return api_access