736 lines
29 KiB
Python
736 lines
29 KiB
Python
import os.path
|
|
import hmac
|
|
import datetime
|
|
import hashlib
|
|
import logging
|
|
import collections
|
|
import urllib.parse
|
|
|
|
|
|
from django.forms import (
|
|
ModelForm,
|
|
Form,
|
|
Textarea,
|
|
EmailField,
|
|
CharField,
|
|
ModelChoiceField,
|
|
ModelMultipleChoiceField,
|
|
)
|
|
from django import forms
|
|
from django.contrib.auth.models import User
|
|
from django.utils.translation import ugettext as _
|
|
from django.contrib.admin.widgets import FilteredSelectMultiple as AdminFilteredSelectMultiple
|
|
from django.forms import ValidationError
|
|
from django.conf import settings
|
|
from django.db.models.query import Q
|
|
from django.contrib.auth.forms import PasswordResetForm, PasswordChangeForm
|
|
from django.utils.encoding import force_text
|
|
|
|
from django_journal import journal as django_journal
|
|
|
|
|
|
from docbow_project.docbow.models import (
|
|
Document,
|
|
username,
|
|
MailingList,
|
|
Content,
|
|
AttachedFile,
|
|
AutomaticForwarding,
|
|
DocbowProfile,
|
|
non_guest_users,
|
|
is_guest,
|
|
FileTypeAttachedFileKind,
|
|
)
|
|
from docbow_project.docbow.widgets import TextInpuWithPredefinedValues, JqueryFileUploadInput
|
|
from docbow_project.docbow.fields import RecipientField
|
|
from docbow_project.docbow.validators import phone_normalize, validate_fr_be_phone
|
|
from docbow_project.docbow.middleware import get_extra
|
|
from docbow_project.docbow.utils import mime_types_to_extensions, truncate_filename, a2_wscall
|
|
from docbow_project.docbow import fields, app_settings, models, widgets
|
|
from docbow_project.docbow import notification, pyuca
|
|
from docbow_project.docbow.widgets import FilteredSelectMultiple
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RecipientForm(object):
|
|
"""
|
|
Base form mixin for forms containing a RecipienField, i.e. all
|
|
forms for sending documents.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
user = kwargs.pop('user', None)
|
|
self.user = user
|
|
user_qs = kwargs.pop('user_qs', None)
|
|
if user_qs is None:
|
|
user_qs = non_guest_users()
|
|
list_qs = kwargs.pop('list_qs', MailingList.objects.active())
|
|
super(RecipientForm, self).__init__(*args, **kwargs)
|
|
self.fields['recipients'].user = user
|
|
self.fields['recipients'].user_qs = user_qs
|
|
self.fields['recipients'].list_qs = list_qs
|
|
self.fields['recipients'].reset_choices()
|
|
|
|
|
|
class ForwardingForm(RecipientForm, Form):
|
|
'''Form for forwarding documents'''
|
|
|
|
recipients = RecipientField(label=_('Recipients'), required=True)
|
|
sender = ModelChoiceField(label=_('Sender'), queryset=User.objects.all())
|
|
|
|
class Media:
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.user = kwargs.pop('user', None)
|
|
assert self.user
|
|
self.default_sender = self.user
|
|
if is_guest(self.default_sender):
|
|
self.default_sender = self.user.delegations_by.all()[0].by
|
|
delegations = User.objects.none()
|
|
else:
|
|
self.default_sender = self.user
|
|
delegations = (
|
|
non_guest_users()
|
|
.filter(Q(id=self.user.id) | Q(delegations_to__to=self.user))
|
|
.order_by('last_name', 'first_name', 'username')
|
|
.distinct()
|
|
)
|
|
super(ForwardingForm, self).__init__(*args, **kwargs)
|
|
if len(delegations) > 1:
|
|
self.fields['sender'].queryset = delegations
|
|
self.fields['sender'].label_from_instance = lambda y: username(y)
|
|
else:
|
|
del self.fields['sender']
|
|
|
|
def clean(self):
|
|
cleaned_data = super(ForwardingForm, self).clean()
|
|
if not cleaned_data.get('sender'):
|
|
cleaned_data['sender'] = self.default_sender
|
|
return cleaned_data
|
|
|
|
|
|
def max_filename_length():
|
|
"""Compute the maximum filename length from the possible maximum length of
|
|
the AttachedFile model."""
|
|
field = AttachedFile._meta.get_field('content')
|
|
prefix = field.generate_filename(None, '')
|
|
max_length = field.max_length
|
|
return max_length - len(prefix)
|
|
|
|
|
|
class FileForm(RecipientForm, ModelForm):
|
|
'''Form for creating a new mailing'''
|
|
|
|
user = None
|
|
recipients = RecipientField(label=_('Recipients'))
|
|
|
|
class Meta:
|
|
model = Document
|
|
exclude = ('filetype', 'date', 'to_user', 'to_list', '_timestamp', 'real_sender', 'reply_to')
|
|
widgets = {'extra_senders': FilteredSelectMultiple(_('Extra Senders'), False)}
|
|
|
|
class Media:
|
|
css = {'all': ('docbow/css/send-file.css', 'docbow/css/send_file_form.css')}
|
|
js = ('js/askdirtyform.js', 'js/url-preload.js', 'js/foldable.js')
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
'''Initialize the form.'''
|
|
self.file_type = kwargs.pop('file_type')
|
|
self.attached_file_kinds = self.file_type.filetypeattachedfilekind_set.all()
|
|
self.reply_to = kwargs.pop('reply_to', None)
|
|
self.default_sender = kwargs.pop('default_sender', None)
|
|
self.delegations = kwargs.pop('delegations', [])
|
|
initial = kwargs.setdefault('initial', {})
|
|
if self.reply_to:
|
|
doc = self.reply_to
|
|
initial['sender'] = kwargs.get('user', None)
|
|
initial['recipients'] = ['user-%s' % doc.sender.id]
|
|
if doc.extra_senders.exists():
|
|
initial['recipients'] += ['user-%s' % sender.pk for sender in doc.extra_senders.all()]
|
|
initial['comment'] = u'Re: ' + doc.comment
|
|
|
|
super(FileForm, self).__init__(*args, **kwargs)
|
|
self.content_fields = []
|
|
if self.attached_file_kinds:
|
|
insert_index = 2
|
|
for attached_file_kind in self.attached_file_kinds:
|
|
key = 'content-%s' % attached_file_kind.id
|
|
self.content_fields.append((key, attached_file_kind))
|
|
label = attached_file_kind.name
|
|
mime_types = attached_file_kind.get_mime_types()
|
|
widget = JqueryFileUploadInput(
|
|
max_filename_length=max_filename_length(),
|
|
extensions=mime_types_to_extensions(mime_types),
|
|
attached_file_kind=attached_file_kind,
|
|
)
|
|
self.fields[key] = forms.Field(label=label, widget=widget)
|
|
insert_index += 1
|
|
else:
|
|
attached_file_kind = FileTypeAttachedFileKind(mime_types='*/*')
|
|
self.content_fields = [('content', attached_file_kind)]
|
|
widget = JqueryFileUploadInput(
|
|
max_filename_length=max_filename_length(), attached_file_kind=attached_file_kind
|
|
)
|
|
self.fields['content'] = forms.Field(label=_('Attached files'), widget=widget)
|
|
|
|
old_widget = self.fields['comment'].widget
|
|
self.fields['comment'].widget = TextInpuWithPredefinedValues(
|
|
attrs=old_widget.attrs, choices=self.get_predefined_comments()
|
|
)
|
|
if len(self.delegations) > 1:
|
|
self.fields['sender'].queryset = self.delegations
|
|
self.fields['sender'].label_from_instance = lambda y: username(y)
|
|
fields.order_field_choices(self.fields['sender'])
|
|
else:
|
|
del self.fields['sender']
|
|
|
|
self.fields['private'].widget.attrs['class'] = 'checkboxinput'
|
|
if not app_settings.PRIVATE_DOCUMENTS:
|
|
del self.fields['private']
|
|
|
|
if self.reply_to or not settings.EXTRA_SENDERS or not self.file_type.extra_senders:
|
|
del self.fields['extra_senders']
|
|
else:
|
|
self.fields['extra_senders'].required = False
|
|
extra_senders_qs = User.objects.filter(is_active=True).exclude(docbowprofile__is_guest=True)
|
|
if self.user:
|
|
extra_senders_qs = extra_senders_qs.exclude(pk=self.user.pk)
|
|
extra_senders = [(user.pk, username(user)) for user in extra_senders_qs]
|
|
extra_senders = sorted(list(extra_senders), key=lambda x: pyuca.collator.sort_key(x[1]))
|
|
self.fields['extra_senders'].choices = extra_senders
|
|
|
|
def template_content_fields(self):
|
|
return [self[name] for name, _ in self.content_fields]
|
|
|
|
def get_predefined_comments(self):
|
|
"""Return a list of predefined comments, structured as choice list for
|
|
a form field.
|
|
"""
|
|
choices = [(content.description,) * 2 for content in Content.objects.all()]
|
|
choices.insert(0, ('---', '---'))
|
|
return choices
|
|
|
|
def clean(self):
|
|
'''Validate that there is at least one file attached to this mailing.'''
|
|
cleaned_data = super(FileForm, self).clean()
|
|
for field, attached_file_kind in self.content_fields:
|
|
upload_id, upload_files = self.cleaned_data.get(field, (None, []))
|
|
max_files = attached_file_kind.cardinality
|
|
min_files = attached_file_kind.minimum
|
|
errors = []
|
|
if max_files and len(upload_files) > max_files:
|
|
errors.append(_(u'You must attach at most %d file(s).') % max_files)
|
|
if min_files and len(upload_files) < min_files:
|
|
errors.append(_(u'You must attach at least %d file(s).') % min_files)
|
|
for upload_file in upload_files:
|
|
if not attached_file_kind.match_file(upload_file):
|
|
mime_types = attached_file_kind.get_mime_types()
|
|
file_name = os.path.basename(upload_file.name)
|
|
msg = _(
|
|
u'The file name "{file_name}" does not match the patterns "{extensions}".'
|
|
).format(file_name=file_name, extensions=mime_types_to_extensions(mime_types))
|
|
errors.append(msg)
|
|
if errors:
|
|
self._errors[field] = self.error_class(errors)
|
|
|
|
if 'extra_senders' in cleaned_data:
|
|
if len(cleaned_data['extra_senders']) > self.file_type.extra_senders:
|
|
self._errors['extra_senders'] = self.error_class(
|
|
[_('No more than %s additional senders allowed') % self.file_type.extra_senders]
|
|
)
|
|
|
|
return cleaned_data
|
|
|
|
def save(self, commit=False):
|
|
self.instance.filetype = self.file_type
|
|
if not self.instance.sender_id:
|
|
assert self.default_sender
|
|
self.instance.sender = self.default_sender
|
|
if self.reply_to:
|
|
self.instance.reply_to = self.reply_to
|
|
if self.user != self.instance.sender:
|
|
self.instance.real_sender = username(self.user)
|
|
return super(FileForm, self).save(commit=commit)
|
|
|
|
def save_attachments(self):
|
|
"""Create a new AttachedFile object for each uploaded file; and attach
|
|
them to the newly created Document object."""
|
|
instance = self.instance
|
|
for field, attached_file_kind in self.content_fields:
|
|
upload_id, uploaded_files = self.cleaned_data.get(field, (None, []))
|
|
for uploaded_file in uploaded_files:
|
|
uploaded_file.name = os.path.basename(uploaded_file.name)
|
|
AttachedFile(
|
|
document=instance,
|
|
kind=attached_file_kind if attached_file_kind.id else None,
|
|
name=truncate_filename(uploaded_file.name),
|
|
content=uploaded_file,
|
|
).save()
|
|
|
|
|
|
class ContactForm(Form):
|
|
'''Form to contact administrators of the platform for logged in users.'''
|
|
|
|
subject = forms.CharField(max_length=100, label=_('Subject'), required=True)
|
|
message = forms.CharField(
|
|
widget=Textarea(attrs={'rows': 25, 'cols': 80}), label=_('Message'), required=True
|
|
)
|
|
|
|
class Media:
|
|
js = ('js/askdirtyform.js',)
|
|
|
|
|
|
class AnonymousContactForm(ContactForm):
|
|
'''Form to contact administrators of the platform for anonymous users.'''
|
|
|
|
name = forms.CharField(max_length=100, label=_('Name'), required=True)
|
|
email = forms.EmailField(max_length=100, label=_('Email'), required=False)
|
|
phone_number = forms.CharField(max_length=100, label=_('Phone number'), required=False)
|
|
|
|
|
|
class MailingListForm(ModelForm):
|
|
'''Admin form to edit MailingList objects'''
|
|
|
|
class Meta:
|
|
model = MailingList
|
|
fields = ('name', 'is_active', 'members', 'mailing_list_members')
|
|
widgets = {
|
|
'members': AdminFilteredSelectMultiple(_('Persons'), False),
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
'''Orders members by their username, use their username to display them.'''
|
|
ModelForm.__init__(self, *args, **kwargs)
|
|
self.fields['members'].queryset = non_guest_users().order_by('username')
|
|
self.fields['members'].label_from_instance = lambda y: username(y)
|
|
|
|
|
|
class UserChoiceField(ModelChoiceField):
|
|
def label_from_instance(self, user):
|
|
if user.first_name or user.last_name:
|
|
return user.first_name + ' ' + user.last_name
|
|
return user.username
|
|
|
|
|
|
class DelegationForm(Form):
|
|
'''Form to manager delegations of users'''
|
|
|
|
first_name = CharField(label=_('Firstname'), max_length=30, required=False)
|
|
last_name = CharField(label=_('Lastname'), max_length=30, required=False)
|
|
email = EmailField(label=_('Email'), required=False)
|
|
existing_user = UserChoiceField(label=_('User'), required=False, queryset=User.objects.all())
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.user = kwargs.pop('user', None)
|
|
self.delegatees = kwargs.pop('delegatees', [])
|
|
self._request = kwargs.pop('request')
|
|
super(DelegationForm, self).__init__(*args, **kwargs)
|
|
qs = non_guest_users()
|
|
if self.user:
|
|
qs = qs.exclude(id=self.user.id)
|
|
if self.delegatees:
|
|
qs = qs.exclude(id__in=[u.id for u in self.delegatees])
|
|
self.fields['existing_user'].queryset = qs.order_by('first_name', 'last_name')
|
|
if not app_settings.DELEGATE_TO_EXISTING_USER:
|
|
del self.fields['existing_user']
|
|
|
|
def clean(self):
|
|
cleaned_data = super(DelegationForm, self).clean()
|
|
ok1 = bool(cleaned_data.get('first_name'))
|
|
ok2 = bool(cleaned_data.get('last_name'))
|
|
ok3 = bool(cleaned_data.get('email'))
|
|
new = ok1 and ok2 and ok3
|
|
ok4 = bool(cleaned_data.get('existing_user'))
|
|
if not ((ok1 or ok2 or ok3) ^ ok4):
|
|
raise ValidationError(
|
|
_(
|
|
'You must choose between creating a new '
|
|
'user or delegating to an existing one; the two are mutually '
|
|
'exclusive.'
|
|
)
|
|
)
|
|
if not new and (ok1 or ok2 or ok3):
|
|
raise ValidationError(
|
|
_('To create a new user you must give a first name, a last name and a valid email')
|
|
)
|
|
if new:
|
|
email = cleaned_data.get('email')
|
|
if email == self.user.email:
|
|
raise ValidationError(_('Email is yours, you cannot delegate to yourself'))
|
|
if any(delegate.email == email for delegate in self.delegatees):
|
|
raise ValidationError(_('A delegation with the same email already exists'))
|
|
qs = non_guest_users().filter(email=email)
|
|
if qs.exists():
|
|
list_of_names = u', '.join([user.get_full_name() for user in qs])
|
|
self.data = {}
|
|
self.is_bound = False
|
|
raise ValidationError(
|
|
_('This email belong to existing user(s) {0}, look in the list of existing users').format(
|
|
list_of_names
|
|
)
|
|
)
|
|
|
|
if 'mellon' in app_settings.settings.INSTALLED_APPS:
|
|
# Create user
|
|
url = urllib.parse.urljoin(app_settings.settings.AUTHENTIC_URL, 'api/users/')
|
|
json = {
|
|
'first_name': cleaned_data['first_name'],
|
|
'last_name': cleaned_data['last_name'],
|
|
'email': email,
|
|
'send_registration_email': True,
|
|
'send_registration_email_next_url': self._request.build_absolute_uri('/'),
|
|
}
|
|
err, json_data, err_desc = a2_wscall(url, 'post', json)
|
|
if err:
|
|
raise ValidationError(err_desc)
|
|
cleaned_data['name_id'] = json_data['uuid']
|
|
|
|
# Give created user a role
|
|
role_uuid = getattr(app_settings.settings, 'AUTHENTIC_ROLE', None)
|
|
if role_uuid:
|
|
url = urllib.parse.urljoin(
|
|
app_settings.settings.AUTHENTIC_URL,
|
|
'api/roles/%s/members/%s/' % (role_uuid, json_data['uuid']),
|
|
)
|
|
err, json_data, err_desc = a2_wscall(url, 'post')
|
|
if err:
|
|
raise ValidationError(err_desc)
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class AutomaticForwardingForm(ModelForm):
|
|
'''Admin form for editing AutomaticForwarding objects'''
|
|
|
|
class Meta:
|
|
model = AutomaticForwarding
|
|
fields = '__all__'
|
|
|
|
def clean(self):
|
|
'''Validate that the forwarding rule contains at least one recipient.'''
|
|
cleaned_data = super(AutomaticForwardingForm, self).clean()
|
|
if not cleaned_data.get('forward_to_user') and not cleaned_data.get('forward_to_list'):
|
|
raise ValidationError(_('A forwarding rule must have at least one recipient, person or list.'))
|
|
return cleaned_data
|
|
|
|
|
|
class ProfileForm(ModelForm):
|
|
"""User form for editing personal informations like email and mobile
|
|
phone.
|
|
"""
|
|
|
|
class Meta:
|
|
model = DocbowProfile
|
|
fields = ()
|
|
if app_settings.MOBILE_PHONE:
|
|
fields += ('accept_notifications', 'mobile_phone')
|
|
if app_settings.PERSONAL_EMAIL:
|
|
fields += ('personal_email',)
|
|
|
|
def __init__(self, request, *args, **kwargs):
|
|
"""Initialize the form object.
|
|
Define a custom help text.
|
|
"""
|
|
self.request = request
|
|
|
|
ModelForm.__init__(self, *args, **kwargs)
|
|
if app_settings.MOBILE_PHONE:
|
|
self.fields['mobile_phone'].help_text = _(
|
|
'Use international phone number '
|
|
'format, i.e +[country code][number]. A challenge SMS will be sent to you to validate it.'
|
|
)
|
|
|
|
def clean_mobile_phone(self):
|
|
"""Validate the mobile phone number by sending a HMAC signature as a code by SMS
|
|
to the phone number.
|
|
|
|
The HMAC code is valid for one day.
|
|
"""
|
|
if self.cleaned_data.get('mobile_phone'):
|
|
mobile_phone = phone_normalize(self.cleaned_data['mobile_phone'])
|
|
validate_fr_be_phone(mobile_phone)
|
|
self.cleaned_data['mobile_phone'] = mobile_phone
|
|
if not self.instance or self.instance.mobile_phone != mobile_phone:
|
|
date = datetime.date.today()
|
|
code = hmac.new(
|
|
force_bytes(settings.SECRET_KEY), force_text(date) + mobile_phone, hashlib.sha1
|
|
).hexdigest()
|
|
code = '%06d' % (int(code, 16) % 1000000)
|
|
key = '%s-code' % self.prefix if self.prefix else 'code'
|
|
if self.data.get(key, '').strip() != code:
|
|
|
|
def send_sms(mobile_phone, code):
|
|
try:
|
|
sms_carrier = notification.SMSNotifier.get_carrier()
|
|
sms_carrier.send_sms((mobile_phone,), 'code is ' + code, no_stop=False)
|
|
return True
|
|
except Exception:
|
|
logger.exception(
|
|
'unable to send SMS verification code %r to %r', code, mobile_phone
|
|
)
|
|
self.request.record(
|
|
'error',
|
|
'unable to send SMS verification code {code} to {mobile_phone}',
|
|
mobile_phone=mobile_phone,
|
|
code=code,
|
|
)
|
|
return False
|
|
|
|
if not send_sms(mobile_phone, code):
|
|
raise ValidationError(_('Unable to send you the SMS code, try again.'))
|
|
self.fields['code'] = CharField(label='Code')
|
|
raise ValidationError(_('Enter code received by SMS'))
|
|
return self.cleaned_data.get('mobile_phone')
|
|
|
|
def save(self, commit=True):
|
|
'''Attach current user to the newly created profile object.'''
|
|
instance = ModelForm.save(self, commit=False)
|
|
instance.user = self.request.user
|
|
if commit:
|
|
instance.save()
|
|
return instance
|
|
|
|
|
|
import unicodedata
|
|
from django.utils.six import PY3
|
|
from django.contrib.auth.tokens import default_token_generator
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.utils.http import urlsafe_base64_encode
|
|
from django.utils.encoding import force_bytes
|
|
|
|
|
|
def _unicode_ci_compare(s1, s2):
|
|
"""
|
|
Perform case-insensitive comparison of two identifiers, using the
|
|
recommended algorithm from Unicode Technical Report 36, section
|
|
2.11.2(B)(2).
|
|
"""
|
|
normalized1 = unicodedata.normalize('NFKC', s1)
|
|
normalized2 = unicodedata.normalize('NFKC', s2)
|
|
if PY3:
|
|
return normalized1.casefold() == normalized2.casefold()
|
|
# lower() is the best alternative available on Python 2.
|
|
return normalized1.lower() == normalized2.lower()
|
|
|
|
|
|
class PasswordResetFormWithLogging(PasswordResetForm):
|
|
email = forms.EmailField(widget=forms.HiddenInput(), required=False)
|
|
identifier = forms.CharField(label=_('E-mail or identifier'), max_length=75)
|
|
|
|
def clean_email(self):
|
|
return None
|
|
|
|
def clean_identifier(self):
|
|
"""
|
|
Validates that an active user exists with the given email address.
|
|
"""
|
|
identifier = self.cleaned_data["identifier"]
|
|
self.users_cache = User.objects.filter(
|
|
Q(email__iexact=identifier)
|
|
| Q(username=identifier)
|
|
| Q(docbowprofile__personal_email=identifier),
|
|
is_active=True,
|
|
).distinct()
|
|
for user in self.users_cache:
|
|
try:
|
|
if not user.email or _unicode_ci_compare(user.docbowprofile.personal_email, identifier):
|
|
user.email = user.docbowprofile.personal_email
|
|
except DocbowProfile.DoesNotExist:
|
|
pass
|
|
self.users_cache = [user for user in self.users_cache if user.email]
|
|
if not len(self.users_cache):
|
|
raise forms.ValidationError(
|
|
_(
|
|
"That e-mail address or identifier doesn't have an associated user account. Are you sure you've registered?"
|
|
)
|
|
)
|
|
return identifier
|
|
|
|
def get_users(self, *args):
|
|
return self.users_cache
|
|
|
|
def save(
|
|
self,
|
|
domain_override=None,
|
|
subject_template_name='registration/password_reset_subject.txt',
|
|
email_template_name='registration/password_reset_email.html',
|
|
use_https=False,
|
|
token_generator=default_token_generator,
|
|
from_email=None,
|
|
request=None,
|
|
html_email_template_name=None,
|
|
extra_email_context=None,
|
|
):
|
|
"""
|
|
Generates a one-use only link for resetting password and sends to the
|
|
user.
|
|
"""
|
|
email = self.cleaned_data["email"]
|
|
for user in self.get_users(email):
|
|
if not domain_override:
|
|
current_site = get_current_site(request)
|
|
site_name = current_site.name
|
|
domain = current_site.domain
|
|
else:
|
|
site_name = domain = domain_override
|
|
user_email = getattr(user, 'email')
|
|
context = {
|
|
'email': user_email,
|
|
'domain': domain,
|
|
'site_name': site_name,
|
|
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
|
|
'user': user,
|
|
'token': token_generator.make_token(user),
|
|
'protocol': 'https' if use_https else 'http',
|
|
}
|
|
if extra_email_context is not None:
|
|
context.update(extra_email_context)
|
|
self.send_mail(
|
|
subject_template_name,
|
|
email_template_name,
|
|
context,
|
|
from_email,
|
|
user_email,
|
|
html_email_template_name=html_email_template_name,
|
|
)
|
|
|
|
for user in self.users_cache:
|
|
django_journal.record(
|
|
'password-reset',
|
|
'password reset link sent to {email}',
|
|
user=user,
|
|
email=user.email,
|
|
ip=get_extra()['ip'],
|
|
)
|
|
|
|
|
|
class PasswordChangeFormWithLogging(PasswordChangeForm):
|
|
def save(self, *args, **kwargs):
|
|
super(PasswordChangeFormWithLogging, self).save(*args, **kwargs)
|
|
django_journal.record('password-change', 'changed its email', user=self.user, ip=get_extra()['ip'])
|
|
|
|
|
|
class FilterForm(forms.Form):
|
|
not_before = forms.DateField(label=_('From'), required=False, localize=True)
|
|
not_after = forms.DateField(label=_('To'), required=False, localize=True)
|
|
search_terms = forms.CharField(label=_('Search terms'), required=False)
|
|
|
|
class Media:
|
|
js = (
|
|
'jquery-ui/js/jquery-ui-1.12.1-autocomplete-datepicker.min.js',
|
|
'docbow/js/filter-form.js',
|
|
)
|
|
css = {
|
|
'all': ('jquery-ui/css/jquery-ui-1.12.1.css',),
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
request = kwargs.pop('request')
|
|
outbox = kwargs.pop('outbox', False)
|
|
super(FilterForm, self).__init__(*args, **kwargs)
|
|
self.fields['search_terms'].widget.attrs['data-boxtype'] = 'outbox' if outbox else 'inbox'
|
|
|
|
for field in ('sort', 'page'):
|
|
if field in request.GET:
|
|
self.fields[field] = forms.CharField(initial=request.GET.get(field), widget=forms.HiddenInput)
|
|
|
|
def clean(self):
|
|
cleaned_data = super(FilterForm, self).clean()
|
|
if (
|
|
cleaned_data.get('not_before')
|
|
and cleaned_data.get('not_after')
|
|
and cleaned_data['not_before'] > cleaned_data['not_after']
|
|
):
|
|
raise ValidationError(_('From must be inferior or equal to To'))
|
|
return cleaned_data
|
|
|
|
|
|
class EmailForm(ModelForm):
|
|
old_email = forms.EmailField(
|
|
label=_('Old email'), required=False, widget=forms.TextInput(attrs={'disabled': 'on'})
|
|
)
|
|
password = forms.CharField(label=_("Password"), widget=forms.PasswordInput())
|
|
email = forms.EmailField(label=_('New email'), required=True, initial='')
|
|
email2 = forms.EmailField(label=_('New email (repeated)'), required=True, widget=forms.TextInput())
|
|
|
|
class Meta:
|
|
model = User
|
|
fields = ('email',)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(EmailForm, self).__init__(*args, **kwargs)
|
|
self.initial['email'] = ''
|
|
self.initial['old_email'] = self.instance.email
|
|
|
|
def clean_password(self):
|
|
password = self.cleaned_data['password']
|
|
if not self.instance.check_password(password):
|
|
raise ValidationError(_('password incorrect'))
|
|
return password
|
|
|
|
def clean(self):
|
|
cleaned_data = super(EmailForm, self).clean()
|
|
email = cleaned_data.get('email')
|
|
email2 = cleaned_data.get('email2')
|
|
if email and email2 and email != email2:
|
|
self._errors['email'] = self.error_class([_('emails are not equal')])
|
|
return cleaned_data
|
|
|
|
|
|
class NotificationPreferencesForm(Form):
|
|
class Media:
|
|
js = ('docbow/js/checkall.js',)
|
|
|
|
def __init__(self, request, *args, **kwargs):
|
|
self.user = request.user
|
|
self.notifiers = notification.get_notifiers()
|
|
self.filetypes = models.FileType.objects.all()
|
|
self.choices = []
|
|
self.initials = {}
|
|
self.kinds = []
|
|
for notifier in self.notifiers:
|
|
self.choices.append((notifier.key, notifier.description))
|
|
self.kinds.append(notifier.key)
|
|
for filetype in self.filetypes:
|
|
self.initials[filetype.id] = set(self.kinds)
|
|
for np in models.NotificationPreference.objects.filter(user=self.user):
|
|
if not np.value:
|
|
self.initials[np.filetype_id].remove(np.kind)
|
|
super(NotificationPreferencesForm, self).__init__(*args, **kwargs)
|
|
for filetype in self.filetypes:
|
|
key = 'filetype-%s' % filetype.id
|
|
self.fields[key] = forms.MultipleChoiceField(
|
|
label=force_text(filetype),
|
|
choices=self.choices,
|
|
initial=self.initials[filetype.id],
|
|
widget=widgets.CheckboxMultipleSelect,
|
|
required=False,
|
|
)
|
|
|
|
def save(self):
|
|
cleaned_data = self.cleaned_data
|
|
adds = collections.defaultdict(lambda: [])
|
|
removes = collections.defaultdict(lambda: [])
|
|
for key in cleaned_data:
|
|
filetype_id = int(key.split('-')[1])
|
|
new = set(cleaned_data[key])
|
|
old = self.initials[filetype_id]
|
|
remove = old - new
|
|
for kind in remove:
|
|
removes[kind].append(filetype_id)
|
|
for kind in new - old:
|
|
adds[kind].append(filetype_id)
|
|
for kind in adds:
|
|
models.NotificationPreference.objects.filter(
|
|
user=self.user, kind=kind, filetype__in=adds[kind]
|
|
).delete()
|
|
for kind in removes:
|
|
for filetype_id in removes[kind]:
|
|
filetype = models.FileType.objects.get(id=filetype_id)
|
|
np, created = models.NotificationPreference.objects.get_or_create(
|
|
user=self.user, kind=kind, filetype=filetype, value=False
|
|
)
|