476 lines
17 KiB
Python
476 lines
17 KiB
Python
from datetime import datetime
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.core import management
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.sql import func
|
|
from sqlalchemy_utils import database_exists, create_database, drop_database
|
|
|
|
from docbow_project.docbow.models import DocbowProfile, FileType, MailingList
|
|
from docbow_project.pfwb.models import TabellioDocType
|
|
from docbow_project.pfwb.tabellio import (
|
|
Base,
|
|
DBSession,
|
|
TAdresse,
|
|
TCom,
|
|
TComppol,
|
|
TPer,
|
|
TPershistoline,
|
|
TTypehistoper,
|
|
TTypedoc,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def engine():
|
|
engine = create_engine('postgresql:///%s' % settings.TABELLIO_DBNAME)
|
|
if database_exists(engine.url):
|
|
drop_database(engine.url)
|
|
create_database(engine.url)
|
|
yield engine
|
|
drop_database(engine.url)
|
|
|
|
|
|
@pytest.fixture
|
|
def session(engine):
|
|
Base.metadata.create_all(engine)
|
|
session = DBSession(bind=engine.connect())
|
|
yield session
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
Base.metadata.drop_all(engine)
|
|
|
|
|
|
@pytest.fixture
|
|
def ministres_list(db, settings):
|
|
ml = MailingList.objects.create(name='Ministres')
|
|
settings.MINISTRES_MAILING_ID = ml.pk
|
|
return ml
|
|
|
|
|
|
@pytest.fixture
|
|
def parl_list(db, settings):
|
|
pl = MailingList.objects.create(name='Parlementaires')
|
|
settings.PARLEMENTAIRES_MAILING_ID = pl.pk
|
|
return pl
|
|
|
|
|
|
def get_new_id(session, model):
|
|
last_id = session.query(func.max(model.id)).scalar()
|
|
if last_id is None:
|
|
last_id = '1'
|
|
return '%s' % (int(last_id) + 1,)
|
|
|
|
|
|
def create_adresse(session, email):
|
|
new_id = get_new_id(session, TAdresse)
|
|
res = TAdresse(id=new_id, email=email, courriel=True, courrier=False)
|
|
session.add(res)
|
|
session.commit()
|
|
return res
|
|
|
|
|
|
def create_pers(session, first_name, last_name, adresse, st='XX'):
|
|
new_id = get_new_id(session, TPer)
|
|
res = TPer(id=new_id, prenom=first_name, nom=last_name, addrpriv=adresse.id, st=st, initiales='XX')
|
|
session.add(res)
|
|
session.commit()
|
|
return res
|
|
|
|
|
|
def create_pershisto(session, pers, typehisto, desc_obj, fin=None):
|
|
new_id = get_new_id(session, TPershistoline)
|
|
res = TPershistoline(
|
|
id=new_id,
|
|
pers=pers.id,
|
|
type=typehisto.id,
|
|
description=desc_obj.id,
|
|
st='XX',
|
|
debut=datetime.now(),
|
|
fin=fin,
|
|
)
|
|
session.add(res)
|
|
session.commit()
|
|
return res
|
|
|
|
|
|
def get_or_create_compol(session, name):
|
|
res = session.query(TComppol).filter_by(abbr=name).first()
|
|
if not res:
|
|
new_id = get_new_id(session, TComppol)
|
|
res = TComppol(id=new_id, abbr=name, st='XX', membres=1, groupe=True, nom='XX')
|
|
session.add(res)
|
|
session.commit()
|
|
return res
|
|
|
|
|
|
def get_or_create_typehisto(session, id_):
|
|
res = session.query(TTypehistoper).filter_by(id=id_).first()
|
|
if not res:
|
|
res = TTypehistoper(id=id_, st='XX', descr='XX')
|
|
session.add(res)
|
|
session.commit()
|
|
return res
|
|
|
|
|
|
def create_deputy(session, first_name, last_name, email, compol_name):
|
|
adress = create_adresse(session, email=email)
|
|
pers = create_pers(session, first_name, last_name, adress)
|
|
compol = get_or_create_compol(session, compol_name)
|
|
typehisto = get_or_create_typehisto(session, 'P_CMPL')
|
|
create_pershisto(session, pers, typehisto, compol, fin=None)
|
|
return pers
|
|
|
|
|
|
def create_ministre(session, first_name, last_name, email, compol_name):
|
|
adress = create_adresse(session, email=email)
|
|
pers = create_pers(session, first_name, last_name, adress, st='S_MINISTRE')
|
|
compol = get_or_create_compol(session, compol_name)
|
|
typehisto = get_or_create_typehisto(session, 'M_MINT')
|
|
create_pershisto(session, pers, typehisto, compol, fin=None)
|
|
return pers
|
|
|
|
|
|
def test_empty_tabellio(session, db, ministres_list, parl_list):
|
|
assert User.objects.count() == 0
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert User.objects.count() == 0
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_create_deputy(session, db, ministres_list, parl_list):
|
|
assert User.objects.count() == 0
|
|
assert parl_list.members.count() == 0
|
|
|
|
create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
assert User.objects.count() == 1
|
|
assert parl_list.members.count() == 1
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert parl_list.members.first() == user
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_remove_deputy_because_no_fonction(session, db, ministres_list, parl_list):
|
|
user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user.is_active
|
|
parl_list.members.add(user)
|
|
assert parl_list.members.count() == 1
|
|
|
|
deputy = create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
# make him not deputy anymore
|
|
histo = deputy.tperhistolines[0]
|
|
histo.fin = datetime.now()
|
|
session.add(histo)
|
|
session.commit()
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert not user.is_active
|
|
assert parl_list.members.count() == 0
|
|
|
|
# run again, user kept innactive
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert not user.is_active
|
|
assert parl_list.members.count() == 0
|
|
|
|
# deleted user is fine
|
|
user.delete()
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert parl_list.members.count() == 0
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_remove_user_does_not_crash_if_multiple_active_user_found(session, db, ministres_list, parl_list):
|
|
user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user.is_active
|
|
parl_list.members.add(user)
|
|
assert parl_list.members.count() == 1
|
|
user_bis = User.objects.create(
|
|
first_name='John', last_name='Doe', email='john+bis@doe.be', username='johndoebis'
|
|
)
|
|
assert user_bis.is_active
|
|
|
|
deputy = create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
# make him not deputy anymore
|
|
histo = deputy.tperhistolines[0]
|
|
histo.fin = datetime.now()
|
|
session.add(histo)
|
|
session.commit()
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert not user.is_active
|
|
user_bis = User.objects.get(first_name='John', last_name='Doe', email='john+bis@doe.be')
|
|
assert not user_bis.is_active
|
|
assert parl_list.members.count() == 0
|
|
|
|
|
|
def test_remove_deputy_because_become_ministre(session, db, ministres_list, parl_list):
|
|
user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user.is_active
|
|
parl_list.members.add(user)
|
|
assert parl_list.members.count() == 1
|
|
|
|
create_ministre(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert user not in parl_list.members.all()
|
|
assert parl_list.members.count() == 0
|
|
assert user in ministres_list.members.all()
|
|
assert ministres_list.members.count() == 1
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_create_ministre(session, db, ministres_list, parl_list):
|
|
assert User.objects.count() == 0
|
|
assert ministres_list.members.count() == 0
|
|
|
|
create_ministre(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
assert User.objects.count() == 1
|
|
assert ministres_list.members.count() == 1
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert ministres_list.members.first() == user
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_remove_ministre(session, db, ministres_list, parl_list):
|
|
user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user.is_active
|
|
ministres_list.members.add(user)
|
|
assert ministres_list.members.count() == 1
|
|
|
|
ministre = create_ministre(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
# make him not minister anymore
|
|
histo = ministre.tperhistolines[0]
|
|
histo.fin = datetime.now()
|
|
session.add(histo)
|
|
session.commit()
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert not user.is_active
|
|
assert ministres_list.members.count() == 0
|
|
|
|
# run again, user kept innactive
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert not user.is_active
|
|
assert ministres_list.members.count() == 0
|
|
|
|
# deleted user is fine
|
|
user.delete()
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert ministres_list.members.count() == 0
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_remove_ministre_because_become_deputy(session, db, ministres_list, parl_list):
|
|
user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user.is_active
|
|
ministres_list.members.add(user)
|
|
assert ministres_list.members.count() == 1
|
|
|
|
create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert user not in ministres_list.members.all()
|
|
assert ministres_list.members.count() == 0
|
|
assert user in parl_list.members.all()
|
|
assert parl_list.members.count() == 1
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_create_commission(session, db, ministres_list, parl_list):
|
|
assert MailingList.objects.count() == 2
|
|
|
|
com1 = TCom(id='foo', nom='foo', code='foo', st='S_ACTIVE')
|
|
com2 = TCom(id='bar', nom='bar', code='bar', st='S_ACTIVE')
|
|
session.add_all([com1, com2])
|
|
session.commit()
|
|
deputy = create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='bar')
|
|
typehisto = get_or_create_typehisto(session, 'foo')
|
|
create_pershisto(session, deputy, typehisto, com1)
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert MailingList.objects.count() == 5
|
|
# we don't care about that one here
|
|
assert MailingList.objects.get(name='Appartenance politique - bar')
|
|
foo = MailingList.objects.get(name='Commission - foo')
|
|
bar = MailingList.objects.get(name='Commission - bar')
|
|
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be', username='john.doe')
|
|
assert foo.members.count() == 1
|
|
assert user in foo.members.all()
|
|
assert bar.members.count() == 0
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_diable_commission(session, db, ministres_list, parl_list):
|
|
old_commission = MailingList.objects.create(name='Commission - xx')
|
|
some_user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be')
|
|
old_commission.members.add(some_user)
|
|
assert old_commission.members.count() == 1
|
|
assert some_user.is_active
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
old_commission = MailingList.objects.get(name='Commission - xx')
|
|
assert old_commission.members.count() == 0
|
|
assert not old_commission.is_active
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_update_commission(session, db, ministres_list, parl_list):
|
|
commission = MailingList.objects.create(name='Commission - foo')
|
|
user = User.objects.create(first_name='John', last_name='Doe', email='john@doe.be', username='john.doe')
|
|
old_user = User.objects.create(first_name='Old', last_name='Man', email='old@man.be', username='old.man')
|
|
commission.members.add(user)
|
|
commission.members.add(old_user)
|
|
assert commission.members.count() == 2
|
|
|
|
com = TCom(id='foo', nom='foo', code='foo', st='S_ACTIVE')
|
|
session.add(com)
|
|
session.commit()
|
|
deputy = create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='bar')
|
|
typehisto = get_or_create_typehisto(session, 'foo')
|
|
create_pershisto(session, deputy, typehisto, com)
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
commission = MailingList.objects.get(name='Commission - foo')
|
|
assert commission.members.count() == 1
|
|
assert user in commission.members.all()
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_create_political(session, db, ministres_list, parl_list):
|
|
assert MailingList.objects.count() == 2
|
|
|
|
create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='green')
|
|
create_deputy(session, 'Igor', 'Avilov', 'igor@avilov.be', compol_name='red')
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
assert MailingList.objects.count() == 4
|
|
assert MailingList.objects.get(name='Appartenance politique - green')
|
|
assert MailingList.objects.get(name='Appartenance politique - red')
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_desactivate_innactive_political(session, db, ministres_list, parl_list):
|
|
ml = MailingList.objects.create(name='Appartenance politique - xx')
|
|
old_user = User.objects.create(first_name='Old', last_name='Man', email='old@man.be', username='old.man')
|
|
ml.members.add(old_user)
|
|
assert ml.is_active
|
|
assert ml.members.count() == 1
|
|
assert MailingList.objects.count() == 3
|
|
|
|
compol = get_or_create_compol(session, 'xx')
|
|
compol.st = 'S_INACTIVE'
|
|
session.add(compol)
|
|
session.commit()
|
|
create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='green')
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert MailingList.objects.count() == 4
|
|
ml = MailingList.objects.get(name='Appartenance politique - xx')
|
|
assert not ml.is_active
|
|
assert ml.members.count() == 0
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_remove_member_from_political(session, db, ministres_list, parl_list):
|
|
ml = MailingList.objects.create(name='Appartenance politique - green')
|
|
old_user = User.objects.create(first_name='Old', last_name='Man', email='old@man.be', username='old.man')
|
|
ml.members.add(old_user)
|
|
assert ml.is_active
|
|
assert ml.members.count() == 1
|
|
assert MailingList.objects.count() == 3
|
|
|
|
create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='green')
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert MailingList.objects.count() == 3
|
|
ml = MailingList.objects.get(name='Appartenance politique - green')
|
|
assert ml.is_active
|
|
members = ml.members.all()
|
|
assert len(members) == 1
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user in members
|
|
assert old_user not in members
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_document_types(session, db, ministres_list, parl_list):
|
|
ft = FileType.objects.create(name='xx')
|
|
TabellioDocType.objects.create(filetype=ft, tabellio_doc_type='doc-x')
|
|
assert TabellioDocType.objects.count() == 1
|
|
|
|
session.add_all(
|
|
[
|
|
TTypedoc(id='doc-0', descr='foo', st='st', fno=True, fnodoc=True, listnum=True),
|
|
TTypedoc(id='doc-1', descr='bar', st='st', fno=True, fnodoc=True, listnum=True),
|
|
TTypedoc(id='doc-x', descr='new', st='st', fno=True, fnodoc=True, listnum=True),
|
|
]
|
|
)
|
|
session.commit()
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
assert TabellioDocType.objects.count() == 3
|
|
ta = TabellioDocType.objects.get(tabellio_doc_type='doc-0')
|
|
assert ta.filetype.name == 'Foo'
|
|
ta = TabellioDocType.objects.get(tabellio_doc_type='doc-1')
|
|
assert ta.filetype.name == 'Bar'
|
|
ta = TabellioDocType.objects.get(tabellio_doc_type='doc-x')
|
|
assert ta.filetype.name == 'New'
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_user_creation_external_id_populated(session, db, ministres_list, parl_list):
|
|
assert User.objects.count() == 0
|
|
assert parl_list.members.count() == 0
|
|
|
|
deputy = create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
user = User.objects.get(first_name='John', last_name='Doe', email='john@doe.be')
|
|
assert user.docbowprofile.external_id == deputy.id
|
|
session.close()
|
|
session.bind.engine.dispose()
|
|
|
|
|
|
def test_match_user_on_external_id(session, db, ministres_list, parl_list):
|
|
assert User.objects.count() == 0
|
|
assert parl_list.members.count() == 0
|
|
|
|
deputy = create_deputy(session, 'John', 'Doe', 'john@doe.be', compol_name='foo')
|
|
user = User.objects.create(first_name='foo', last_name='bar', email='foo@bar')
|
|
DocbowProfile.objects.create(user=user, external_id=deputy.id)
|
|
|
|
management.call_command('sync-tabellio', '--verbosity', '2')
|
|
|
|
assert User.objects.count() == 1
|
|
assert parl_list.members.count() == 1
|
|
user = User.objects.get(first_name='foo', last_name='bar', email='foo@bar')
|
|
assert parl_list.members.first() == user
|
|
session.close()
|
|
session.bind.engine.dispose()
|