wcs/tests/utilities.py

420 lines
15 KiB
Python

import email.header
import email.parser
import json
import os
import tempfile
import random
import psycopg2
import shutil
import sys
from wcs import sql, sessions, custom_views
from webtest import TestApp
from quixote import cleanup, get_publisher
from django.conf import settings
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urlparse
from wcs.qommon import force_str
import wcs
import wcs.wsgi
from wcs import compat
from wcs.users import User
from wcs.tracking_code import TrackingCode
import wcs.qommon.emails
import wcs.qommon.sms
from wcs.qommon.errors import ConnectionError
import wcs.middleware
wcs.middleware.AfterJobsMiddleware.ASYNC = False
class KnownElements(object):
pickle_app_dir = None
sql_app_dir = None
sql_db_name = None
templates_app_dir = None
lazy_app_dir = None
known_elements = KnownElements()
def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
if get_publisher():
get_publisher().cleanup()
cleanup()
if templates_mode and known_elements.templates_app_dir:
APP_DIR = known_elements.templates_app_dir
elif lazy_mode and known_elements.lazy_app_dir:
APP_DIR = known_elements.lazy_app_dir
elif sql_mode and known_elements.sql_app_dir:
APP_DIR = known_elements.sql_app_dir
elif not (templates_mode or lazy_mode or sql_mode) and known_elements.pickle_app_dir:
APP_DIR = known_elements.pickle_app_dir
else:
APP_DIR = tempfile.mkdtemp()
if templates_mode:
known_elements.templates_app_dir = APP_DIR
elif lazy_mode:
known_elements.lazy_app_dir = APP_DIR
elif sql_mode:
known_elements.sql_app_dir = APP_DIR
else:
known_elements.pickle_app_dir = APP_DIR
compat.CompatWcsPublisher.APP_DIR = APP_DIR
compat.CompatWcsPublisher.DATA_DIR = os.path.abspath(
os.path.join(os.path.dirname(wcs.__file__), '..', 'data'))
compat.CompatWcsPublisher.cronjobs = None
pub = compat.CompatWcsPublisher.create_publisher()
# allow saving the user
pub.app_dir = os.path.join(APP_DIR, 'example.net')
pub.site_charset = 'utf-8'
if sql_mode:
pub.user_class = sql.SqlUser
pub.tracking_code_class = sql.TrackingCode
pub.session_class = sql.Session
pub.custom_view_class = sql.CustomView
pub.snapshot_class = sql.Snapshot
pub.loggederror_class = sql.LoggedError
pub.is_using_postgresql = lambda: True
else:
pub.user_class = User
pub.tracking_code_class = TrackingCode
pub.session_class = sessions.BasicSession
pub.custom_view_class = custom_views.CustomView
pub.is_using_postgresql = lambda: False
pub.session_manager_class = sessions.StorageSessionManager
pub.session_manager = pub.session_manager_class(session_class=pub.session_class)
if os.path.exists(os.path.join(pub.APP_DIR, 'scripts')):
shutil.rmtree(os.path.join(pub.APP_DIR, 'scripts'))
if os.path.exists(os.path.join(pub.app_dir, 'scripts')):
shutil.rmtree(os.path.join(pub.app_dir, 'scripts'))
created = False
if not os.path.exists(pub.app_dir):
os.mkdir(pub.app_dir)
created = True
# always reset site-options.cfg
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write('[wscall-secrets]\n')
fd.write('idp.example.net = BAR\n')
fd.write('\n')
fd.write('[options]\n')
fd.write('formdef-captcha-option = true\n')
fd.write('formdef-appearance-keywords = true\n')
fd.write('workflow-resubmit-action = true\n')
if lazy_mode:
fd.write('force-lazy-mode = true\n')
if sql_mode:
fd.write('postgresql = true\n')
fd.close()
# make sure site options are not cached
pub.site_options = None
pub.cfg = {}
pub.cfg['misc'] = {
'charset': 'utf-8',
'backoffice-url': 'http://example.net/backoffice',
'frontoffice-url': 'http://example.net',
}
pub.cfg['language'] = {'language': 'en'}
if templates_mode:
pub.cfg['branding'] = {'theme': 'django'}
else:
pub.cfg['branding'] = {'theme': 'default'}
pub.write_cfg()
if not created:
if sql_mode:
pub.cfg['postgresql'] = {'database': known_elements.sql_db_name, 'user': os.environ['USER']}
pub.write_cfg()
return pub
os.symlink(os.path.join(os.path.dirname(__file__), 'templates'),
os.path.join(pub.app_dir, 'templates'))
if sql_mode:
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
i = 0
while True:
dbname = 'wcstests%d' % random.randint(0, 100000)
known_elements.sql_db_name = dbname
try:
cur = conn.cursor()
cur.execute('CREATE DATABASE %s' % dbname)
break
except psycopg2.Error as e:
if i < 5:
i += 1
continue
raise
finally:
cur.close()
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
pub.write_cfg()
sql.do_user_table()
sql.do_tracking_code_table()
sql.do_session_table()
sql.do_custom_views_table()
sql.do_snapshots_table()
sql.do_loggederrors_table()
sql.do_meta_table()
conn.close()
return pub
def clean_temporary_pub():
if get_publisher():
get_publisher().cleanup()
if known_elements.templates_app_dir and os.path.exists(known_elements.templates_app_dir):
shutil.rmtree(known_elements.templates_app_dir)
known_elements.templates_app_dir = None
if known_elements.pickle_app_dir and os.path.exists(known_elements.pickle_app_dir):
shutil.rmtree(known_elements.pickle_app_dir)
known_elements.pickle_app_dir = None
if known_elements.sql_app_dir and os.path.exists(known_elements.sql_app_dir):
shutil.rmtree(known_elements.sql_app_dir)
known_elements.sql_app_dir = None
if known_elements.sql_db_name:
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
try:
cur = conn.cursor()
cur.execute('DROP DATABASE %s' % known_elements.sql_db_name)
cur.close()
except psycopg2.Error as e:
print(e)
known_elements.sql_db_name = None
def force_connections_close():
# workaround for threading issue
# https://code.djangoproject.com/ticket/22420
try:
if known_elements.sql_db_name:
conn = psycopg2.connect(user=os.environ['USER'])
cur = conn.cursor()
cur.execute('''SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = %s
AND pid <> pg_backend_pid();''', (known_elements.sql_db_name,))
cur.close()
except psycopg2.ProgrammingError:
pass
def get_app(pub, https=False):
extra_environ = {'HTTP_HOST': 'example.net', 'REMOTE_ADDR': '127.0.0.1'}
if https:
settings.SECURE_PROXY_SSL_HEADER = ('HTTPS', 'on')
extra_environ['HTTPS'] = 'on'
else:
extra_environ['HTTPS'] = 'off'
return TestApp(wcs.wsgi.application, extra_environ=extra_environ)
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms['login-form']
login_form['username'] = username
login_form['password'] = password
resp = login_form.submit()
assert resp.status_int == 302
return app
class EmailsMocking(object):
def create_smtp_server(self, *args, **kwargs):
class MockSmtplibSMTP(object):
def __init__(self, emails):
self.emails = emails
def send_message(self, msg, msg_from, rcpts):
return self.sendmail(msg_from, rcpts, msg.as_string())
def sendmail(self, msg_from, rcpts, msg):
msg = email.parser.Parser().parsestr(msg)
subject = email.header.decode_header(msg['Subject'])[0][0]
if msg.is_multipart():
payloads = [x.get_payload(decode=True) for x in msg.get_payload()]
payload = payloads[0]
else:
payload = msg.get_payload(decode=True)
payloads = [payload]
self.emails[force_text(subject)] = {
'from': msg_from,
'to': email.header.decode_header(msg['To'])[0][0],
'payload': force_str(payload if payload else ''),
'payloads': payloads,
'msg': msg,
}
self.emails[force_text(subject)]['email_rcpt'] = rcpts
def quit(self):
pass
return MockSmtplibSMTP(self.emails)
def get(self, subject):
return self.emails.get(subject)
def empty(self):
self.emails.clear()
def count(self):
return len(self.emails)
def __enter__(self):
self.wcs_create_smtp_server = sys.modules['wcs.qommon.emails'].create_smtp_server
sys.modules['wcs.qommon.emails'].create_smtp_server = self.create_smtp_server
self.emails = {}
return self
def __exit__(self, exc_type, exc_value, tb):
del self.emails
sys.modules['wcs.qommon.emails'].create_smtp_server = self.wcs_create_smtp_server
class MockSubstitutionVariables(object):
def get_substitution_variables(self):
return {'bar': 'Foobar', 'foo': '1 < 3', 'email': 'sub@localhost',
'empty': ''}
class HttpRequestsMocking(object):
def __init__(self):
self.requests = []
def __enter__(self):
import wcs.qommon.misc
self.wcs_qommon_misc_http_request = wcs.qommon.misc._http_request
wcs.qommon.misc._http_request = self.http_request
return self
def __exit__(self, exc_type, exc_value, tb):
import wcs.qommon.misc
wcs.qommon.misc._http_request = self.wcs_qommon_misc_http_request
del self.wcs_qommon_misc_http_request
def http_request(self, url, method='GET', body=None, headers={},
cert_file=None, timeout=None, raise_on_http_errors=False):
self.requests.append(
{'url': url,
'method': method,
'body': body,
'headers': headers,
'timeout': timeout})
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
base_url = urlparse.urlunparse((scheme, netloc, path, '', '', ''))
with open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')) as fd:
metadata = fd.read()
geojson = {
'features': [
{'properties': {'id': '1', 'text': 'foo'},
'geometry': {'type': 'Point', 'coordinates': [1, 2]}
},
{'properties': {'id': '2', 'text': 'bar'},
'geometry': {'type': 'Point', 'coordinates': [3, 4]}
}
]
}
status, data, headers = {
'http://remote.example.net/204': (204, None, None),
'http://remote.example.net/400': (400, 'bad request', None),
'http://remote.example.net/400-json': (400, '{"err": 1, "err_desc": ":("}', None),
'http://remote.example.net/404': (404, 'page not found', None),
'http://remote.example.net/404-json': (404, '{"err": 1}', None),
'http://remote.example.net/500': (500, 'internal server error', None),
'http://remote.example.net/json': (200, '{"foo": "bar"}', None),
'http://remote.example.net/json-list': (200, '{"data": [{"id": "a", "text": "b"}]}', None),
'http://remote.example.net/json-list-extra': (200, '{"data": [{"id": "a", "text": "b", "foo": "bar"}]}', None),
'http://remote.example.net/geojson': (200, json.dumps(geojson), None),
'http://remote.example.net/json-err0': (200, '{"data": "foo", "err": 0}', None),
'http://remote.example.net/json-err1': (200, '{"data": "", "err": 1}', None),
'http://remote.example.net/json-list-err1': (200, '{"data": [{"id": "a", "text": "b"}], "err": 1}', None),
'http://remote.example.net/json-errstr': (200, '{"data": "", "err": "bug"}', None),
'http://remote.example.net/json-errheader0': (200, '{"foo": "bar"}',
{'x-error-code': '0'}),
'http://remote.example.net/json-errheader1': (200, '{"foo": "bar"}',
{'x-error-code': '1'}),
'http://remote.example.net/json-errheaderstr': (200, '{"foo": "bar"}',
{'x-error-code': 'bug'}),
'http://remote.example.net/xml': (200, '<?xml version="1.0"><foo/>',
{'content-type': 'text/xml'}),
'http://remote.example.net/xml-errheader': (200, '<?xml version="1.0"><foo/>',
{'content-type': 'text/xml', 'x-error-code': '1'}),
'http://remote.example.net/connection-error': (None, None, None),
'http://authentic.example.net/idp/saml2/metadata': (200, metadata, None),
}.get(base_url, (200, '', {}))
if url.startswith('file://'):
try:
status, data = 200, open(url[7:]).read()
except IOError:
status = 404
data = force_bytes(data)
class FakeResponse(object):
def __init__(self, status, data, headers):
self.status_code = status
self.reason = 'whatever'
self.data = data
self.headers = headers or {}
self.length = len(data or '')
if status is None:
raise ConnectionError('error')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return FakeResponse(status, data, headers), status, data, None
def get_last(self, attribute):
return self.requests[-1][attribute]
def empty(self):
self.requests = []
def count(self):
return len(self.requests)
class SMSMocking(wcs.qommon.sms.PasserelleSMS):
def get_sms_class(self):
sms_cfg = get_publisher().cfg.get('sms', {})
if sms_cfg.get('sender') and sms_cfg.get('passerelle_url'):
return self
return None
def send(self, sender, destinations, text):
self.sms.append({'sender': sender, 'destinations': destinations, 'text': text})
def __enter__(self):
self.sms = []
self.wcs_get_sms_class = wcs.qommon.sms.SMS.get_sms_class
wcs.qommon.sms.SMS.get_sms_class = self.get_sms_class
return self
def __exit__(self, exc_type, exc_value, tb):
del self.sms
wcs.qommon.sms.SMS.get_sms_class = self.wcs_get_sms_class