612 lines
18 KiB
Python
612 lines
18 KiB
Python
import urllib
|
|
|
|
try:
|
|
import elementtree.ElementTree as ET
|
|
except ImportError:
|
|
try:
|
|
import xml.etree.ElementTree as ET
|
|
except ImportError:
|
|
ET = None
|
|
|
|
from qommon.storage import StorableObject
|
|
from quixote import get_request, get_session
|
|
|
|
from qommon import emails
|
|
|
|
from qommon.form import *
|
|
from qommon.misc import simplify, date_format
|
|
from qommon import get_cfg
|
|
|
|
from formdata import FormData
|
|
from roles import Role
|
|
from users import User
|
|
from categories import Category
|
|
from wcs.workflows import Workflow
|
|
import fields
|
|
|
|
status_labels = {
|
|
None: N_('Unknown'),
|
|
'new': N_('New'),
|
|
'rejected': N_('Rejected'),
|
|
'accepted': N_('Accepted'),
|
|
'finished': N_('Finished')
|
|
}
|
|
|
|
class FormField:
|
|
### only used to unpickle form fields from older (<200603) versions
|
|
def __setstate__(self, dict):
|
|
type = dict['type']
|
|
self.real_field = fields.get_field_class_by_type(type)(**dict)
|
|
|
|
|
|
|
|
class FormDef(StorableObject):
|
|
_names = 'formdefs'
|
|
_indexes = ['url_name']
|
|
|
|
name = None
|
|
url_name = None
|
|
fields = None
|
|
receiver_id = None
|
|
category_id = None
|
|
workflow_id = None
|
|
roles = None
|
|
discussion = False
|
|
confirmation = True
|
|
detailed_emails = True
|
|
disabled = False
|
|
only_allow_one = False
|
|
allow_drafts = False
|
|
|
|
acl_read = 'owner' # one of ('owner', 'roles', 'all')
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
|
|
if self.__dict__.has_key('receiver'):
|
|
self.receiver_id = self.__dict__['receiver']
|
|
del self.__dict__['receiver']
|
|
changed = True
|
|
|
|
if self.__dict__.has_key('category'):
|
|
self.category_id = self.__dict__['category']
|
|
del self.__dict__['category']
|
|
changed = True
|
|
|
|
if not self.url_name:
|
|
try:
|
|
int(self.id)
|
|
except ValueError:
|
|
self.url_name = self.id
|
|
else:
|
|
self.url_name = simplify(self.name)
|
|
changed = True
|
|
|
|
if self.fields and type(self.fields[0]) is dict:
|
|
for f in self.fields:
|
|
if f.has_key('name'):
|
|
f['label'] = f['name']
|
|
del f['name']
|
|
self.fields = [FormField(**x) for x in self.fields]
|
|
for i, f in enumerate(self.fields):
|
|
f.id = str(i)
|
|
for formdata in self.data_class().select():
|
|
for f in self.fields:
|
|
if not formdata.data.has_key(f.label):
|
|
continue
|
|
formdata.data[f.id] = formdata.data[f.label]
|
|
del formdata.data[f.label]
|
|
formdata.store()
|
|
changed = True
|
|
|
|
if self.fields and isinstance(self.fields[0], FormField):
|
|
# migration from generic FormField to specific Field classes
|
|
# (200603)
|
|
self.fields = [x.real_field for x in self.fields]
|
|
|
|
if self.__dict__.has_key('public'):
|
|
if self.__dict__.get('public'):
|
|
self.acl_read = 'all'
|
|
del self.__dict__['public']
|
|
changed = True
|
|
|
|
if changed:
|
|
self.store()
|
|
|
|
def data_class(self):
|
|
cls = FormData
|
|
cls._names = 'form-%s' % self.url_name
|
|
return cls
|
|
|
|
def store(self):
|
|
new_url_name = simplify(self.name)
|
|
if new_url_name != self.url_name:
|
|
# title changed, url will be changed only if there are not yet any
|
|
# submitted forms
|
|
data_class = self.data_class()
|
|
if data_class().count() == 0:
|
|
self.url_name = new_url_name
|
|
return StorableObject.store(self)
|
|
|
|
|
|
def get_receiver(self):
|
|
if self.receiver_id:
|
|
try:
|
|
return Role.get(self.receiver_id)
|
|
except KeyError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def set_receiver(self, role):
|
|
if role:
|
|
self.receiver_id = role.id
|
|
elif self.receiver_id:
|
|
self.receiver_id = None
|
|
receiver = property(get_receiver, set_receiver)
|
|
|
|
def get_category(self):
|
|
if self.category_id:
|
|
try:
|
|
return Category.get(self.category_id)
|
|
except KeyError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def set_category(self, category):
|
|
if category:
|
|
self.category_id = category.id
|
|
elif self.category_id:
|
|
self.category_id = None
|
|
category = property(get_category, set_category)
|
|
|
|
|
|
def get_workflow(self):
|
|
if self.workflow_id:
|
|
try:
|
|
return Workflow.get(self.workflow_id)
|
|
except KeyError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def set_workflow(self, workflow):
|
|
if workflow:
|
|
self.workflow_id = workflow.id
|
|
elif self.workflow_id:
|
|
self.workflow_id = None
|
|
workflow = property(get_workflow, set_workflow)
|
|
|
|
def get_by_urlname(cls, url_name):
|
|
return cls.get_on_index(url_name, 'url_name')
|
|
get_by_urlname = classmethod(get_by_urlname)
|
|
|
|
def get_url(self, backoffice = False):
|
|
req = get_request()
|
|
base_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
|
|
urllib.quote(req.environ.get('SCRIPT_NAME')))
|
|
if backoffice:
|
|
base_url += '/backoffice'
|
|
return '%s/%s/' % (base_url, self.url_name)
|
|
|
|
|
|
def create_form(self, page_no = 0, displayed_fields = None):
|
|
form = Form(enctype = "multipart/form-data", use_tokens = False)
|
|
# had: , use_tokens = not self.confirmation)
|
|
self.add_fields_to_form(form, page_no = page_no, displayed_fields = displayed_fields)
|
|
return form
|
|
|
|
def add_fields_to_form(self, form, page_no = 0, displayed_fields = None, form_data = None):
|
|
current_page = 0
|
|
for field in self.fields:
|
|
if field.type == 'page':
|
|
if field is self.fields[0]:
|
|
continue
|
|
current_page += 1
|
|
if current_page > page_no:
|
|
break
|
|
continue
|
|
if current_page != page_no:
|
|
continue
|
|
if type(displayed_fields) is list:
|
|
displayed_fields.append(field)
|
|
value = None
|
|
if form_data:
|
|
value = form_data.get(field.id)
|
|
field.add_to_form(form, value)
|
|
|
|
def get_page(self, page_no):
|
|
return [x for x in self.fields if x.type == 'page'][page_no]
|
|
|
|
def create_view_form(self, dict = {}, use_tokens = True):
|
|
form = Form(enctype = 'multipart/form-data', use_tokens = use_tokens)
|
|
on_disabled_page = False
|
|
on_page = False
|
|
for field in self.fields:
|
|
if field.type == 'page':
|
|
on_disabled_page = False
|
|
if not field.is_visible(dict):
|
|
on_disabled_page = True
|
|
|
|
form_field = False
|
|
for f in self.fields[self.fields.index(field)+1:]:
|
|
if f.key == 'page':
|
|
break
|
|
if isinstance(f, fields.WidgetField):
|
|
form_field = True
|
|
break
|
|
if form_field is False:
|
|
on_disabled_page = True
|
|
|
|
if on_disabled_page:
|
|
continue
|
|
|
|
if field.type == 'page':
|
|
if on_page:
|
|
form.widgets.append(HtmlWidget(htmltext('</div>')))
|
|
form.widgets.append(HtmlWidget(
|
|
htmltext('<div class="page"><h3>%s</h3>' % field.label)))
|
|
on_page = True
|
|
|
|
value = dict.get(field.id, '')
|
|
field.add_to_view_form(form, value)
|
|
|
|
if on_page:
|
|
form.widgets.append(HtmlWidget(htmltext('</div>')))
|
|
|
|
return form
|
|
|
|
def get_data(self, form):
|
|
d = {}
|
|
for field in self.fields:
|
|
widget = form.get_widget('f%s' % field.id)
|
|
if widget:
|
|
d[field.id] = widget.parse()
|
|
if d.get(field.id) and field.convert_value_from_str:
|
|
d[field.id] = field.convert_value_from_str(d[field.id])
|
|
if widget and widget.cleanup:
|
|
widget.cleanup()
|
|
|
|
return d
|
|
|
|
|
|
def export_to_xml(self):
|
|
charset = get_publisher().site_charset
|
|
root = ET.Element('formdef')
|
|
ET.SubElement(root, 'name').text = unicode(self.name, charset)
|
|
ET.SubElement(root, 'url_name').text = unicode(self.url_name, charset)
|
|
if self.category_id:
|
|
ET.SubElement(root, 'category').text = unicode(self.category.name, charset)
|
|
fields = ET.SubElement(root, 'fields')
|
|
for field in self.fields:
|
|
fields.append(field.export_to_xml(charset = charset))
|
|
|
|
return root
|
|
|
|
def import_from_xml(cls, fd):
|
|
charset = get_publisher().site_charset
|
|
try:
|
|
tree = ET.parse(fd)
|
|
except:
|
|
raise ValueError()
|
|
formdef = cls()
|
|
if tree.find('name') is None or not tree.find('name').text:
|
|
raise ValueError()
|
|
|
|
formdef.name = tree.find('name').text.encode(charset)
|
|
formdef.fields = []
|
|
for i, field in enumerate(tree.find('fields')):
|
|
try:
|
|
field_o = fields.get_field_class_by_type(field.findtext('type'))()
|
|
except KeyError:
|
|
raise ValueError()
|
|
field_o.init_with_xml(field, charset)
|
|
field_o.id = str(i)
|
|
formdef.fields.append(field_o)
|
|
if tree.find('category') is not None:
|
|
category = tree.find('category').text.encode(charset)
|
|
cats = Category.select()
|
|
for c in cats:
|
|
if c.name == category:
|
|
formdef.category_id = c.id
|
|
break
|
|
return formdef
|
|
|
|
import_from_xml = classmethod(import_from_xml)
|
|
|
|
def get_detailed_email_form(self, formdata, url):
|
|
details = []
|
|
if formdata.user_id and formdata.user:
|
|
details.append(_('User name:'))
|
|
details.append(' %s' % formdata.user.name)
|
|
details.append('')
|
|
fields = self.fields
|
|
for field in self.fields:
|
|
data = formdata.data
|
|
if data.get(field.id) is None:
|
|
continue
|
|
details.append(_('%s:') % field.label)
|
|
if field.type == 'bool':
|
|
if eval(data[field.id]):
|
|
details.append(' %s' % _('Yes'))
|
|
else:
|
|
details.append(' %s' % _('No'))
|
|
elif field.type == 'file':
|
|
if 'status' in url:
|
|
file_url = url.replace('status', 'download?f=%s' % field.id)
|
|
else:
|
|
file_url = url + 'download?f=%s' % field.id
|
|
details.append(' %s' % file_url)
|
|
elif field.type == 'text':
|
|
# XXX: howto support preformatted text in a dl in docutils ?
|
|
details.append((' %s' % data[field.id]).replace('\n', '\n '))
|
|
elif field.type == 'date':
|
|
details.append(' %s' % time.strftime(date_format(), data[field.id]))
|
|
else:
|
|
details.append('%s' % field.get_rst_view_value(data[field.id], indent=' '))
|
|
details.append('')
|
|
return '\n'.join(details)
|
|
|
|
def get_submitter_email(self, formdata):
|
|
users_cfg = get_cfg('users', {})
|
|
field_email = users_cfg.get('field_email') or 'email'
|
|
if formdata.user:
|
|
if field_email == 'email' and formdata.user.email:
|
|
return formdata.user.email
|
|
elif formdata.user.form_data and formdata.user.form_data.get(field_email):
|
|
return formdata.user.form_data.get(field_email)
|
|
|
|
# if there is no user, or user has no email address, look
|
|
# up in submitted form for one that would hold the user
|
|
# email (the one set to be prefilled by user email)
|
|
fields = formdata.formdef.fields
|
|
for field in fields:
|
|
if not hasattr(field, 'prefill'):
|
|
continue
|
|
if field.prefill and field.prefill.get('type') == 'user':
|
|
if field.prefill.get('value') == field_email:
|
|
v = formdata.data.get(field.id)
|
|
if v:
|
|
return v
|
|
return None
|
|
|
|
def notify_new_user(self, formdata):
|
|
submitter_email = self.get_submitter_email(formdata)
|
|
if not submitter_email:
|
|
return
|
|
|
|
if self.receiver.emails:
|
|
from_email = self.receiver.emails[0]
|
|
else:
|
|
from_email = None
|
|
|
|
url = formdata.get_url()
|
|
data = {
|
|
'url': url,
|
|
'details': self.get_detailed_email_form(formdata, url),
|
|
'name': self.name,
|
|
'user': formdata.user,
|
|
}
|
|
|
|
emails.custom_ezt_email('new_user',
|
|
data, submitter_email,
|
|
replyto = from_email,
|
|
exclude_current_user = False,
|
|
fire_and_forget = True)
|
|
|
|
|
|
def notify_new_receiver(self, formdata):
|
|
if not self.receiver.emails:
|
|
return
|
|
|
|
mail_body = _("""Hello,
|
|
|
|
A new form has been submitted on the website, you can consult it with this
|
|
link: [url]
|
|
|
|
[if-any details]
|
|
[details]
|
|
[end]
|
|
""")
|
|
|
|
url = formdata.get_url(backoffice = True)
|
|
data = {
|
|
'url': url,
|
|
'details': self.get_detailed_email_form(formdata, url),
|
|
'name': self.name
|
|
}
|
|
|
|
emails.custom_ezt_email('new_receiver',
|
|
data, self.receiver.emails[0],
|
|
bcc = self.receiver.emails[1:],
|
|
exclude_current_user = True,
|
|
fire_and_forget = True)
|
|
|
|
def notify_change_user(self, formdata, old_status):
|
|
submitter_email = self.get_submitter_email(formdata)
|
|
if not submitter_email:
|
|
return
|
|
|
|
if self.receiver.emails:
|
|
from_email = self.receiver.emails[0]
|
|
else:
|
|
from_email = None
|
|
|
|
url = formdata.get_url()
|
|
|
|
if old_status != formdata.status:
|
|
email_key = 'change_user'
|
|
else:
|
|
email_key = 'comment_user'
|
|
|
|
evolution = self.get_detailed_evolution(formdata)
|
|
|
|
data = {
|
|
'url': url,
|
|
'evolution': evolution,
|
|
'before': _(status_labels[old_status]),
|
|
'after': _(status_labels[formdata.status]),
|
|
'user': formdata.user,
|
|
}
|
|
|
|
emails.custom_ezt_email(email_key, data, submitter_email,
|
|
replyto = from_email,
|
|
exclude_current_user = True,
|
|
fire_and_forget = True)
|
|
|
|
def get_detailed_evolution(self, formdata):
|
|
if not formdata.evolution:
|
|
return None
|
|
|
|
details = []
|
|
evo = formdata.evolution[-1]
|
|
if evo.who:
|
|
details.append(_('User name'))
|
|
details.append(' %s' % User.get(evo.who).name)
|
|
if evo.status:
|
|
details.append(_('Status'))
|
|
details.append(' %s' % _(status_labels[evo.status]))
|
|
if evo.comment:
|
|
details.append('\n%s\n' % evo.comment)
|
|
return '\n\n----\n\n' + '\n'.join(details)
|
|
|
|
|
|
def notify_change_receiver(self, formdata, old_status):
|
|
if not self.receiver.emails:
|
|
return
|
|
|
|
emails_cfg = get_cfg('emails', {})
|
|
|
|
url = formdata.get_url(backoffice = True)
|
|
|
|
if old_status != formdata.status:
|
|
email_key = 'change_receiver'
|
|
else:
|
|
email_key = 'comment_receiver'
|
|
|
|
evolution = self.get_detailed_evolution(formdata)
|
|
|
|
data = {
|
|
'name': self.name,
|
|
'url': url,
|
|
'evolution': evolution,
|
|
'before': _(status_labels[old_status]),
|
|
'after': _(status_labels[formdata.status])
|
|
}
|
|
|
|
emails.custom_ezt_email(email_key, data, self.receiver.emails,
|
|
exclude_current_user = True,
|
|
fire_and_forget = True)
|
|
|
|
|
|
from qommon.admin.emails import EmailsDirectory
|
|
|
|
EmailsDirectory.register('new_user', N_('Notification of creation to user'),
|
|
N_('Available variables: user, name, url, details'), enabled = False,
|
|
default_subject = N_('New form ([name])'),
|
|
default_body = N_('''\
|
|
Hello,
|
|
|
|
[if-any user]
|
|
This mail is a reminder about the form you just submitted; you can consult it
|
|
with this link: [url]
|
|
[else]
|
|
This mail is a reminder about the form you just submitted.
|
|
[end]
|
|
|
|
[if-any details]
|
|
For reference, here are the details:
|
|
|
|
[details]
|
|
[end]
|
|
'''))
|
|
|
|
EmailsDirectory.register('change_user', N_('Notification of change to user'),
|
|
N_('Available variables: user, url, before, after, evolution'),
|
|
default_subject = N_('Form status change'),
|
|
default_body = N_('''\
|
|
Hello,
|
|
|
|
Status of the form you submitted just changed (from "[before]" to "[after]").
|
|
|
|
[if-any user]
|
|
You can consult it with this link:
|
|
[url]
|
|
[else]
|
|
[end]
|
|
|
|
[if-any evolution]
|
|
[evolution]
|
|
[end]
|
|
'''))
|
|
|
|
|
|
EmailsDirectory.register('new_receiver', N_('Notification of creation to receiver'),
|
|
N_('Available variables: name, url, details'), enabled = False,
|
|
default_subject = N_('New form ([name])'),
|
|
default_body = N_('''\
|
|
Hello,
|
|
|
|
A new form has been submitted, you can see it with this link:
|
|
[url]
|
|
|
|
[if-any details]
|
|
For reference, here are the details:
|
|
|
|
[details]
|
|
[end]
|
|
'''))
|
|
|
|
|
|
|
|
EmailsDirectory.register('change_receiver', N_('Notification of change to receiver'),
|
|
N_('Available variables: name, url, before, after, evolution'),
|
|
default_subject = N_('Form status change ([name])'),
|
|
default_body = N_('''\
|
|
Hello,
|
|
|
|
A form just changed, you can consult it with this link:
|
|
|
|
[url]
|
|
|
|
[if-any evolution]
|
|
[evolution]
|
|
[end]
|
|
'''))
|
|
|
|
|
|
|
|
EmailsDirectory.register('comment_user', N_('Notification of comment to user'),
|
|
N_('Available variables: url, evolution'),
|
|
default_subject = N_('New comment on form'),
|
|
default_body = N_('''\
|
|
Hello,
|
|
|
|
New information has been submitted on the form you submitted, you can
|
|
consult it with this link:
|
|
|
|
[url]
|
|
|
|
[if-any evolution]
|
|
[evolution]
|
|
[end]
|
|
'''))
|
|
|
|
|
|
EmailsDirectory.register('comment_receiver', N_('Notification of comment to receiver'),
|
|
N_('Available variables: name, url, evolution'),
|
|
default_subject = N_('Form status comment [name]'),
|
|
default_body = N_('''\
|
|
Hello,
|
|
|
|
A form just changed, you can consult it with this link:
|
|
|
|
[url]
|
|
|
|
[if-any evolution]
|
|
[evolution]
|
|
[end]
|
|
'''))
|
|
|