remove contrib.agoraplus connector (#39671)

This commit is contained in:
Thomas NOËL 2020-02-10 10:18:02 +01:00
parent 049198cb33
commit 570f7bff3c
24 changed files with 0 additions and 3436 deletions

View File

@ -1,24 +0,0 @@
Connect Publik with Agora+ softwares
====================================
How to use
----------
Add to your settings.py
# local_settings.py:
INSTALLED_APPS += ('passerelle.contrib.agoraplus',)
Provisioning
------------
Each time a Agora+ family is linked to a Publik account, address and phones of
the family can be sent to Authentic. To activate this, add:
AUTHENTIC_AUTH_TUPLE = ('login', 'password')
If you also want add/delete roles on link/unlink family, get UUID roles in
Authentic and add them in:
PASSERELLE_APP_AGORAPLUS_ROLES_UUIDS = ['4839125218905', '4839125218905']

View File

@ -1,152 +0,0 @@
# -*- encoding: utf-8 -*-
#
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
def agoraize_date(date):
try:
return '%s' % datetime.strptime(date, '%Y-%m-%d').strftime('%d/%m/%Y')
except ValueError:
return ''
def dict_with_id_or_text(ref, textkey='nom'):
if isinstance(ref, dict):
if ref.get('id'):
return {'id': ref.get('id')}
return {'id': '', textkey: ref.get('text') or ''}
return {'id': '', textkey: ''}
def agoraize_address(address):
commune = dict_with_id_or_text(address.get('city'))
commune['cp'] = address.get('post_code') or ''
address_id = address.get('id') or ''
if address_id.startswith('sas_'):
address_id = ''
return {
'id': address_id,
'numVoie': address.get('number') or '',
'btq': dict_with_id_or_text(address.get('btq'), 'libelle'),
'typeVoie': dict_with_id_or_text(address.get('type_of_street')),
'voie': dict_with_id_or_text(address.get('street')),
'complementAdresse': address.get('complements') or '',
'commune': commune,
}
def agoraize_adult(adult):
adult_id = adult.get('id') or ''
if adult_id.startswith('sas_'):
adult_id = ''
return {
'id': adult_id,
'email': adult.get('email') or '',
'position': '%s' % adult.get('index') or '',
'civilite': dict_with_id_or_text(adult.get('title')),
'nom': adult.get('last_name') or '',
'prenom': adult.get('first_name') or '',
'dateNaissance': agoraize_date(adult.get('birthdate') or ''),
'villeNaissance': adult.get('birth_city') or '',
'paysNaissance': adult.get('birth_country') or '',
'mobilePerso': adult.get('cellphone') or '',
'fixPerso': adult.get('phone') or '',
'adresse': agoraize_address(adult.get('address') or {}),
'profession': adult.get('profession') or '',
'pcs': dict_with_id_or_text(adult.get('pcs')),
'mobilePro': adult.get('work_cellphone') or '',
'fixPro': adult.get('work_phone') or '',
'employeur': adult.get('employer') or '',
'telEmployeur': adult.get('employer_phone') or '',
'adresseEmployeur': agoraize_address(adult.get('employer_address') or {}),
'secu': '', # mandatory
}
def agoraize_family(family):
family_id = family.get('id') or ''
if family_id.startswith('sas_'):
family_id = ''
res = {
'id': family_id,
'Representants': [agoraize_adult(adult) for adult in family.get('adults') or []],
'Enfants': [],
'Contacts': [],
'Documents': [],
}
informations = family.get('informations', {})
res['Informations complementaires'] = {
'situationFamiliale': {'id': informations.get('situation', {}).get('id') or ''},
'nbEnfants': informations.get('number_of_children') or '',
'regime': {'id': ''},
'numAlloc': '',
'A.E.E.H': '0',
'communicationAdresse': '0',
'allocataire': '',
}
return res
def agoraize_child(child, representant_ids={}):
child_id = child.get('id') or ''
if child_id.startswith('sas_'):
child_id = ''
agora_child = {
'id': child_id,
'position': '%s' % child.get('index') or '',
'nom': child.get('last_name') or '',
'prenom': child.get('first_name') or '',
'dateNaissance': agoraize_date(child.get('birthdate') or ''),
'villeNaissance': child.get('birth_city') or '',
'paysNaissance': child.get('birth_country') or '',
'mobile': child.get('cellphone') or '',
'contacts': [],
'etablissementScolaire': {'id': '', 'designation': ''},
'niveauInscription': {'id': '', 'designation': ''},
'autoriteParentale': [],
}
for pid in child['parental_autorithy']:
representant_id = representant_ids.get(pid['id']) or pid['id']
agora_child['autoriteParentale'].append({'id': representant_id,
'type': 'representant'})
if child.get('sex') == 'M':
agora_child['sexe'] = u'Masculin'
else:
agora_child['sexe'] = u'Féminin'
return agora_child
def agoraize_contact(contact, child_id_map={}):
contact_id = contact.get('id') or ''
if contact_id.startswith('sas_'):
contact_id = ''
agora_contact = {
'id': contact_id,
'email': contact.get('email', ''),
'position': '%s' % (contact.get('index') or ''),
'civilite': dict_with_id_or_text(contact.get('title')),
'nom': contact.get('last_name') or '',
'prenom': contact.get('first_name') or '',
'adresse': agoraize_address(contact.get('address') or {}),
'mobilePerso': contact.get('cellphone') or '',
'fixPerso': contact.get('phone') or '',
'enfants': []
}
for child in contact.get('children', []):
agora_contact['enfants'].append({
'rang': '1', # XXX does it matter ?
'id': child_id_map.get(child['id']) or child['id'],
'lien': {'id': child['link']['id'], 'nom': ''},
'interdiction': child.get('banned') and '1' or '0',
'autoriteParentale': child.get('parental_authority') and '1' or '0',
})
return agora_contact

View File

@ -1,51 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from .keywordize import keywordize_family
def finalize_address(address):
address['text'] = u'{number}{btq[text]} {type_of_street[text]} {street[text]}'.format(**address).strip()
address['full'] = address['text']
end = u'{post_code} {city[text]}'.format(**address).strip()
if end:
address['full'] += u', ' + end
return address
def finalize_person(person):
person['text'] = u'%(first_name)s %(last_name)s' % person
birthdate = person.get('birthdate')
if birthdate:
born = datetime.strptime(birthdate, '%Y-%m-%d')
today = datetime.today()
person['age_in_days'] = (today - born).days
person['age'] = today.year - born.year - \
int((today.month, today.day) < (born.month, born.day))
if person.get('address'):
person['address'] = finalize_address(person['address'])
if person.get('employer_address'):
person['employer_address'] = finalize_address(person['employer_address'])
return person
def finalize_family(family):
family['adults'] = [finalize_person(p) for p in family.get('adults') or []]
family['children'] = [finalize_person(p) for p in family.get('children') or []]
family['contacts'] = [finalize_person(p) for p in family.get('contacts') or []]
if not family.get('address') and len(family['adults']) > 0:
family['address'] = family['adults'][0]['address']
keywordize_family(family)
return family

View File

@ -1,50 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
AGE_LIMIT = 20
from datetime import datetime
def keywordize_child(child):
child['keywords'] = child.get('keywords') or []
if child.get('birthdate'):
child['keywords'].append('enfant-annee-%s' % child['birthdate'][:4])
if child.get('age'):
age = int(child.get('age'))
child['keywords'].append('enfant-age-%s' % age)
for younger in range(0, age):
child['keywords'].append('enfant-age-plus-de-%s' % younger)
for older in range(age, AGE_LIMIT):
child['keywords'].append('enfant-age-moins-de-%s' % (older+1))
else:
child['keywords'].append('enfant-age-inconnu')
if child.get('school'):
child['keywords'].append('enfant-inscrit')
grade = child['school'].get('grade', {}).get('id', 'inconnu') or 'inconnu'
child['keywords'].append('enfant-inscrit-niveau-%s' % grade)
school = child['school'].get('id', 'inconnu') or 'inconnu'
child['keywords'].append('enfant-inscrit-etablissement-%s' % school)
else:
child['keywords'].append('enfant-non-inscrit')
return child
def keywordize_family(family):
family['keywords'] = family.get('keywords') or []
family['keywords'].append('famille-%d-adultes' % len(family['adults']))
family['keywords'].append('famille-%d-enfants' % len(family['children']))
family['keywords'].append('famille-%d-contacts' % len(family['contacts']))
family['children'] = [keywordize_child(p) for p in family.get('children') or []]
return family

View File

@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('base', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='AgoraPlus',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('title', models.CharField(verbose_name='Title', max_length=50)),
('slug', models.SlugField(verbose_name='Identifier', unique=True)),
('description', models.TextField(verbose_name='Description')),
('url', models.CharField(max_length=128, verbose_name='Webservices Base URL')),
('login', models.CharField(max_length=128, verbose_name='Auth login')),
('oauth_consumer_key', models.CharField(max_length=128, verbose_name='Auth oauth_consumer_key')),
('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')),
('username', models.CharField(max_length=128, verbose_name='HTTP Basic Auth username', blank=True)),
('password', models.CharField(max_length=128, verbose_name='HTTP Basic Auth password', blank=True)),
('keystore', models.FileField(help_text='Certificate and private key in PEM format', upload_to=b'agoraplus', null=True, verbose_name='Keystore', blank=True)),
('users', models.ManyToManyField(to='base.ApiUser', related_name='_agoraplus_users_+', related_query_name='+', blank=True)),
],
options={
'verbose_name': 'Agora+',
},
bases=(models.Model,),
),
migrations.CreateModel(
name='AgoraPlusLink',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name_id', models.CharField(max_length=256)),
('login', models.CharField(max_length=128)),
('password', models.CharField(max_length=128)),
('resource', models.ForeignKey(to='agoraplus.AgoraPlus', on_delete=models.CASCADE)),
],
options={
},
bases=(models.Model,),
),
]

View File

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
import jsonfield.fields
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='AgoraPlusSas',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name_id', models.CharField(max_length=256)),
('kind', models.CharField(max_length=16, choices=[(b'ADULT', b'ADULT'), (b'CHILD', b'CHILD'), (b'CONTACT', b'CONTACT')])),
('agoraplus_id', models.CharField(max_length=16, blank=True)),
('value', jsonfield.fields.JSONField(default=dict, blank=True)),
('resource', models.ForeignKey(to='agoraplus.AgoraPlus', on_delete=models.CASCADE)),
],
options={
},
bases=(models.Model,),
),
]

View File

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0002_agoraplussas'),
]
operations = [
migrations.AlterField(
model_name='agoraplussas',
name='kind',
field=models.CharField(max_length=16, choices=[(b'FAMILY', b'FAMILY'), (b'CHILD', b'CHILD'), (b'CONTACT', b'CONTACT')]),
preserve_default=True,
),
]

