238 lines
7.6 KiB
Python
238 lines
7.6 KiB
Python
# -*- encoding: utf-8 -*-
|
|
|
|
from functools import wraps
|
|
import datetime
|
|
import time
|
|
import random
|
|
|
|
import django.core.cache
|
|
from django.core.urlresolvers import RegexURLPattern, RegexURLResolver
|
|
from django.db import models, connections
|
|
from django.db.models import query
|
|
import django.forms
|
|
from django.contrib.admin.options import BaseModelAdmin
|
|
|
|
def get_next_workable_day(increment=2, increment_hours=0, increment_days=1, today=None):
|
|
nextday = datetime.date.today()
|
|
if today is not None:
|
|
nextday = today
|
|
i = 0
|
|
while i < increment:
|
|
nextday = nextday + datetime.timedelta(hours=increment_hours, days=increment_days)
|
|
if nextday.weekday() not in (5,6):
|
|
i += 1
|
|
return nextday
|
|
|
|
def profile(func):
|
|
def f(*args, **kwargs):
|
|
t = time.time()
|
|
result = func(*args, **kwargs)
|
|
print 'Profiling call to %s(%s, %s):' % (func, args, kwargs), time.time()-t
|
|
return result
|
|
return f
|
|
|
|
def tuplify(iterable):
|
|
'''Convert all iterable to tuple.
|
|
|
|
Use it when you want to index a dictionary with an iterable.
|
|
'''
|
|
if hasattr(iterable, '__iter__'):
|
|
return tuple(tuplify(x) for x in iterable)
|
|
else:
|
|
return iterable
|
|
|
|
def stringify(iterable):
|
|
if isinstance(iterable, models.Model):
|
|
return unicode(iterable.pk)
|
|
elif hasattr(iterable, '__iter__') and not isinstance(iterable, basestring):
|
|
return unicode(map(stringify, iterable))
|
|
else:
|
|
return unicode(iterable)
|
|
|
|
def normalize_key(key):
|
|
return ''.join(filter(lambda c: c.isalnum(), stringify(tuplify(key))))
|
|
|
|
class CachingDecorator(object):
|
|
'''Mixin class for building caching decorators'''
|
|
def __init__(self, **kwargs):
|
|
self.__dict__.update(kwargs)
|
|
|
|
def cache(self, args, kwargs):
|
|
return None
|
|
|
|
def key(self, args, kwargs):
|
|
return tuple(args) + tuple(sorted(kwargs.iteritems()))
|
|
|
|
def get(self, cache, key, default=None):
|
|
return default
|
|
|
|
def set(self, cache, key, value):
|
|
pass
|
|
|
|
def __call__(self, function):
|
|
function_name = '{0}.{1}_cache'.format(function.__module__,
|
|
function.__name__)
|
|
@wraps(function)
|
|
def f(*args, **kwargs):
|
|
cache = self.cache(args, kwargs)
|
|
key = normalize_key((function_name, self.key(args, kwargs)))
|
|
result = self.get(cache, key)
|
|
if result:
|
|
return result
|
|
result = function(*args, **kwargs)
|
|
self.set(cache, key, result)
|
|
return result
|
|
return f
|
|
|
|
class CacheToSession(CachingDecorator):
|
|
def cache(self, args, kwargs):
|
|
return getattr(kwargs.get('request', None), 'session', None)
|
|
|
|
def get(self, cache, key, default=None):
|
|
if cache is not None:
|
|
return cache.get(key, default)
|
|
return default
|
|
|
|
def set(self, cache, key, value):
|
|
if cache is not None:
|
|
cache[key] = value
|
|
|
|
'''Decorator adding caching to the request.session to any function'''
|
|
cache_to_session = CacheToSession()
|
|
|
|
class CacheToDjangoCache(CachingDecorator):
|
|
'''Add cache using the Django cache framework to a function
|
|
|
|
Parameters:
|
|
cache_name - use a specific named cache backend
|
|
timeout - cache values this amount of time
|
|
'''
|
|
def __init__(self, cache_name=None, timeout=None):
|
|
self.cache_name = cache_name
|
|
self.timeout = timeout
|
|
|
|
def cache(self, args, kwargs):
|
|
if self.cache_name:
|
|
return django.core.cache.get_cache(self.cache_name)
|
|
else:
|
|
return django.core.cache.cache
|
|
|
|
def get(self, cache, key, default=None):
|
|
return cache.get(key, default)
|
|
|
|
def set(self, cache, key, value):
|
|
cache.set(key, value, timeout=self.timeout)
|
|
|
|
'''Decorator adding caching to the default Django cache backend'''
|
|
cache_to_django = CacheToDjangoCache
|
|
|
|
def slice_collection(iterable, length):
|
|
'''Cut a slicable container into sample of maximum size equals to length'''
|
|
if hasattr(iterable, '__getslice__'):
|
|
i = 0
|
|
while True:
|
|
l = iterable[i:i+length]
|
|
yield l
|
|
if len(l) < length:
|
|
break
|
|
i += length
|
|
else:
|
|
l = []
|
|
for x in iterable:
|
|
l.append(x)
|
|
if len(l) == length:
|
|
yield l
|
|
l = []
|
|
if l:
|
|
yield l
|
|
|
|
class DecoratedURLPattern(RegexURLPattern):
|
|
def resolve(self, *args, **kwargs):
|
|
result = super(DecoratedURLPattern, self).resolve(*args, **kwargs)
|
|
if result:
|
|
result.func = self._decorate_with(result.func)
|
|
return result
|
|
|
|
class DecoratedRegexURLResolver(RegexURLResolver):
|
|
def resolve(self, *args, **kwargs):
|
|
result = super(DecoratedRegexURLResolver, self).resolve(*args, **kwargs)
|
|
if result:
|
|
result.func = self._decorate_with(result.func)
|
|
return result
|
|
|
|
def decorated_includes(func, includes, *args, **kwargs):
|
|
urlconf_module, app_name, namespace = includes
|
|
|
|
for item in urlconf_module:
|
|
if isinstance(item, RegexURLPattern):
|
|
item.__class__ = DecoratedURLPattern
|
|
item._decorate_with = func
|
|
|
|
elif isinstance(item, RegexURLResolver):
|
|
item.__class__ = DecoratedRegexURLResolver
|
|
item._decorate_with = func
|
|
|
|
return urlconf_module, app_name, namespace
|
|
|
|
def sort_by_frequency(seq):
|
|
l = list(seq)
|
|
counts = [(a, l.count(a)) for a in set(seq)]
|
|
return map(lambda x: x[0], sorted(counts, key=lambda x: -x[1]))
|
|
|
|
class QuerySetManager(models.Manager):
|
|
# Copied from http://adam.gomaa.us/blog/2009/feb/16/subclassing-django-querysets/
|
|
# http://docs.djangoproject.com/en/dev/topics/db/managers/#using-managers-for-related-object-access
|
|
# Not working cause of:
|
|
# http://code.djangoproject.com/ticket/9643
|
|
use_for_related_fields = True
|
|
def __init__(self, qs_class=models.query.QuerySet):
|
|
self.queryset_class = qs_class
|
|
super(QuerySetManager, self).__init__()
|
|
|
|
def get_query_set(self):
|
|
return self.queryset_class(self.model)
|
|
|
|
def none(self):
|
|
return self.get_query_set().none()
|
|
|
|
def __getattr__(self, attr, *args):
|
|
try:
|
|
return getattr(self.__class__, attr, *args)
|
|
except AttributeError:
|
|
return getattr(self.get_query_set(), attr, *args)
|
|
|
|
class QuerySet(models.query.QuerySet):
|
|
# Copied from http://adam.gomaa.us/blog/2009/feb/16/subclassing-django-querysets/
|
|
"""Base QuerySet class for adding custom methods that are made
|
|
available on both the manager and subsequent cloned QuerySets"""
|
|
|
|
def none(self):
|
|
klass = type("_InstrumentedEmptyQuerySet_%s" % self.__class__.__name__, (query.EmptyQuerySet, self.__class__), {})
|
|
return self._clone(klass=klass)
|
|
|
|
@classmethod
|
|
def as_manager(cls, ManagerClass=QuerySetManager):
|
|
return ManagerClass(cls)
|
|
|
|
def qs_use_postgres(qs):
|
|
return hasattr(connections[qs._db or 'default'], 'pg_version')
|
|
|
|
class LocalizedModelForm(django.forms.ModelForm):
|
|
def __new__(cls, *args, **kwargs):
|
|
new_class = super(LocalizedModelForm, cls).__new__(cls, *args, **kwargs)
|
|
for field in new_class.base_fields.values():
|
|
if isinstance(field, django.forms.DecimalField):
|
|
field.localize = True
|
|
field.widget.is_localized = True
|
|
return new_class
|
|
|
|
class LocalizedModelAdmin(BaseModelAdmin):
|
|
'''Replace default ModelForm by LocalizedModelForm'''
|
|
form = LocalizedModelForm
|
|
|
|
def ellipsize(value, length=25):
|
|
value = unicode(value)
|
|
if len(value) > length:
|
|
value = value[:length-3] + u'\u2026'
|
|
return value
|