This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-fc/src/authentic2_auth_fc/models.py

95 lines
3.1 KiB
Python

import requests
import json
import logging
import urlparse
from requests_oauthlib import OAuth2Session
from authentic2.compat import user_model_label
from django.db import models
from django.db.models.query import Q
from django.db.models.signals import post_save
from django.utils.translation import ugettext_lazy as _
from django.dispatch import receiver
from . import app_settings
class FcAccountManager(models.Manager):
def cleanup(self):
for fc_account in self.filter(Q(user__isnull=True)
|Q(user__deleteduser__isnull=False)):
fc_account.delete()
class FcAccount(models.Model):
user = models.OneToOneField(user_model_label,
verbose_name=_('user'),
default=None,
null=True,
on_delete=models.SET_NULL)
agc = models.CharField(max_length=64, verbose_name=_('access grant code'))
token = models.TextField(verbose_name=_('access token'))
objects = FcAccountManager()
def delete(self, *args, **kwargs):
logger = logging.getLogger(__name__)
try:
self.api_call('app/rest/agc', method='delete')
logger.info('fc link deleted for %r', unicode(self.user))
except requests.RequestException:
logger.warn('msk failed to delete link for %r, deleting locally '
'anyway', unicode(self.user))
super(FcAccount, self).delete(*args, **kwargs)
def api_call(self, api_path, method='get', **kwargs):
url = urlparse.urljoin(app_settings.api_url, api_path)
session = OAuth2Session(app_settings.client_id,
token=json.loads(self.token))
return getattr(session, method)(url,
verify=app_settings.verify_certificate,
**kwargs)
def refresh_token(self):
logger = logging.getLogger(__name__)
if not self.token:
return True
token = json.loads(self.token)
data = {
'grant_type': 'refresh_token',
'refresh_token': token['refresh_token'],
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
}
response = requests.post(app_settings.token_url,
data=data, verify=app_settings.verify_certificate)
new_token = response.json()
if 'error' in new_token:
if new_token['error'] == 'invalid_grant':
logger.warning('obsolete token %r, deleting FcAccount %r', self.token,
self.agc)
self.delete()
return False
return True
else:
self.token = json.dumps(new_token)
self.save()
return True
@receiver(post_save)
def delete_fc_account_on_account_deletion(sender, instance, created, *args, **kwargs):
from authentic2.models import DeletedUser
if sender is not DeletedUser:
return
if not created:
return
logger = logging.getLogger(__name__)
for fc_account in FcAccount.objects.filter(user=instance.user):
try:
fc_account.delete()
except:
logger.exception('unable to delete fc accounts')