View File

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0003_auto_20151102_1737'),
]
operations = [
migrations.AlterField(
model_name='agoraplussas',
name='kind',
field=models.CharField(max_length=16, choices=[(b'FAMILY', b'FAMILY'), (b'ADULT', b'ADULT'), (b'CHILD', b'CHILD'), (b'CONTACT', b'CONTACT')]),
preserve_default=True,
),
]

View File

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0004_auto_20151111_1809'),
]
operations = [
migrations.AddField(
model_name='agoraplus',
name='log_level',
field=models.CharField(default=b'NOTSET', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL')]),
preserve_default=True,
),
]

View File

@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0005_agoraplus_log_level'),
]
operations = [
migrations.AddField(
model_name='agoraplus',
name='frontoffice_url',
field=models.CharField(max_length=128, verbose_name='Frontoffice URL (SSO)', blank=True),
),
]

View File

@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0006_agoraplus_frontoffice_url'),
]
operations = [
migrations.AlterField(
model_name='agoraplus',
name='slug',
field=models.SlugField(verbose_name='Identifier', unique=True),
),
]

View File

@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.12 on 2018-11-19 13:42
from __future__ import unicode_literals
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('agoraplus', '0007_auto_20170920_0951'),
]
operations = [
migrations.RemoveField(
model_name='agoraplus',
name='log_level',
),
]

File diff suppressed because it is too large Load Diff

View File

@ -1,387 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unicodedata
import re
from datetime import datetime, timedelta
from decimal import Decimal
import logging
# handle years before 1900
from django.utils import datetime_safe, six
# NNNN@mailprov.no emails are not real ones: we will ignore them.
# (They are used by Agora+ to create "fake" login for each user id=NNNN)
PROVISIONING_DOMAIN = 'mailprov.no'
DATE_FORMAT = '%Y-%m-%d'
DATETIME_FORMAT = DATE_FORMAT + 'T%H:%M:%S'
# invoices possible status
PAID_INVOICE = 1
UNPAID_INVOICE = 2
LITIGATION_INVOICE = 3
logger = logging.getLogger('passerelle.contrib.agoraplus')
def slugify(s, space='-'):
if s is None:
return ''
if not isinstance(s, six.text_type):
s = s.decode('utf-8', errors='ignore')
s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore')
s = re.sub(r'[^\w\s\'%s]' % space, '', s).strip().lower()
s = re.sub(r'[\s\'%s]+' % space, space, s)
return s
def normalize_date(date):
return u'%s' % datetime_safe.datetime.strptime(date, '%d/%m/%Y').strftime('%Y-%m-%d')
def denormalize_date(date):
return u'%s' % datetime_safe.datetime.strptime(date, '%Y-%m-%d').strftime('%d/%m/%Y')
def normalize_reference(reference):
new_reference = {}
new_reference['id'] = u'%s' % reference['ID']
new_reference['text'] = reference.get('NAME') or reference.get('NOM') or u''
if 'CP' in reference:
new_reference['post_code'] = reference['CP']
if 'CODE_POSTAL' in reference:
new_reference['post_code'] = reference['CODE_POSTAL']
if 'NAME_VILLE' in reference and 'ID_VILLE' in reference:
new_reference['city'] = {
'id': u'%s' % reference['ID_VILLE'],
'text': reference['NAME_VILLE'],
}
return new_reference
def normalize_address(address):
new_address = {
'number': u'%s' % address['numVoie'],
'btq': {
'text': address['btq']['libelle'],
'id': u'%s' % address['btq']['id'],
},
'type_of_street': {
'text': address['typeVoie']['nom'],
'id': u'%s' % address['typeVoie']['id'],
},
'street': {
'text': address['voie']['nom'],
'id': u'%s' % address['voie']['id'],
},
'complements': address['complementAdresse'],
'post_code': address['commune']['cp'],
'city': {
'text': address['commune']['nom'],
'id': u'%s' % address['commune']['id'],
},
}
if 'id' in address:
new_address['id'] = u'%s' % address['id']
new_address['text'] = u'{number}{btq[text]} {type_of_street[text]} {street[text]}'.format(**new_address).strip()
new_address['full'] = new_address['text']
end = u'{post_code} {city[text]}'.format(**new_address).strip()
if end:
new_address['full'] += u', ' + end
return new_address
def normalize_person(person):
new_person = {}
if 'id' in person:
new_person['id'] = u'%s' % person['id']
new_person['first_name'] = person['prenom']
new_person['last_name'] = person['nom']
new_person['text'] = u'%(first_name)s %(last_name)s' % new_person
if 'position' in person:
new_person['index'] = int(person['position'])
if person.get('dateNaissance'):
new_person['birthdate'] = normalize_date(person['dateNaissance'])
if 'villeNaissance' in person:
new_person['birth_city'] = person['villeNaissance']
if 'paysNaissance' in person:
new_person['birth_country'] = person['paysNaissance']
if 'sexe' in person:
new_person['sex'] = person['sexe'][0] # u'M' or u'F'
if 'civilite' in person:
new_person['title'] = {
'id': u'%s' % person['civilite']['id'],
'text': person['civilite']['nom'],
}
if 'email' in person:
new_person['login'] = person['email']
if not person['email'].endswith('@' + PROVISIONING_DOMAIN):
# real email
new_person['email'] = person['email']
# child
if 'mobile' in person:
new_person['cellphone'] = person['mobile']
if 'etablissementScolaire' in person:
new_person['school'] = {
'id': u'%s' % person['etablissementScolaire']['id'],
'text': person['etablissementScolaire']['designation'],
}
if 'niveauInscription' in person:
new_person['school']['grade'] = {
'id': u'%s' % person['niveauInscription']['id'],
'text': person['niveauInscription']['designation'],
}
if 'autoriteParentale' in person:
new_person['parental_authority'] = [
{ 'id': u'%s' % adult['id'] }
for adult in person['autoriteParentale']
if adult['type'] == 'representant'
]
# adult
if 'adresse' in person:
new_person['address'] = normalize_address(person['adresse'])
if 'fixPerso' in person:
new_person['phone'] = person['fixPerso']
if 'mobilePerso' in person:
new_person['cellphone'] = person['mobilePerso']
if 'profession' in person:
new_person['profession'] = person['profession']
if 'employeur' in person:
new_person['employer'] = person['employeur']
if 'telEmployeur' in person:
new_person['employer_phone'] = person['telEmployeur']
if 'adresseEmployeur' in person:
new_person['employer_address'] = normalize_address(person['adresseEmployeur'])
# contact
if 'enfants' in person:
new_person['children'] = [
{
'child_id': u'%s' % item['id'],
'link': {
'id': u'%s' % item['lien']['id'],
'text': u'%s' % item['lien']['nom'],
},
'parental_authority': bool(int(item['autoriteParentale'])),
'index': item['rang'],
'banned': bool(int(item['interdiction'])),
}
for item in person['enfants']]
return new_person
def normalize_invoice(invoice):
"""
Input invoice data to format
{u'dateEmission': u'2015-06-23T00:00:00',
u'dateLimitePayment': u'2015-07-17T00:00:00',
u'datePrelevement': u'2015-06-30T00:00:00',
u'id': u'9122',
u'montantDu': u'0',
u'nomFacturation': u'ACTIVITES PERISCOLAIRES MAI 2015',
u'paye': u'54.58',
u'prelevement': False,
u'publication': False,
u'statutEmission': {u'id': 1, u'name': u'pay\xe9e'},
u'total': u'54.58'}
"""
if not invoice.get('publication'):
return None
invoice['id'] = str(invoice['id'])
invoice['display_id'] = invoice['id']
invoice['total_amount'] = Decimal(invoice['total'])
invoice['amount'] = Decimal(invoice['montantDu'])
invoice['has_pdf'] = True
invoice['created'] = datetime.strptime(invoice['dateEmission'], DATETIME_FORMAT).date()
invoice['label'] = invoice['nomFacturation']
invoice['pay_limit_date'] = datetime.strptime(invoice['dateLimitePayment'], DATETIME_FORMAT).date()
status = invoice['statutEmission'].get('id')
autobilling = invoice['prelevement']
invoice['online_payment'] = False
if autobilling:
invoice['no_online_payment_reason'] = 'autobilling'
invoice['online_payment'] = False
if status == PAID_INVOICE:
invoice['paid'] = True
elif status == UNPAID_INVOICE:
invoice['paid'] = False
if not autobilling:
invoice['online_payment'] = True
elif status == LITIGATION_INVOICE:
invoice['paid'] = False
invoice['no_online_payment_reason'] = 'litigation'
else:
logger.warning('unknown invoice status: %s' % status)
return None
return invoice
def normalize_family(family):
new_family = {}
if 'id' in family:
new_family['id'] = u'%s' % family['id']
new_family['adults'] = [normalize_person(p) for p in family['Representants']]
if len(new_family['adults']) > 0:
new_family['address'] = new_family['adults'][0]['address']
informations = {}
for k,v in family.get('Informations complementaires', {}).items():
if k == 'situationFamiliale':
informations['situation'] = {
'id': u'%s' % v['id'],
'text': u'%s' % v['libelle'],
}
elif k == 'allocataire':
adult_id = u'%s' % v
for adult in new_family['adults']:
if adult['id'] == adult_id:
informations['allowance_adult'] = {
'id': adult.get('index'),
'text': adult['text'],
}
break
elif k == 'numAlloc':
informations['allowance_number'] = u'%s' % v
elif k == 'nbEnfants':
informations['number_of_children'] = u'%s' % v
elif k == 'regime':
informations['regime'] = {
'id': u'%s' % v['id'],
'text': v['name'],
}
elif k == 'A.E.E.H':
informations['allowance_aeeh'] = u'%s' % v
new_family['informations'] = informations
# build links between children and contacts
children = {}
for child in family['Enfants']:
nchild = normalize_person(child)
children[nchild['id']] = nchild
contacts = []
for contact in family['Contacts']:
contact = normalize_person(contact)
for child in contact['children']:
child['text'] = children.get(child['child_id'], {}).get('text')
contacts.append(contact)
new_family['children'] = children.values()
new_family['contacts'] = contacts
return new_family
def normalize_school(school):
new_school = {}
new_school['id'] = u'%s' % school['ID']
new_school['text'] = school.get('NAME') or u''
address = ''
if 'NUMERO_1' in school:
address += u'%s ' % school['NUMERO_1']
if 'TYPE_VOIE' in school:
address += u'%s ' % school['TYPE_VOIE']
if 'NOM_VOIE' in school:
address += u'%s' % school['NOM_VOIE']
new_school['address'] = address
if 'VILLE' in school:
new_school['city'] = school['VILLE']
if 'CODE_POSTAL':
new_school['post_code'] = school['CODE_POSTAL']
return new_school
def normalize_planning(planning):
new_planning = {}
new_planning['date'] = planning['date'][:10]
new_planning['year'] = planning['date'][0:4]
new_planning['month'] = planning['date'][5:7]
new_planning['expected_in'] = planning['arriveePrev']
new_planning['in'] = planning['arriveeReelle']
new_planning['expected_out'] = planning['departPrev']
new_planning['out'] = planning['departReel']
new_planning['message'] = planning['typeAbsence'].get('name', '').strip()
new_planning['absent'] = (new_planning['in'] == '00:00' and
new_planning['out'] == '00:00') or (not new_planning['in'] and
not new_planning['out']) or planning['typeAbsence'].get('id')
new_planning['deductible'] = planning['typeAbsence'].get('deductible') == 'TRUE'
new_planning['absence_name'] = planning['typeAbsence'].get('name')
new_planning['differ'] = (new_planning['in'] != new_planning['expected_in'] and
new_planning['out'] != new_planning['expected_out'])
return new_planning
def normalize_nursery_enroll_results(results):
"""
return only last month result, if exists
"""
if not results:
return {'decision': ''}
result = results.pop()
try:
date = datetime.strptime(result['dateCommission'], '%d/%m/%Y').date()
except ValueError:
date = datetime.strptime(result['dateCommission'], '%d/%m/%y').date()
if (datetime.today().date() - date).days > 30:
return {'decision': ''}
new_result = {}
new_result['date'] = date
new_result['date_fr'] = date.strftime('%d/%m/%Y')
new_result['decision'] = slugify(result['decision'])
new_result['proposed_structure'] = result['strctProposee']
if result.get('dateDecisionFammille'):
try:
date = datetime.strptime(result['dateDecisionFammille'], '%d/%m/%Y').date()
except ValueError:
date = datetime.strptime(result['dateDecisionFammille'], '%d/%m/%y').date()
new_result['family_decision_date'] = date
new_result['family_decision_date_fr'] = date.strftime('%d/%m/%Y')
else:
new_result['family_decision_date'] = ''
new_result['family_decision_date_fr'] = ''
new_result['family_decision'] = result.get('decisionFamille') or ''
new_result['family_decision_reason'] = result.get('MotifRefusFamille') or ''
new_result['comment'] = result.get('commentaire') or ''
if result.get('dateMaintien'):
try:
date = datetime.strptime(result['dateMaintien'], '%d/%m/%Y').date()
except ValueError:
date = datetime.strptime(result['dateMaintien'], '%d/%m/%y').date()
new_result['maintain_date'] = date
new_result['maintain_date_fr'] = date.strftime('%d/%m/%Y')
else:
new_result['maintain_date'] = ''
new_result['maintain_date_fr'] = ''
return new_result
def normalize_school_enrollment(enrollment):
child = enrollment['enfant']
year = enrollment['anneeRef']
child_fullname = '%s %s' % (child['prenom'], child['nom'])
school = enrollment['etablissementAffectation']
enrollment_level = enrollment['niveau']
return {'id': enrollment['idInscScol'], 'child_id': child['id'],
'year_id': year['id'],
'text': '%s, %s / %s, %s' % (child_fullname, school['name'],
enrollment_level['name'], year['name']),
}
def normalize_periscol_enrollment(enrollment):
return {'id': u'%s' % enrollment['ID'],
'text': enrollment['ACTIVITY']['DESCRIPTION'],
'url': '/periscol/enrollment/%s/planning/' % enrollment['ID'],
'start_date': enrollment['START_DATE'],
'end_date': enrollment['END_DATE']}
def normalize_periscol_enrollment_planning(planning):
# remove hour from planning date to be able to format it
date, hour = planning['DAY'].split('T')
text = '%s (%s-%s)' % (denormalize_date(date), planning['START_TIME'], planning['END_TIME'])
return {'id': planning['DAY'], 'text': text}
def normalize_periscol_enrollment_reservation(reservation):
# remove hour from planning date to be able to format it
date, hour = reservation['DAY'].split('T')
return {'id': str(reservation['ID']), 'text': denormalize_date(date)}

