docbow/docbow_project/docbow/utils.py

137 lines
3.7 KiB
Python

import os.path
import mimetypes
from itertools import count
import datetime
from django.contrib.auth.models import User
from django.contrib.auth.tokens import default_token_generator
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
import requests
from docbow_project.docbow import app_settings
def truncate_filename(file_name):
'''Truncate filename (without extension) and append original extension'''
base_name = os.path.basename(file_name)
base, ext = os.path.splitext(base_name)
base = base[: app_settings.TRUNCATE_FILENAME]
return base + ext
def clean_ldap_user(user):
'''Clean references to module in django-auth-ldap User model, allowing
pickling of the instances.'''
user.ldap_user = None
return user
def match_mime_type(t1, t2):
if t1.endswith('/*') or t2.endswith('/*'):
t1 = t1.split('/')[0]
t2 = t2.split('/')[0]
if '*' in (t1, t2):
return True
return t1 == t2
def match_mime_types(mime_type, mime_types):
for other_mime_type in mime_types:
if match_mime_type(other_mime_type, mime_type):
return True
return False
def file_match_mime_types(file_like, mime_types, buffer_size):
import magic
if not mime_types:
return True
if hasattr(magic, 'from_buffer'):
mime_type = magic.from_buffer(file_like.read(buffer_size), mime=True)
else:
mime_type = magic.detect_from_content(file_like.read(buffer_size)).mime_type
return match_mime_types(mime_type, mime_types)
def mime_types_to_extensions(mime_types):
extensions = set()
for mime_type in mime_types:
extension = mimetypes.guess_extension(mime_type, strict=False)
if extension is None:
continue
extensions.add(extension)
return sorted(extensions)
def get_free_delegation_number(user):
'''Allocate a new delegation username'''
for i in count(1):
delegate_username = user.username + '-%d' % i
try:
User.objects.get(username=delegate_username)
except User.DoesNotExist:
return delegate_username
def make_password_reset_url(request, user):
uid = urlsafe_base64_encode(force_bytes(user.id))
token = default_token_generator.make_token(user)
return request.build_absolute_uri(
reverse('auth_password_reset_confirm', kwargs={'uidb64': uid, 'token': token})
)
def date_to_aware_datetime(date):
return timezone.make_aware(
datetime.datetime(date.year, date.month, date.day), timezone.get_current_timezone()
)
def queryset_fixpoint(initial, qs_function):
initial = set(initial)
count = len(initial)
while True:
initial |= set(qs_function(initial))
if len(initial) == count:
break
count = len(initial)
return initial
def a2_wscall(url, method, json=None):
kwargs = {
'url': url,
'auth': requests.auth.HTTPBasicAuth(
app_settings.settings.AUTHENTIC_USER, app_settings.settings.AUTHENTIC_PASSWORD
),
}
if json:
kwargs['json'] = json
resp = getattr(requests, method)(**kwargs)
err = False
err_desc = ''
json_data = None
try:
resp.raise_for_status()
except requests.RequestException as e:
err = True
err_desc = str(e)
try:
json_data = resp.json()
except Exception as e:
pass
if err and json_data and 'errors' in json_data:
errors = json_data['errors']
if hasattr(errors, 'values'):
errors = list(errors.values())
err_desc = errors[0][0]
return err, json_data, err_desc