374 lines
13 KiB
Python
374 lines
13 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import datetime
|
|
import json
|
|
import urllib.parse
|
|
|
|
from quixote import get_publisher
|
|
|
|
import wcs.qommon.storage as st
|
|
from wcs.api_utils import get_secret_and_orig, sign_url
|
|
|
|
from .qommon import _, get_cfg
|
|
from .qommon.misc import get_formatted_phone, http_post_request, simplify
|
|
from .qommon.storage import StorableObject
|
|
from .qommon.substitution import Substitutions, invalidate_substitution_cache
|
|
from .qommon.template import Template, TemplateError
|
|
|
|
|
|
class User(StorableObject):
|
|
_names = 'users'
|
|
|
|
name = None
|
|
email = None
|
|
roles = None
|
|
is_active = True
|
|
is_admin = False
|
|
form_data = None # dumping ground for custom fields
|
|
|
|
verified_fields = None
|
|
name_identifiers = None
|
|
lasso_dump = None
|
|
deleted_timestamp = None
|
|
|
|
last_seen = None
|
|
is_api_user = False
|
|
|
|
default_search_result_template = """{{ user_email|default:"" }}
|
|
{% if user_var_phone %} 📞 {{ user_var_phone }}{% endif %}
|
|
{% if user_var_mobile %} 📱 {{ user_var_mobile }}{% endif %}
|
|
{% if user_var_address or user_var_zipcode or user_var_city %} 📨{% endif %}
|
|
{% if user_var_address %} {{ user_var_address }}{% endif %}
|
|
{% if user_var_zipcode %} {{ user_var_zipcode }}{% endif %}
|
|
{% if user_var_city %} {{ user_var_city }}{% endif %}"""
|
|
|
|
def __init__(self, name=None):
|
|
StorableObject.__init__(self)
|
|
self.name = name
|
|
self.name_identifiers = []
|
|
self.verified_fields = []
|
|
self.roles = []
|
|
|
|
def get_formatted_phone(self, country_code=None):
|
|
users_cfg = get_cfg('users', {})
|
|
field_phone = users_cfg.get('field_phone')
|
|
if not field_phone:
|
|
return
|
|
phone = self.form_data.get(field_phone)
|
|
return get_formatted_phone(phone, country_code)
|
|
|
|
@invalidate_substitution_cache
|
|
def store(self, *args, **kwargs):
|
|
return super().store(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def get_formdef(cls):
|
|
from .admin.settings import UserFieldsFormDef
|
|
|
|
return UserFieldsFormDef.singleton()
|
|
|
|
@classmethod
|
|
def get_fields(cls):
|
|
formdef = cls.get_formdef()
|
|
return formdef.fields or []
|
|
|
|
@property
|
|
def ascii_name(self):
|
|
return simplify(self.get_display_name(), space=' ')
|
|
|
|
def get_display_name(self):
|
|
if self.name:
|
|
return self.name
|
|
if self.email:
|
|
return self.email
|
|
return str(_('Unknown User'))
|
|
|
|
display_name = property(get_display_name)
|
|
|
|
def __str__(self):
|
|
return self.display_name
|
|
|
|
@property
|
|
def nameid(self):
|
|
return self.name_identifiers[0] if self.name_identifiers else None
|
|
|
|
def get_roles(self):
|
|
return (self.roles or []) + ['_user:%s' % self.id]
|
|
|
|
def set_attributes_from_formdata(self, formdata):
|
|
users_cfg = get_cfg('users', {})
|
|
|
|
if formdata.get('email'):
|
|
self.email = formdata.get('email')
|
|
|
|
field_email = users_cfg.get('field_email')
|
|
if field_email:
|
|
self.email = formdata.get(field_email)
|
|
|
|
if users_cfg.get('fullname_template'):
|
|
# apply template
|
|
user_ctx = self.get_substitution_variables(prefix='')
|
|
template = users_cfg.get('fullname_template')
|
|
try:
|
|
self.name = Template(template, autoescape=False, raises=True).render(user_ctx).strip()
|
|
except TemplateError:
|
|
self.name = '!template error! (%s)' % self.id
|
|
else:
|
|
# legacy mode, list of field IDs
|
|
field_name_values = users_cfg.get('field_name')
|
|
if isinstance(field_name_values, str): # it was a string in previous versions
|
|
field_name_values = [field_name_values]
|
|
|
|
if field_name_values:
|
|
self.name = ' '.join([formdata.get(x) for x in field_name_values if formdata.get(x)])
|
|
|
|
def can_go_in_admin(self):
|
|
return self.is_admin
|
|
|
|
def can_go_in_backoffice(self):
|
|
if self.is_admin:
|
|
return True
|
|
|
|
for role_id in self.roles or []:
|
|
try:
|
|
role = get_publisher().role_class.get(role_id)
|
|
if role.allows_backoffice_access:
|
|
return True
|
|
except KeyError: # role has been deleted
|
|
pass
|
|
return False
|
|
|
|
def can_go_in_backoffice_section(self, section):
|
|
authorised_roles = set(get_cfg('admin-permissions', {}).get(section) or [])
|
|
if authorised_roles:
|
|
return bool(set(self.roles or []).intersection(authorised_roles))
|
|
return self.can_go_in_admin()
|
|
|
|
def can_go_in_backoffice_forms(self):
|
|
return self.can_go_in_backoffice_section('forms')
|
|
|
|
def can_go_in_backoffice_cards(self):
|
|
return self.can_go_in_backoffice_section('cards')
|
|
|
|
def can_go_in_backoffice_workflows(self):
|
|
return self.can_go_in_backoffice_section('workflows')
|
|
|
|
def add_roles(self, roles):
|
|
if not self.roles:
|
|
self.roles = []
|
|
self.roles.extend(roles)
|
|
|
|
@classmethod
|
|
def get_users_with_role(cls, role_id):
|
|
# this will be slow with the pickle backend as there is no index
|
|
# suitable for Intersects()
|
|
return cls.select([st.Null('deleted_timestamp'), st.Intersects('roles', [str(role_id)])])
|
|
|
|
@classmethod
|
|
def get_users_with_roles(cls, included_roles=None, excluded_roles=None, order_by=None):
|
|
from wcs import sql
|
|
|
|
criterias = [sql.Null('deleted_timestamp')]
|
|
if included_roles:
|
|
criterias.append(sql.ArrayContains('roles', [str(r) for r in included_roles]))
|
|
if excluded_roles:
|
|
criterias.append(sql.Not(sql.Intersects('roles', [str(r) for r in excluded_roles])))
|
|
return cls.select(criterias, order_by=order_by)
|
|
|
|
@classmethod
|
|
def get_users_with_name_identifier(cls, name_identifier):
|
|
return cls.select(
|
|
[st.Null('deleted_timestamp'), st.Intersects('name_identifiers', [name_identifier])]
|
|
)
|
|
|
|
@classmethod
|
|
def get_users_with_email(cls, email):
|
|
return cls.select([st.Null('deleted_timestamp'), st.Equal('email', email)])
|
|
|
|
@classmethod
|
|
def get_users_with_name(cls, name):
|
|
return cls.select([st.Null('deleted_timestamp'), st.Equal('name', name)])
|
|
|
|
def get_substitution_variables(self, prefix='session_'):
|
|
d = {
|
|
prefix + 'user': self,
|
|
prefix + 'user_display_name': self.display_name,
|
|
prefix + 'user_email': self.email,
|
|
}
|
|
formdef = self.get_formdef()
|
|
if formdef:
|
|
from .formdata import get_dict_with_varnames
|
|
|
|
data = get_dict_with_varnames(formdef.fields, self.form_data)
|
|
for k, v in data.items():
|
|
d[prefix + 'user_' + k] = v
|
|
|
|
d[prefix + 'user_admin_access'] = self.can_go_in_admin()
|
|
d[prefix + 'user_backoffice_access'] = self.can_go_in_backoffice()
|
|
for i, name_identifier in enumerate(self.name_identifiers):
|
|
if i == 0:
|
|
d[prefix + 'user_nameid'] = name_identifier
|
|
d[prefix + 'user_name_identifier_%d' % i] = name_identifier
|
|
return d
|
|
|
|
@classmethod
|
|
def lookup_by_string(cls, user_value):
|
|
# lookup a single user using some string identifier, it can either be
|
|
# an email address or a uuid (name id).
|
|
if not user_value:
|
|
return None
|
|
if '@' in user_value:
|
|
users = cls.get_users_with_email(user_value)
|
|
else:
|
|
users = cls.get_users_with_name_identifier(user_value)
|
|
if not users:
|
|
return None
|
|
# hopefully the list has a single item but sort it on id to get a
|
|
# stable value in case there are multiple items.
|
|
users.sort(key=lambda x: x.id)
|
|
return users[0]
|
|
|
|
@classmethod
|
|
def get_substitution_variables_list(cls, prefix='session_'):
|
|
formdef = cls.get_formdef()
|
|
if not formdef:
|
|
return []
|
|
variables = []
|
|
for field in formdef.fields:
|
|
# we only advertise fields with a varname, as they can be
|
|
# considered stable
|
|
if field.varname:
|
|
variables.append(
|
|
(
|
|
_('User'),
|
|
prefix + 'user_var_' + field.varname,
|
|
_('Session User Field: %s') % field.label,
|
|
)
|
|
)
|
|
return variables
|
|
|
|
def __getattr__(self, attr):
|
|
if attr in self.__dict__:
|
|
return self.__dict__[attr]
|
|
|
|
formdef = self.get_formdef()
|
|
if formdef:
|
|
# lookup based on field varname
|
|
field = [x for x in formdef.fields if x.varname == attr]
|
|
if not field: # lookup on field id
|
|
field = [x for x in formdef.fields if str(x.id) == '_' + attr]
|
|
if field:
|
|
if 'form_data' not in self.__dict__:
|
|
return None
|
|
return self.__dict__['form_data'].get(field[0].id)
|
|
|
|
if attr[0] == 'f' and (self.__dict__['form_data'] and attr[0] == 'f' and self.__dict__['form_data']):
|
|
return self.__dict__['form_data'].get(attr[1:])
|
|
|
|
raise AttributeError()
|
|
|
|
def get_json_export_dict(self, full=False):
|
|
data = {
|
|
'id': self.id,
|
|
'name': self.display_name,
|
|
}
|
|
if self.email:
|
|
data['email'] = self.email
|
|
if self.name_identifiers:
|
|
data['NameID'] = self.name_identifiers
|
|
if full:
|
|
formdef = self.get_formdef()
|
|
if formdef:
|
|
data.update({f.varname: self.form_data.get(f.id) for f in formdef.fields if f.varname})
|
|
return data
|
|
|
|
def set_deleted(self):
|
|
self.deleted_timestamp = datetime.datetime.now()
|
|
self.store()
|
|
|
|
# django-compatibility properties and methods, useful in shared code/templates
|
|
@property
|
|
def is_anonymous(self):
|
|
return False
|
|
|
|
@property
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
@property
|
|
def is_superuser(self):
|
|
return self.is_admin
|
|
|
|
def get_full_name(self):
|
|
return self.display_name
|
|
|
|
@classmethod
|
|
def update_attributes_from_formdata(cls, job=None):
|
|
for user in cls.select([st.Null('deleted_timestamp')], iterator=True):
|
|
orig_dict = copy.copy(user.__dict__)
|
|
user.set_attributes_from_formdata(user.form_data or {})
|
|
if orig_dict != user.__dict__:
|
|
user.store()
|
|
|
|
@classmethod
|
|
def keepalive_users(cls, *args, **kwargs):
|
|
# get API URL
|
|
idps = get_cfg('idp', {})
|
|
if not idps:
|
|
return
|
|
entity_id = list(idps.values())[0].get('metadata_url')
|
|
if not entity_id:
|
|
return
|
|
idp_url = entity_id.split('idp/saml2/metadata')[0]
|
|
url = urllib.parse.urljoin(idp_url, '/api/users/synchronization/')
|
|
|
|
# get list of active UUIDs
|
|
from wcs.carddef import CardDef
|
|
|
|
user_uuids = get_publisher().user_class.get_formdef_keepalive_user_uuids()
|
|
user_ids = set()
|
|
for carddef in CardDef.select(ignore_errors=True):
|
|
if not (carddef and carddef.user_support):
|
|
continue
|
|
for carddata in carddef.data_class().select(ignore_errors=True):
|
|
if carddata.user_id:
|
|
user_ids.add(carddata.user_id)
|
|
for user in get_publisher().user_class().get_ids(user_ids, ignore_errors=True):
|
|
user_uuids.extend(user.name_identifiers)
|
|
|
|
secret, orig = get_secret_and_orig(url)
|
|
url = sign_url(url + '?orig=%s' % orig, secret)
|
|
|
|
# call API
|
|
status = http_post_request(
|
|
url,
|
|
body=json.dumps({'known_uuids': list(set(user_uuids)), 'keepalive': True}),
|
|
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
|
)[1]
|
|
if status != 200:
|
|
get_publisher().record_error(_('Failed to call keepalive API (status: %s)') % status)
|
|
|
|
|
|
Substitutions.register(
|
|
'session_user_display_name',
|
|
category=_('User'),
|
|
comment=_('Session User Display Name'),
|
|
)
|
|
Substitutions.register('session_user_email', category=_('User'), comment=_('Session User Email'))
|
|
Substitutions.register_dynamic_source(User)
|