View File

@ -1,84 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import requests
import logging
from django.conf import settings
logger = logging.getLogger('passerelle.contrib.agoraplus')
def get_authentic_url():
if hasattr(settings, 'KNOWN_SERVICES'):
return list(settings.KNOWN_SERVICES.get('authentic').items())[0][1]['url']
return None
def provision_roles(uuid, method='POST'):
roles_uuids = getattr(settings, 'PASSERELLE_APP_AGORAPLUS_ROLES_UUIDS', None)
if not roles_uuids:
return
authentic_url = get_authentic_url()
credentials = getattr(settings, 'AUTHENTIC_AUTH_TUPLE', None)
if not authentic_url or not credentials:
return
# if credentials is it settings.json, it's a list, cast it:
credentials = tuple(credentials)
for role_uuid in roles_uuids:
logger.info('%s role %s to user NameID:%s', method, role_uuid, uuid)
requests.request(method, authentic_url + 'api/roles/%s/members/%s/' % (role_uuid, uuid),
headers={'Content-type': 'application/json'}, auth=credentials)
def patch_authentic_user(uuid, user):
authentic_url = get_authentic_url()
credentials = getattr(settings, 'AUTHENTIC_AUTH_TUPLE', None)
if not authentic_url or not credentials:
return
# if credentials is it settings.json, it's a list, cast it:
credentials = tuple(credentials)
data = json.dumps(user)
logger.debug('patch Authentic user NameID:%s with data:%s', uuid, data)
authentic_responses = requests.patch(
authentic_url + 'api/users/' + uuid + '/',
data=data,
headers={'Content-type': 'application/json'},
auth=credentials)
if authentic_responses.status_code != 200:
logger.warning('patch Authentic user NameID:%r returns status_code %r', uuid,
authentic_responses.status_code)
def provision_attributes(family, login, name_id):
# get adult from the family
for adult in family.get('adults', []):
if adult['login'] == login:
break
else:
return
# build Authentic api user
user = {}
if adult.get('address'):
user['address'] = adult['address']['text']
user['zipcode'] = adult['address']['post_code']
user['city'] = adult['address']['city']['text']
user['country'] = adult['address'].get('country', '')
if adult.get('cellphone'):
user['mobile'] = adult['cellphone']
if adult.get('phone'):
user['phone'] = adult['phone']
# send it to Authentic IdP
if user:
patch_authentic_user(uuid=name_id, user=user)

View File

