cleanup: remove legacy support for bulk importing users (#48558)

This commit is contained in:
Frédéric Péters 2020-11-15 15:44:26 +01:00
parent f03e42545e
commit f329c8b057
2 changed files with 1 additions and 231 deletions

View File

@ -1127,17 +1127,6 @@ def test_settings_auth_password(pub):
resp = resp.click('Passwords')
resp = resp.forms[0].submit()
resp = app.get('/backoffice/settings/identification/password/')
resp = resp.click('Bulk Import')
resp = resp.forms[0].submit()
# same with existing roles
Role(name='blah').store()
resp = app.get('/backoffice/settings/identification/password/')
resp = resp.click('Bulk Import')
resp = resp.forms[0].submit()
def test_settings_filetypes(pub):
create_superuser(pub)

View File

@ -649,7 +649,7 @@ class MethodAdminDirectory(Directory):
title = ADMIN_TITLE
label = N_('Configure username/password identification method')
_q_exports = ['', 'passwords', 'identities', ('import', 'p_import')]
_q_exports = ['', 'passwords', 'identities']
def _q_index(self):
html_top('settings', title = _(ADMIN_TITLE))
@ -665,8 +665,6 @@ class MethodAdminDirectory(Directory):
_('Identities'), _('Configure identities creation'))
r += htmltext('<dt><a href="passwords">%s</a></dt> <dd>%s</dd>') % (
_('Passwords'), _('Configure all password things'))
r += htmltext('<dt><a href="import">%s</a></dt> <dd>%s</dd>') % (
_('Bulk Import'), _('Import accounts from a CSV file'))
r += htmltext('</dl>')
return r.getvalue()
@ -775,223 +773,6 @@ class MethodAdminDirectory(Directory):
('creation', 'email-as-username', 'notify-on-register', 'email-confirmation',
'warn_about_unused_account_delay', 'remove_unused_account_delay'))
def p_import(self):
if get_request().form.get('job'):
return self.sending_notification()
passwords_cfg = get_cfg('passwords', {})
form = Form(enctype='multipart/form-data')
form.add(FileWidget, 'file', title = _('File'), required = True)
form.add(CheckboxWidget, 'send_notifications',
title=_('Send notifications to users'),
required=False,
value=False)
user_class = get_publisher().user_class()
if hasattr(user_class, str('get_available_roles')):
roles = user_class.get_available_roles()
if roles:
roles = [(None, '---', '')] + roles
form.add(WidgetList, 'roles', title=_('Roles'),
element_type=SingleSelectWidget,
add_element_label=_('Add Role'),
element_kwargs={
str('render_br'): False,
str('options'): roles})
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
t = self.import_submit(form)
if not form.has_errors():
if t:
return t
return redirect('.')
html_top('settings', title = _('Bulk Import'))
r = TemplateIO(html=True)
identities_cfg = get_cfg('identities', {})
users_cfg = get_cfg('users', {})
r += htmltext('<h2>%s</h2>') % _('Bulk Import')
r += htmltext('<p>')
r += _('The CSV file must strictly adhere to the following structure:')
r += htmltext('</p>')
r += htmltext('<ul>')
r += htmltext('<li>%s</li>') % _('Charset: %s') % get_publisher().site_charset
r += htmltext('<li>%s</li>') % _('Column Separator: ;')
r += htmltext('<li>%s') % _('Columns:')
r += htmltext('<ul>')
if not identities_cfg.get('email-as-username', False):
r += htmltext('<li>%s</li>') % _('Username')
formdef = get_publisher().user_class.get_formdef()
if not formdef or not users_cfg.get('field_name'):
r += htmltext('<li>%s</li>') % _('Name')
if not formdef or not users_cfg.get('field_email'):
r += htmltext('<li>%s</li>') % _('Email')
r += htmltext('<li>%s') % _('Password')
if passwords_cfg.get('hashing_algo'):
r += ' '
r += _('(%s hash)') % passwords_cfg.get('hashing_algo').upper()
r += ' '
r += _('(empty to get an automatically generated password)')
r += htmltext('</li>')
if formdef:
for field in formdef.fields:
r += htmltext('<li>%s</li>') % field.label
r += htmltext('</ul>')
r += htmltext('</li>')
r += htmltext('</li>')
r += htmltext('</ul>')
r += form.render()
return r.getvalue()
def import_submit(self, form):
identities_cfg = get_cfg('identities', {})
passwords_cfg = get_cfg('passwords', {})
users_cfg = get_cfg('users', {})
hashing_algo = passwords_cfg.get('hashing_algo')
required_nb_columns = 0
if not identities_cfg.get('email-as-username', False):
required_nb_columns += 1
username_field_number = 0
formdef = get_publisher().user_class.get_formdef()
if not formdef or not users_cfg.get('field_name'):
name_field = required_nb_columns
required_nb_columns += 1
if not formdef or not users_cfg.get('field_email'):
email_field = required_nb_columns
if identities_cfg.get('email-as-username', False):
username_field_number = required_nb_columns
required_nb_columns += 1
password_field_number = required_nb_columns
required_nb_columns += 1
if formdef:
base_formdef_field_no = required_nb_columns
required_nb_columns += len(formdef.fields)
send_notifications = form.get_widget('send_notifications').parse()
created_accounts = []
roles = None
if form.get_widget('roles'):
roles = form.get_widget('roles').parse()
objects = []
reader = csv.reader(form.get_widget('file').parse().fp, delimiter=';')
for i, csv_line in enumerate(reader):
if len(csv_line) != required_nb_columns:
form.set_error('file', _('Incorrect number of columns (line: %s)') % (i+1))
return
u = get_publisher().user_class()
if formdef:
if not users_cfg.get('field_name'):
u.name = csv_line[name_field]
if not users_cfg.get('field_email'):
u.email = csv_line[email_field]
data = {}
for j, field in enumerate(formdef.fields):
if identities_cfg.get('email-as-username', False) and \
users_cfg.get('field_email') == field.id:
username_field_number = base_formdef_field_no+j
data[field.id] = csv_line[base_formdef_field_no+j]
u.set_attributes_from_formdata(data)
u.form_data = data
else:
u.name = csv_line[name_field]
u.email = csv_line[email_field]
if roles:
u.add_roles(roles)
username = csv_line[username_field_number]
if PasswordAccount.has_key(username):
form.set_error('file', _('Duplicate username (line: %s)') % (i+1))
return
if [x for x in objects if x[1].id == username]:
form.set_error('file', _('Duplicate username (line: %s)') % (i+1))
return
password = csv_line[password_field_number]
p = PasswordAccount(id=username)
if hashing_algo and password:
p.hashing_algo = hashing_algo
p.password = password
password = None
elif password:
p.hashing_algo = hashing_algo
p.set_password(password)
objects.append((u, p))
if send_notifications:
if not p.password:
password = make_password()
p.set_password(password)
created_accounts.append({'username': username,
'email_as_username': identities_cfg.get('email-as-username', False),
'hostname': get_request().get_server(),
'email': u.email,
'password': password})
for u, p in objects:
u.id = u.get_new_id()
u.store()
p.user_id = u.id
p.store()
get_session().message = ('info', _('Number of accounts created: %s') % (i+1))
if send_notifications:
class NotificationSender(object):
def __init__(self, accounts):
self.accounts = accounts
def email(self, job=None):
for account in self.accounts:
if not account.get('email'):
continue
emails.custom_template_email('new-account-generated-password',
account, account['email'],
fire_and_forget=False)
job = get_response().add_after_job(
str(N_('Sending subscription emails')),
NotificationSender(created_accounts).email)
return redirect('import?job=%s' % job.id)
def sending_notification(self):
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('..')
html_top('settings', title=_('Notifications'))
r = TemplateIO(html=True)
r += get_session().display_message()
get_response().add_javascript(['jquery.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a href="./">%s</a>') % _('Back')
r += htmltext('</div>')
return r.getvalue()
class UsernamePasswordWidget(CompositeWidget):
def __init__(self, name, value=None, **kwargs):