408 lines
17 KiB
Python
408 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
# authentic2-wallonie-connect - Authentic2 plugin for the Wallonie Connect usecase
|
|
# Copyright (C) 2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
import functools
|
|
import json
|
|
|
|
from django.db import transaction
|
|
from django.utils import six
|
|
|
|
from authentic2_idp_oidc.models import OIDCClient, OIDCClaim, generate_uuid
|
|
from authentic2.a2_rbac.models import Role, OrganizationalUnit
|
|
from authentic2.custom_user.models import User
|
|
|
|
|
|
from django.core.management.base import BaseCommand
|
|
|
|
import hashlib
|
|
import unicodedata
|
|
|
|
|
|
class DryRun(Exception):
|
|
pass
|
|
|
|
|
|
DEFAULT_CLAIMS = [
|
|
{"name": "given_name", "value": "first_name", "scopes": ["profile"]},
|
|
{"name": "family_name", "value": "last_name", "scopes": ["profile"]},
|
|
{"name": "email", "value": "email", "scopes": ["email"]},
|
|
]
|
|
|
|
|
|
def dryrun(func):
|
|
@functools.wraps(func)
|
|
def f(*args, **kwargs):
|
|
try:
|
|
with transaction.atomic():
|
|
return func(*args, **kwargs)
|
|
except DryRun:
|
|
pass
|
|
|
|
return f
|
|
|
|
|
|
def create_password(password):
|
|
m = hashlib.md5(password)
|
|
return m.hexdigest()[10]
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Create validation requests"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument("--no-dry-run")
|
|
parser.add_argument("paths", nargs="+")
|
|
|
|
def handle(self, paths, no_dry_run=False, verbosity=1, **options):
|
|
self.no_dry_run = no_dry_run
|
|
self.verbosity = verbosity
|
|
for path in paths:
|
|
with open(path) as fd:
|
|
contents = json.load(fd)
|
|
if contents.get("data", None):
|
|
contents = contents.get("data")
|
|
if not isinstance(contents, list):
|
|
contents = [contents]
|
|
self.do(contents=contents)
|
|
contents = {"data": contents}
|
|
try:
|
|
with open(path, "w") as fd:
|
|
json.dump(contents, fd, indent=4)
|
|
except PermissionError:
|
|
pass
|
|
|
|
def info(self, *args, **kwargs):
|
|
if self.verbosity >= 1:
|
|
self.stdout.write(*args, **kwargs)
|
|
|
|
@dryrun
|
|
def do(self, contents):
|
|
for content in contents:
|
|
locality = content.get(
|
|
"locality",
|
|
{"name": "Collectivité par défaut", "slug": "default"},
|
|
)
|
|
|
|
self.info("Locality %s" % locality["name"], ending=" ")
|
|
|
|
ou, created = OrganizationalUnit.objects.get_or_create(
|
|
slug=locality["slug"], defaults={"name": locality["name"]}
|
|
)
|
|
|
|
if not created:
|
|
if ou.name != locality["name"]:
|
|
ou.name = locality["name"]
|
|
ou.save()
|
|
self.info(self.style.SUCCESS("UPDATED"))
|
|
else:
|
|
self.info("unchanged")
|
|
else:
|
|
self.info(self.style.SUCCESS("CREATED"))
|
|
|
|
services = {}
|
|
|
|
content_services = content.get("services", [])
|
|
assert isinstance(content_services, list)
|
|
|
|
for service in content_services:
|
|
name = service["name"]
|
|
self.info("Service %s " % name, ending=" ")
|
|
slug = service["slug"]
|
|
client_id = service.get("client_id")
|
|
client_secret = service.get("client_secret")
|
|
frontchannel_logout_uri = service["frontchannel_logout_uri"]
|
|
assert isinstance(frontchannel_logout_uri, six.text_type)
|
|
post_logout_redirect_uris = service.get("post_logout_redirect_uris", [])
|
|
assert isinstance(post_logout_redirect_uris, list)
|
|
open_to_all = service.get("open_to_all", False)
|
|
redirect_uris = service.get("redirect_uris", [])
|
|
assert isinstance(redirect_uris, list)
|
|
has_api_access = service.get("has_api_access", False)
|
|
assert isinstance(has_api_access, bool)
|
|
identifier_policy = service.get(
|
|
"identifier_policy", OIDCClient.POLICY_UUID
|
|
)
|
|
assert isinstance(identifier_policy, int)
|
|
authorization_flow = service.get(
|
|
"authorization_flow", OIDCClient.FLOW_AUTHORIZATION_CODE
|
|
)
|
|
assert isinstance(authorization_flow, int)
|
|
|
|
idtoken_algo = OIDCClient.ALGO_HMAC
|
|
if "idtoken_algo" in service:
|
|
idtoken_algo = getattr(
|
|
OIDCClient, "ALGO_" + service["idtoken_algo"].upper()
|
|
)
|
|
scope = service.get("scope", "")
|
|
verified_source_apps = service.get("verified_source_apps", [])
|
|
oidc_client, created = OIDCClient.objects.get_or_create(
|
|
slug=service["slug"],
|
|
ou=ou,
|
|
defaults={
|
|
"name": name,
|
|
"client_id": client_id or generate_uuid(),
|
|
"client_secret": client_secret or generate_uuid(),
|
|
"frontchannel_logout_uri": frontchannel_logout_uri,
|
|
"post_logout_redirect_uris": "\n".join(
|
|
post_logout_redirect_uris
|
|
),
|
|
"redirect_uris": "\n".join(redirect_uris),
|
|
"has_api_access": has_api_access,
|
|
"identifier_policy": identifier_policy,
|
|
"authorization_flow": authorization_flow,
|
|
"idtoken_algo": idtoken_algo,
|
|
"scope": scope,
|
|
},
|
|
)
|
|
services[slug] = {"oidc_client": oidc_client}
|
|
if not created:
|
|
modified = False
|
|
for key in (
|
|
"name",
|
|
"client_id",
|
|
"client_secret",
|
|
"frontchannel_logout_uri",
|
|
"post_logout_redirect_uris",
|
|
"redirect_uris",
|
|
"has_api_access",
|
|
"idtoken_algo",
|
|
"identifier_policy",
|
|
"authorization_flow",
|
|
"scope",
|
|
):
|
|
value = locals()[key]
|
|
if value is None:
|
|
continue
|
|
if getattr(oidc_client, key) != value:
|
|
if isinstance(value, list):
|
|
if getattr(oidc_client, key) != "\n".join(value):
|
|
setattr(oidc_client, key, "\n".join(value))
|
|
modified = True
|
|
else:
|
|
setattr(oidc_client, key, value)
|
|
modified = True
|
|
if modified:
|
|
oidc_client.save()
|
|
self.info(self.style.SUCCESS("MODIFIED"))
|
|
else:
|
|
self.info("unchanged")
|
|
else:
|
|
self.info(self.style.SUCCESS("CREATED"))
|
|
service["client_id"] = oidc_client.client_id
|
|
service["client_secret"] = oidc_client.client_secret
|
|
|
|
if not open_to_all:
|
|
access_role, created = Role.objects.get_or_create(
|
|
slug=slug, ou=ou, defaults={"name": slug}
|
|
)
|
|
if not created and access_role.slug != slug:
|
|
access_role.name = slug
|
|
access_role.save()
|
|
services[slug]["access_role"] = access_role
|
|
if access_role not in oidc_client.authorized_roles.all():
|
|
oidc_client.add_authorized_role(access_role)
|
|
self.info(self.style.SUCCESS("MODIFIED"))
|
|
appid = "-".join(slug.split("-")[1:])
|
|
access_role_admin_slug = "{0}-admin".format(appid)
|
|
try:
|
|
access_role_admin = Role.objects.get(
|
|
slug=access_role_admin_slug,
|
|
)
|
|
except Role.DoesNotExist:
|
|
access_role_admin = None
|
|
if (
|
|
access_role_admin
|
|
and access_role_admin not in oidc_client.authorized_roles.all()
|
|
):
|
|
oidc_client.add_authorized_role(access_role_admin)
|
|
self.info(self.style.SUCCESS("MODIFIED"))
|
|
else:
|
|
Role.objects.filter(slug=slug, ou=ou).delete()
|
|
|
|
# access role to news, events or directory
|
|
if len(verified_source_apps) > 0:
|
|
for verified_source_app in verified_source_apps:
|
|
verified_source_app_slug = f"{slug}-{verified_source_app}"
|
|
verified_source_app_role, created = Role.objects.get_or_create(
|
|
slug=verified_source_app_slug,
|
|
ou=ou,
|
|
defaults={"name": verified_source_app_slug},
|
|
)
|
|
if (
|
|
not created
|
|
and verified_source_app_role.slug
|
|
!= verified_source_app_slug
|
|
):
|
|
access_role.name = verified_source_app_slug
|
|
access_role.save()
|
|
verified_source_app_oidc_client = OIDCClient.objects.get(
|
|
slug=f"imio-{verified_source_app}"
|
|
)
|
|
if (
|
|
access_role
|
|
not in verified_source_app_oidc_client.authorized_roles.all()
|
|
):
|
|
verified_source_app_oidc_client.add_authorized_role(
|
|
access_role
|
|
)
|
|
self.info(
|
|
self.style.SUCCESS(
|
|
f"{verified_source_app_oidc_client.slug} MODIFIED"
|
|
)
|
|
)
|
|
|
|
claims = service.get("claims", DEFAULT_CLAIMS)
|
|
assert isinstance(claims, list), "claims must be a list of dic"
|
|
claim_set = set()
|
|
|
|
for claim in claims:
|
|
assert isinstance(claim, dict), "claims must be a list of dict"
|
|
name = claim["name"]
|
|
value = claim["value"]
|
|
scopes = claim["scopes"]
|
|
assert name and isinstance(
|
|
name, six.string_types
|
|
), "claim's name must be a non-empty string"
|
|
assert value and isinstance(
|
|
name, six.string_types
|
|
), "claim's value must be a non-empty string"
|
|
assert (
|
|
scopes
|
|
and isinstance(scopes, list)
|
|
and all(isinstance(x, six.string_types) for x in scopes)
|
|
), "claim's scope must be a non-empty list of strings"
|
|
|
|
oidc_claim, created = OIDCClaim.objects.get_or_create(
|
|
client=oidc_client,
|
|
name=claim["name"],
|
|
value=claim["value"],
|
|
defaults={"scopes": " ".join(scopes)},
|
|
)
|
|
if not created:
|
|
if set(oidc_claim.get_scopes()) != set(scopes):
|
|
oidc_claim.scopes = " ".join(scopes)
|
|
oidc_claim.save()
|
|
|
|
content_users = content.get("users", [])
|
|
assert isinstance(content_users, list)
|
|
|
|
password = None
|
|
email = None
|
|
first_name = None
|
|
last_name = None
|
|
for content_user in content_users:
|
|
required = ["email", "username"]
|
|
imported_password = True
|
|
if "password" not in content_user.keys():
|
|
content_user["password"] = create_password(
|
|
"{0}@{1}1".format(
|
|
unicodedata.normalize(
|
|
"NFKD", content_user.get("first_name")
|
|
)
|
|
.encode("ASCII", "ignore")
|
|
.lower(),
|
|
unicodedata.normalize("NFKD", content_user.get("last_name"))
|
|
.encode("ASCII", "ignore")[0:3]
|
|
.capitalize(),
|
|
)
|
|
)
|
|
imported_password = False
|
|
data = {}
|
|
for string_key in (
|
|
"email",
|
|
"first_name",
|
|
"last_name",
|
|
"password",
|
|
"username",
|
|
):
|
|
assert string_key in content_user, "missing key " + string_key
|
|
value = content_user[string_key]
|
|
if imported_password:
|
|
assert isinstance(value, six.text_type), (
|
|
"invalid type for key " + string_key
|
|
)
|
|
if string_key in required:
|
|
assert value, (
|
|
"missing value for key " + string_key + " %s" % content_user
|
|
)
|
|
data[string_key] = content_user[string_key]
|
|
assert "password" in data
|
|
uuid = content_user.get("uuid") or None
|
|
assert uuid is None or (
|
|
isinstance(uuid, six.text_type) and uuid
|
|
), "invalid uuid %s %s" % (uuid, content_user)
|
|
allowed_services = content_user.get("allowed_services", [])
|
|
assert isinstance(allowed_services, list)
|
|
|
|
defaults = data.copy()
|
|
if uuid is not None:
|
|
self.info("User %s-%s" % (data["username"], uuid), ending=" ")
|
|
kwargs = {"uuid": uuid, "ou": ou, "defaults": defaults}
|
|
else:
|
|
self.info("User %s" % data["username"], ending=" ")
|
|
kwargs = {
|
|
"username": defaults.pop("username"),
|
|
"ou": ou,
|
|
"defaults": defaults,
|
|
}
|
|
user, created = User.objects.get_or_create(**kwargs)
|
|
if created:
|
|
user.set_password(defaults["password"])
|
|
user.save()
|
|
self.info(self.style.SUCCESS("CREATED"))
|
|
else:
|
|
modified = False
|
|
for key in defaults:
|
|
if getattr(user, key) != defaults[key] and key != "password":
|
|
setattr(user, key, defaults[key])
|
|
modified = True
|
|
if modified:
|
|
user.save()
|
|
self.info(self.style.SUCCESS("MODIFIED"))
|
|
else:
|
|
self.info("unchanged")
|
|
content_user["uuid"] = user.uuid
|
|
for service_slug in allowed_services:
|
|
if "service_slug" in services.keys():
|
|
role = services[service_slug]["access_role"]
|
|
service = services[service_slug]["oidc_client"]
|
|
else:
|
|
role = Role.objects.get(slug=service_slug)
|
|
service = OIDCClient.objects.get(slug=service_slug)
|
|
|
|
self.info("Access to service %s" % service.name, ending=" ")
|
|
if role.members.filter(pk=user.pk).exists():
|
|
self.info("unchanged")
|
|
else:
|
|
role.members.add(user)
|
|
self.info(self.style.SUCCESS("ADDED"))
|
|
for service_slug in set(services) - set(allowed_services):
|
|
role = services[service_slug]["access_role"]
|
|
service = services[service_slug]["oidc_client"]
|
|
self.info("Access to service %s" % service.name, ending=" ")
|
|
if role.members.filter(pk=user.pk).exists():
|
|
role.members.remove(user)
|
|
self.info(self.style.SUCCESS("REMOVED"))
|
|
else:
|
|
self.info("unchanged")
|
|
|
|
if self.no_dry_run:
|
|
return
|
|
raise DryRun
|