@ -1,260 +0,0 @@
{% extends "passerelle/manage/service_view.html" %}
{% load i18n passerelle %}
{% block description %}
{% endblock %}
{% block endpoints %}
<ul>
<li>{% trans 'Check WS availability:' %} <a href="{% url 'agoraplus-ping' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-ping' slug=object.slug %}</a>[?debug]</li>
</ul>
<h4>{% trans "Family" %}</h4>
<ul>
<li>{% trans 'Link:' %} <a href="{% url 'agoraplus-family-link' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-family-link' slug=object.slug %}?NameID=…&login=…&password=…</a></li>
<li>{% trans 'Unlink:' %} <a href="{% url 'agoraplus-family-unlink' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-family-unlink' slug=object.slug %}?NameID=…</a></li>
<li>{% trans 'Get all family informations:' %} <a href="{% url 'agoraplus-family' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-family' slug=object.slug %}?NameID=…</a></li>
<li>{% trans 'Get specific family items (list of adults, children, etc.):' %}
<a href="{% url 'agoraplus-family-items' slug=object.slug key='key' %}"
>{{ site_base_uri }}{% url 'agoraplus-family-items' slug=object.slug key='key' %}?NameID=...</a>
<em>key</em>: <em>adults</em>, <em>children</em>, <em>contacts</em>, etc.</li>
<li>{% trans 'Get specific family item:' %}
<a href="{% url 'agoraplus-family-item' slug=object.slug key='key' item_id='id' %}"
>{{ site_base_uri }}{% url 'agoraplus-family-item' slug=object.slug key='key' item_id='id' %}?NameID=...</a></li>
</ul>
<h4>{% trans "User" %}</h4>
<ul>
<li>{% trans 'User:' %} <a href="{% url 'agoraplus-user' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-user' slug=object.slug %}</a>[?login=…]</li>
<li>{% trans 'Authentication:' %} <a href="{% url 'agoraplus-auth' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-auth' slug=object.slug %}?login=…&password=…</a></li>
<li>{% trans 'Update phone number:' %} POST <a href="{% url 'agoraplus-phone-update' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-phone-update' slug=object.slug %}?NameID=…</a></li>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields' %}
<li>{% trans 'Update profession:' %} POST <a href="{% url 'agoraplus-profession-update' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-profession-update' slug=object.slug %}?NameID=…</a></li>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields' %}
</ul>
<h4>{% trans "Address" %}</h4>
<ul>
<li>{% trans 'Communes:' %}
<a href="{% url 'agoraplus-commune' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-commune' slug=object.slug %}</a>[id/|?q=…]</li>
<li>{% trans 'Type of streets:' %}
<a href="{% url 'agoraplus-type-of-street' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-type-of-street' slug=object.slug %}</a>[id/]</li>
<li>{% trans 'Streets:' %}
<a href="{% url 'agoraplus-street' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-street' slug=object.slug %}</a>[id/|?q=…]</li>
<li>{% trans 'List of streets of a commune, by type:' %}
<a href="{% url 'agoraplus-street-search' slug=object.slug commune_id='commune-id' type_of_street_id='tos-id' %}"
>{{ site_base_uri }}{% url 'agoraplus-street-search' slug=object.slug commune_id='commune-id' type_of_street_id='tos-id' %}</a>[id/|?q=…]</li>
<li>{% trans 'Update address:' %} POST
<a href="{% url 'agoraplus-address-update' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-address-update' slug=object.slug %}?NameID=…</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields' %}
</li>
</ul>
<h4>{% trans "Other references" %}</h4>
<ul>
<li>{% trans 'Generic reference:' %} <a href="{% url 'agoraplus-reference' slug=object.slug name="name" %}"
>{{ site_base_uri }}{% url 'agoraplus-reference' slug=object.slug name="name" %}</a>[id/|?q=…]
<em>key</em>: <em>civilites</em> <em>annees_refs</em>, <em>modes_accueils</em> (see Agora+ documentation)</li>
<li>{% trans 'School:' %} <a href="{% url 'agoraplus-school' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-school' slug=object.slug %}</a>[id/]</li>
<li>{% trans 'Schools for a child, based on the family adress:' %}
<a href="{% url 'agoraplus-school' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-school' slug=object.slug %}?NameID=…&amp;(child_id=…|birthdate=yyyy-mm-dd)[&year_id=NN]</a>
<em>year_id</em> from {% url 'agoraplus-reference' slug=object.slug name="annees_refs" %}</li>
<li>{% trans 'All schools for a birthdate:' %}
<a href="{% url 'agoraplus-school' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-school' slug=object.slug %}?birthdate=yyyy-mm-dd</a></li>
<li>{% trans 'Educational Stages:' %}
<a href="{% url 'agoraplus-educational-stage' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-educational-stage' slug=object.slug %}</a>[id/]</li>
<li>{% trans 'Educational Stages for a child:' %}
<a href="{% url 'agoraplus-educational-stage' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-educational-stage' slug=object.slug %}</a>?(NameID=…&amp;child_id=…|birthdate=yyyy-mm-dd)[&year_id=NN]
<em>year_id</em> from {% url 'agoraplus-reference' slug=object.slug name="annees_refs" %}</li>
</ul>
<h4>{% trans 'Verifications before injection' %}</h4>
<ul>
<li>{% trans 'Email:' %} <a href="{% url 'agoraplus-exist-email' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-exist-email' slug=object.slug %}?email=test@example.net</a></li>
<li>{% trans 'Child:' %} <a href="{% url 'agoraplus-exist-child' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-exist-child' slug=object.slug %}?first_name=xxx&last_name=xxx&birthdate=yyyy-mm-dd</a></li>
<li>{% trans 'Adult (representant):' %} <a href="{% url 'agoraplus-exist-representant' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-exist-representant' slug=object.slug %}?first_name=xxx&last_name=xxx[&birthdate=yyyy-mm-dd]</a></li>
<li>{% trans 'List all existing persons:' %} <a href="{% url 'agoraplus-exist-list' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-exist-list' slug=object.slug %}?first_name=xxx&last_name=xxx[&birthdate=yyyy-mm-dd]</a></li>
</ul>
<h4>{% trans 'Insert objects in sas' %}</h4>
<ul>
<li>{% trans 'Add a family:' %} POST <a href="{% url 'agoraplus-sas-family' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-family' slug=object.slug %}</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields and extra' %}
</li>
<li>{% trans 'Add a child:' %} POST <a href="{% url 'agoraplus-sas-child' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-child' slug=object.slug %}</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields and extra' %}
</li>
<li>{% trans 'Add a contact:' %} POST <a href="{% url 'agoraplus-sas-contact' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-contact' slug=object.slug %}</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields and extra' %}
</li>
<li>{% trans 'Check duplicates in sas objects:' %} <a href="{% url 'agoraplus-sas-check-duplicates' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-check-duplicates' slug=object.slug %}?NameID=…</a>
</li>
</ul>
<h4>{% trans 'Push sas objects to Agora+' %}</h4>
<ul>
<li>{% trans 'Push sas family (adults):' %} <a href="{% url 'agoraplus-sas-family-push' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-family-push' slug=object.slug %}?NameID=…[&validation]</a></li>
<li>{% trans 'Push sas child:' %} <a href="{% url 'agoraplus-sas-child-push' slug=object.slug sas_child_id='NNN' %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-child-push' slug=object.slug sas_child_id='NNN' %}?NameID=…</a></li>
<li>{% trans 'Push sas contact:' %} <a href="{% url 'agoraplus-sas-contact-push' slug=object.slug sas_contact_id='NNN' %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-contact-push' slug=object.slug sas_contact_id='NNN' %}?NameID=…</a></li>
</ul>
<h4>{% trans 'Remove sas objects' %}</h4>
<ul>
<li>{% trans 'Remove a child in sas:' %} <a href="{% url 'agoraplus-sas-child-delete' slug=object.slug sas_child_id='NNN' %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-child-delete' slug=object.slug sas_child_id='NNN' %}?NameID=…</a></li>
<li>{% trans 'Remove a contact in sas:' %} <a href="{% url 'agoraplus-sas-contact-delete' slug=object.slug sas_contact_id='NNN' %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-contact-delete' slug=object.slug sas_contact_id='NNN' %}?NameID=…</a></li>
<li>{% trans 'Remove a family (all related objects):' %} <a href="{% url 'agoraplus-sas-family-delete' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-sas-family-delete' slug=object.slug %}?NameID=…</a>
</li>
</ul>
<h4>{% trans 'Incomes' %}</h4>
<ul>
<li>{% trans 'Incomes declaration:' %} POST <a href="{% url 'agoraplus-incomes-declaration' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-incomes-declaration' slug=object.slug %}?NameID=…</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields' %}
</li>
</ul>
<h4>{% trans 'Enrollments' %}</h4>
<ul>
<li>{% trans 'School Enrollment:' %} POST <a href="{% url 'agoraplus-school-enrollment' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-school-enrollment' slug=object.slug %}</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields' %}
</li>
<li>{% trans 'Children school enrollments:' %} GET <a href="{% url 'agoraplus-school-enrollments' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-school-enrollments' slug=object.slug %}?NameID=…</a>
</li>
<li>{% trans 'Nursery Enrollment:' %} POST <a href="{% url 'agoraplus-nursery-enrollment' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-nursery-enrollment' slug=object.slug %}</a>
— {% trans 'JSON payload: wcs Formdata with Agora+ fields' %}
</li>
<li>{% trans 'Nursery Enrollment Result:' %} <a href="{% url 'agoraplus-nursery-enrollment-result' enroll_id='NNN' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-nursery-enrollment-result' enroll_id='NNN' slug=object.slug %}</a>
</li>
</ul>
<h4>{% trans 'Invoices' %}</h4>
<ul>
<li>
{% url 'agoraplus-invoices' slug=object.slug as payable_invoices_url %}
{% trans 'Get payable invoices:' %} <a href="{{ payable_invoices_url }}">{{ site_base_uri }}{{ payable_invoices_url }}?NameID=…</a>
</li>
<li>
{% url 'agoraplus-invoices-history' slug=object.slug as invoices_history_url %}
{% trans 'Get invoices history:' %} <a href="{{ invoices_history_url }}">{{ site_base_uri }}{{ invoices_history_url }}?NameID=…</a>
</li>
<li>
{% url 'agoraplus-invoice' slug=object.slug invoice_id='invoice_id' as invoice_url %}
{% trans 'Get invoice details:' %} <a href="{{ invoice_url }}">{{ site_base_uri }}{{ invoice_url }}?NameID=…</a>
</li>
<li>
{% url 'agoraplus-invoice-pdf' slug=object.slug invoice_id='invoice_id' as invoice_pdf_url %}
{% trans 'Get invoice PDF:' %} <a href="{{ invoice_pdf_url }}">{{ site_base_uri }}{{ invoice_pdf_url }}?NameID=…</a>
</li>
<li>
{% url 'agoraplus-invoice-payment' slug=object.slug invoice_id='invoice_id' as invoice_payment_url %}
{% trans 'Pay invoice(POST):' %} <a href="{{ invoice_payment_url }}">{{ site_base_uri }}{{ invoice_payment_url }}?NameID=…</a>
{% trans 'JSON payload data:' %}
<ul>
<li>transaction_number</li>
<li>transaction_date</li>
</ul>
</li>
</ul>
<h4>{% trans 'Documents' %}</h4>
<ul>
<li>{% trans 'Document (pass-through):' %} POST <a href="{% url 'agoraplus-document' slug=object.slug %}"
>{{ site_base_uri }}{% url 'agoraplus-document' slug=object.slug %}</a>
{% trans 'JSON payload:' %}
<ul>
<li><em>uri</em> {% trans 'uri, with query-string' %}</li>
<li><em>content_type</em> {% trans 'document must have this content-type' %}</li>
</ul>
</li>
</ul>
<h4>{% trans 'Periscol enrollments' %}</h4>
<ul>
<li>
{% url 'agoraplus-child-periscol-enrollments' slug=object.slug child_id='child_id' as periscol_enrollments_url %}
{% trans 'Get child periscol enrollments:' %} <a href="{{ periscol_enrollments_url }}">{{ site_base_uri }}{{ periscol_enrollments_url }}?service_id=7&NameID=…&start_days=35</a>
</li>
<li>
{% url 'agoraplus-enrollment-planning' slug=object.slug enrollment_id='enrollment_id' as periscol_enrollment_planning_url %}
{% trans 'Get enrollment planning:' %} <a href="{{ periscol_enrollment_planning_url }}">{{ site_base_uri }}{{ periscol_enrollment_planning_url }}</a>
</li>
<li>
{% url 'agoraplus-periscol-child-enrollment-planning' slug=object.slug child_id='child_id' enrollment_id='enrollment_id' as periscol_child_enrollment_planning_url %}
{% trans 'Get child enrollment planning:' %} <a href="{{ periscol_child_enrollment_planning_url }}">{{ site_base_uri }}{{ periscol_child_enrollment_planning_url }}?NameID=…&reserved_day=-1&start_days=2&end_days=365</a>
{% trans 'days:' %}
<ul>
<li><em>-1</em>: {% trans "all days" %}</li>
<li><em>0</em>: {% trans "not reserved days" %}</li>
<li><em>1</em>: {% trans "reserved days" %}</li>
</ul>
</li>
<li>
{% url 'agoraplus-periscol-add-reservation' slug=object.slug as periscol_add_reservation_url %}
{% trans 'Add periscol reservation:' %} POST <a href="{{ periscol_add_reservation_url }}">{{ site_base_uri }}{{ periscol_add_reservation_url }}</a>
{% trans 'JSON payload:' %}
<ul>
<li><em>enrollment_id</em> {% trans 'enrollment id for a child' %}</li>
<li><em>dates</em> {% trans 'a list of dates of enrollment planning' %}</li>
<li><em>tarif_id</em> {% trans 'tarif identifier for this reservation' %}</li>
<li><em>majored</em> {% trans 'boolean specifying if the tarif is majored' %}</li>
</ul>
</li>
<li>
{% url 'agoraplus-periscol-reservations-delete' slug=object.slug as periscol_enrollment_reservations_delete_url %}
{% trans 'Delete enrollment reservations:' %} POST <a href="{{ periscol_enrollment_reservations_delete_url }}">{{ site_base_uri }}{{ periscol_enrollment_reservations_delete_url }}</a>
{% trans 'JSON payload:' %}
<ul>
<li><em>activity_id</em> {% trans 'activity identifier' %}</li>
<li><em>dates</em> {% trans 'list of dates to delete' %}</li>
</ul>
</li>
</ul>
{% endblock %}
{% block security %}
<p>
{% trans 'Access is limited to the following API users:' %}
</p>
{% access_rights_table resource=object permission='can_access' %}
<p>
{% trans 'Sas access is limited to the following API users:' %}
</p>
{% access_rights_table resource=object permission='can_access_sas' %}
{% endblock %}

