wcs/wcs/formdata.py

201 lines
6.2 KiB
Python

import time
import urllib
from sets import Set
from quixote import get_request
from qommon.storage import StorableObject
import qommon.misc
from anonylink import AnonymityLink
from users import User
class Evolution:
who = None
status = None
time = None
comment = None
parts = None
def add_part(self, part):
if not self.parts:
self.parts = []
self.parts.append(part)
def display_parts(self):
if not self.parts:
return []
l = []
for p in self.parts:
if not hasattr(p, 'view'):
continue
l.append(p.view())
return l
class FormData(StorableObject):
_names = 'XX'
_hashed_indexes = ['user_id']
user_id = None
receipt_time = None
status = None
evolution = None
data = None
editable_by = None
_formdef = None
def get_formdef(self):
if self._formdef:
return self._formdef
from formdef import FormDef
type, id = self._names.split('-', 1)
try:
self._formdef = FormDef.get_by_urlname(id)
except KeyError:
self._formdef = None
return self._formdef
formdef = property(get_formdef)
def get_user(self):
if self.user_id and self.user_id != 'ultra-user':
return User.get(self.user_id)
return None
def set_user(self, user):
if user:
self.user_id = user.id
else:
self.user_id = None
user = property(get_user, set_user)
def just_created(self):
self.receipt_time = time.localtime()
if self.formdef.workflow_id:
self.status = 'wf-%s' % self.formdef.workflow.possible_status[0].id
else:
self.status = 'new'
def perform_workflow(self):
url = None
if self.status.startswith('wf-'):
wf_status = self.get_workflow_status()
url = wf_status.perform_items(self)
elif self.status == 'new':
self.formdef.notify_new_receiver(self)
self.formdef.notify_new_user(self)
return url
def display_workflow_message(self):
if self.status.startswith('wf-'):
wf_status = self.get_workflow_status()
for status in wf_status.items:
if hasattr(status, 'get_message'):
return status.get_message(self)
return ''
def get_status_label(self, status = None):
if not status:
status = self.status
if self.formdef.workflow:
try:
status_id = status.split('-')[1]
except IndexError:
return _('Unknownn')
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status_id][0]
return wf_status.name
else:
# COMPAT (behaviour when no workflow)
from formdef import status_labels
return _(status_labels[status])
def get_workflow_form(self, user):
wf_status = self.get_workflow_status()
return wf_status.get_action_form(self, user)
def handle_workflow_form(self, user, form):
wf_status = self.get_workflow_status()
return wf_status.handle_form(form, self, user)
def get_url(self, backoffice = False):
req = get_request()
base_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
urllib.quote(req.environ.get('SCRIPT_NAME')))
if backoffice:
base_url += '/backoffice'
return '%s/%s/%s/' % (base_url, self.formdef.url_name, self.id)
def get_workflow_status(self):
if not self.status.startswith('wf-'):
raise 'Error, this codepath is only for workflows (current status: %s)' % self.status
status_id = self.status.split('-')[1]
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status_id][0]
return wf_status
def get_field_value(self, field):
try:
x = [x for x in self.formdef.fields if x.label == field][0]
except IndexError:
return None
return self.data.get(x.id)
def get_as_dict(self, prefix = ''):
data = {}
for field in self.formdef.fields:
data['f%s' % field.id] = self.data.get(field.id)
identifier_name = qommon.misc.simplify(field.label, space = '_')
if not prefix and not identifier_name[0].isalpha():
identifier_name = '_' + identifier_name
data[prefix + identifier_name] = data['f%s' % field.id]
return data
def is_user_allowed_read(self, user):
if self.formdef.acl_read == 'all':
return True
if not user:
return False
if user.is_admin:
return True
elif self.formdef.acl_read == 'roles':
user_roles = Set(user.roles)
form_roles = (self.formdef.roles or [])
if self.formdef.receiver:
form_roles.append(self.formdef.receiver.id)
if user_roles.intersection(form_roles):
return True
elif self.formdef.acl_read == 'owner':
if self.user_id == user.id:
return True
if self.formdef.workflow_id:
# formdef has workflow, get roles allowed some actions relative to
# current status
wf_status = self.get_workflow_status()
status_action_roles = []
for item in wf_status.items or []:
if not hasattr(item, 'by'):
continue
status_action_roles.extend(
[x for x in item.by if not (type(x) is str and x.startswith('_'))])
user_roles = Set(user.roles)
if user_roles.intersection(status_action_roles or []):
return True
else:
# in default workflow, access is allowed for formdef.receiver_id
return self.formdef.receiver_id in (user.roles or [])
return False
# don't pickle _formdef cache
def __getstate__(self):
odict = self.__dict__
if odict.has_key('_formdef'):
del odict['_formdef']
return odict
def __setstate__(self, dict):
self.__dict__ = dict
self._formdef = None