tests: remove wcs service from tests (#78935)
gitea/passerelle/pipeline/head This commit looks good Details

This commit is contained in:
Nicolas Roche 2023-06-26 11:51:58 +02:00 committed by Nicolas Roche
parent f498f8f32a
commit 0e07b8fca7
4 changed files with 0 additions and 556 deletions

View File

@ -1,4 +0,0 @@
#!/bin/sh -ue
test -d wcs || git clone https://git.entrouvert.org/entrouvert/wcs.git
(cd wcs && git pull)

View File

@ -1,471 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import configparser
import contextlib
import os
import pickle
import random
import shutil
import socket
import sys
import tempfile
import time
import httmock
import psycopg2
import pytest
def find_free_tcp_port():
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
@contextlib.contextmanager
def postgres_db_factory():
database = 'db%s' % random.getrandbits(20)
with contextlib.closing(psycopg2.connect('')) as conn:
conn.set_isolation_level(0)
with conn.cursor() as cursor:
cursor.execute('CREATE DATABASE %s' % database)
try:
yield PostgresDB(database)
finally:
with contextlib.closing(psycopg2.connect('')) as conn:
conn.set_isolation_level(0)
with conn.cursor() as cursor:
cursor.execute('DROP DATABASE IF EXISTS %s' % database)
class PostgresDB:
def __init__(self, database):
self.database = database
@property
def dsn(self):
return f'dbname={self.database}'
@contextlib.contextmanager
def conn(self):
with contextlib.closing(psycopg2.connect(self.dsn)) as conn:
yield conn
def __repr__(self):
return '<Postgres Database %r>' % self.database
@pytest.fixture
def postgres_db():
with postgres_db_factory() as pg_db:
yield pg_db
class WcsRunInContextError(Exception):
def __init__(self, msg, exception, tb):
self.msg = msg
self.exception = exception
self.tb = tb
super().__init__(msg)
def __str__(self):
return '%s\n%s' % (self.msg, self.tb)
class WcsHost:
def __init__(self, wcs, hostname, database=None):
self.wcs = wcs
self.hostname = hostname
self.app_dir = os.path.join(wcs.app_dir, hostname)
with self.config_pck as config:
config['misc'] = {'charset': 'utf-8'}
config['language'] = {'language': 'en'}
config['branding'] = {'theme': 'django'}
if database:
self.set_postgresql(database)
self.__wcs_init()
@property
def url(self):
return 'http://{self.hostname}:{self.wcs.port}'.format(self=self)
def run_in_context(self, func):
from multiprocessing import Pipe
WCSCTL = os.environ.get('WCSCTL')
pipe_out, pipe_in = Pipe()
pid = os.fork()
if pid:
pid, exit_code = os.waitpid(pid, 0)
try:
if pid and exit_code != 0:
try:
e, formatted_tb = pipe_out.recv()
except EOFError:
e, formatted_tb = None, None
raise WcsRunInContextError('%s failed' % func, e, formatted_tb)
finally:
pipe_out.close()
else:
sys.path.append(os.path.dirname(WCSCTL))
try:
import wcs.publisher
wcs.publisher.WcsPublisher.APP_DIR = self.wcs.app_dir
publisher = wcs.publisher.WcsPublisher.create_publisher(register_tld_names=False)
publisher.app_dir = self.app_dir
publisher.set_config()
func()
except Exception as e:
import traceback
pipe_in.send((e, traceback.format_exc()))
pipe_in.close()
# FIXME: send exception to parent
os._exit(1)
finally:
pipe_in.close()
os._exit(0)
def __wcs_init(self):
for name in sorted(dir(self)):
if not name.startswith('wcs_init_'):
continue
method = getattr(self, name)
if not hasattr(method, '__call__'):
continue
self.run_in_context(method)
@property
@contextlib.contextmanager
def site_options(self):
config = configparser.ConfigParser()
site_options_path = os.path.join(self.app_dir, 'site-options.cfg')
if os.path.exists(site_options_path):
with open(site_options_path) as fd:
config.readfp(fd)
yield config
with open(site_options_path, 'w') as fd:
fd.seek(0)
config.write(fd)
@property
@contextlib.contextmanager
def config_pck(self):
config_pck_path = os.path.join(self.app_dir, 'config.pck')
if os.path.exists(config_pck_path):
with open(config_pck_path, 'rb') as fd:
config = pickle.load(fd)
else:
config = {}
yield config
with open(config_pck_path, 'wb') as fd:
pickle.dump(config, fd)
def add_api_secret(self, orig, secret):
with self.site_options as config:
if not config.has_section('api-secrets'):
config.add_section('api-secrets')
config.set('api-secrets', orig, secret)
def set_postgresql(self, database):
with self.site_options as config:
if not config.has_section('options'):
config.add_section('options')
config.set('options', 'postgresql', 'true')
with self.config_pck as config:
config['postgresql'] = {
'database': database,
}
self.run_in_context(self._wcs_init_sql)
def _wcs_init_sql(self):
from quixote import get_publisher
get_publisher().initialize_sql()
@property
def api(self):
from passerelle.utils import wcs
self.add_api_secret('test', 'test')
return wcs.WcsApi(self.url, name_id='xxx', orig='test', key='test')
@property
def anonym_api(self):
from passerelle.utils import wcs
self.add_api_secret('test', 'test')
return wcs.WcsApi(self.url, orig='test', key='test')
class Wcs:
def __init__(self, app_dir, port, wcs_host_class=None, **kwargs):
self.app_dir = app_dir
self.port = port
self.wcs_host_class = wcs_host_class or WcsHost
self.wcs_host_class_kwargs = kwargs
@contextlib.contextmanager
def host(self, hostname='127.0.0.1', wcs_host_class=None, **kwargs):
wcs_host_class = wcs_host_class or self.wcs_host_class
app_dir = os.path.join(self.app_dir, hostname)
os.mkdir(app_dir)
try:
init_kwargs = self.wcs_host_class_kwargs.copy()
init_kwargs.update(kwargs)
yield wcs_host_class(self, hostname, **init_kwargs)
finally:
shutil.rmtree(app_dir)
@contextlib.contextmanager
def wcs_factory(base_dir, wcs_class=Wcs, **kwargs):
WCSCTL = os.environ.get('WCSCTL')
if not WCSCTL:
raise Exception('WCSCTL is not defined')
tmp_app_dir = tempfile.mkdtemp(dir=base_dir)
wcs_cfg_path = os.path.join(base_dir, 'wcs.cfg')
with open(wcs_cfg_path, 'w') as fd:
fd.write(
'''[main]
app_dir = %s\n'''
% tmp_app_dir
)
local_settings_path = os.path.join(base_dir, 'local_settings.py')
with open(local_settings_path, 'w') as fd:
fd.write(
'''
WCS_LEGACY_CONFIG_FILE = '{base_dir}/wcs.cfg'
THEMES_DIRECTORY = '/'
ALLOWED_HOSTS = ['*']
SILENCED_SYSTEM_CHECKS = ['*']
DEBUG = False
LOGGING = {{
'version': 1,
'disable_existing_loggers': False,
'handlers': {{
'console': {{
'class': 'logging.StreamHandler',
}},
}},
'loggers': {{
'django': {{
'handlers': ['console'],
'level': os.getenv('DJANGO_LOG_LEVEL', 'INFO'),
}},
}},
}}
'''.format(
base_dir=base_dir
)
)
address = '0.0.0.0'
port = find_free_tcp_port()
wcs_pid = os.fork()
try:
# launch a Django worker for running w.c.s.
if not wcs_pid:
os.chdir(os.path.dirname(WCSCTL))
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
os.environ['WCS_SETTINGS_FILE'] = local_settings_path
os.execvp(
'python',
['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (address, port)],
)
os._exit(0)
# verify w.c.s. is launched
s = socket.socket()
i = 0
while True:
i += 1
try:
s.connect((address, port))
except Exception:
time.sleep(0.1)
else:
s.close()
break
assert i < 50, 'no connection found after 5 seconds'
# verify w.c.s. is still running
pid, exit_code = os.waitpid(wcs_pid, os.WNOHANG)
if pid:
raise Exception('w.c.s. stopped with exit-code %s' % exit_code)
yield wcs_class(tmp_app_dir, port=port, **kwargs)
finally:
os.kill(wcs_pid, 9)
shutil.rmtree(tmp_app_dir)
class DefaultWcsHost(WcsHost):
def wcs_init_01_setup_auth(self):
from quixote import get_publisher
get_publisher().cfg['identification'] = {'methods': ['password']}
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
get_publisher().write_cfg()
def wcs_init_02_create_user(self):
from qommon.ident.password_accounts import PasswordAccount
from quixote import get_publisher
pub = get_publisher()
Role = pub.role_class
User = pub.user_class
user = User()
user.name = 'foo bar'
user.email = 'foo@example.net'
user.store()
account = PasswordAccount(id='user')
account.set_password('user')
account.user_id = user.id
account.store()
role = Role(name='role')
role.store()
user = User()
user.name = 'admin'
user.email = 'admin@example.net'
user.name_identifiers = ['xxx']
user.is_admin = True
user.roles = [str(role.id)]
user.store()
account = PasswordAccount(id='admin')
account.set_password('admin')
account.user_id = user.id
account.store()
def wcs_init_03_create_data(self):
import datetime
import random
from quixote import get_publisher
from wcs import fields
from wcs.categories import Category
from wcs.formdef import FormDef
pub = get_publisher()
Role = pub.role_class
User = pub.user_class
cat = Category()
cat.name = 'Catégorie'
cat.description = ''
cat.store()
role = Role.select()[0]
formdef = FormDef()
formdef.name = 'Demande'
formdef.category_id = cat.id
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='1', label='1st field', type='string', anonymise=False, varname='string'),
fields.ItemField(
id='2', label='2nd field', type='item', items=['foo', 'bar', 'baz'], varname='item'
),
fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'),
fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'),
fields.ItemField(
id='5',
label='5th field',
type='item',
varname='item_datasource',
data_source={'type': 'json', 'value': 'http://datasource.com/'},
),
]
formdef.store()
user = User.select()[0]
for i in range(10):
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(
2018, random.randrange(1, 13), random.randrange(1, 29)
).timetuple()
formdata.data = {'1': 'FOO BAR %d' % i}
if i % 4 == 0:
formdata.data['2'] = 'foo'
formdata.data['2_display'] = 'foo'
formdata.data['4'] = 'open_one'
formdata.data['4_display'] = 'open_one'
elif i % 4 == 1:
formdata.data['2'] = 'bar'
formdata.data['2_display'] = 'bar'
formdata.data['4'] = 'open_two'
formdata.data['4_display'] = 'open_two'
else:
formdata.data['2'] = 'baz'
formdata.data['2_display'] = 'baz'
formdata.data['4'] = "open'three"
formdata.data['4_display'] = "open'three"
formdata.data['3'] = bool(i % 2)
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
if i % 7 == 0:
formdata.user_id = user.id
formdata.store()
@pytest.fixture
def datasource():
@httmock.urlmatch(netloc='datasource.com')
def handler(url, request):
return {
'status_code': 200,
'content': {
'err': 0,
'data': [
{'id': '1', 'text': 'hello'},
{'id': '2', 'text': 'world'},
],
},
'content-type': 'application/json',
}
with httmock.HTTMock(handler):
yield
@pytest.fixture(scope='session')
def wcs(tmp_path_factory):
base_dir = tmp_path_factory.mktemp('wcs')
with wcs_factory(str(base_dir), wcs_host_class=DefaultWcsHost) as wcs:
yield wcs
@pytest.fixture
def wcs_host(wcs, postgres_db, datasource):
with wcs.host('127.0.0.1', database=postgres_db.database) as wcs_host:
yield wcs_host