View File

@ -1,132 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf.urls import include, url
from django.contrib.auth.decorators import login_required
from .views import *
urlpatterns = [
url(r'^(?P<slug>[\w-]+)/$', AgoraPlusDetailView.as_view(),
name='agoraplus-view'),
url(r'^(?P<slug>[\w-]+)/ping/$', PingView.as_view(),
name='agoraplus-ping'),
url(r'^(?P<slug>[\w-]+)/family/link/$', LinkView.as_view(),
name='agoraplus-family-link'),
url(r'^(?P<slug>[\w-]+)/family/unlink/$', UnlinkView.as_view(),
name='agoraplus-family-unlink'),
url(r'^(?P<slug>[\w-]+)/family/$', FamilyView.as_view(),
name='agoraplus-family'),
url(r'^(?P<slug>[\w-]+)/family/(?P<key>[\w-]+)/$', FamilyItemView.as_view(),
name='agoraplus-family-items'),
url(r'^(?P<slug>[\w-]+)/family/(?P<key>[\w-]+)/(?P<item_id>[\w-]+)/$', FamilyItemView.as_view(),
name='agoraplus-family-item'),
url(r'^(?P<slug>[\w-]+)/sso/$', login_required(RedirectSSOView.as_view()),
name='agoraplus-sso'),
url(r'^(?P<slug>[\w-]+)/users/$', UserView.as_view(),
name='agoraplus-user'),
url(r'^(?P<slug>[\w-]+)/auth/$', AuthView.as_view(),
name='agoraplus-auth'),
url(r'^(?P<slug>[\w-]+)/phone/$', PhoneUpdateView.as_view(),
name='agoraplus-phone-update'),
url(r'^(?P<slug>[\w-]+)/profession/$', ProfessionUpdateView.as_view(),
name='agoraplus-profession-update'),
url(r'^(?P<slug>[\w-]+)/reference/(?P<name>[\w-]+)/((?P<reference_id>[\w-]+)/)?$',
ReferenceView.as_view(), name='agoraplus-reference'),
url(r'^(?P<slug>[\w-]+)/schools/((?P<school_id>[\w-]+)/)?$', SchoolView.as_view(),
name='agoraplus-school'),
url(r'^(?P<slug>[\w-]+)/educational-stages/((?P<edustage_id>[\w-]+)/)?$', EducationalStageView.as_view(),
name='agoraplus-educational-stage'),
url(r'^(?P<slug>[\w-]+)/address/communes/((?P<commune_id>[\w-]+)/)?$', CommuneView.as_view(),
name='agoraplus-commune'),
url(r'^(?P<slug>[\w-]+)/address/types-of-streets/((?P<type_of_street_id>[\w-]+)/)?$', TypeOfStreetView.as_view(),
name='agoraplus-type-of-street'),
url(r'^(?P<slug>[\w-]+)/address/communes/(?P<commune_id>[\w-]+)/types-of-streets/(?P<type_of_street_id>[\w-]+)/$', StreetView.as_view(),
name='agoraplus-street-search'),
url(r'^(?P<slug>[\w-]+)/address/streets/((?P<street_id>[\w-]+)/)?$', StreetView.as_view(),
name='agoraplus-street'),
url(r'^(?P<slug>[\w-]+)/address/?$', AddressUpdateView.as_view(),
name='agoraplus-address-update'),
url(r'^(?P<slug>[\w-]+)/exist/email/$', ExistEmailView.as_view(),
name='agoraplus-exist-email'),
url(r'^(?P<slug>[\w-]+)/exist/child/$', ExistChildView.as_view(),
name='agoraplus-exist-child'),
url(r'^(?P<slug>[\w-]+)/exist/representant/$', ExistRepresentantView.as_view(),
name='agoraplus-exist-representant'),
url(r'^(?P<slug>[\w-]+)/exist/list/$', ExistListView.as_view(),
name='agoraplus-exist-list'),
url(r'^(?P<slug>[\w-]+)/sas/purge/$', SasPurgeView.as_view(),
name='agoraplus-sas-purge'),
url(r'^(?P<slug>[\w-]+)/sas/family/$', SasFamilyView.as_view(),
name='agoraplus-sas-family'),
url(r'^(?P<slug>[\w-]+)/sas/check-duplicates/$', SasCheckDuplicatesView.as_view(),
name='agoraplus-sas-check-duplicates'),
url(r'^(?P<slug>[\w-]+)/sas/family/push/$', SasFamilyPushView.as_view(),
name='agoraplus-sas-family-push'),
url(r'^(?P<slug>[\w-]+)/sas/family/delete/$', SasPurgeView.as_view(),
name='agoraplus-sas-family-delete'),
url(r'^(?P<slug>[\w-]+)/sas/child/$', SasChildView.as_view(),
name='agoraplus-sas-child'),
url(r'^(?P<slug>[\w-]+)/sas/child/push/sas_(?P<sas_child_id>[\w-]+)/$', SasChildPushView.as_view(),
name='agoraplus-sas-child-push'),
url(r'^(?P<slug>[\w-]+)/sas/child/delete/sas_(?P<sas_child_id>[\w-]+)/$', SasChildDeleteView.as_view(),
name='agoraplus-sas-child-delete'),
url(r'^(?P<slug>[\w-]+)/sas/contact/push/sas_(?P<sas_contact_id>[\w-]+)/$', SasContactPushView.as_view(),
name='agoraplus-sas-contact-push'),
url(r'^(?P<slug>[\w-]+)/sas/contact/delete/sas_(?P<sas_contact_id>[\w-]+)/$', SasContactDeleteView.as_view(),
name='agoraplus-sas-contact-delete'),
url(r'^(?P<slug>[\w-]+)/sas/contact/$', SasContactView.as_view(),
name='agoraplus-sas-contact'),
url(r'^(?P<slug>[\w-]+)/incomes-declaration/?$', IncomesDeclarationView.as_view(),
name='agoraplus-incomes-declaration'),
url(r'^(?P<slug>[\w-]+)/school-enrollment/?$', SchoolEnrollmentView.as_view(),
name='agoraplus-school-enrollment'),
url(r'^(?P<slug>[\w-]+)/school-enrollments/?$', SchoolEnrollmentsView.as_view(),
name='agoraplus-school-enrollments'),
url(r'^(?P<slug>[\w-]+)/nursery-enrollment/?$', NurseryEnrollmentView.as_view(),
name='agoraplus-nursery-enrollment'),
url(r'^(?P<slug>[\w-]+)/nursery-enrollment-result/(?P<enroll_id>[\w-]+)/?$', NurseryEnrollmentResultView.as_view(),
name='agoraplus-nursery-enrollment-result'),
url(r'^(?P<slug>[\w-]+)/regie/invoices/$', InvoicesView.as_view(), name='agoraplus-invoices'),
url(r'^(?P<slug>[\w-]+)/regie/invoices/history/$', HistoryInvoicesView.as_view(), name='agoraplus-invoices-history'),
url(r'^(?P<slug>[\w-]+)/regie/invoice/(?P<invoice_id>[\w,-]+)/pay/$', InvoicePayView.as_view(), name='agoraplus-invoice-payment'),
url(r'^(?P<slug>[\w-]+)/regie/invoice/(?P<invoice_id>[\w,-]+)/$', InvoiceView.as_view(), name='agoraplus-invoice'),
url(r'^(?P<slug>[\w-]+)/regie/invoice/(?P<invoice_id>[\w,-]+)/pdf/$', InvoicePDFView.as_view(), name='agoraplus-invoice-pdf'),
url(r'^(?P<slug>[\w-]+)/document/?$', DocumentView.as_view(), name='agoraplus-document'),
url(r'^(?P<slug>[\w-]+)/periscol/enrollments/(?P<child_id>[\w-]+)/$',
PeriscolChildEnrollmentsView.as_view(),
name='agoraplus-child-periscol-enrollments'),
url(r'^(?P<slug>[\w-]+)/periscol/enrollment/(?P<enrollment_id>[\w-]+)/planning/$',
PeriscolEnrollmentPlanningView.as_view(),
name='agoraplus-enrollment-planning'),
url(r'^(?P<slug>[\w-]+)/periscol/child/(?P<child_id>[\w-]+)/enrollment/(?P<enrollment_id>[\w-]+)/planning/$',
PeriscolChildEnrollmentPlanningView.as_view(),
name='agoraplus-periscol-child-enrollment-planning'),
url(r'^(?P<slug>[\w-]+)/periscol/reservation/add/$',
PeriscolAddReservationView.as_view(),
name='agoraplus-periscol-add-reservation'),
url(r'^(?P<slug>[\w-]+)/periscol/reservations/delete/?$',
PeriscolDeleteReservationsView.as_view(),
name='agoraplus-periscol-reservations-delete'),
]

View File

