This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
asec/extra/modules/bodiffusion.py

1025 lines
39 KiB
Python

# w.c.s. (asec) - w.c.s. extension for poll & survey service
# Copyright (C) 2010-2011 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib
import csv
import re
from sets import Set
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from qommon.backoffice.menu import html_top
from qommon.admin.emails import EmailsDirectory
from qommon.admin.menu import command_icon
from qommon.admin.cfg import cfg_submit
from qommon.form import *
from qommon import errors, get_cfg, get_logger
from qommon.afterjobs import AfterJob
import qommon.storage
from qommon import emails
from wcs.formdef import FormDef, FormField
from wcs.roles import Role
from wcs.users import User
from qommon.ident.password import make_password
from qommon.ident.password_accounts import PasswordAccount
from wcs.workflows import Workflow
from wcs.admin.fields import FieldWidget, FieldDefPage, FieldsDirectory
from participanttokens import ParticipantToken
from boresults import FormResultDirectory
import quota
access_mode_labels = {
'disabled': N_('Disabled'),
'public': N_('Public'),
'private': N_('Private'),
'unknown': N_('Unknown')
}
def get_form_access_mode(formdef):
if formdef.disabled:
return 'disabled'
elif formdef.private:
return 'private'
else:
return 'public'
def get_form_url(formdef):
if formdef.private:
req = get_request()
base_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
urllib.quote(req.environ.get('SCRIPT_NAME')))
private_url = '%s/p/%s/' % (base_url, formdef.private_urlname)
return private_url
else:
return formdef.get_url()
class AccessCodeMailer:
def __init__(self, objectdef, mail_from, mail_subject, mail_body, behaviour):
self.objectdef = objectdef
self.mail_from = mail_from
self.mail_subject = mail_subject
self.mail_body = mail_body
self.behaviour = behaviour
self.questionnaire_url = get_form_url(objectdef)
def __call__(self, job=None):
count = 0
for user in User.select():
for role_id in (self.objectdef.roles or []):
if role_id in (user.roles or []):
break
else:
continue
if ParticipantToken.has_key('%s-%s' % (self.objectdef.id, user.id)):
if self.behaviour == 'skip':
continue
p = ParticipantToken(self.objectdef.id, user.id)
token = p.generate_token()
token = re.sub(r'(....)', r'\1-', token).strip('-')
p.store()
data = {
'user_name': user.display_name,
'user_email': user.email,
'access_code': token,
'questionnaire_name': self.objectdef.name,
'questionnaire_url': self.questionnaire_url,
}
emails.ezt_email(self.mail_subject, self.mail_body, data,
email_rcpt=user.email, fire_and_forget=False,
want_html=False, email_from=self.mail_from)
count += 1
get_logger().info('form %s - done mailing access codes (%s)' % (
self.objectdef.id, count))
class ParticipantsMailer:
def __init__(self, objectdef, mail_from, mail_subject, mail_body, behaviour):
self.objectdef = objectdef
self.mail_from = mail_from
self.mail_subject = mail_subject
self.mail_body = mail_body
self.behaviour = behaviour
self.questionnaire_url = get_form_url(objectdef)
def __call__(self, job=None):
count = 0
if self.behaviour == 'skip':
# it's not possible to use the index file, as it's disabled
# in asec, for the anonymous vote support, so, no:
# self.objectdef.data_class().get_with_indexed_value('user_id', user.id)
# and something not performant at all; but well, it runs in an
# after job so it's not that important...
#
user_ids = {}
if str(self.objectdef.workflow_id).endswith(str('+anonymous')):
import anonymity
for user_id in anonymity.get_voters(self.objectdef):
user_ids[user_id] = True
else:
for object in self.objectdef.data_class().select():
user_ids[object.user_id] = True
for user in User.select():
for role_id in (self.objectdef.roles or []):
if role_id in (user.roles or []):
break
else:
continue
if self.behaviour == 'skip' and user.id in user_ids:
continue
data = {
'user_name': user.display_name,
'user_email': user.email,
'questionnaire_name': self.objectdef.name,
'questionnaire_url': self.questionnaire_url,
}
emails.ezt_email(self.mail_subject, self.mail_body, data,
email_rcpt=user.email, fire_and_forget=False,
want_html=False, email_from=self.mail_from)
count += 1
get_logger().info('form %s - done mailing participants (%s)' % (
self.objectdef.id, count))
class DiffusionDirectory(Directory):
_q_exports = ['', 'add', 'remove', ('import', 'p_import'),
('import-disabled', 'import_disabled'),
('mail-access-codes', 'mail_access_codes'),
('mail-participants', 'mail_participants'),
('access-disabled', 'access_disabled'),
('access-public', 'access_public'),
('access-private', 'access_private'),
'enable', 'disable', 'emailfrom', 'emailnotify']
def __init__(self, objectdef):
self.objectdef = objectdef
def disable(self):
self.objectdef.roles = []
self.objectdef.store()
return redirect('.')
def enable(self):
if not self.objectdef.roles:
self.set_role()
return redirect('.')
def access_disabled(self):
self.objectdef.disabled = True
self.objectdef.store()
return redirect('.')
def access_public(self):
self.objectdef.disabled = False
self.objectdef.private = False
self.objectdef.store()
return redirect('.')
def access_private(self):
self.objectdef.disabled = False
self.objectdef.private = True
self.objectdef.store()
return redirect('.')
def emailfrom(self):
emails_cfg = get_cfg('emails', {})
form = Form(enctype='multipart/form-data')
form.add(EmailWidget, 'from', title=_('Sender Address'),
value=emails_cfg.get('from', ''), size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append( ('emailfrom', _('Sender Address')) )
html_top('config', title = _('Sender Address'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Sender Address')
r += form.render()
return r.getvalue()
else:
cfg_submit(form, 'emails', ['from'])
redirect('.')
def emailnotify(self):
emails_cfg = get_cfg('emails', {})
form = Form(enctype='multipart/form-data')
if not self.objectdef.workflow_options:
self.objectdef.workflow_options = {}
form.add(EmailWidget, 'notify', title=_('Address for notifications'),
value=self.objectdef.workflow_options.get('done*mail-on-filled*to'),
hint=_('Leave blank to disable notifications'),
size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append( ('emailfrom', _('Sender Address')) )
html_top('config', title = _('Sender Address'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Sender Address')
r += form.render()
return r.getvalue()
else:
v = form.get_widget('notify').parse()
if v:
v = str(v)
else:
v = None
self.objectdef.workflow_options[str('done*mail-on-filled*to')] = v
self.objectdef.store()
redirect('.')
def set_role(self):
try:
role = Role.get('participants-for-%s' % self.objectdef.id)
except KeyError:
role = Role(_('Participants for "%s"') % self.objectdef.name)
role.id = 'participants-for-%s' % self.objectdef.id
role.store()
self.objectdef.roles = [role.id]
self.objectdef.store()
def html_top(self, section, *args, **kwargs):
html_top('forms/%s/diffusion' % self.objectdef.id, *args, **kwargs)
def _q_index (self):
# XXX: it would be nice to be manage groups, to select an existing group
self.html_top(_('Diffusion'))
r = TemplateIO(html=True)
r += htmltext('<ul id="main-actions">')
r += htmltext('<li><a href="add" rel="popup">%s</a></li>') % _('Add a participant')
r += htmltext('</ul>')
r += htmltext('<h2>%s</h2>') % _('Diffusion')
r += htmltext('<ul>')
r += htmltext(' <li>%s ') % _('Participants:')
if self.objectdef.roles:
r += htmltext('<a href="disable">%s</a>') % _('Anybody')
r += htmltext(' - ')
r += htmltext('<strong>%s</strong>') % _('List of persons')
else:
r += htmltext('<strong>%s</strong>') % _('Anybody')
r += htmltext(' - ')
r += htmltext(' <a href="enable">%s</a>') % _('List of persons')
r += htmltext('</li>')
access_mode = get_form_access_mode(self.objectdef)
r += htmltext(' <li>%s ') % _('Questionnaire Address:')
if access_mode == 'public':
r += htmltext('<strong>%s</strong>') % _('Visible')
else:
r += htmltext(' <a href="access-public">%s</a>') % _('Visible')
r += htmltext(' - ')
if access_mode == 'private':
r += htmltext('<strong>%s</strong>') % _('Hidden')
else:
r += htmltext(' <a href="access-private">%s</a>') % _('Hidden')
r += htmltext('<ul><li>')
url = get_form_url(self.objectdef)
r += htmltext('<a href="%s">%s</a>') % (url, url)
r += htmltext('</li></ul>')
r += htmltext('</li>')
emails_cfg = get_cfg('emails', {})
r += htmltext('<li>%s') % _('Sender Address: ')
if emails_cfg.get('from', ''):
emails_cfg.get('from')
r += htmltext(' (<a href="emailfrom" rel="popup">%s</a>)') % _('change address')
else:
r += _('None set')
r += htmltext(' (<a href="emailfrom" rel="popup">%s</a>)') % _('set address')
r += htmltext('</li>')
r += htmltext('<li>%s') % _('Notification when a questionnaire is filled: ')
if not self.objectdef.workflow_options:
self.objectdef.workflow_options = {}
if self.objectdef.workflow_options.get('done*mail-on-filled*to'):
self.objectdef.workflow_options.get('done*mail-on-filled*to')
r += htmltext(' (<a href="emailnotify" rel="popup">%s</a>)') % _('change address')
else:
r += _('None set')
r += htmltext(' (<a href="emailnotify" rel="popup">%s</a>)') % _('set address')
r += htmltext('</li>')
r += htmltext('</ul>')
get_response().filter['sidebar'] = self.get_sidebar()
get_session().display_message()
if self.objectdef.roles:
r += self.display_participants()
return r.getvalue()
def display_participants(self):
users = []
many_users = False
search_result = False
user_ids = Set()
for role_id in (self.objectdef.roles or []):
try:
user_ids_with_role = User.get_ids_with_indexed_value(
'roles', role_id, auto_fallback=False)
except:
# this should be limited to StorageIndexException but it
# somehow failed to catch them.
keys = User.keys()
if len(keys) < 500:
user_ids_with_role = User.get_ids_with_indexed_value(
'roles', role_id, auto_fallback=True)
else:
# failed to get a reliable list of participants, note
# it so
user_ids = None
break
user_ids.update(user_ids_with_role)
search_form = Form(enctype='multipart/form-data', use_tokens=False)
search_form.add(StringWidget, 'email', title=_('Email'))
if search_form.is_submitted() and search_form.get_widget('email').parse():
keys = []
term = search_form.get_widget('email').parse()
if term:
term = term.lower()
for k in User.keys():
if term in k:
keys.append(k)
search_result = True
if user_ids is None:
if len(keys) < 1000:
# we do not have a precise list of participants but
# the result set is relatively small, so we iterate
# over all of them, and we'll get back to the
# precise list situation.
user_ids = Set()
for user_key in keys:
user = User.get(user_key)
for role_id in (self.objectdef.roles or []):
if role_id in (user.roles or []):
break
else:
continue
user_ids.add(user_key)
else:
# we reduce the current set
user_ids.intersection_update(keys)
# XXX: and now, convert from user_ids to User objects?
r = TemplateIO(html=True)
if user_ids:
# XXX: paragraph of explanation, and option to get a list of access
# codes instead of sending them out
r += htmltext('<p>')
r += _('%s participants.') % len(user_ids)
if search_result:
r += htmltext(' <a href=".">%s</a>') % _('Back to full listing')
r += htmltext('</p>')
if len(user_ids) < 500:
# reasonable number, load all of them, so it's possible to
# display and sort on user names
all_users = [User.get(x) for x in user_ids]
all_users.sort(key=lambda x: getattr(x, str('display_name')))
user_are_objects = True
else:
all_users = sorted(user_ids)
user_are_objects = False
r += htmltext('<div class="splitcontent-left">')
r += htmltext('<ul class="biglist">')
rightcol = False
for i, user in enumerate(all_users):
if user_are_objects:
user_id = user.id
r += htmltext('<li><strong>%s</strong> ') % user.display_name
else:
user_id = user
r += htmltext('<li><strong>%s</strong> ') % user_id
r += htmltext('(<a href="remove?id=%s">%s</a>)') % (user_id, _('remove'))
r += htmltext('</li>')
if not rightcol and (i+1) >= len(user_ids)/2.0:
rightcol = True
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
r += htmltext('<ul class="biglist">')
r += htmltext('</ul>')
r += htmltext('</div>')
# XXX: add pagination
elif user_ids is None: # unknown set
r += htmltext('<p>')
r += _('Unable to get a list of participants.')
r += ' '
r += _('System overloaded?')
# XXX: + use search form on the right.
r += htmltext('</p>')
else:
r += htmltext('<p>')
if search_result:
r += _('There is currently no participants matching your query.')
else:
r += _('There is currently no participants defined.')
r += htmltext('</p>')
return r.getvalue()
def get_sidebar(self):
if not self.objectdef.roles:
return ''
r = TemplateIO(html=True)
r += htmltext('<ul>')
r += htmltext('<li><a href="import" rel="popup">%s</a></li>') % _('Import list of participants')
r += htmltext('<li><a href="import-disabled" rel="popup">%s</a></li>') % _(
'Import list of participants to disable')
r += htmltext('<li><a href="mail-access-codes">%s</a></li>') % _('Mail access codes')
if quota.can_mail():
r += htmltext('<li><a href="mail-participants">%s</a></li>') % _('Mail participants')
r += htmltext('</ul>')
r += htmltext('<h4>%s</h4>') % _('Search a participant')
search_form = Form(enctype='multipart/form-data', use_tokens=False)
search_form.add(StringWidget, 'email', title=_('Email'))
search_form.add_submit('submit', _('Search'))
r += search_form.render()
return r.getvalue()
def remove(self):
user_id = get_request().form.get('id')
if not user_id:
return redirect('.')
user = User.get(user_id)
for role_id in (self.objectdef.roles or []):
if role_id in user.roles:
user.roles.remove(role_id)
if not user.roles:
user.remove_self()
else:
user.store()
return redirect('.')
def add(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), size=40, required=False)
form.add(EmailWidget, 'email', title=_('Email'), required=True)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.add_submit(form)
except ValueError:
pass
get_response().breadcrumb.append( ('add', _('Add')) )
self.html_top(_('Add a Participant'))
r = TemplateIO(html=True)
r += form.render()
return r.getvalue()
def add_submit(self, form):
name = form.get_widget('name').parse()
email = form.get_widget('email').parse()
try:
user = User.get(email)
except KeyError:
# this user doesn't exist
user = User()
user.id = email
user.email = email
get_session().message = ('info', _('New partipant has been created.'))
else:
get_session().message = ('info', _('Participant already existed, it has been updated.'))
user.name = name
for role_id in (self.objectdef.roles or []):
if not role_id in user.roles:
user.roles.append(role_id)
user.store()
return redirect('.')
def p_import(self):
if get_request().form.get('job'):
return self.participants_importing()
form = Form(enctype='multipart/form-data')
form.add(FileWidget, 'file', title=_('File'), required=True)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.import_submit(form)
except ValueError:
pass
get_response().breadcrumb.append( ('import', _('Import')) )
self.html_top(_('Import a List of Participants'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Importing a List of Participants')
r += htmltext('<p>')
r += _('The file should be in the CSV file format. Using your spreadsheet '\
'program (Calc, Excel...), click "Save as" and select the CSV format.')
r += htmltext('</p>')
r += htmltext('<p>')
r += _('The file should have email addresses in the first column, and, '\
'optionnaly, names in the second column.')
r += htmltext('</p>')
r += get_session().display_message()
r += form.render()
return r.getvalue()
def import_submit(self, form):
class ParticipantsImporter:
def __init__(self, csvfile, objectdef):
self.csvfile = csvfile
self.objectdef = objectdef
def __call__(self, job=None):
self.csvfile.seek(0)
try:
dialect = csv.Sniffer().sniff(self.csvfile.read(1024),
delimiters=',; \t')
except csv.Error:
# perhaps this is just a list of emails, and it raised
# "Could not determine delimiter", so we check the first
# line
self.csvfile.seek(0)
first_line = self.csvfile.readline().strip()
if first_line.count('@') != 1:
# XXX: no way to pass errors from after job to user.
raise ValueError()
dialect = 'excel'
self.csvfile.seek(0)
reader = csv.reader(self.csvfile, dialect)
count = 0
for line in reader:
if not line:
continue
email = line[0]
try:
user = User.get(email)
except KeyError:
# this user doesn't exist
user = User()
user.id = email
user.email = email
if len(line) == 2:
user.name = line[1]
for role_id in (self.objectdef.roles or []):
if not role_id in user.roles:
user.roles.append(role_id)
user.store()
count += 1
self.csvfile.close()
get_logger().info('form %s - done importing participants (%s)' % (
self.objectdef.id, count))
tmpfile = tempfile.NamedTemporaryFile()
fp = form.get_widget('file').parse().fp
dialect = None
while True:
s = fp.read(1024*1024)
if s and not dialect:
try:
dialect = csv.Sniffer().sniff(s, delimiters=',; \t')
except csv.Error:
# perhaps this is just a list of emails, and it raised
# "Could not determine delimiter", so we check the first
# line
if '\n' in s:
first_line = s[:s.index('\n')]
else:
first_line = s
if first_line.count('@') != 1:
tmpfile.close()
get_session().message = ('error', _('Failed to use the file, please check its format.'))
return redirect('import')
dialect = 'excel'
tmpfile.write(s)
if not s:
break
get_logger().info('form %s - importing participants' % self.objectdef.id)
job = get_response().add_after_job(
str(N_('Importing list of participants')),
ParticipantsImporter(tmpfile, self.objectdef))
return redirect('import?job=%s' % job.id)
def participants_importing(self):
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
self.html_top( _('Importing Participants'))
r = TemplateIO(html=True)
get_response().add_javascript(['jquery.js', 'interface.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a href="./">%s</a>') % _('Back')
r += htmltext('</div>')
return r.getvalue()
def mail_access_codes(self):
if get_request().form.get('job'):
return self.access_code_mailing()
emails_cfg = get_cfg('emails', {})
default_subject = EmailsDirectory.get_subject('asec-voting-instructions')
default_body = EmailsDirectory.get_body('asec-voting-instructions')
form = Form(enctype='multipart/form-data')
form.add(EmailWidget, 'mail_from', title=_('From'), size=40, required=False,
value=emails_cfg.get('from', ''))
form.add(StringWidget, 'mail_subject', title=_('Subject'), size=40, required=True,
value=default_subject)
form.add(TextWidget, 'mail_body', title=_('Body'), cols=70, rows=20, required=True,
value=default_body)
form.add(SingleSelectWidget, 'behaviour',
title=_('Behaviour for participants who already have been given an access code'),
options=[(str('skip'), _('Skip')),
(str('new'), _('Generate new access code'))])
#XXX: send another type of email...
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
mail_from = form.get_widget('mail_from').parse()
mail_subject = form.get_widget('mail_subject').parse()
mail_body = form.get_widget('mail_body').parse()
behaviour = form.get_widget('behaviour').parse()
EmailsDirectory.set_subject('asec-voting-instructions', mail_subject)
EmailsDirectory.set_body('asec-voting-instructions', mail_body)
access_code_mailer = AccessCodeMailer(self.objectdef, mail_from,
mail_subject, mail_body, behaviour)
get_logger().info('form %s - mailing access codes (behaviour: %s)' % (
self.objectdef.id, behaviour))
job = get_response().add_after_job(
str(N_('Mailing access codes')),
access_code_mailer)
return redirect('mail-access-codes?job=%s' % job.id)
self.html_top(_('Mailing access codes'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Mailing access codes')
r += form.render()
return r.getvalue()
def access_code_mailing(self):
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
self.html_top(_('Mailing access codes'))
r = TemplateIO(html=True)
get_response().add_javascript(['jquery.js', 'interface.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a href="./">%s</a>') % _('Back')
r += htmltext('</div>')
return r.getvalue()
def import_disabled(self):
if get_request().form.get('job'):
return self.participants_disabling()
form = Form(enctype='multipart/form-data')
form.add(FileWidget, 'file', title=_('File'), required=True)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.import_disabled_submit(form)
except ValueError:
pass
get_response().breadcrumb.append( ('import', _('Import')) )
self.html_top(_('Import a List of Participants to Disable'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Importing a List of Participants to Disable')
r += htmltext('<p>')
r += _('The file should be in the CSV file format. Using your spreadsheet '\
'program (Calc, Excel...), click "Save as" and select the CSV format.')
r += htmltext('</p>')
r += htmltext('<p>')
r += _('The file should consist of email addresses, one per line.')
r += htmltext('</p>')
r += get_session().display_message()
r += form.render()
return r.getvalue()
def import_disabled_submit(self, form):
class ParticipantsDisabler:
def __init__(self, csvfile, objectdef):
self.csvfile = csvfile
self.objectdef = objectdef
def __call__(self, job=None):
self.csvfile.seek(0)
try:
dialect = csv.Sniffer().sniff(self.csvfile.read(1024),
delimiters=',; \t')
except csv.Error:
# perhaps this is just a list of emails, and it raised
# "Could not determine delimiter", so we check the first
# line
self.csvfile.seek(0)
first_line = self.csvfile.readline().strip()
if first_line.count('@') != 1:
# XXX: no way to pass errors from after job to user.
raise ValueError()
dialect = 'excel'
self.csvfile.seek(0)
reader = csv.reader(self.csvfile, dialect)
for line in reader:
if not line:
continue
email = line[0]
try:
user = User.get(email)
except KeyError:
# this user doesn't exist
continue
touched = False
for role_id in (self.objectdef.roles or []):
if role_id in user.roles:
user.roles.remove(role_id)
touched = True
if touched:
if not user.roles:
user.remove_self()
else:
user.store()
self.csvfile.close()
tmpfile = tempfile.NamedTemporaryFile()
fp = form.get_widget('file').parse().fp
dialect = None
while True:
s = fp.read(1024*1024)
if s and not dialect:
try:
dialect = csv.Sniffer().sniff(s, delimiters=',; \t')
except csv.Error:
# perhaps this is just a list of emails, and it raised
# "Could not determine delimiter", so we check the first
# line
if '\n' in s:
first_line = s[:s.index('\n')]
else:
first_line = s
if first_line.count('@') != 1:
get_session().message = ('error', _('Failed to use the file, please check its format.'))
tmpfile.close()
return redirect('import-disabled')
dialect = 'excel'
tmpfile.write(s)
if not s:
break
job = get_response().add_after_job(
str(N_('Disabling list of participants')),
ParticipantsDisabler(tmpfile, self.objectdef))
return redirect('import-disabled?job=%s' % job.id)
def participants_disabling(self):
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
self.html_top(_('Disabling Participants'))
r = TemplateIO(html=True)
get_response().add_javascript(['jquery.js', 'interface.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a href="./">%s</a>') % _('Back')
r += htmltext('</div>')
return r.getvalue()
def mail_participants(self):
if get_request().form.get('job'):
return self.participants_mailing()
if not quota.can_mail():
raise quota.NotAvailableFeature()
emails_cfg = get_cfg('emails', {})
default_subject = EmailsDirectory.get_subject('asec-skeleton-participant-mail')
default_body = EmailsDirectory.get_body('asec-skeleton-participant-mail')
form = Form(enctype='multipart/form-data')
form.add(EmailWidget, 'mail_from', title=_('From'), size=40, required=False,
value=emails_cfg.get('from', ''))
form.add(StringWidget, 'mail_subject', title=_('Subject'), size=40, required=True,
value=default_subject)
form.add(TextWidget, 'mail_body', title=_('Body'), cols=70, rows=20, required=True,
value=default_body)
form.add(SingleSelectWidget, 'behaviour',
title=_('Behaviour for participants who already have completed a questionnaire'),
options=[(str('skip'), _('Skip')),
(str('send'), _('Send'))])
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
mail_from = form.get_widget('mail_from').parse()
mail_subject = form.get_widget('mail_subject').parse()
mail_body = form.get_widget('mail_body').parse()
behaviour = form.get_widget('behaviour').parse()
EmailsDirectory.set_subject('asec-skeleton-participant-mail', mail_subject)
EmailsDirectory.set_body('asec-skeleton-participant-mail', mail_body)
participants_mailer = ParticipantsMailer(self.objectdef,
mail_from, mail_subject, mail_body, behaviour)
get_logger().info('form %s - mailing participants (behaviour: %s)' % (
self.objectdef.id, behaviour))
job = get_response().add_after_job(
str(N_('Mailing participants')),
participants_mailer)
return redirect('mail-participants?job=%s' % job.id)
self.html_top(_('Mailing participants'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Mailing participants')
r += form.render()
return r.getvalue()
def participants_mailing(self):
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
self.html_top(_('Mailing participants'))
get_response().add_javascript(['jquery.js', 'interface.js', 'afterjob.js'])
r = TemplateIO(html=True)
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a href="./">%s</a>') % _('Back')
r += htmltext('</div>')
return r.getvalue()
EmailsDirectory.register('asec-voting-instructions',
N_('Instructions and access codes'),
N_('Available variables: user_name, user_email, access_code, questionnaire_name, questionnaire_url'),
default_subject=N_('[questionnaire_name] - instructions'),
default_body=N_('''\
Dear [user_name]
[questionnaire_name] is now open.
To give your answer, please go to [questionnaire_url]
and follow the instructions there.
When instructed to do so, enter the following details:
E-Mail: [user_email]
Access Code: [access_code]
The process has 4 steps:
First, you must identify yourself using the access code above.
Then fill the questionnaire with your choices.
A third step will show you your choice, and ask you to confirm or return
to the previous step.
Finally, after confirming your choice, a unique identifier will be given
to you which will allow you to verify that your response
was counted correctly. To ensure anonymity, no link will be kept between
this token and your identifiers, so please keep this token safe. Once you
have given a response, you will not be able to do it again.
Thank you for your participation!
Regards,
'''))
EmailsDirectory.register('asec-skeleton-participant-mail',
N_('Skeleton for "mail to participants"'),
N_('Available variables: user_name, user_email, questionnaire_name, questionnaire_url'),
default_subject=N_('[questionnaire_name]'))