View File

@ -1,77 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from urllib import parse as urlparse
import pytest
import requests
def test_wcs_fixture(wcs_host):
assert wcs_host.url.startswith('http://127.0.0.1:')
requests.get(wcs_host.url)
response = requests.get(urlparse.urljoin(wcs_host.url, '/api/categories/'))
assert response.json()['data'][0]['title'] == 'Catégorie'
def test_wcs_api(wcs_host):
from passerelle.utils.wcs import WcsApiError
api = wcs_host.api
assert len(api.categories) == 1
assert len(api.formdefs) == 1
assert len(api.roles) == 1
formdef = api.formdefs['demande']
assert formdef.schema.fields[4].label == '5th field'
assert len(formdef.formdatas) == 10
assert len(formdef.formdatas.full) == 10
formdata = next(iter(formdef.formdatas))
assert formdata is not formdata.full
assert formdata.full is formdata.full
assert formdata.full.full is formdata.full
assert formdata.full.anonymized is not formdata.full
with formdef.submit() as submitter:
with pytest.raises(ValueError):
submitter.set('zob', '1')
submitter.draft = True
submitter.submission_channel = 'mdel'
submitter.submission_context = {
'mdel_ref': 'ABCD',
}
submitter.set('string', 'hello')
submitter.set('item', 'foo')
submitter.set('item_open', {'id': '1', 'text': 'world', 'foo': 'bar'})
submitter.set(
'item_datasource',
{
'id': '2',
'text': 'world',
},
)
formdata = formdef.formdatas[submitter.result.id]
api = wcs_host.anonym_api
assert len(api.categories) == 1
assert len(api.formdefs) == 1
assert len(api.roles) == 1
formdef = api.formdefs['demande']
assert formdef.schema.fields[4].label == '5th field'
with pytest.raises(WcsApiError):
assert len(formdef.formdatas) == 10
assert len(formdef.formdatas.anonymized) == 10