@ -1,666 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unicodedata
import logging
from django.core.exceptions import ObjectDoesNotExist
from django.views.generic import DetailView as GenericDetailView
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.http import HttpResponse, HttpResponseBadRequest, Http404, HttpResponseRedirect
from django.utils.translation import ugettext_lazy as _
from django.utils.http import urlencode
from passerelle.compat import json_loads
import passerelle.utils as utils
from .models import AgoraPlus, AgoraPlusLink, AgoraAPIError
from .wcs import Formdata
from .provisioning import provision_attributes, provision_roles
logger = logging.getLogger('passerelle.contrib.agoraplus')
def normalize_lower(s):
return unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').lower()
class AgoraPlusDetailView(GenericDetailView):
model = AgoraPlus
template_name = 'passerelle/contrib/agoraplus/detail.html'
class RedirectSSOView(GenericDetailView):
model = AgoraPlus
def get(self, request, *args, **kwargs):
resource = self.get_object()
if not resource.frontoffice_url:
raise Http404('no frontoffice defined')
url = resource.frontoffice_url + '/!portail_external.go_to_agora?'
if (request.user.is_authenticated() and hasattr(request.user, 'saml_identifiers')
and request.user.saml_identifiers.exists()):
name_id = request.user.saml_identifiers.first().name_id
link = AgoraPlusLink.objects.filter(resource=resource, name_id=name_id).first()
if link:
try:
login = resource.auth(link.login, link.password)
if login:
token = resource.get_token_raw(login)
url += urlencode((('p_login', login), ('oauth_token', token)))
except AgoraAPIError as e:
pass
return HttpResponseRedirect(url)
class DetailView(GenericDetailView):
model = AgoraPlus
name_id = None
login = None
family = None
def set_user_from_request(self, request):
'''
Get Agora+ user from name_id, if exists, and check it.
'''
self.name_id = None
self.login = None
if 'NameID' in request.GET:
self.name_id = request.GET['NameID']
links = AgoraPlusLink.objects.filter(resource=self.object, name_id=self.name_id)
if len(links) > 1:
raise AgoraAPIError('multiple links')
if links:
self.login = self.object.auth(links[0].login, links[0].password)
if not self.login:
raise AgoraAPIError('unable to find an Agora+ user for NameID %r' %
self.name_id)
elif 'login' in request.GET: # fallback ?login= (only for tests)
user = self.object.get_user(request.GET['login'])
if not user:
raise ObjectDoesNotExist('unknown login')
self.login = user['login']
def get_family(self):
if not self.family:
self.family = self.object.get_family(name_id=self.name_id, login=self.login)
return self.family
def get_child(self, child_id):
family = self.get_family()
if not family:
raise AgoraAPIError('empty family')
childrens = self.get_family().get('children', [])
for child in childrens:
if child['id'] == child_id:
return child
raise AgoraAPIError('unknown child')
def get_data(self, request, *args, **kwargs):
raise NotImplementedError
@utils.to_json()
@utils.protected_api('can_access')
def get(self, request, *args, **kwargs):
self.object = self.get_object()
self.set_user_from_request(request)
self.family = None
data = self.get_data(request, *args, **kwargs)
if isinstance(data, HttpResponse):
return data
return {'data': data}
class PingView(DetailView):
def get_data(self, request, *args, **kwargs):
# simple check Agora+ WS availability
token = self.object.get_token()
response = {'ping': 'pong'}
if 'debug' in request.GET:
response['token'] = token
return response
class ReferenceView(DetailView):
def get_data(self, request, name, reference_id=None, *args, **kwargs):
results = self.object.get_reference(name, reference_id)
if not reference_id and 'q' in request.GET:
q = normalize_lower(request.GET['q'])
results = [result for result in results if q in normalize_lower(result['text'])]
return results
class SchoolView(DetailView):
def get_data(self, request, school_id, *args, **kwargs):
if school_id:
return self.object.get_school(school_id=school_id) # a specific school
if 'birthdate' not in request.GET and 'child_id' not in request.GET:
return self.object.get_school() # all schools
birthdate = request.GET.get('birthdate') or self.get_child(request.GET['child_id'])['birthdate']
year_id = request.GET.get('year_id') or None
return self.object.get_school(family=self.get_family(), birthdate=birthdate, year_id=year_id)
class CommuneView(DetailView):
def get_data(self, request, commune_id=None, *args, **kwargs):
results = self.object.get_commune(commune_id)
if not commune_id and 'q' in request.GET:
q = normalize_lower(request.GET['q'])
results = [result for result in results if q in normalize_lower(result['text'])]
return results
class TypeOfStreetView(DetailView):
def get_data(self, request, type_of_street_id=None, *args, **kwargs):
return self.object.get_type_of_street(type_of_street_id)
class StreetView(DetailView):
def get_data(self, request, commune_id=None, type_of_street_id=None, street_id=None, *args, **kwargs):
results = self.object.get_street(commune_id, type_of_street_id, street_id)
if not street_id and 'q' in request.GET:
q = normalize_lower(request.GET['q'])
results = [result for result in results if q in normalize_lower(result['text'])]
return results
class EducationalStageView(DetailView):
def get_data(self, request, edustage_id=None, *args, **kwargs):
if edustage_id: # a specific stage
return self.object.get_educational_stage(edustage_id=edustage_id)
if 'birthdate' not in request.GET and 'child_id' not in request.GET:
return self.object.get_educational_stage() # all stages
if 'birthdate' in request.GET:
birthdate = request.GET['birthdate']
else:
birthdate = self.get_child(request.GET['child_id'])['birthdate']
return self.object.get_educational_stage(birthdate=birthdate,
year_id=request.GET.get('year_id'))
class UserView(DetailView):
def get_data(self, request, *args, **kwargs):
if self.login:
return self.object.get_user(self.login)
else:
return self.object.get_user() # list all users
class AuthView(DetailView):
def get_data(self, request, *args, **kwargs):
if 'login' not in request.GET:
raise AgoraAPIError('missing login parameter', http_error=400)
if 'password' not in request.GET:
raise AgoraAPIError('missing password parameter', http_error=400)
login = self.object.auth(request.GET['login'], request.GET['password'])
return bool(login)
class LinkView(GenericDetailView):
model = AgoraPlus
@utils.to_json()
@utils.protected_api('can_access')
def get(self, request, *args, **kwargs):
agoraplus = self.get_object()
if 'NameID' not in request.GET:
raise AgoraAPIError('missing NameID parameter')
if 'login' not in request.GET:
raise AgoraAPIError('missing login parameter')
if 'password' not in request.GET:
raise AgoraAPIError('missing password parameter')
login = request.GET['login']
password = request.GET['password']
login = agoraplus.auth(login, password)
if not login:
raise AgoraAPIError('authentication of login %r failed' % login)
family = agoraplus.get_family(login=login)
if not family:
raise AgoraAPIError('no family for login %r' % login)
name_id = request.GET['NameID']
provision_roles(name_id)
provision_attributes(family, login, name_id)
AgoraPlusLink.objects.update_or_create(resource=agoraplus,
name_id=name_id,
defaults={'login': login, 'password': password})
logger.info('link created: NameID:%r with Agora login:%r', name_id, login)
return {'data': True}
class UnlinkView(GenericDetailView):
model = AgoraPlus
@utils.to_json()
@utils.protected_api('can_access')
def get(self, request, *args, **kwargs):
agoraplus = self.get_object()
if 'NameID' not in request.GET:
raise AgoraAPIError('missing NameID parameter')
name_id = request.GET['NameID']
provision_roles(name_id, 'DELETE')
AgoraPlusLink.objects.filter(resource=agoraplus, name_id=name_id).delete()
return {'data': True}
class FamilyView(DetailView):
def get_data(self, request, *args, **kwargs):
family = self.get_family()
if family and family.get('children'):
self.object.add_children_plannings(family)
return family
class FamilyItemView(DetailView):
def get_data(self, request, key, item_id=None, *args, **kwargs):
family = self.get_family()
if not family:
raise AgoraAPIError('empty family')
items = family.get(key, [])
if not item_id:
return items
if item_id:
for item in items:
if item['id'] == item_id:
return item
class ExistEmailView(DetailView):
def get_data(self, request, *args, **kwargs):
email = request.GET.get('email')
if email:
return self.object.exist_email(email)
return False
class ExistChildView(DetailView):
def get_data(self, request, *args, **kwargs):
if 'first_name' not in request.GET:
raise AgoraAPIError('missing first_name parameter')
if 'last_name' not in request.GET:
raise AgoraAPIError('missing last_name parameter')
if 'birthdate' not in request.GET:
raise AgoraAPIError('missing birthdate parameter')
first_name = request.GET.get('first_name')
last_name = request.GET.get('last_name')
birthdate = request.GET.get('birthdate')
return self.object.exist_child(first_name, last_name, birthdate)
class ExistRepresentantView(DetailView):
def get_data(self, request, *args, **kwargs):
if 'first_name' not in request.GET:
raise AgoraAPIError('missing first_name parameter')
if 'last_name' not in request.GET:
raise AgoraAPIError('missing last_name parameter')
first_name = request.GET.get('first_name')
last_name = request.GET.get('last_name')
birthdate = request.GET.get('birthdate')
return self.object.exist_representant(first_name, last_name, birthdate)
class ExistListView(DetailView):
'''
Returns all persons with same first_name, last_name and birthdate (opt.)
'''
def get_data(self, request, *args, **kwargs):
if 'first_name' not in request.GET:
raise AgoraAPIError('missing first_name parameter')
if 'last_name' not in request.GET:
raise AgoraAPIError('missing last_name parameter')
first_name = request.GET.get('first_name')
last_name = request.GET.get('last_name')
birthdate = request.GET.get('birthdate')
first_names = [first_name]
if ' ' in first_name:
first_names.extend(first_name.split(' '))
last_names = [last_name]
if ' ' in last_name:
last_names.extend(last_name.split(' '))
duplicates = []
for first_name in first_names:
for last_name in last_names:
if self.object.exist_person(first_name, last_name, birthdate):
if birthdate:
duplicates.append('%s %s - %s' % (first_name, last_name, birthdate))
else:
duplicates.append('%s %s' % (first_name, last_name))
return duplicates
class SasView(DetailView):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(SasView, self).dispatch(*args, **kwargs)
def post_data(self, name_id, formdata):
raise NotImplementedError
@utils.to_json()
@utils.protected_api('can_access_sas')
def post(self, request, *args, **kwargs):
self.object = self.get_object()
self.set_user_from_request(request)
data = json_loads(request.body)
formdata = Formdata(data)
name_id = formdata.get('NameID')
if not name_id:
raise AgoraAPIError('missing NameID parameter')
return {'data': self.post_data(name_id, formdata)}
class SasFamilyView(SasView):
def post_data(self, name_id, formdata):
provision_roles(name_id)
family, adult1, adult2 = formdata.get_family()
sas_family = self.object.sas_upsert(name_id, 'FAMILY', family)
sas_adult1 = self.object.sas_upsert(name_id, 'ADULT', adult1)
if adult2.keys() != ['sas_id']: # not empty adult
sas_adult2 = self.object.sas_upsert(name_id, 'ADULT', adult2)
else:
self.object.sas_delete(name_id, 'ADULT', adult2)
sas_adult2 = None
return {
'family': sas_family,
'adult1': sas_adult1,
'adult2': sas_adult2,
}
class SasFamilyPushView(DetailView):
def get_data(self, request, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
if self.login:
raise AgoraAPIError('already linked account, cannot push')
return self.object.push_family(self.name_id, validation='validation' in request.GET)
class SasChildView(SasView):
def post_data(self, name_id, formdata):
child = formdata.get_child()
return {
'child': self.object.sas_upsert(name_id, 'CHILD', child),
}
class SasChildPushView(DetailView):
def get_data(self, request, sas_child_id, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
if not self.login:
raise AgoraAPIError('no family linked, cannot push child')
return self.object.push_child(self.name_id, self.login, sas_child_id)
class SasChildDeleteView(DetailView):
def get_data(self, request, sas_child_id, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
fake_sas_child = {'sas_id': 'sas_%s' % sas_child_id}
return self.object.sas_delete(self.name_id, 'CHILD', fake_sas_child)
class SasContactView(SasView):
def post_data(self, name_id, formdata):
data = formdata.get_contact()
return {
'contact': self.object.sas_upsert(name_id, 'CONTACT', data),
}
class SasContactPushView(DetailView):
def get_data(self, request, sas_contact_id, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
if not self.login:
raise AgoraAPIError('no family linked, cannot push contact')
return self.object.push_contact(self.name_id, self.login, sas_contact_id)
class SasContactDeleteView(DetailView):
def get_data(self, request, sas_contact_id, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
fake_sas_contact = {'sas_id': 'sas_%s' % sas_contact_id}
return self.object.sas_delete(self.name_id, 'CONTACT', fake_sas_contact)
class SasCheckDuplicatesView(DetailView):
def get_data(self, request, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
return self.object.sas_check_duplicates(self.name_id)
class SasPurgeView(DetailView):
def get_data(self, request, *args, **kwargs):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
provision_roles(self.name_id, method='DELETE')
return self.object.sas_purge(self.name_id)
class PostFormdataView(DetailView):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(PostFormdataView, self).dispatch(*args, **kwargs)
def post_data(self, formdata):
raise NotImplementedError
@utils.to_json()
@utils.protected_api('can_access_sas')
def post(self, request, *args, **kwargs):
self.object = self.get_object()
self.set_user_from_request(request)
data = json_loads(request.body)
formdata = Formdata(data)
return {'data': self.post_data(formdata)}
class IncomesDeclarationView(PostFormdataView):
def post_data(self, formdata):
if not self.name_id:
raise AgoraAPIError('missing NameID parameter')
if not self.login:
raise AgoraAPIError('no family linked, cannot add incomes declaration')
data = formdata.get_incomes_declaration()
return self.object.incomes_declaration(self.login, data)
class SchoolEnrollmentView(PostFormdataView):
def post_data(self, formdata):
data = formdata.get_school_enrollment()
return self.object.school_enrollment(data)
class SchoolEnrollmentsView(DetailView):
def get_data(self, request, *args, **kwargs):
family = self.object.get_agoraplus_family(login=self.login, raise_error=True)
enrollments = []
for child in family.get('children') or []:
enrollment = self.object.get_school_enrollment(child)
enrollments.extend(enrollment)
return enrollments
class NurseryEnrollmentView(PostFormdataView):
def post_data(self, formdata):
data = formdata.get_nursery_enrollment()
return self.object.nursery_enrollment(data)
class NurseryEnrollmentResultView(DetailView):
def get_data(self, request, enroll_id, *args, **kwargs):
return self.object.get_nursery_enrollment_result(enroll_id)
class PeriscolChildEnrollmentsView(DetailView):
def get_data(self, request, child_id, *args, **kwargs):
return self.object.get_periscol_enrollments(child_id, request.GET.get('service_id'),
request.GET.get('NameID'), request.GET.get('start_days', 35))
class PeriscolEnrollmentPlanningView(DetailView):
def get_data(self, request, enrollment_id, *args, **kwargs):
return self.object.get_periscol_enrollment_planning(enrollment_id)
class PeriscolChildEnrollmentPlanningView(DetailView):
def get_data(self, request, child_id, enrollment_id, *args, **kwargs):
return self.object.get_periscol_child_enrollment_planning(child_id, enrollment_id,
request.GET.get('reserved_day'), request.GET.get('NameID'),
request.GET.get('start_days', 2), request.GET.get('end_days', 365))
class PeriscolAddReservationView(DetailView):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(PeriscolAddReservationView, self).dispatch(*args, **kwargs)
@utils.to_json()
@utils.protected_api('can_access')
def post(self, request, *args, **kwargs):
self.object = self.get_object()
try:
data = json_loads(request.body)
enrollment_id = int(data['enrollment_id'])
dates = data['dates']
tarif_id = int(data['tarif_id'])
majored = data['majored']
except (ValueError, TypeError, KeyError) as e:
return HttpResponseBadRequest()
return self.object.create_periscol_reservation(enrollment_id, dates, tarif_id, majored)
class PeriscolDeleteReservationsView(DetailView):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(PeriscolDeleteReservationsView, self).dispatch(*args, **kwargs)
@utils.to_json()
@utils.protected_api('can_access')
def post(self, request, *args, **kwargs):
self.object = self.get_object()
data = json_loads(request.body)
return self.object.delete_periscol_enrollment_reservations(data['activity_id'], data['dates'])
class InvoicesView(DetailView):
def get_data(self, request, **kwargs):
if not self.login: # unlinked account: no invoice
return []
return self.object.get_payable_invoices(self.login)
class HistoryInvoicesView(DetailView):
def get_data(self, request, **kwargs):
if not self.login: # unlinked account: no invoice
return []
return self.object.get_historical_invoices(self.login)
class InvoicePayView(DetailView):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(InvoicePayView, self).dispatch(*args, **kwargs)
@utils.to_json()
@utils.protected_api('can_access')
def post(self, request, *args, **kwargs):
self.object = self.get_object()
data = json_loads(request.body)
return {'data': self.object.pay_invoice(kwargs['invoice_id'], data['transaction_id'],
data['transaction_date'])}
class InvoiceView(DetailView):
def get_data(self, request, invoice_id, **kwargs):
if not self.login: # unlinked account: no invoice
return None
return self.object.get_invoice(self.login, invoice_id)
class InvoicePDFView(DetailView):
def get_data(self, request, invoice_id, **kwargs):
pdf = self.object.get_invoice_pdf(self.login, invoice_id)
response = HttpResponse(content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % invoice_id
response.write(pdf)
return response
class DocumentView(DetailView):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(DocumentView, self).dispatch(*args, **kwargs)
@utils.protected_api('can_access')
def post(self, request, *args, **kwargs):
self.object = self.get_object()
try:
data = json_loads(request.body)
uri = data['uri']
content_type = data['content_type']
except (ValueError, TypeError, KeyError) as e:
return HttpResponseBadRequest()
document = self.object.get_document(uri, content_type)
response = HttpResponse(content_type=content_type)
if document is not None:
response.write(document)
else:
response.status_code = 404
return response
class AddressUpdateView(PostFormdataView):
def post_data(self, formdata):
new_address = formdata.get_address()
leaving_date = formdata.get('leaving_date')
family = self.object.get_agoraplus_family(login=self.login)
if not family:
raise Http404(_('no family in Agora+'))
return self.object.update_address(family, new_address, leaving_date)
class PhoneUpdateView(PostFormdataView):
def post_data(self, formdata):
adult_id = formdata.get('adult').get('id')
new_phone_number = formdata.get('phone')
new_cellphone_number = formdata.get('cellphone')
try:
return self.object.update_phone_numbers(self.login, self.name_id, adult_id,
new_phone_number, new_cellphone_number)
except ObjectDoesNotExist as e:
raise Http404(str(e))
class ProfessionUpdateView(PostFormdataView):
def post_data(self, formdata):
adult_id = formdata.get('adult').get('id')
new_profession = formdata.get('profession')
new_pcs = formdata.get('pcs')
new_employer_name = formdata.get('employer')
new_employer_city = formdata.get('employer_city')
new_employer_phone = formdata.get('employer_phone')
try:
return self.object.update_profession(self.login, self.name_id, adult_id,
new_profession, new_pcs, new_employer_name,
new_employer_city, new_employer_phone)
except ObjectDoesNotExist as e:
raise Http404(str(e))

View File

@ -1,251 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or,
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import calendar
import datetime
def date_from_iso(s):
if 'T' in s:
utc = time.strptime(s, '%Y-%m-%dT%H:%M:%SZ')
timestamp = calendar.timegm(utc)
return datetime.date.fromtimestamp(timestamp).strftime('%Y-%m-%d')
else:
return datetime.datetime.strptime(s, '%Y-%m-%d').date()
def booleanize(value):
'''
return True/False from a Formdata value
'''
if isinstance(value, dict):
value = value['id']
if isinstance(value, bool):
return value
value = ('%s' % value).lower()
return not value.startswith('n')
class Formdata(object):
def __init__(self, data):
self.data = data
def get_workflow_data(self, key):
return self.data.get('workflow', {}).get('data', {}).get(key) or ''
def get_extra(self, key):
return self.data.get('extra', {}).get(key) or ''
def get_field(self, key):
return self.data.get('fields', {}).get(key) or ''
def get_extra_or_field(self, key):
return self.get_extra(key) or self.get_field(key)
def get(self, key):
value = self.get_extra_or_field(key)
raw = self.get_extra_or_field(key + '_raw')
if raw:
return {'id': raw, 'text': value}
return value
def get_object(self, prefix):
res = {}
fields = self.data.get('fields', {})
extra = self.data.get('extra', {})
keys = [key for key in set(fields) | set(extra)
if key.startswith(prefix + '_') and not key.endswith('_raw')]
cut = len(prefix)+1
for key in keys:
res[key[cut:]] = self.get(key)
return res
def get_sas_id_from_workflow(self, kind):
'''
Guess if formdata had been stored in the sas previously.
'''
if 'sas_response' in self.data['workflow'].get('data', {}):
if self.data['workflow']['data']['sas_response'].get('data', {}).get(kind):
sas_id = self.data['workflow']['data']['sas_response']['data'][kind].get('id')
if sas_id and sas_id.startswith('sas_'):
return sas_id
def get_address(self, prefix='address'):
address = self.get_object(prefix) or {}
if 'city_structured' in address and 'post_code' in address['city_structured']:
address['post_code'] = address['city_structured']['post_code']
del address['city_structured']
for ref in ('btq', 'type_of_street', 'street', 'city'):
if ref not in address or not isinstance(address[ref], dict):
address[ref] = {'id': '', 'text': address.get(ref, '')}
# alternative address (out of references)
for key in ('number', 'post_code', 'complements'):
address[key] = address.get(key) or address.get(key + '_alt') or ''
if (key + '_alt') in address:
del address[key + '_alt']
for key in ('street', 'city'):
if not address.get(key, {}).get('text'):
address[key] = {
'id':'',
'text': address.get(key + '_alt') or ''
}
if (key + '_alt') in address:
del address[key + '_alt']
address['text'] = u'{number}{btq[text]} {type_of_street[text]} {street[text]}'.format(**address).strip()
if address['text']:
address['full'] = u'{text}, {post_code} {city[text]}'.format(**address).strip()
else:
address['full'] = u'{post_code} {city[text]}'.format(**address).strip()
return address
def get_person(self, prefix='adult'):
person = self.get_object(prefix)
if not person or not person.get('last_name'):
return None
person['text'] = u'%(first_name)s %(last_name)s' % person
return person
def get_child(self, prefix='child'):
child = self.get_object(prefix)
if not child or not child.get('last_name'):
return None
child['text'] = u'%(first_name)s %(last_name)s' % child
sex = child.get('sex', {}).get('id')
if sex:
child['sex'] = sex[0]
else:
child['sex'] = None
for key in child:
if key.endswith('_authorization'):
child[key] = booleanize(child[key])
if child.get('parental_autorithy_structured'):
# parental_autorithy_structured => adults already exist, use IDs
child['parental_autorithy'] = [{'id': adult['id']} for adult in
child['parental_autorithy_structured']]
elif 'id' in child.get('parental_autorithy', {}):
# parental_autorithy had a "raw" value, stored in ['id']
child['parental_autorithy'] = [{'id': pid} for pid in
child['parental_autorithy']['id']]
else:
child['parental_autorithy'] = []
# if child was already stored in sas, get its sas_id from workflow.data
child['sas_id'] = self.get_sas_id_from_workflow('child')
return child
def get_contact(self, prefix='contact'):
contact = self.get_object(prefix)
if not contact or not contact.get('last_name'):
return None
contact['text'] = u'%(first_name)s %(last_name)s' % contact
contact['address'] = self.get_address('contact_address')
if 'children_structured' in contact:
contact['children'] = [{
'id': child['id'],
'link': contact.get('link') or {'id':'', 'text': ''},
'text': child['text'],
'banned': contact.get('banned') or False,
'parental_authority': contact.get('parental_authority') or False,
} for child in contact['children_structured']]
del contact['children_structured']
else:
contact['children'] = []
for cleankey in ('link', 'banned', 'parental_authority'):
if cleankey in contact:
del contact[cleankey]
# if contact was already stored in sas, get its sas_id from workflow.data
contact['sas_id'] = self.get_sas_id_from_workflow('contact')
return contact
def get_informations(self):
informations = self.get_object('informations')
informations['number_of_children'] = int(informations.get('number_of_children') or '0')
return informations
def get_family(self):
family = {}
family['address'] = self.get_address()
adult1 = self.get_person('adult1')
adult1['index'] = 1
adult1['address'] = family['address']
adult1['employer_address'] = self.get_address('employer_address_adult1')
adult2 = self.get_person('adult2') or {}
if adult2:
adult2['index'] = 2
adult2['address'] = family['address']
adult2['employer_address'] = self.get_address('employer_address_adult2')
family['informations'] = self.get_informations()
# if objects were already stored in sas, get sas_id from workflow.data
family['sas_id'] = self.get_sas_id_from_workflow('family')
adult1['sas_id'] = self.get_sas_id_from_workflow('adult1')
adult2['sas_id'] = self.get_sas_id_from_workflow('adult2')
return family, adult1, adult2
def get_incomes_declaration(self):
return {
'date': self.data['receipt_time'][:10],
'nb_part': self.get_extra_or_field('nb_part'),
'allowance_adult_id': self.get_extra_or_field('allowance_adult_raw'),
'allowance_number': self.get_extra_or_field('allowance_number'),
'allowance_regime_id': self.get_extra_or_field('allowance_regime_raw'),
'date_start': date_from_iso(self.get_workflow_data('precision_var_date_start_raw')),
'date_stop': date_from_iso(self.get_workflow_data('precision_var_date_stop_raw')),
'incomes_adult1': self.get_extra_or_field('incomes_adult1'),
'incomes_adult2': self.get_extra_or_field('incomes_adult2'),
'incomes_allowance': self.get_extra_or_field('incomes_allowance'),
'rental': self.get_extra_or_field('rental'),
'reimbursement': self.get_extra_or_field('reimbursement'),
'declaration_type_id': self.get_extra_or_field('declaration_type_id'),
}
def get_comments(self):
comments = self.get_field('comments') or ''
for key in [key for key in self.data['fields'] if key.startswith('comments_')]:
title = key[9:].upper().replace('_', ' ')
comments += '[%s]\n%s\n\n' % (title, self.get_field(key))
return comments
def get_school_enrollment(self):
comments = self.get_comments()
return {
'child_id': self.get_field('child_structured')['id'],
'year_id': self.get_extra_or_field('year_raw'),
'school_id': self.get_extra_or_field('school_raw'),
'educational_stage_id': self.get_extra_or_field('educational_stage_raw'),
'comments': comments,
'date_start': self.get_extra_or_field('date_start'),
'status_id': self.get_extra_or_field('status_raw'),
}
def get_nursery_enrollment(self):
comments = self.get_comments()
structures_ids = [] # ordered list of structures id (by preference)
n = 1
while self.get_field('structure_%s_raw' % n):
structures_ids.append(self.get_field('structure_%s_raw' % n))
n += 1
return {
'child_id': self.get_field('child_structured')['id'],
'expected_birthdate': self.get_extra_or_field('expected_birthdate'),
'year_id': self.get_extra_or_field('year_raw'),
'date': self.data['receipt_time'][:10],
'date_start': self.get_extra_or_field('date_start'),
'date_stop': self.get_extra_or_field('date_stop'),
'type_structure_id': self.get_extra_or_field('type_structure_raw'),
'structures_ids': structures_ids,
'admission_mode_id': self.get_extra_or_field('admission_mode_raw'),
'comments': comments,
}

View File

@ -127,7 +127,6 @@ li.connector.geographic-information-system a::before {
content: "\f041"; /* map-marker */
}
li.connector.agoraplus a::before,
li.connector.fakefamily a::before,
li.connector.genericfamily a::before,
li.connector.teamnetaxel a::before {

View File

@ -16,7 +16,6 @@ KNOWN_SERVICES = {
# include all contrib apps
INSTALLED_APPS += (
'passerelle.contrib.adict',
'passerelle.contrib.agoraplus',
'passerelle.contrib.dpark',
'passerelle.contrib.fake_family',
'passerelle.contrib.gdema',

View File

@ -1,144 +0,0 @@
# -*- coding: utf-8 -*-
import mock
import pytest
from django.core.urlresolvers import reverse
from django.utils import timezone
from django.utils.http import urlencode
from passerelle.contrib.agoraplus.models import AgoraPlus, AgoraPlusLink
from passerelle.contrib.agoraplus.normalize import DATE_FORMAT
import utils
pytestmark = pytest.mark.django_db
DATETIME_FORMAT_START = DATE_FORMAT + 'T%H:' # test only date+hour, without min and seconds
NAME_ID = 'foobarnameid'
AGORAPLUS_PERISCOL_RESPONSE = """
{"DATA": {"INSCRIPTION_INFO": [
{"ID": 38193,
"ACTIVITY": {"ID": 97,
"DESCRIPTION": "Accueil De Loisirs Vacances Élémentaire Toulon"
},
"START_DATE": "2015-09-01T00:00:00",
"END_DATE": "2016-08-31T00:00:00"
}]
},
"TYPE_RESULT": 1,
"ERROR": {
"ID": "0",
"MESSAGE": ""
}
}
"""
AGORAPLUS_PERISCOL_PLANNING_RESPONSE = """{
"DATA": {
"PLANNING_DAYS": [
{"DAY": "2016-10-20T00:00:00",
"START_TIME": "00:00",
"END_TIME": "00:00"
},
{
"DAY": "2016-10-21T00:00:00",
"START_TIME": "00:00",
"END_TIME": "00:00"
}]
},
"TYPE_RESULT": 1,
"ERROR": {
"ID": "0",
"MESSAGE": ""
}
}
"""
@pytest.fixture
def connector():
conn = AgoraPlus.objects.create(title='test', slug='test',
url='https://agoraplus.com/',
login='test', oauth_consumer_key='secret')
utils.setup_access_rights(conn)
return conn
@pytest.fixture
def link(connector):
return AgoraPlusLink.objects.create(resource=connector, name_id=NAME_ID,
login='foo', password='bar')
class FakedAgoraPlusResponse:
def __init__(self, content):
self.content = content
def __call__(self, method, url, **kwargs):
request = utils.FakedResponse(headers={'Content-Type': 'application/json'})
if url == 'https://agoraplus.com/Auth/':
return utils.FakedResponse(status_code=200, content='{"token": "random"}',
request=request, headers={})
return utils.FakedResponse(status_code=200, content=self.content, headers={},
request=request)
@mock.patch('passerelle.utils.RequestSession.request')
def test_child_periscol_enrollments(mocked_get, app, connector, link):
mocked_get.side_effect = FakedAgoraPlusResponse(AGORAPLUS_PERISCOL_RESPONSE)
r = app.get(reverse('agoraplus-child-periscol-enrollments',
kwargs={'slug': connector.slug, 'child_id': '1'}),
params={'service_id': '7'})
# check remote webservice call args
assert urlencode((('p_id_service', '7'),)) in mocked_get.call_args[0][1]
date = timezone.now() + timezone.timedelta(days=35)
assert urlencode((('p_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
assert r.json['data']
r = app.get(reverse('agoraplus-child-periscol-enrollments',
kwargs={'slug': connector.slug, 'child_id': '1'}),
params={'service_id': '6', 'start_days': '10'})
# check remote webservice call args
assert urlencode((('p_id_service', '6'),)) in mocked_get.call_args[0][1]
date = timezone.now() + timezone.timedelta(days=10)
assert urlencode((('p_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
assert r.json['data']
r = app.get(reverse('agoraplus-child-periscol-enrollments',
kwargs={'slug': connector.slug, 'child_id': '1'}),
params={'service_id': '6'})
date = timezone.now() + timezone.timedelta(days=35)
assert urlencode((('p_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
@mock.patch('passerelle.utils.RequestSession.request')
def test_child_periscol_enrollment_planning(mocked_get, app, connector, link):
mocked_get.side_effect = FakedAgoraPlusResponse(AGORAPLUS_PERISCOL_PLANNING_RESPONSE)
r = app.get(reverse('agoraplus-periscol-child-enrollment-planning',
kwargs={'slug': connector.slug, 'child_id': '1', 'enrollment_id': '232'}),
params={'reserved_day': '1'})
# check remote webservice call args
assert urlencode((('reserved_day', '1'),)) in mocked_get.call_args[0][1]
date = timezone.now() + timezone.timedelta(days=2)
assert urlencode((('start_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
assert r.json['data']
r = app.get(reverse('agoraplus-periscol-child-enrollment-planning',
kwargs={'slug': connector.slug, 'child_id': '1', 'enrollment_id': '232'}),
params={'reserved_day': '-1', 'start_days': '10', 'end_days': '31'})
# check remote webservice call args
assert urlencode((('reserved_day', '-1'),)) in mocked_get.call_args[0][1]
date = timezone.now() + timezone.timedelta(days=10)
assert urlencode((('start_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
date = timezone.now() + timezone.timedelta(days=31)
assert urlencode((('end_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
assert r.json['data']
r = app.get(reverse('agoraplus-periscol-child-enrollment-planning',
kwargs={'slug': connector.slug, 'child_id': '1', 'enrollment_id': '232'}),
params={'reserved_day': '-1'})
date = timezone.now() + timezone.timedelta(days=2)
assert urlencode((('start_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]
date = timezone.now() + timezone.timedelta(days=365)
assert urlencode((('end_date', date.strftime(DATETIME_FORMAT_START)),)) in mocked_get.call_args[0][1]