526 lines
21 KiB
Python
526 lines
21 KiB
Python
import os.path
|
|
import hmac
|
|
import thread
|
|
import datetime
|
|
import hashlib
|
|
import logging
|
|
import threading
|
|
|
|
|
|
from django.forms import (ModelForm, Form, Textarea, EmailField, CharField, ModelChoiceField)
|
|
from django import forms
|
|
from django.contrib.auth.models import User
|
|
from django.utils.translation import ugettext as _
|
|
from django.contrib.admin.widgets import FilteredSelectMultiple as AdminFilteredSelectMultiple
|
|
from django.forms import ValidationError
|
|
from django.conf import settings
|
|
from django.db.models.query import Q
|
|
from django.contrib.auth.forms import PasswordResetForm, PasswordChangeForm
|
|
|
|
|
|
from crispy_forms.helper import FormHelper
|
|
from crispy_forms.layout import Submit, Layout, ButtonHolder, HTML
|
|
import django_journal
|
|
|
|
|
|
from .models import (Document, username, MailingList, Content,
|
|
AttachedFile, AutomaticForwarding, DocbowProfile, non_guest_users,
|
|
is_guest, FileTypeAttachedFileKind)
|
|
from .widgets import TextInpuWithPredefinedValues, JqueryFileUploadInput
|
|
from .fields import RecipientField
|
|
from . import notification
|
|
from .validators import phone_normalize, validate_fr_be_phone
|
|
from .middleware import get_extra
|
|
from .utils import mime_types_to_extensions
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RecipientForm(object):
|
|
'''
|
|
Base form mixin for forms containing a RecipienField, i.e. all
|
|
forms for sending documents.
|
|
'''
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
user = kwargs.pop('user', None)
|
|
self.user = user
|
|
user_qs = kwargs.pop('user_qs', None)
|
|
if user_qs is None:
|
|
user_qs = non_guest_users().filter(is_active=True)
|
|
list_qs = kwargs.pop('list_qs', MailingList.objects.active())
|
|
super(RecipientForm, self).__init__(*args, **kwargs)
|
|
self.fields['recipients'].user = user
|
|
self.fields['recipients'].user_qs = user_qs
|
|
self.fields['recipients'].list_qs = list_qs
|
|
self.fields['recipients'].reset_choices()
|
|
|
|
class ForwardingForm(RecipientForm, Form):
|
|
'''Form for forwarding documents'''
|
|
recipients = RecipientField(label=_('Recipients'), required=True)
|
|
sender = ModelChoiceField(label=_('Sender'), queryset=User.objects.all())
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.layout = Layout(
|
|
'sender',
|
|
'recipients',
|
|
ButtonHolder(
|
|
Submit('forward', _('forward the document'))),
|
|
)
|
|
|
|
self.helper = FormHelper()
|
|
self.helper.form_method = 'POST'
|
|
self.helper.add_layout(self.layout)
|
|
self.helper.form_action = ''
|
|
self.user = kwargs.pop('user', None)
|
|
assert self.user
|
|
self.default_sender = self.user
|
|
if is_guest(self.default_sender):
|
|
self.default_sender = self.user.delegations_by.all()[0].by
|
|
delegations = User.objects.none()
|
|
else:
|
|
self.default_sender = self.user
|
|
delegations = User.objects.filter(
|
|
Q(id=self.user.id) |
|
|
Q(delegations_to__to=self.user)).distinct()
|
|
super(ForwardingForm, self).__init__(*args, **kwargs)
|
|
if len(delegations) > 1:
|
|
self.fields['sender'].queryset = delegations
|
|
self.fields['sender'].label_from_instance = lambda y: username(y)
|
|
else:
|
|
self.layout.fields.remove('sender')
|
|
del self.fields['sender']
|
|
|
|
def clean(self):
|
|
cleaned_data = super(ForwardingForm, self).clean()
|
|
if not cleaned_data.get('sender'):
|
|
cleaned_data['sender'] = self.default_sender
|
|
return cleaned_data
|
|
|
|
class Media:
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
def max_filename_length():
|
|
'''Compute the maximum filename length from the possible maximum length of
|
|
the AttachedFile model.'''
|
|
field = AttachedFile._meta.get_field_by_name('content')[0]
|
|
prefix = field.generate_filename(None, '')
|
|
max_length = field.max_length
|
|
return max_length - len(prefix)
|
|
|
|
class FileForm(RecipientForm, ModelForm):
|
|
'''Form for creating a new mailing'''
|
|
user = None
|
|
recipients = RecipientField(label=_('Recipients'))
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
'''Initialize the form.'''
|
|
self.file_type = kwargs.pop('file_type')
|
|
self.attached_file_kinds = self.file_type.filetypeattachedfilekind_set.all()
|
|
self.layout = Layout(
|
|
HTML('''<div id="div_id_filetype" class="ctrlHolder">
|
|
<label>Type de document</label>
|
|
<span>{{ form.file_type.name }}</span>
|
|
</div>'''),
|
|
'sender',
|
|
'recipients',
|
|
'comment',
|
|
ButtonHolder(
|
|
Submit('send', _('send a file'))),
|
|
)
|
|
self.helper = FormHelper()
|
|
self.helper.form_method = 'POST'
|
|
self.helper.add_layout(self.layout)
|
|
self.helper.form_action = ''
|
|
self.reply_to = kwargs.pop('reply_to', None)
|
|
self.default_sender = kwargs.pop('default_sender', None)
|
|
self.delegations = kwargs.pop('delegations', [])
|
|
initial = kwargs.setdefault('initial', {})
|
|
if self.reply_to:
|
|
doc = self.reply_to.document
|
|
initial['sender'] = self.reply_to.owner
|
|
initial['recipients'] = [ 'user-%s' % doc.sender.id ]
|
|
initial['comment'] = u'Re: ' + doc.comment
|
|
|
|
|
|
super(FileForm, self).__init__(*args, **kwargs)
|
|
self.content_fields = []
|
|
if self.attached_file_kinds:
|
|
insert_index = 2
|
|
for attached_file_kind in self.attached_file_kinds:
|
|
key = 'content-%s' % attached_file_kind.id
|
|
self.content_fields.append((key, attached_file_kind))
|
|
label = attached_file_kind.name
|
|
mime_types = attached_file_kind.get_mime_types()
|
|
widget = JqueryFileUploadInput(
|
|
max_filename_length=max_filename_length(),
|
|
extensions=mime_types_to_extensions(mime_types),
|
|
attached_file_kind=attached_file_kind)
|
|
self.fields[key] = forms.Field(label=label, widget=widget)
|
|
self.layout.fields.insert(insert_index, key)
|
|
insert_index += 1
|
|
else:
|
|
attached_file_kind = FileTypeAttachedFileKind(mime_types='*/*')
|
|
self.content_fields = [ ('content', attached_file_kind) ]
|
|
widget = JqueryFileUploadInput(
|
|
max_filename_length=max_filename_length(),
|
|
attached_file_kind=attached_file_kind)
|
|
self.fields['content'] = forms.Field(label=_('Attached files'), widget=widget)
|
|
self.layout.fields.insert(2, 'content')
|
|
old_widget = self.fields['comment'].widget
|
|
self.fields['comment'].widget = TextInpuWithPredefinedValues(attrs=old_widget.attrs,
|
|
choices=self.get_predefined_comments())
|
|
if len(self.delegations) > 1:
|
|
self.fields['sender'].queryset = self.delegations
|
|
self.fields['sender'].label_from_instance = lambda y: username(y)
|
|
else:
|
|
self.layout.fields.remove('sender')
|
|
del self.fields['sender']
|
|
|
|
def get_predefined_comments(self):
|
|
'''Return a list of predefined comments, structured as choice list for
|
|
a form field.
|
|
'''
|
|
choices = [ (content.description,)*2 for content in Content.objects.all() ]
|
|
choices.insert(0, ('---','---'))
|
|
return choices
|
|
|
|
class Meta:
|
|
model = Document
|
|
exclude = ('filetype', 'date', 'to_user', 'to_list', '_timestamp', 'real_sender',
|
|
'reply_to')
|
|
|
|
class Media:
|
|
css = {
|
|
'all': (
|
|
'docbow/css/send-file.css',
|
|
'docbow/css/send_file_form.css'
|
|
)
|
|
}
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
def clean(self):
|
|
'''Validate that there is at least one file attached to this mailing.'''
|
|
cleaned_data = super(FileForm, self).clean()
|
|
for field, attached_file_kind in self.content_fields:
|
|
upload_id, upload_files = self.cleaned_data.get(field, (None, []))
|
|
max_files = attached_file_kind.cardinality
|
|
min_files = attached_file_kind.minimum
|
|
errors = []
|
|
if max_files and len(upload_files) > max_files:
|
|
errors.append(_(u'You must attach at most %d file(s).') % max_files)
|
|
if min_files and len(upload_files) < min_files:
|
|
errors.append(_(u'You must attach at least %d file(s).') % min_files)
|
|
for upload_file in upload_files:
|
|
if not attached_file_kind.match_file(upload_file):
|
|
mime_types = attached_file_kind.get_mime_types()
|
|
file_name = os.path.basename(upload_file.name)
|
|
msg = _(u'The file name "{file_name}" does not match the patterns "{extensions}".').format(
|
|
file_name=file_name,
|
|
extensions=mime_types_to_extensions(mime_types))
|
|
errors.append(msg)
|
|
if errors:
|
|
self._errors[field] = self.error_class(errors)
|
|
return cleaned_data
|
|
|
|
def save(self, commit=False):
|
|
self.instance.filetype = self.file_type
|
|
if not self.instance.sender_id:
|
|
assert self.default_sender
|
|
self.instance.sender = self.default_sender
|
|
if self.reply_to:
|
|
self.instance.reply_to = self.reply_to.document
|
|
if self.user != self.instance.sender:
|
|
self.instance.real_sender = username(self.user)
|
|
return super(FileForm, self).save(commit=commit)
|
|
|
|
def save_attachments(self):
|
|
'''Create a new AttachedFile object for each uploaded file; and attach
|
|
them to the newly created Document object.'''
|
|
instance = self.instance
|
|
for field, attached_file_kind in self.content_fields:
|
|
upload_id, uploaded_files = self.cleaned_data.get(field, (None, []))
|
|
for uploaded_file in uploaded_files:
|
|
uploaded_file.name = os.path.basename(uploaded_file.name)
|
|
AttachedFile(document=instance,
|
|
kind=attached_file_kind if attached_file_kind.id else None,
|
|
name=os.path.basename(uploaded_file.name),
|
|
content=uploaded_file).save()
|
|
|
|
|
|
class ContactForm(Form):
|
|
'''Form to contact administrators of the platform for logged in users.'''
|
|
layout = Layout(
|
|
'subject',
|
|
'message',
|
|
ButtonHolder(
|
|
Submit('send', _('send your message'))),
|
|
)
|
|
helper = FormHelper()
|
|
helper.form_method = 'POST'
|
|
helper.add_layout(layout)
|
|
helper.form_action = ''
|
|
|
|
subject = forms.CharField(max_length=100, label=_('Subject'),
|
|
required=True)
|
|
message = forms.CharField(widget=Textarea(attrs={'rows': 25, 'cols': 80}),
|
|
label=_('Message'), required=True)
|
|
|
|
class Media:
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
|
|
class AnonymousContactForm(ContactForm):
|
|
'''Form to contact administrators of the platform for anonymous users.'''
|
|
layout = Layout(
|
|
'name',
|
|
'email',
|
|
'phone_number',
|
|
'subject',
|
|
'message',
|
|
ButtonHolder(
|
|
Submit('send', _('send your message'))),
|
|
)
|
|
helper = FormHelper()
|
|
helper.form_method = 'POST'
|
|
helper.add_layout(layout)
|
|
helper.form_action = ''
|
|
|
|
name = forms.CharField(max_length=100, label=_('Name'), required=True)
|
|
email = forms.EmailField(max_length=100, label=_('Email'), required=False)
|
|
phone_number = forms.CharField(max_length=100, label=_('Phone number'),
|
|
required=False)
|
|
|
|
|
|
class MailingListForm(ModelForm):
|
|
'''Admin form to edit MailingList objects'''
|
|
def __init__(self, *args, **kwargs):
|
|
'''Orders members by their username, use their username to display them.'''
|
|
ModelForm.__init__(self, *args, **kwargs)
|
|
self.fields['members'].queryset = non_guest_users() \
|
|
.order_by('username')
|
|
self.fields['members'].label_from_instance = lambda y: username(y)
|
|
|
|
class Meta:
|
|
model = MailingList
|
|
fields = ('name', 'is_active', 'members', 'mailing_list_members')
|
|
widgets = {
|
|
'members': AdminFilteredSelectMultiple(_('Persons'), False),
|
|
}
|
|
|
|
|
|
class UserChoiceField(ModelChoiceField):
|
|
def label_from_instance(self, user):
|
|
if user.first_name or user.last_name:
|
|
return user.first_name + ' ' + user.last_name
|
|
return user.username
|
|
|
|
|
|
class DelegationForm(Form):
|
|
'''Form to manager delegations of users'''
|
|
layout = Layout(
|
|
HTML(_('<p>You can delegate to a newly created user, then all following fields are mandatory:</p>')),
|
|
'first_name', 'last_name', 'email',
|
|
HTML(_('<p>or you can delegate to an existing user:</p>')),
|
|
'existing_user',
|
|
ButtonHolder(
|
|
Submit('delegate-create', _('Create a new delegation'))),
|
|
)
|
|
helper = FormHelper()
|
|
helper.form_method = 'POST'
|
|
helper.add_layout(layout)
|
|
helper.form_action = '.'
|
|
|
|
first_name = CharField(label=_('Firstname'), max_length=30, required=False)
|
|
last_name = CharField(label=_('Lastname'), max_length=30, required=False)
|
|
email = EmailField(label=_('Email'), required=False)
|
|
existing_user = UserChoiceField(label=_('User'),
|
|
required=False, queryset=User.objects.all)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
user = kwargs.pop('user', None)
|
|
delegatees = kwargs.pop('delegatees', [])
|
|
super(DelegationForm, self).__init__(*args, **kwargs)
|
|
qs = non_guest_users()
|
|
if user:
|
|
qs = qs.exclude(id=user.id)
|
|
if delegatees:
|
|
qs = qs.exclude(id__in=[u.id for u in delegatees])
|
|
self.fields['existing_user'].queryset = qs.order_by('first_name', 'last_name')
|
|
|
|
def clean(self):
|
|
cleaned_data = super(DelegationForm, self).clean()
|
|
ok1 = bool(cleaned_data.get('first_name'))
|
|
ok2 = bool(cleaned_data.get('last_name'))
|
|
ok3 = bool(cleaned_data.get('email'))
|
|
new = ok1 and ok2 and ok3
|
|
ok4 = bool(cleaned_data.get('existing_user'))
|
|
if not ((ok1 or ok2 or ok3) ^ ok4):
|
|
raise ValidationError(_('You must choose between creating a new '
|
|
'user or delegating to an existing one; the two are mutually '
|
|
'exclusive.'))
|
|
if not new and (ok1 or ok2 or ok3):
|
|
raise ValidationError(_('To create a new user you must give a first name, a last name and a valid email'))
|
|
return cleaned_data
|
|
|
|
class Media:
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
|
|
class AutomaticForwardingForm(ModelForm):
|
|
'''Admin form for editing AutomaticForwarding objects'''
|
|
|
|
class Meta:
|
|
model = AutomaticForwarding
|
|
|
|
def clean(self):
|
|
'''Validate that the forwarding rule contains at least one recipient.'''
|
|
cleaned_data = super(AutomaticForwardingForm, self).clean()
|
|
if not cleaned_data.get('forward_to_user') and not cleaned_data.get('forward_to_list'):
|
|
raise ValidationError(_('A forwarding rule must have at least one recipient, person or list.'))
|
|
return cleaned_data
|
|
|
|
|
|
class ProfileForm(ModelForm):
|
|
'''User form for editing personal informations like email and mobile
|
|
phone.
|
|
'''
|
|
|
|
def __init__(self, request, *args, **kwargs):
|
|
'''Initialize the form object.
|
|
Define a custom help text.
|
|
'''
|
|
self.request = request
|
|
|
|
self.layout = Layout(
|
|
'personal_email',
|
|
HTML(_(u'<p class="formHint">If You would like to receive a SMS alert each '
|
|
u'time your inbox receives a document, provide your '
|
|
u'mobile phone number. If you do not fill this field '
|
|
u'you won\'t receive any SMS alert</p>')),
|
|
'mobile_phone',
|
|
ButtonHolder(
|
|
Submit('profile-clear_mobile_phone', _('Clear SMS alert')),
|
|
Submit('profile-validate', _('Validate'))),
|
|
)
|
|
self.helper = FormHelper()
|
|
self.helper.form_method = 'POST'
|
|
self.helper.add_layout(self.layout)
|
|
self.helper.form_action = ''
|
|
ModelForm.__init__(self, *args, **kwargs)
|
|
self.fields['mobile_phone'].help_text = _('Use international phone number '
|
|
'format, i.e +[country code][number]. A challenge SMS will be sent to you to validate it.')
|
|
|
|
def clean_mobile_phone(self):
|
|
'''Validate the mobile phone number by sending a HMAC signature as a code by SMS
|
|
to the phone number.
|
|
|
|
The HMAC code is valid for one day.
|
|
'''
|
|
if self.cleaned_data.get('mobile_phone'):
|
|
mobile_phone = phone_normalize(self.cleaned_data['mobile_phone'])
|
|
validate_fr_be_phone(mobile_phone)
|
|
self.cleaned_data['mobile_phone'] = mobile_phone
|
|
if not self.instance or self.instance.mobile_phone != mobile_phone:
|
|
date = datetime.date.today()
|
|
code = hmac.new(settings.SECRET_KEY, unicode(date) +
|
|
mobile_phone, hashlib.sha1).hexdigest()
|
|
code = '%06d' % (int(code, 16) % 1000000)
|
|
key = '%s-code' % self.prefix if self.prefix else 'code'
|
|
if self.data.get(key, '').strip() != code:
|
|
def send_sms(mobile_phone, code):
|
|
try:
|
|
sms_carrier = notification.SMSNotifier.get_carrier()
|
|
sms_carrier.send_sms((mobile_phone,), 'code is ' + code)
|
|
except Exception:
|
|
logger.exception('failure in SMS background thread')
|
|
thread = threading.Thread(target=send_sms, args=(mobile_phone, code))
|
|
thread.start()
|
|
self.fields['code'] = CharField(label='Code')
|
|
i = self.layout.fields.index('mobile_phone')
|
|
self.layout.fields.insert(i+1, 'code')
|
|
raise ValidationError(_('Enter code received by SMS'))
|
|
return self.cleaned_data.get('mobile_phone')
|
|
|
|
def save(self, commit=True):
|
|
'''Attach current user to the newly created profile object.'''
|
|
instance = ModelForm.save(self, commit=False)
|
|
instance.user = self.request.user
|
|
if commit:
|
|
instance.save()
|
|
return instance
|
|
|
|
class Meta:
|
|
model = DocbowProfile
|
|
fields = ('personal_email', 'mobile_phone')
|
|
|
|
class Media:
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
|
|
class PasswordResetFormWithLogging(PasswordResetForm):
|
|
email = forms.EmailField(widget=forms.HiddenInput(), required=False)
|
|
identifier = forms.CharField(label=_('E-mail or identifier'), max_length=75)
|
|
|
|
def clean_email(self):
|
|
return None
|
|
|
|
def clean_identifier(self):
|
|
"""
|
|
Validates that an active user exists with the given email address.
|
|
"""
|
|
identifier = self.cleaned_data["identifier"]
|
|
self.users_cache = User.objects.filter(
|
|
Q(email__iexact=identifier)|
|
|
Q(username=identifier)|
|
|
Q(docbowprofile__personal_email=identifier),
|
|
is_active=True).distinct()
|
|
if not len(self.users_cache):
|
|
raise forms.ValidationError(_("That e-mail address or identifier doesn't have an associated user account. Are you sure you've registered?"))
|
|
return identifier
|
|
|
|
def save(self, *args, **kwargs):
|
|
super(PasswordResetFormWithLogging, self).save(*args, **kwargs)
|
|
for user in self.users_cache:
|
|
django_journal.record('password-reset', 'password reset link sent to {email}', user=user,
|
|
email=user.email, ip=get_extra()['ip'])
|
|
|
|
|
|
class PasswordChangeFormWithLogging(PasswordChangeForm):
|
|
def save(self, *args, **kwargs):
|
|
super(PasswordChangeFormWithLogging, self).save(*args, **kwargs)
|
|
django_journal.record('password-change', 'changed its email', user=self.user,
|
|
ip=get_extra()['ip'])
|
|
|
|
|
|
class FilterForm(forms.Form):
|
|
not_before = forms.DateField(label=_('From'), required=False, localize=True)
|
|
not_after = forms.DateField(label=_('To'), required=False, localize=True)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
request = kwargs.pop('request')
|
|
super(FilterForm, self).__init__(*args, **kwargs)
|
|
for field in ('sort', 'page'):
|
|
if field in request.GET:
|
|
self.fields[field] = forms.CharField(
|
|
initial=request.GET.get(field),
|
|
widget=forms.HiddenInput)
|
|
|
|
def clean(self):
|
|
cleaned_data = super(FilterForm, self).clean()
|
|
if cleaned_data.get('not_before') and cleaned_data.get('not_after') and \
|
|
cleaned_data['not_before'] > cleaned_data['not_after']:
|
|
raise ValidationError(_('From must be inferior or equal to To'))
|
|
return cleaned_data
|
|
|
|
class Media:
|
|
js = (
|
|
'jquery-ui/js/jquery-ui-1.8.4.min.js',
|
|
'docbow/js/filter-form.js',
|
|
)
|
|
css = {
|
|
'all': ('jquery-ui/css/jquery-ui-1.8.4.css',),
|
|
}
|