wcs/tests/utilities.py

409 lines
14 KiB
Python

import cPickle
import email.header
import email.parser
import os
import tempfile
import random
import psycopg2
import pytest
import shutil
import sys
import threading
import urlparse
from wcs import sql, sessions
from webtest import TestApp
from quixote import cleanup, get_publisher
from django.conf import settings
import wcs
import wcs.wsgi
from wcs import publisher, compat
from wcs.qommon.http_request import HTTPRequest
from wcs.users import User
from wcs.tracking_code import TrackingCode
import wcs.qommon.emails
import wcs.qommon.sms
import qommon.sms
from qommon.errors import ConnectionError
import wcs.middleware
wcs.middleware.AfterJobsMiddleware.ASYNC = False
class KnownElements(object):
pickle_app_dir = None
sql_app_dir = None
sql_db_name = None
templates_app_dir = None
lazy_app_dir = None
known_elements = KnownElements()
def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
if sql_mode is True:
if pytest.config.getoption('without_postgresql_tests'):
pytest.skip("unsupported configuration")
return
if get_publisher():
get_publisher().cleanup()
cleanup()
if templates_mode and known_elements.templates_app_dir:
APP_DIR = known_elements.templates_app_dir
elif lazy_mode and known_elements.lazy_app_dir:
APP_DIR = known_elements.lazy_app_dir
elif sql_mode and known_elements.sql_app_dir:
APP_DIR = known_elements.sql_app_dir
elif not (templates_mode or lazy_mode or sql_mode) and known_elements.pickle_app_dir:
APP_DIR = known_elements.pickle_app_dir
else:
APP_DIR = tempfile.mkdtemp()
if templates_mode:
known_elements.templates_app_dir = APP_DIR
elif lazy_mode:
known_elements.lazy_app_dir = APP_DIR
elif sql_mode:
known_elements.sql_app_dir = APP_DIR
else:
known_elements.pickle_app_dir = APP_DIR
compat.CompatWcsPublisher.APP_DIR = APP_DIR
compat.CompatWcsPublisher.DATA_DIR = os.path.abspath(
os.path.join(os.path.dirname(wcs.__file__), '..', 'data'))
compat.CompatWcsPublisher.cronjobs = None
pub = compat.CompatWcsPublisher.create_publisher()
# allow saving the user
pub.app_dir = os.path.join(APP_DIR, 'example.net')
pub.site_charset = 'utf-8'
if sql_mode:
pub.user_class = sql.SqlUser
pub.tracking_code_class = sql.TrackingCode
pub.session_class = sql.Session
pub.is_using_postgresql = lambda: True
else:
pub.user_class = User
pub.tracking_code_class = TrackingCode
pub.session_class = sessions.BasicSession
pub.is_using_postgresql = lambda: False
pub.session_manager_class = sessions.StorageSessionManager
pub.session_manager = pub.session_manager_class(session_class=pub.session_class)
if os.path.exists(os.path.join(pub.APP_DIR, 'scripts')):
shutil.rmtree(os.path.join(pub.APP_DIR, 'scripts'))
if os.path.exists(os.path.join(pub.app_dir, 'scripts')):
shutil.rmtree(os.path.join(pub.app_dir, 'scripts'))
created = False
if not os.path.exists(pub.app_dir):
os.mkdir(pub.app_dir)
created = True
# always reset site-options.cfg
fd = file(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write('[wscall-secrets]\n')
fd.write('idp.example.net = BAR\n')
fd.write('\n')
fd.write('[options]\n')
fd.write('formdef-captcha-option = true\n')
fd.write('formdef-appearance-keywords = true\n')
fd.write('workflow-resubmit-action = true\n')
if lazy_mode:
fd.write('force-lazy-mode = true\n')
if sql_mode:
fd.write('postgresql = true\n')
# make sure site options are not cached
pub.site_options = None
pub.cfg = {}
pub.cfg['misc'] = {
'charset': 'utf-8',
'backoffice-url': 'http://example.net/backoffice',
'frontoffice-url': 'http://example.net',
}
pub.cfg['language'] = {'language': 'en'}
if templates_mode:
pub.cfg['branding'] = {'theme': 'django'}
else:
pub.cfg['branding'] = {'theme': 'default'}
pub.write_cfg()
if not created:
if sql_mode:
pub.cfg['postgresql'] = {'database': known_elements.sql_db_name, 'user': os.environ['USER']}
pub.write_cfg()
return pub
os.symlink(os.path.join(os.path.dirname(__file__), 'templates'),
os.path.join(pub.app_dir, 'templates'))
if sql_mode:
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
dbname = 'wcstests%d' % random.randint(0, 100000)
known_elements.sql_db_name = dbname
cur.execute('CREATE DATABASE %s' % dbname)
cur.close()
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
pub.write_cfg()
sql.do_user_table()
sql.do_tracking_code_table()
sql.do_session_table()
sql.do_meta_table()
conn.close()
fd.close()
return pub
def clean_temporary_pub():
if get_publisher():
get_publisher().cleanup()
if known_elements.templates_app_dir and os.path.exists(known_elements.templates_app_dir):
shutil.rmtree(known_elements.templates_app_dir)
known_elements.templates_app_dir = None
if known_elements.pickle_app_dir and os.path.exists(known_elements.pickle_app_dir):
shutil.rmtree(known_elements.pickle_app_dir)
known_elements.pickle_app_dir = None
if known_elements.sql_app_dir and os.path.exists(known_elements.sql_app_dir):
shutil.rmtree(known_elements.sql_app_dir)
known_elements.sql_app_dir = None
if known_elements.sql_db_name:
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
try:
cur = conn.cursor()
cur.execute('DROP DATABASE %s' % known_elements.sql_db_name)
cur.close()
except psycopg2.Error as e:
print(e)
known_elements.sql_db_name = None
def force_connections_close():
# workaround for threading issue
# https://code.djangoproject.com/ticket/22420
try:
if known_elements.sql_db_name:
conn = psycopg2.connect(user=os.environ['USER'])
cur = conn.cursor()
cur.execute('''SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = %s
AND pid <> pg_backend_pid();''', (known_elements.sql_db_name,))
cur.close()
except psycopg2.ProgrammingError:
pass
def get_app(pub, https=False):
extra_environ = {'HTTP_HOST': 'example.net', 'REMOTE_ADDR': '127.0.0.1'}
if https:
settings.SECURE_PROXY_SSL_HEADER = ('HTTPS', 'on')
extra_environ['HTTPS'] = 'on'
else:
extra_environ['HTTPS'] = 'off'
return TestApp(wcs.wsgi.application, extra_environ=extra_environ)
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms['login-form']
login_form['username'] = username
login_form['password'] = password
resp = login_form.submit()
assert resp.status_int == 302
return app
class EmailsMocking(object):
def create_smtp_server(self, *args, **kwargs):
class MockSmtplibSMTP(object):
def __init__(self, emails):
self.emails = emails
def sendmail(self, msg_from, rcpts, msg):
msg = email.parser.Parser().parsestr(msg)
subject = email.header.decode_header(msg['Subject'])[0][0]
if msg.is_multipart():
payloads = [x.get_payload(decode=True) for x in msg.get_payload()]
payload = payloads[0]
else:
payload = msg.get_payload(decode=True)
payloads = [payload]
self.emails[subject] = {
'from': msg_from,
'to': email.header.decode_header(msg['To'])[0][0],
'payload': payload,
'payloads': payloads,
'msg': msg,
}
self.emails[subject]['email_rcpt'] = rcpts
def close(self):
pass
return MockSmtplibSMTP(self.emails)
def get(self, subject):
return self.emails.get(subject)
def empty(self):
self.emails.clear()
def count(self):
return len(self.emails)
def __enter__(self):
self.wcs_create_smtp_server = sys.modules['wcs.qommon.emails'].create_smtp_server
self.qommon_create_smtp_server = sys.modules['qommon.emails'].create_smtp_server
sys.modules['wcs.qommon.emails'].create_smtp_server = self.create_smtp_server
sys.modules['qommon.emails'].create_smtp_server = self.create_smtp_server
self.emails = {}
return self
def __exit__(self, exc_type, exc_value, tb):
del self.emails
sys.modules['wcs.qommon.emails'].create_smtp_server = self.wcs_create_smtp_server
sys.modules['qommon.emails'].create_smtp_server = self.qommon_create_smtp_server
class MockSubstitutionVariables(object):
def get_substitution_variables(self):
return {'bar': 'Foobar', 'foo': '1 < 3', 'email': 'sub@localhost',
'empty': ''}
class HttpRequestsMocking(object):
def __init__(self):
self.requests = []
def __enter__(self):
import wcs.qommon.misc
import qommon.misc
self.wcs_qommon_misc_http_request = wcs.qommon.misc._http_request
self.qommon_misc_http_request = qommon.misc._http_request
wcs.qommon.misc._http_request = self.http_request
qommon.misc._http_request = self.http_request
return self
def __exit__(self, exc_type, exc_value, tb):
import wcs.qommon.misc
import qommon.misc
wcs.qommon.misc._http_request = self.wcs_qommon_misc_http_request
qommon.misc._http_request = self.qommon_misc_http_request
del self.wcs_qommon_misc_http_request
del self.qommon_misc_http_request
def http_request(self, url, method='GET', body=None, headers={},
cert_file=None, timeout=None, raise_on_http_errors=False):
self.requests.append(
{'url': url,
'method': method,
'body': body,
'headers': headers,
'timeout': timeout})
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
base_url = urlparse.urlunparse((scheme, netloc, path, '', '', ''))
status, data, headers = {
'http://remote.example.net/204': (204, None, None),
'http://remote.example.net/400-json': (400, '{"err": 1, "err_desc": ":("}', None),
'http://remote.example.net/404': (404, 'page not found', None),
'http://remote.example.net/404-json': (404, '{"err": 1}', None),
'http://remote.example.net/500': (500, 'internal server error', None),
'http://remote.example.net/json': (200, '{"foo": "bar"}', None),
'http://remote.example.net/json-list': (200, '{"data": [{"id": "a", "text": "b"}]}', None),
'http://remote.example.net/json-err0': (200, '{"data": "foo", "err": 0}', None),
'http://remote.example.net/json-err1': (200, '{"data": "", "err": 1}', None),
'http://remote.example.net/json-errstr': (200, '{"data": "", "err": "bug"}', None),
'http://remote.example.net/json-errheader0': (200, '{"foo": "bar"}',
{'x-error-code': '0'}),
'http://remote.example.net/json-errheader1': (200, '{"foo": "bar"}',
{'x-error-code': '1'}),
'http://remote.example.net/json-errheaderstr': (200, '{"foo": "bar"}',
{'x-error-code': 'bug'}),
'http://remote.example.net/xml': (200, '<?xml version="1.0"><foo/>',
{'content-type': 'text/xml'}),
'http://remote.example.net/xml-errheader': (200, '<?xml version="1.0"><foo/>',
{'content-type': 'text/xml', 'x-error-code': '1'}),
'http://remote.example.net/connection-error': (None, None, None),
'http://authentic.example.net/idp/saml2/metadata': (
200, open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')).read(), None),
}.get(base_url, (200, '', {}))
if url.startswith('file://'):
try:
status, data = 200, open(url[7:]).read()
except IOError:
status = 404
class FakeResponse(object):
def __init__(self, status, data, headers):
self.status_code = status
self.reason = 'whatever'
self.data = data
self.headers = headers or {}
self.length = len(data or '')
if status is None:
raise ConnectionError('error')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return FakeResponse(status, data, headers), status, data, None
def get_last(self, attribute):
return self.requests[-1][attribute]
def empty(self):
self.requests = []
def count(self):
return len(self.requests)
class SMSMocking(wcs.qommon.sms.MobytSMS):
def get_sms_class(self, mode):
if mode == 'none':
return None
return self
def send(self, sender, destinations, text, quality=None):
self.sms.append({'sender': sender, 'destinations': destinations, 'text': text})
def get_sms_left(self, type='standard'):
raise NotImplementedError
def get_money_left(self):
raise NotImplementedError
def __enter__(self):
self.sms = []
self.wcs_get_sms_class = wcs.qommon.sms.SMS.get_sms_class
self.qommon_get_sms_class = qommon.sms.SMS.get_sms_class
wcs.qommon.sms.SMS.get_sms_class = self.get_sms_class
qommon.sms.SMS.get_sms_class = self.get_sms_class
return self
def __exit__(self, exc_type, exc_value, tb):
del self.sms
wcs.qommon.sms.SMS.get_sms_class = self.wcs_get_sms_class
qommon.sms.SMS.get_sms_class = self.qommon_get_sms_class