146 lines
6.1 KiB
Python
146 lines
6.1 KiB
Python
import sys
|
|
import locale
|
|
import logging
|
|
import json
|
|
|
|
from django.contrib.auth.models import User, Group
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.management.base import BaseCommand
|
|
from django.db import transaction
|
|
from django.db.models import FieldDoesNotExist, CharField
|
|
|
|
from django.core import serializers
|
|
|
|
from authentic2.attribute_aggregator.models import (AttributeItem,
|
|
AttributeList, AttributeSource)
|
|
from authentic2.idp.models import AttributePolicy
|
|
|
|
from authentic2.saml.models import (SPOptionsIdPPolicy, LibertyProviderPolicy,
|
|
LibertyProvider, LibertyServiceProvider, LibertyFederation)
|
|
from authentic2.models import Attribute, AttributeValue
|
|
from authentic2_idp_oauth2.models import A2Client
|
|
|
|
|
|
class MockManager(object):
|
|
def __init__(self, seq):
|
|
self.seq = seq
|
|
|
|
def iterator(self):
|
|
return self.seq
|
|
class MockManagerDescriptorEmpty(object):
|
|
def __get__(self, instance, xxx):
|
|
return MockManager([])
|
|
|
|
class MockManagerDescriptorGroups(object):
|
|
def __get__(self, instance, xxx):
|
|
return instance.__dict__['groups']
|
|
|
|
class MockObject(object):
|
|
def __init__(self, **kwargs):
|
|
self.__dict__.update(kwargs)
|
|
|
|
class Command(BaseCommand):
|
|
args = ''
|
|
help = '''Migrate portail citoyen to authentic'''
|
|
|
|
def get_objects(self):
|
|
user_model = get_user_model()
|
|
# serialize groups
|
|
Group.permissions = MockManagerDescriptorEmpty()
|
|
for group in Group.objects.all():
|
|
yield group
|
|
# serialize attribute item, lists, and sources
|
|
for at_item in AttributeItem.objects.select_related():
|
|
yield at_item
|
|
for at_list in AttributeList.objects.select_related():
|
|
yield at_list
|
|
for at_source in AttributeSource.objects.select_related():
|
|
yield at_source
|
|
# serialize attributepolicy
|
|
for at_policy in AttributePolicy.objects.select_related():
|
|
yield at_policy
|
|
# serialize policiees & providers
|
|
for ppolicy in LibertyProviderPolicy.objects.select_related():
|
|
yield ppolicy
|
|
for sppolicy in SPOptionsIdPPolicy.objects.select_related():
|
|
yield sppolicy
|
|
for provider in LibertyProvider.objects.select_related():
|
|
yield provider
|
|
for sp in LibertyServiceProvider.objects.select_related():
|
|
yield sp
|
|
assert user_model is not User, "You don't need this script, user model is django.contrib.auth.models.User"
|
|
# serialize user and user attributes
|
|
BASE_FIELDS = ('username', 'email', 'first_name', 'last_name',
|
|
'last_login', 'password', 'date_joined', 'is_superuser', 'is_staff', 'is_active')
|
|
attributes = {}
|
|
attribute_values = []
|
|
# butt ugly work around the fact that the classical user model is not
|
|
# initialized, so we need to simulate many2many fields and also to
|
|
# simulate "through" models of those sames fields in order to make the
|
|
# JSON serializer happy
|
|
for m2m_field in User._meta.many_to_many:
|
|
m2m_field.rel.through = MockObject(_meta=MockObject(auto_created=True))
|
|
User.groups = MockManagerDescriptorGroups()
|
|
User.user_permissions = MockManagerDescriptorEmpty()
|
|
# end of workaround
|
|
for user in user_model.objects.all().select_related().prefetch_related('groups'):
|
|
new_user = User()
|
|
for field in BASE_FIELDS:
|
|
try:
|
|
user_model._meta.get_field(field)
|
|
except FieldDoesNotExist:
|
|
continue
|
|
setattr(new_user, field, getattr(user, field))
|
|
# simulale m2m fields, depends on the mock descriptors MockRel1 and MockRel2
|
|
new_user.__dict__['groups'] = user.groups.all()
|
|
yield new_user
|
|
for field in user_model._meta.fields:
|
|
if field.attname in BASE_FIELDS:
|
|
continue
|
|
if field.name == 'id':
|
|
continue
|
|
assert isinstance(field, (CharField,)), 'only CharField is supported: %s' % field
|
|
value = getattr(user, field.attname, None)
|
|
if not value:
|
|
continue
|
|
if field.attname not in attributes:
|
|
required = hasattr(user_model, 'REQUIRED_FIELDS') \
|
|
and field.attname in user_model.REQUIRED_FIELDS
|
|
asked_on_registration = hasattr(user_model, 'REGISTER_FIELDS') \
|
|
and field.attname in user_model.REGISTER_FIELDS
|
|
attributes[field.attname] = Attribute(name=field.attname,
|
|
label=field.attname, kind='string',
|
|
asked_on_registration=asked_on_registration,
|
|
required=required)
|
|
av = AttributeValue(
|
|
attribute=attributes[field.attname],
|
|
content=json.dumps(value))
|
|
av.owner = new_user
|
|
attribute_values.append(av)
|
|
for attribute in attributes.values():
|
|
yield attribute
|
|
for attribute_value in attribute_values:
|
|
yield attribute_value
|
|
for federation in LibertyFederation.objects.select_related():
|
|
yield federation
|
|
|
|
oauth2_client = A2Client(authorized_scopes='read write read+write',
|
|
name='portail citoyen', url='https://portail-citoyen',
|
|
client_type=0,
|
|
client_id='1',
|
|
client_secret='1234')
|
|
yield oauth2_client
|
|
|
|
@transaction.commit_on_success
|
|
def handle(self, *args, **options):
|
|
self.logger = logging.getLogger()
|
|
locale.setlocale(locale.LC_ALL, '')
|
|
handler = logging.StreamHandler(stream=sys.stderr)
|
|
self.logger.addHandler(handler)
|
|
if options['verbosity'] > 2:
|
|
handler.setLevel(level=logging.DEBUG)
|
|
json_serializer = serializers.get_serializer('json')()
|
|
json_serializer.serialize(self.get_objects(), stream=sys.stdout,
|
|
indent=2,
|
|
use_natural_keys=True)
|