View File

@ -8,7 +8,6 @@ setenv =
DJANGO_SETTINGS_MODULE=passerelle.settings
PASSERELLE_SETTINGS_FILE=tests/settings.py
RAND_TEST={env:RAND_TEST:}
WCSCTL=wcs/wcsctl.py
SETUPTOOLS_USE_DISTUTILS=stdlib
NUMPROCESSES={env:NUMPROCESSES:1}
fast: FAST=--nomigrations
@ -50,7 +49,6 @@ deps =
cryptography<39
git+https://git.entrouvert.org/publik-django-templatetags.git
commands =
./get_wcs.sh
py.test {posargs: --numprocesses {env:NUMPROCESSES:1} --dist loadfile {env:FAST:} {env:COVERAGE:} {env:JUNIT:} tests/}
codestyle: pre-commit run --all-files --show-diff-on-failure
@ -61,8 +59,6 @@ filterwarnings = default
# pyproj warning about deprecation of '+init=authority:code which comes from using Proj(init='EPSG:4326') instead of Proj('EPSG:4326')
# I tried the new syntax but it broke some opengis tests
module:.*init.*authority.*code.*syntax is deprecated:DeprecationWarning:pyproj
# wcs root directory must be renamed to fix this one
ignore:Not importing directory.*/wcs':ImportWarning
[testenv:pylint]
setenv =