287 lines
9.9 KiB
Python
287 lines
9.9 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# This file is part of passerelle-atreal-openads - a Publik connector to openADS
|
|
#
|
|
# Copyright (C) 2019 Atreal
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""Utilities functions."""
|
|
|
|
import json
|
|
import base64
|
|
import datetime
|
|
import re
|
|
import hashlib
|
|
import copy
|
|
import mimetypes
|
|
import sys
|
|
|
|
from django.urls import reverse_lazy
|
|
from django.utils import six
|
|
from django.utils.encoding import force_text
|
|
from django.utils.six.moves.html_parser import HTMLParser
|
|
|
|
|
|
def to_dash_case(camel_str):
|
|
"""Convert a string formatted from camel case to dash case (like snake case with dash)."""
|
|
converted = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', camel_str)
|
|
return re.sub('([a-z0-9])([A-Z])', r'\1-\2', converted).lower()
|
|
|
|
|
|
# from: https://stackoverflow.com/a/13848698
|
|
def force_encoded_string_output(func, default_enc='utf-8'):
|
|
"""Decorator function that return the result converted to str type."""
|
|
if sys.version_info.major < 3:
|
|
def _func(*args, **kwargs):
|
|
return func(*args, **kwargs).encode(sys.stdout.encoding or default_enc)
|
|
return _func
|
|
return func
|
|
|
|
|
|
class MLStripper(HTMLParser, object):
|
|
"""HTML parser that removes html tags."""
|
|
def __init__(self):
|
|
super(MLStripper, self).__init__()
|
|
self.fed = []
|
|
|
|
def handle_data(self, data):
|
|
self.fed.append(data)
|
|
|
|
def get_data(self):
|
|
"""Get the stripped data as a string."""
|
|
return ''.join(self.fed)
|
|
|
|
|
|
def strip_tags(html):
|
|
"""Remove html tags from a string."""
|
|
stripper = MLStripper()
|
|
stripper.feed(html)
|
|
return stripper.get_data()
|
|
|
|
|
|
def clean_spaces(text):
|
|
"""Remove extra spaces an line breaks from a string."""
|
|
text = text.replace('\n', ' ')
|
|
text = text.replace('\r', ' ')
|
|
text = text.replace('\t', ' ')
|
|
text = text.replace('\\n', ' ')
|
|
text = text.replace('\\r', ' ')
|
|
text = text.replace('\\t', ' ')
|
|
return re.sub(r' +', ' ', text).strip()
|
|
|
|
|
|
def normalize(value):
|
|
"""Normalize a value to be send to openADS.API."""
|
|
if value is None:
|
|
return ''
|
|
if not isinstance(value, six.text_type):
|
|
value = force_text(value)
|
|
return clean_spaces(value)
|
|
|
|
|
|
def get_file_data(path, b64=True):
|
|
"""Return the content of a file as a string, in base64 if specified."""
|
|
with open(path, 'rb') as file_pt:
|
|
if b64:
|
|
return base64.b64encode(file_pt.read())
|
|
return file_pt.read()
|
|
|
|
|
|
# copy-pasted from 'wcs/qommon/misc.py'
|
|
def get_file_digest(content, chunk_size=100000):
|
|
"""Return a hash for the content specified."""
|
|
digest = hashlib.sha256()
|
|
content.seek(0)
|
|
|
|
def read_chunk():
|
|
"""Read 'chunk_size' amount of data from the content."""
|
|
return content.read(chunk_size)
|
|
|
|
for chunk in iter(read_chunk, b''):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def get_upload_path(instance, filename=None):
|
|
"""Return a relative upload path for a file."""
|
|
fn_ref = instance.orig_filename if instance.orig_filename else filename
|
|
# file_hash and content_type attribute are updated on file save()
|
|
# so if the file was not yet saved, it may have those attributes undefined
|
|
# this is why we update them here, if they are empty
|
|
instance.update_file_hash(only_if_empty=True)
|
|
instance.update_content_type(only_if_empty=True)
|
|
# be careful:
|
|
# * openADS accept only filename less than 50 chars
|
|
# * name should be unique, even if the content is the same
|
|
return 'to_openADS__%s__%s%s' % (
|
|
datetime.datetime.now().strftime('%Y-%m-%d_%Hh%Mm%Ss%f'),
|
|
instance.file_hash[:4],
|
|
get_file_extension(fn_ref, instance.content_type)[:5]
|
|
)
|
|
|
|
|
|
def get_file_extension(filename, mimetype=None):
|
|
"""Return the extension of the file, according to its filename or specified mimetype."""
|
|
file_extension = None
|
|
if filename and '.' in filename:
|
|
file_extension = re.sub(r'^.*\.', '.', filename)
|
|
elif mimetype:
|
|
file_extension = mimetypes.guess_extension(mimetype)
|
|
return file_extension if file_extension else ''
|
|
|
|
|
|
# pylint: disable=invalid-encoded-data
|
|
def trunc_str_values(value, limit, visited=None, truncate_text=u'…'):
|
|
"""Truncate a string value (not dict keys) and append a truncate text."""
|
|
|
|
if visited is None:
|
|
visited = []
|
|
if value not in visited:
|
|
if isinstance(value, six.string_types) and len(value) > limit:
|
|
value = value[:limit] + truncate_text
|
|
elif isinstance(value, (dict, list, tuple)):
|
|
visited.append(value)
|
|
iterator = value.items() if isinstance(value, dict) else enumerate(value)
|
|
for _key, _value in iterator:
|
|
value[_key] = trunc_str_values(_value, limit, visited, truncate_text)
|
|
return value
|
|
|
|
|
|
@six.python_2_unicode_compatible
|
|
class DictDumper(object):
|
|
"""Helper to dump a dictionary to a string representation with lazy processing.
|
|
|
|
Only applied when dict is converted to string (lazy processing):
|
|
- long strings truncated (after the dict has been 'deep' copied)
|
|
- (optionaly) dict converted with json.dumps instead of unicode().
|
|
"""
|
|
|
|
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
|
|
""" arguments:
|
|
- dic string the dict to dump
|
|
- max_str_len integer the maximul length of string values
|
|
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
|
|
"""
|
|
self.dic = dic
|
|
self.max_str_len = max_str_len
|
|
self.use_json_dumps = use_json_dumps
|
|
|
|
@force_encoded_string_output
|
|
def __repr__(self):
|
|
return u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
|
|
self.dic, self.max_str_len, self.use_json_dumps)
|
|
|
|
def __str__(self):
|
|
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
|
|
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
|
|
return force_text(dict_ref)
|
|
|
|
|
|
class BaseModel(object):
|
|
"""A class that provide basic usefull functions.
|
|
Intended for all models to extends it.
|
|
"""
|
|
|
|
@classmethod
|
|
def get_verbose_name(cls):
|
|
"""Return the verbose name of the class (helper for META option)."""
|
|
# pylint: disable=no-member
|
|
return cls._meta.verbose_name
|
|
|
|
@classmethod
|
|
def get_verbose_name_plural(cls):
|
|
"""Return the plural form of the verbose name of the class (helper for META option)."""
|
|
# pylint: disable=no-member
|
|
return cls._meta.verbose_name_plural
|
|
|
|
@classmethod
|
|
def get_class_name(cls):
|
|
"""Return the object class name."""
|
|
return cls.__name__
|
|
|
|
@classmethod
|
|
def get_class_name_plural(cls):
|
|
"""Return the plural form of the object class name."""
|
|
return cls.get_class_name() + 's'
|
|
|
|
@classmethod
|
|
def get_class_name_dash_case(cls):
|
|
"""Return the object class name formatted to dash case."""
|
|
return to_dash_case(cls.get_class_name())
|
|
|
|
@classmethod
|
|
def get_class_name_plural_dash_case(cls):
|
|
"""Return the plural form of the object class name
|
|
formatted to dash case.
|
|
"""
|
|
return to_dash_case(cls.get_class_name_plural())
|
|
|
|
@classmethod
|
|
def get_class_name_title(cls):
|
|
"""Return the object class name formatted to 'title' case."""
|
|
return cls.get_class_name_dash_case().replace('-', ' ').title()
|
|
|
|
@classmethod
|
|
def get_class_name_plural_title(cls):
|
|
"""Return the plural form of the object class name
|
|
formatted to 'title' case.
|
|
"""
|
|
return cls.get_class_name_plural_dash_case().replace('-', ' ').title()
|
|
|
|
@classmethod
|
|
def get_fields(cls):
|
|
"""Return the fields of the class (helper for META option)."""
|
|
# pylint: disable=no-member
|
|
return cls._meta.get_fields(include_parents=True, include_hidden=False)
|
|
|
|
@force_encoded_string_output
|
|
def __str__(self):
|
|
return force_text(self)
|
|
|
|
# mainly for the view
|
|
def get_fields_kv(self):
|
|
"""Return the model's list of field's key value."""
|
|
# pylint: disable=no-member
|
|
return [(field, getattr(self, field.name, None)) for field in self._meta.get_fields()]
|
|
|
|
def get_url_name(self, prefix='', plural=False):
|
|
"""Return a base name for url for this object."""
|
|
class_name_dash_case = self.__class__.get_class_name_dash_case()
|
|
if plural:
|
|
class_name_dash_case = self.__class__.get_class_name_plural_dash_case()
|
|
return '%s%s' % (prefix + '-' if prefix else '', class_name_dash_case)
|
|
|
|
def get_url_params(self, primary_key=True):
|
|
"""Return the parameters for 'reverse()' to build url for this object."""
|
|
# pylint: disable=no-member
|
|
return {'pk': self.id} if primary_key else {}
|
|
|
|
def get_absolute_url(self):
|
|
"""Return the 'absolute' url for this object."""
|
|
return reverse_lazy(self.get_url_name('view'), kwargs=self.get_url_params())
|
|
|
|
def get_edit_url(self):
|
|
"""Return the 'edit' url for this object."""
|
|
return reverse_lazy(self.get_url_name('edit'), kwargs=self.get_url_params())
|
|
|
|
def get_delete_url(self):
|
|
"""Return the 'delete' url for this object."""
|
|
return reverse_lazy(self.get_url_name('delete'), kwargs=self.get_url_params())
|
|
|
|
def get_list_url(self):
|
|
"""Return the 'list' url for this object."""
|
|
return reverse_lazy(self.get_url_name('list', True), kwargs=self.get_url_params(False))
|