This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
univnautes-old/virtualenv/pfidp/users_admin/pfusers.py

215 lines
6.7 KiB
Python

import xml.etree.ElementTree as ET
try:
from django.conf import settings
PF_CONFIG_XML = settings.PF_CONFIG_XML
CLEAR_PASSWORD_DIR = settings.CLEAR_PASSWORD_DIR
except ImportError:
PF_CONFIG_XML = '/conf/config.xml'
CLEAR_PASSWORD_DIR = '/var/db/univnautes/pfidp/passwords'
import datetime
import subprocess
import syslog
import htmlentitydefs
import re
import random
import fnmatch
import os
pattern = re.compile("&(\w+?);")
def html_entity_decode_char(m, defs=htmlentitydefs.entitydefs):
try:
return defs[m.group(1)]
except KeyError:
return m.group(0)
def html_entity_decode(string):
return pattern.sub(html_entity_decode_char, string)
def configxml():
f = open(PF_CONFIG_XML,'r')
root = ET.fromstring(f.read())
f.close()
return root
def create_password(username):
if not os.path.exists(CLEAR_PASSWORD_DIR):
os.makedirs(CLEAR_PASSWORD_DIR)
password = ''.join([random.choice('23456789ABCDEFGHJLMNPQRSTUVWXZabcdefghjkmnpqrstuvwxyz')
for x in range(random.randint(6,9))])
filename = os.path.join(CLEAR_PASSWORD_DIR, 'user-%s' % username)
f = open(filename, 'wb')
f.write(password)
f.close()
return password
def read_password(username):
filename = os.path.join(CLEAR_PASSWORD_DIR, 'user-%s' % username)
try:
f = open(filename, 'rb')
password = f.read()
f.close()
return password
except:
return None
def delete_password(username):
filename = os.path.join(CLEAR_PASSWORD_DIR, 'user-%s' % username)
try:
os.unlink(filename)
except:
pass
def get_all_pfusers(filter=None, with_password=False):
xml_users = configxml().findall('system/user')
if xml_users is None:
return {}
users = {}
for xml_user in xml_users:
scope = xml_user.find('scope')
if scope is None or scope.text != 'user':
continue
user = dict([(tag, xml_user.findtext(tag))
for tag in ('uid', 'name', 'expires', 'descr')])
if filter and not fnmatch.fnmatch(user['name'], filter+'*'):
continue
user['descr'] = html_entity_decode(user['descr']).decode('iso-8859-1')
user['priv'] = set([priv.text for priv in xml_user.findall('priv')])
if with_password:
user['password'] = read_password(user['name'])
expires = user.get('expires')
if expires:
try:
user['expires'] = datetime.datetime.strptime(expires, '%m/%d/%Y').date() # pfSense format is mm/dd/YYYY
user['ttl'] = (user['expires'] - datetime.date.today()).days
if user['ttl'] < 0:
user['ttl'] = 0
except:
# pfSense xml error ? ok, I suppose the account is expired
user['expires'] = datetime.date.today()
user['ttl'] = 0
else:
user['expires'] = None
user['ttl'] = -1 # no expiration
user['disabled'] = xml_user.find('disabled') is not None
users[user['uid']] = user
# priv from groups
xml_groups = configxml().findall('system/group')
if not xml_groups:
return users
for xml_group in xml_groups:
xml_members = xml_group.findall('member')
if not xml_members:
continue
xml_privs = xml_group.findall('priv')
if not xml_privs:
continue
privs = set([priv.text for priv in xml_privs])
for uid in [xml_member.text for xml_member in xml_members]:
user = users.get(uid)
if user:
user['priv'].update(privs)
# 1) keep only users with the "univnautes-idp" privilege
# 1bis) and without "univnautes-idp-admin" privilege
# 2) new index of the dict is the username (instead of the uid)
users = dict([(users[uid]['name'], users[uid]) for uid in users
if ('univnautes-idp' in users[uid]['priv']) and ('univnautes-idp-admin' not in users[uid]['priv'])])
return users
def call(action, **kwargs):
# TODO: add some checks on kwargs...
cmd = ['/usr/local/univnautes/bin/pf_useradm', ]
cmd.append(action)
cmd += [ '%s=%s' % (k,v.encode('iso-8859-1')) for k,v in kwargs.items() ]
try:
p = subprocess.Popen(cmd, close_fds=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError, e:
syslog.openlog("idpusersadmin/pf_useradmin", syslog.LOG_PID)
syslog.syslog(syslog.LOG_LOCAL4 | syslog.LOG_INFO,
"ERROR %s: OSError %s" % (action, e))
return False, "ERROR: OSError %s" % e
stdout, stderr = p.communicate()
if p.returncode != 0:
syslog.openlog("idpusersadmin/pf_useradmin", syslog.LOG_PID)
syslog.syslog(syslog.LOG_LOCAL4 | syslog.LOG_INFO,
"ERROR %s: code %s (out:%s) (err:%s)" % (action, p.returncode, stdout, stderr))
return False, stdout + stderr
syslog.openlog("idpusersadmin/pf_useradmin", syslog.LOG_PID)
syslog.syslog(syslog.LOG_LOCAL4 | syslog.LOG_INFO,
"SUCCESS %s: %s" % (action, stdout))
return True, stdout
def create(username, password, expires, disabled=False, descr='', multiple=False):
if isinstance(expires, datetime.date):
expires_str = expires.strftime('%m/%d/%Y')
else:
expires_str = ''
if disabled:
disabled_str='yes'
else:
disabled_str='no'
privs = 'univnautes-idp'
if multiple:
privs += ',univnautes-idp-multiple'
if not password:
password = create_password(username)
return call('create', name=username, password=password,
expires=expires_str,
disabled=disabled_str,
privs=privs,
descr=descr)
def desactivate(username):
return call('update', name=username, disabled='yes')
def activate(username):
return call('update', name=username, disabled='no')
def delete(username):
return call('delete', name=username)
def update(username, password, expires, disabled, descr, multiple):
if isinstance(expires, datetime.date):
expires_str = expires.strftime('%m/%d/%Y')
else:
expires_str = ''
if disabled:
disabled_str='yes'
else:
disabled_str='no'
privs = 'univnautes-idp'
if multiple:
privs += ',univnautes-idp-multiple'
if password:
delete_password(username)
return call('update', name=username, password=password,
expires=expires_str,
disabled=disabled_str,
privs=privs,
descr=descr)
else:
return call('update', name=username,
expires=expires_str,
disabled=disabled_str,
privs=privs,
descr=descr)