wcs/tests/utilities.py

262 lines
8.2 KiB
Python

import cPickle
import email.header
import email.parser
import os
import tempfile
import random
import psycopg2
import pytest
import shutil
import threading
from wcs import sql
from webtest import TestApp
from quixote import cleanup, get_publisher
import wcs
from wcs import publisher
from wcs.qommon.http_request import HTTPRequest
from wcs.users import User
from wcs.tracking_code import TrackingCode
import wcs.qommon.sms
import qommon.sms
class QWIP:
# copy of quixote original QWIP code, adapted to use our own HTTPRequest
# object and to process after jobs.
request_class = HTTPRequest
def __init__(self, publisher):
self.publisher = publisher
def __call__(self, env, start_response):
"""I am called for each request."""
if 'REQUEST_URI' not in env:
env['REQUEST_URI'] = env['SCRIPT_NAME'] + env['PATH_INFO']
input = env['wsgi.input']
request = self.request_class(input, env)
response = self.publisher.process_request(request)
status = "%03d %s" % (response.status_code, response.reason_phrase)
headers = response.generate_headers()
start_response(status, headers)
result = list(response.generate_body_chunks()) # Iterable object.
response.process_after_jobs()
return result
class KnownElements(object):
pickle_app_dir = None
sql_app_dir = None
sql_db_name = None
known_elements = KnownElements()
def create_temporary_pub(sql_mode=False):
if sql_mode is True:
if pytest.config.getoption('without_postgresql_tests'):
pytest.skip("unsupported configuration")
return
if get_publisher():
get_publisher().cleanup()
cleanup()
if sql_mode is False and known_elements.pickle_app_dir:
APP_DIR = known_elements.pickle_app_dir
elif sql_mode is True and known_elements.sql_app_dir:
APP_DIR = known_elements.sql_app_dir
else:
APP_DIR = tempfile.mkdtemp()
if sql_mode is True:
known_elements.sql_app_dir = APP_DIR
elif sql_mode is False:
known_elements.pickle_app_dir = APP_DIR
publisher.WcsPublisher.APP_DIR = APP_DIR
publisher.WcsPublisher.DATA_DIR = os.path.abspath(
os.path.join(os.path.dirname(wcs.__file__), '..', 'data'))
pub = publisher.WcsPublisher.create_publisher()
# allow saving the user
pub.app_dir = os.path.join(APP_DIR, 'example.net')
if sql_mode:
pub.user_class = sql.SqlUser
pub.tracking_code_class = sql.TrackingCode
pub.is_using_postgresql = lambda: True
else:
pub.user_class = User
pub.tracking_code_class = TrackingCode
pub.is_using_postgresql = lambda: False
if os.path.exists(pub.app_dir):
pub.cfg = {}
if sql_mode:
pub.cfg['postgresql'] = {'database': known_elements.sql_db_name, 'user': os.environ['USER']}
pub.write_cfg()
return pub
os.mkdir(pub.app_dir)
if sql_mode:
fd = file(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write('[options]\npostgresql = true\n')
fd.close()
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
dbname = 'wcstests%d' % random.randint(0, 100000)
known_elements.sql_db_name = dbname
cur.execute('CREATE DATABASE %s' % dbname)
cur.close()
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
pub.write_cfg()
sql.do_user_table()
sql.do_tracking_code_table()
conn.close()
return pub
def clean_temporary_pub():
if get_publisher():
get_publisher().cleanup()
if known_elements.pickle_app_dir:
shutil.rmtree(known_elements.pickle_app_dir)
known_elements.pickle_app_dir = None
if known_elements.sql_app_dir:
shutil.rmtree(known_elements.sql_app_dir)
known_elements.sql_app_dir = None
if known_elements.sql_db_name:
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
try:
cur = conn.cursor()
cur.execute('DROP DATABASE %s' % known_elements.sql_db_name)
cur.close()
except psycopg2.Error:
pass
known_elements.sql_db_name = None
def get_app(pub):
return TestApp(QWIP(pub), extra_environ={
'HTTP_HOST': 'example.net', 'REMOTE_ADDR': '127.0.0.1'})
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms['login-form']
login_form['username'] = username
login_form['password'] = password
resp = login_form.submit()
assert resp.status_int == 302
return app
class EmailsMocking(object):
def __init__(self):
self.emails = {}
import wcs.qommon.emails
import qommon.emails
wcs.qommon.emails.create_smtp_server = self.create_smtp_server
qommon.emails.create_smtp_server = self.create_smtp_server
def create_smtp_server(self, *args, **kwargs):
class MockSmtplibSMTP(object):
def __init__(self, emails):
self.emails = emails
def sendmail(self, msg_from, rcpts, msg):
msg = email.parser.Parser().parsestr(msg)
subject = email.header.decode_header(msg['Subject'])[0][0]
if msg.is_multipart():
payload = msg.get_payload()[0].get_payload(decode=True)
else:
payload = msg.get_payload(decode=True)
self.emails[subject] = {
'from': msg_from,
'to': email.header.decode_header(msg['To'])[0][0],
'payload': payload,
}
self.emails[subject]['email_rcpt'] = rcpts
def close(self):
pass
return MockSmtplibSMTP(self.emails)
def get(self, subject):
return self.emails.get(subject)
def empty(self):
self.emails.clear()
def count(self):
return len(self.emails)
emails = EmailsMocking()
class MockSubstitutionVariables(object):
def get_substitution_variables(self):
return {'bar': 'Foobar', 'foo': '1 < 3', 'email': 'sub@localhost',
'empty': ''}
class HttpRequestsMocking(object):
def __init__(self):
self.requests = []
import wcs.qommon.misc
import qommon.misc
wcs.qommon.misc._http_request = self.http_request
qommon.misc._http_request = self.http_request
def http_request(self, url, method='GET', body=None, headers={}, timeout=None):
self.requests.append(
{'url': url,
'method': method,
'body': body,
'headers': headers,
'timeout': timeout})
status, data = {
'http://remote.example.net/404': (404, 'page not found'),
'http://remote.example.net/500': (500, 'internal server error'),
'http://remote.example.net/json': (200, '{"foo": "bar"}'),
'http://remote.example.net/xml': (200, '<?xml version="1.0"><foo/>'),
}.get(url, (200, ''))
class FakeResponse(object):
def __init__(self, status, data):
self.status = status
self.reason = 'whatever'
self.data = data
return FakeResponse(status, data), status, data, None
def get_last(self, attribute):
return self.requests[-1][attribute]
def empty(self):
self.requests = []
http_requests = HttpRequestsMocking()
class SMSMocking(wcs.qommon.sms.MobytSMS):
def __init__(self):
wcs.qommon.sms.SMS.get_sms_class = classmethod(lambda x, y: self)
qommon.sms.SMS.get_sms_class = classmethod(lambda x, y: self)
self.sms = []
def send(self, sender, destinations, text, quality=None):
self.sms.append({'sender': sender, 'destinations': destinations,
'text': text})
def empty(self):
self.sms = []
def get_sms_left(self, type='standard'):
raise NotImplementedError
def get_money_left(self):
raise NotImplementedError
sms_mocking = SMSMocking()