334 lines
11 KiB
Python
334 lines
11 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
|
|
from quixote import get_publisher
|
|
|
|
import wcs.qommon.storage as st
|
|
|
|
from .qommon import _, get_cfg
|
|
from .qommon.misc import simplify
|
|
from .qommon.storage import StorableObject
|
|
from .qommon.substitution import Substitutions, invalidate_substitution_cache
|
|
|
|
|
|
class User(StorableObject):
|
|
_names = 'users'
|
|
|
|
name = None
|
|
email = None
|
|
roles = None
|
|
is_active = True
|
|
is_admin = False
|
|
anonymous = False
|
|
form_data = None # dumping ground for custom fields
|
|
|
|
verified_fields = None
|
|
name_identifiers = None
|
|
lasso_dump = None
|
|
deleted_timestamp = None
|
|
|
|
last_seen = None
|
|
is_api_user = False
|
|
|
|
default_search_result_template = """{{ user_email|default:"" }}
|
|
{% if user_var_phone %} 📞 {{ user_var_phone }}{% endif %}
|
|
{% if user_var_mobile %} 📱 {{ user_var_mobile }}{% endif %}
|
|
{% if user_var_address or user_var_zipcode or user_var_city %} 📨{% endif %}
|
|
{% if user_var_address %} {{ user_var_address }}{% endif %}
|
|
{% if user_var_zipcode %} {{ user_var_zipcode }}{% endif %}
|
|
{% if user_var_city %} {{ user_var_city }}{% endif %}"""
|
|
|
|
def __init__(self, name=None):
|
|
StorableObject.__init__(self)
|
|
self.name = name
|
|
self.name_identifiers = []
|
|
self.verified_fields = []
|
|
self.roles = []
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
|
|
if self.roles and 'site-admin' in self.roles:
|
|
self.is_admin = True
|
|
self.roles = [x for x in self.roles if x != 'site-admin']
|
|
changed = True
|
|
|
|
if self.roles:
|
|
for role in self.roles:
|
|
if isinstance(role, int):
|
|
self.roles = [str(x) for x in self.roles]
|
|
changed = True
|
|
break
|
|
|
|
if not self.verified_fields:
|
|
self.verified_fields = []
|
|
|
|
if changed:
|
|
self.store()
|
|
|
|
@invalidate_substitution_cache
|
|
def store(self, *args, **kwargs):
|
|
return super().store(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def get_formdef(cls):
|
|
from .admin.settings import UserFieldsFormDef
|
|
|
|
return UserFieldsFormDef.singleton()
|
|
|
|
@classmethod
|
|
def get_fields(cls):
|
|
formdef = cls.get_formdef()
|
|
return formdef.fields or []
|
|
|
|
@property
|
|
def ascii_name(self):
|
|
return simplify(self.get_display_name(), space=' ')
|
|
|
|
def get_display_name(self):
|
|
if self.name:
|
|
return self.name
|
|
if self.email:
|
|
return self.email
|
|
return str(_('Unknown User'))
|
|
|
|
display_name = property(get_display_name)
|
|
|
|
def __str__(self):
|
|
return self.display_name
|
|
|
|
@property
|
|
def nameid(self):
|
|
return self.name_identifiers[0] if self.name_identifiers else None
|
|
|
|
def get_roles(self):
|
|
return (self.roles or []) + ['_user:%s' % self.id]
|
|
|
|
def set_attributes_from_formdata(self, formdata):
|
|
users_cfg = get_cfg('users', {})
|
|
|
|
if formdata.get('email'):
|
|
self.email = formdata.get('email')
|
|
|
|
field_email = users_cfg.get('field_email')
|
|
if field_email:
|
|
self.email = formdata.get(field_email)
|
|
|
|
field_name_values = users_cfg.get('field_name')
|
|
if isinstance(field_name_values, str): # it was a string in previous versions
|
|
field_name_values = [field_name_values]
|
|
|
|
if field_name_values:
|
|
self.name = ' '.join([formdata.get(x) for x in field_name_values if formdata.get(x)])
|
|
|
|
def can_go_in_admin(self):
|
|
return self.is_admin
|
|
|
|
def can_go_in_backoffice(self):
|
|
if self.is_admin:
|
|
return True
|
|
if self.anonymous:
|
|
return False
|
|
|
|
for role_id in self.roles or []:
|
|
try:
|
|
role = get_publisher().role_class.get(role_id)
|
|
if role.allows_backoffice_access:
|
|
return True
|
|
except KeyError: # role has been deleted
|
|
pass
|
|
return False
|
|
|
|
def can_go_in_backoffice_section(self, section):
|
|
authorised_roles = set(get_cfg('admin-permissions', {}).get(section) or [])
|
|
if authorised_roles:
|
|
return bool(set(self.roles or []).intersection(authorised_roles))
|
|
return self.can_go_in_admin()
|
|
|
|
def can_go_in_backoffice_forms(self):
|
|
return self.can_go_in_backoffice_section('forms')
|
|
|
|
def can_go_in_backoffice_cards(self):
|
|
return self.can_go_in_backoffice_section('cards')
|
|
|
|
def can_go_in_backoffice_workflows(self):
|
|
return self.can_go_in_backoffice_section('workflows')
|
|
|
|
@classmethod
|
|
def get_available_roles(cls):
|
|
from .roles import get_user_roles
|
|
|
|
return get_user_roles()
|
|
|
|
def add_roles(self, roles):
|
|
if not self.roles:
|
|
self.roles = []
|
|
self.roles.extend(roles)
|
|
|
|
@classmethod
|
|
def get_users_with_role(cls, role_id):
|
|
# this will be slow with the pickle backend as there is no index
|
|
# suitable for Intersects()
|
|
return cls.select([st.Null('deleted_timestamp'), st.Intersects('roles', [str(role_id)])])
|
|
|
|
@classmethod
|
|
def get_users_with_roles(cls, included_roles=None, excluded_roles=None, order_by=None):
|
|
if not get_publisher().is_using_postgresql():
|
|
return []
|
|
|
|
from wcs import sql
|
|
|
|
criterias = [sql.Null('deleted_timestamp')]
|
|
if included_roles:
|
|
criterias.append(sql.ArrayContains('roles', [str(r) for r in included_roles]))
|
|
if excluded_roles:
|
|
criterias.append(sql.Not(sql.Intersects('roles', [str(r) for r in excluded_roles])))
|
|
return cls.select(criterias, order_by=order_by)
|
|
|
|
@classmethod
|
|
def get_users_with_name_identifier(cls, name_identifier):
|
|
return cls.select(
|
|
[st.Null('deleted_timestamp'), st.Intersects('name_identifiers', [name_identifier])]
|
|
)
|
|
|
|
@classmethod
|
|
def get_users_with_email(cls, email):
|
|
return cls.select([st.Null('deleted_timestamp'), st.Equal('email', email)])
|
|
|
|
@classmethod
|
|
def get_users_with_name(cls, name):
|
|
return cls.select([st.Null('deleted_timestamp'), st.Equal('name', name)])
|
|
|
|
def get_substitution_variables(self, prefix='session_'):
|
|
d = {
|
|
prefix + 'user': self,
|
|
prefix + 'user_display_name': self.display_name,
|
|
prefix + 'user_email': self.email,
|
|
}
|
|
formdef = self.get_formdef()
|
|
if formdef:
|
|
from .formdata import get_dict_with_varnames
|
|
|
|
data = get_dict_with_varnames(formdef.fields, self.form_data)
|
|
for k, v in data.items():
|
|
d[prefix + 'user_' + k] = v
|
|
|
|
d[prefix + 'user_admin_access'] = self.can_go_in_admin()
|
|
d[prefix + 'user_backoffice_access'] = self.can_go_in_backoffice()
|
|
for i, name_identifier in enumerate(self.name_identifiers):
|
|
if i == 0:
|
|
d[prefix + 'user_nameid'] = name_identifier
|
|
d[prefix + 'user_name_identifier_%d' % i] = name_identifier
|
|
return d
|
|
|
|
@classmethod
|
|
def lookup_by_string(cls, user_value):
|
|
# lookup a single user using some string identifier, it can either be
|
|
# an email address or a uuid (name id).
|
|
if not user_value:
|
|
return None
|
|
if '@' in user_value:
|
|
users = cls.get_users_with_email(user_value)
|
|
else:
|
|
users = cls.get_users_with_name_identifier(user_value)
|
|
if not users:
|
|
return None
|
|
# hopefully the list has a single item but sort it on id to get a
|
|
# stable value in case there are multiple items.
|
|
users.sort(key=lambda x: x.id)
|
|
return users[0]
|
|
|
|
@classmethod
|
|
def get_substitution_variables_list(cls, prefix='session_'):
|
|
formdef = cls.get_formdef()
|
|
if not formdef:
|
|
return []
|
|
variables = []
|
|
for field in formdef.fields:
|
|
# we only advertise fields with a varname, as they can be
|
|
# considered stable
|
|
if field.varname:
|
|
variables.append(
|
|
(
|
|
_('User'),
|
|
prefix + 'user_var_' + field.varname,
|
|
_('Session User Field: %s') % field.label,
|
|
)
|
|
)
|
|
return variables
|
|
|
|
def __getattr__(self, attr):
|
|
if attr in self.__dict__:
|
|
return self.__dict__[attr]
|
|
|
|
formdef = self.get_formdef()
|
|
if formdef:
|
|
# lookup based on field varname
|
|
field = [x for x in formdef.fields if x.varname == attr]
|
|
if not field: # lookup on field id
|
|
field = [x for x in formdef.fields if str(x.id) == '_' + attr]
|
|
if field:
|
|
if 'form_data' not in self.__dict__:
|
|
return None
|
|
return self.__dict__['form_data'].get(field[0].id)
|
|
|
|
if attr[0] == 'f' and (self.__dict__['form_data'] and attr[0] == 'f' and self.__dict__['form_data']):
|
|
return self.__dict__['form_data'].get(attr[1:])
|
|
|
|
raise AttributeError()
|
|
|
|
def get_json_export_dict(self):
|
|
data = {
|
|
'id': self.id,
|
|
'name': self.display_name,
|
|
}
|
|
if self.email:
|
|
data['email'] = self.email
|
|
if self.name_identifiers:
|
|
data['NameID'] = self.name_identifiers
|
|
return data
|
|
|
|
def set_deleted(self):
|
|
self.deleted_timestamp = datetime.datetime.now()
|
|
self.store()
|
|
|
|
# django-compatibility properties and methods, useful in shared code/templates
|
|
@property
|
|
def is_anonymous(self):
|
|
return self.anonymous
|
|
|
|
@property
|
|
def is_authenticated(self):
|
|
return not (self.anonymous)
|
|
|
|
@property
|
|
def is_superuser(self):
|
|
return self.is_admin
|
|
|
|
def get_full_name(self):
|
|
return self.display_name
|
|
|
|
|
|
Substitutions.register(
|
|
'session_user_display_name',
|
|
category=_('User'),
|
|
comment=_('Session User Display Name'),
|
|
)
|
|
Substitutions.register('session_user_email', category=_('User'), comment=_('Session User Email'))
|
|
Substitutions.register_dynamic_source(User)
|