300 lines
9.5 KiB
Python
300 lines
9.5 KiB
Python
from decimal import Decimal
|
|
from datetime import time, datetime, date
|
|
|
|
from django.db import models
|
|
from django.contrib.auth.models import User
|
|
from django.forms import (IntegerField, CharField, DateField, DateTimeField,
|
|
TimeField, DecimalField, TypedChoiceField, FloatField)
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.db.models.signals import post_save, post_delete
|
|
|
|
from .fields import UUIDField
|
|
|
|
|
|
class decimal(Decimal):
|
|
pass
|
|
|
|
def parse_iso8601_datetime(iso_time):
|
|
return datetime.strptime(iso_time.replace("-", ""), "%Y%m%dT%H:%M:%S")
|
|
|
|
def parse_iso8601_date(iso_time):
|
|
return datetime.strptime(iso_time.replace("-", ""), "%Y%m%d").date()
|
|
|
|
def parse_iso8601_time(iso_time):
|
|
return datetime.strptime(iso_time, "%Y%m%d").time()
|
|
|
|
ATTRIBUTE_TYPES = {
|
|
str: dict(field=CharField),
|
|
int: dict(field=IntegerField),
|
|
float: dict(field=FloatField),
|
|
decimal: dict(field=DecimalField),
|
|
date: dict(field=DateField, coerce=parse_iso8601_date),
|
|
time: dict(field=TimeField, coerce=parse_iso8601_time),
|
|
datetime: dict(field=DateTimeField, coerce=parse_iso8601_datetime),
|
|
}
|
|
ATTRIBUTE_TYPES_MAP = dict(((t.__name__, t) for t in ATTRIBUTE_TYPES))
|
|
ATTRIBUTE_TYPE_CHOICES = [(t.__name__, t.__name__) for t in ATTRIBUTE_TYPES]
|
|
|
|
class Schema(models.Model):
|
|
name = models.CharField(max_length=128, unique=True)
|
|
display_name = models.CharField(max_length=256)
|
|
views = models.ManyToManyField('django_directory.View', blank=True)
|
|
object_of = models.ManyToManyField('django_directory.Predicate',
|
|
related_name='object_schemas', blank=True)
|
|
value_of = models.ManyToManyField('django_directory.Predicate',
|
|
related_name='value_schemas', blank=True)
|
|
|
|
def __unicode__(self):
|
|
return self.display_name or self.name
|
|
|
|
class Principal(models.Model):
|
|
uuid = UUIDField()
|
|
schema = models.ForeignKey(Schema)
|
|
|
|
@property
|
|
def attributes(self):
|
|
return Attributes(self)
|
|
|
|
@property
|
|
def user(self):
|
|
'''Return the user linked to this principal, if none exists, create
|
|
one.
|
|
'''
|
|
try:
|
|
return self.principaluser.user
|
|
except PrincipalUser.DoesNotExist:
|
|
if not self.id:
|
|
raise ValueError('Principal must have been saved to have an user')
|
|
user = User.get_or_create(username='principal#%u' % self.uuid)
|
|
principal_user = PrincipalUser(user=user, principal_ptr_id=self.pk)
|
|
principal_user.__dict__.update(self.__dict__)
|
|
principal_user.save()
|
|
return user
|
|
|
|
def __unicode__(self):
|
|
try:
|
|
user = self.principaluser.user
|
|
return self.uuid + " " + user
|
|
except PrincipalUser.DoesNotExist:
|
|
return self.uuid
|
|
|
|
class PrincipalUser(Principal):
|
|
user = models.OneToOneField(User, related_name='principal')
|
|
|
|
class Attribute(models.Model):
|
|
'''An attribute MUST have a globally unique name,
|
|
you can use URI for example.
|
|
'''
|
|
name = models.CharField(max_length=128, unique=True)
|
|
display_name = models.CharField(max_length=256)
|
|
multivalued = models.BooleanField(blank=True)
|
|
type_name = models.CharField(max_length=10, choices=ATTRIBUTE_TYPE_CHOICES)
|
|
possible_choices = models.TextField(blank=True, null=True)
|
|
required = models.BooleanField(blank=True)
|
|
|
|
@property
|
|
def type(self):
|
|
return ATTRIBUTE_TYPES_MAP.get(self.type_name)
|
|
|
|
@property
|
|
def coerce(self):
|
|
t = ATTRIBUTE_TYPES_MAP.get(self.type_name)
|
|
return ATTRIBUTE_TYPES[t].get('coerce', t)
|
|
|
|
@property
|
|
def choices(self):
|
|
result = []
|
|
t = self.type()
|
|
if self.possible_choices:
|
|
for choice in self.possible_choices.strip(','):
|
|
result.append(t(choice.strip()))
|
|
return result
|
|
|
|
|
|
|
|
@property
|
|
def field(self, *args, **kwargs):
|
|
t = self.type
|
|
if self.choices:
|
|
coerce = ATTRIBUTE_TYPES[t].get('coerce', t)
|
|
return TypedChoiceField( choices=self.choices, coerce=coerce)
|
|
else:
|
|
return ATTRIBUTE_TYPES[t]['field'](*args, required=self.required,
|
|
**kwargs)
|
|
|
|
|
|
|
|
|
|
def __unicode__(self):
|
|
return self.display_name or self.name
|
|
|
|
class Meta:
|
|
ordering = ('display_name',)
|
|
|
|
class ViewMember(models.Model):
|
|
'''Try to put some order upon all those attributes.
|
|
|
|
Attribute should be displayed in UI with this ordering, attribute with
|
|
the same order can be displayed on the same line for example.
|
|
'''
|
|
attribute = models.ForeignKey(Attribute)
|
|
view = models.ForeignKey('django_directory.View')
|
|
order = models.IntegerField(default=0)
|
|
|
|
def __unicode__(self):
|
|
return u'%s.%s %s' % (self.view, self.attribute, self.order)
|
|
|
|
class Meta:
|
|
ordering = ('order','attribute',)
|
|
|
|
class View(models.Model):
|
|
'''Group attributes together.
|
|
|
|
Attributes without a group should not be displayed or editable.
|
|
'''
|
|
name = models.CharField(max_length=128, primary_key=True)
|
|
display_name = models.CharField(max_length=128)
|
|
order = models.IntegerField(default=0)
|
|
attributes = models.ManyToManyField(Attribute,
|
|
blank=True, null=True, related_name='views',
|
|
through=ViewMember)
|
|
|
|
def __unicode__(self):
|
|
return self.display_name or self.name
|
|
|
|
def natural_key(self):
|
|
return (self.name,)
|
|
|
|
class Meta:
|
|
ordering = ('order', 'display_name', 'name')
|
|
|
|
class AttributeValue(models.Model):
|
|
'''Value for an attribute of a principal'''
|
|
user = models.ForeignKey(Principal, related_name='attributes')
|
|
attribute = models.ForeignKey(Attribute, related_name='values')
|
|
text_value = models.TextField()
|
|
|
|
@property
|
|
def value(self):
|
|
'''Coerce the stored text value into a Python value'''
|
|
return self.attribute.coerce(self.text_value)
|
|
|
|
def natural_key(self):
|
|
return (self.value,) + self.attribute.natural_key() + self.principal.natural_key()
|
|
|
|
def clean(self):
|
|
if self.attribute.multivalued and self.objects.filter(user=self.user, attribute=self.attribute) \
|
|
.exclude(pk=self.pk):
|
|
raise ValidationError(_(u'Attribute %u is not multi-valued') % self.attribute)
|
|
|
|
class Meta:
|
|
unique_together = (('user', 'attribute', 'text_value'),)
|
|
|
|
|
|
class AttributeValuesAsSet(object):
|
|
def __init__(self, attribute, **kwargs):
|
|
self.attribute = attribute
|
|
self.kwargs = kwargs
|
|
|
|
def qs(self, value=None):
|
|
qs = AttributeValue.objects.filter(*self.kwargs)
|
|
if value:
|
|
qs = qs.filter(text_value=unicode(value))
|
|
return qs
|
|
|
|
def add(self, value):
|
|
value = unicode(value)
|
|
if value not in self:
|
|
Attribute.get_or_create(text_value=value, **self.kwargs)
|
|
|
|
def update(self, values):
|
|
values = map(unicode, values)
|
|
values = set(values) - set(iter(self))
|
|
# since Django 1.4
|
|
if hasattr(AttributeValue.objects, 'bulk_create'):
|
|
AttributeValue.objects.bulk_create((
|
|
AttributeValue(text_value=value, **self.kwargs) for value in values))
|
|
else:
|
|
for value in values:
|
|
self.add(value)
|
|
|
|
def remove(self, value):
|
|
if value not in self:
|
|
raise KeyError()
|
|
self.discard(value)
|
|
|
|
def discard(self, value):
|
|
self.qs(value).delete()
|
|
|
|
def clear(self):
|
|
self.qs().delete()
|
|
|
|
def __contains__(self, value):
|
|
return bool(self.qs(value))
|
|
|
|
def __len__(self):
|
|
return self.qs().count()
|
|
|
|
def __iter__(self):
|
|
return iter(self.qs().values_list('text_value', flat=True))
|
|
|
|
|
|
class Attributes(object):
|
|
def __init__(self, principal):
|
|
self.principal = principal
|
|
|
|
def keys(self):
|
|
return Attribute.objects.filter(values__principal=self.principal)
|
|
|
|
def __getitem__(self, attribute):
|
|
if not hasattr(attribute, '_meta'):
|
|
attribute = Attribute.objects.get(id=attribute)
|
|
return AttributeValuesAsSet(attribute, principal=self.principal)
|
|
|
|
def __setitem__(self, attribute, values):
|
|
s = self[attribute]
|
|
s.update(values)
|
|
|
|
def __contains__(self, attribute):
|
|
if not hasattr(attribute, '_meta'):
|
|
attribute = Attribute.objects.get(id=attribute)
|
|
return bool(AttributeValue.filter(attribute=attribute))
|
|
|
|
class DirectoryUser(User):
|
|
@property
|
|
def principal(self):
|
|
return PrincipalUser.get_or_create(user=self)
|
|
|
|
@property
|
|
def attributes(self):
|
|
self.principal.attributes
|
|
|
|
class Meta:
|
|
proxy = True
|
|
|
|
class Predicate(models.Model):
|
|
name = models.CharField(max_length=128, unique=True)
|
|
display_name = models.CharField(max_length=256)
|
|
|
|
def __unicode__(self):
|
|
return self.display_name or self.name
|
|
|
|
class Relation(models.Model):
|
|
'''State a relation between two principals'''
|
|
object = models.ForeignKey(Principal, related_name='child_relations')
|
|
predicate = models.ForeignKey(Predicate, related_name='relations')
|
|
value = models.ForeignKey(Principal, related_name='parent_relations')
|
|
|
|
def update_schema(sender, **kwargs):
|
|
from . import schema
|
|
print 'update schema', kwargs
|
|
schema.get_schemas(True)
|
|
|
|
for model in (Schema, View, ViewMember, Attribute, Predicate, Relation):
|
|
post_save.connect(update_schema, sender=model,
|
|
dispatch_uid='schema-rebuild')
|
|
post_delete.connect(update_schema, sender=model,
|
|
dispatch_uid='schema-rebuild')
|
|
|