general: add tracking code object (#656)

This commit is contained in:
Frédéric Péters 2015-01-02 10:26:47 +01:00
parent 0f17fa2172
commit 402e6f65fb
7 changed files with 271 additions and 3 deletions

View File

@ -54,6 +54,7 @@ def setup_module(module):
formdef.store()
sql.do_user_table()
sql.do_tracking_code_table()
conn.close()
@ -667,3 +668,13 @@ def test_sql_criteria_ilike():
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == range(21, 51)
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == range(21, 51)
@postgresql
def test_tracking_code():
from test_tracking_code import generic_tracking_code
generic_tracking_code(sql.TrackingCode)
@postgresql
def test_tracking_code_duplicate():
from test_tracking_code import generic_tracking_code_duplicate
generic_tracking_code_duplicate(sql.TrackingCode)

View File

@ -0,0 +1,79 @@
import shutil
from quixote import cleanup
from wcs.formdef import FormDef
from wcs.tracking_code import TrackingCode
from utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
def generic_tracking_code(klass):
klass.wipe()
code = klass()
code.store()
code = klass()
code.store()
assert klass.count() == 2
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.store()
code = klass.get(code.id)
code.formdef = formdef
code.formdata = formdata
assert code.formdef_id == str(formdef.id)
assert code.formdata_id == str(formdata.id)
code.store()
assert klass.count() == 2
assert klass.get(code.id).formdef_id == code.formdef_id
assert klass.get(code.id).formdata_id == code.formdata_id
assert klass.get(code.id).formdata.tracking_code == code.id
def generic_tracking_code_duplicate(klass):
klass.wipe()
code = klass()
code.store()
code_id = code.id
code = klass()
real_get_new_id = klass.get_new_id
marker = {}
def fake_get_new_id(cls):
if not hasattr(cls, 'cnt'):
cls.cnt = 0
cls.cnt += 1
if cls.cnt < 5:
return code_id
else:
marker['done'] = True
return real_get_new_id(cls)
klass.get_new_id = fake_get_new_id
code.store()
klass.get_new_id = real_get_new_id
assert marker.get('done') # makes sure we got to the real new id code
assert klass.count() == 2
def test_tracking_code():
generic_tracking_code(TrackingCode)
def test_tracking_code_duplicate():
generic_tracking_code_duplicate(TrackingCode)

View File

@ -1119,6 +1119,7 @@ class SettingsDirectory(QommonSettingsDirectory):
try:
sql.get_connection(new=True)
sql.do_user_table()
sql.do_tracking_code_table()
except Exception, e:
postgresql_cfg[str('postgresql')] = postgresql_cfg
form.set_error('database', str(e))

View File

@ -122,6 +122,8 @@ class CmdConvertToSql(Command):
print 'done'.ljust(num_columns-1)
sql.do_tracking_code_table()
if errors:
error_log = file('error.log', 'w')
for formdata, trace in errors:

View File

@ -145,6 +145,7 @@ class FormData(StorableObject):
evolution = None
data = None
editable_by = None
tracking_code = None
workflow_data = None
workflow_roles = None

View File

@ -26,6 +26,7 @@ from qommon import get_cfg
import wcs.categories
import wcs.formdata
import wcs.tracking_code
import wcs.users
SQL_TYPE_MAPPING = {
@ -288,7 +289,7 @@ def do_formdef_tables(formdef, rebuild_views=False):
needed_fields = set(['id', 'user_id', 'user_hash', 'receipt_time',
'status', 'workflow_data', 'id_display', 'fts', 'page_no',
'anonymised', 'workflow_roles', 'workflow_roles_array',
'concerned_roles_array'])
'concerned_roles_array', 'tracking_code'])
# migrations
if not 'fts' in existing_fields:
@ -309,6 +310,9 @@ def do_formdef_tables(formdef, rebuild_views=False):
if not 'anonymised' in existing_fields:
cur.execute('''ALTER TABLE %s ADD COLUMN anonymised timestamptz''' % table_name)
if not 'tracking_code' in existing_fields:
cur.execute('''ALTER TABLE %s ADD COLUMN tracking_code varchar''' % table_name)
# add new fields
for field in formdef.fields:
assert field.id is not None
@ -339,10 +343,15 @@ def do_formdef_tables(formdef, rebuild_views=False):
# them even if not asked to.
redo_views(formdef)
actions = []
if not 'concerned_roles_array' in existing_fields:
return ['rebuild_security']
actions.append('rebuild_security')
if not 'tracking_code' in existing_fields:
# if tracking code has just been added to the table we need to make
# sure the tracking code table does exist.
actions.append('do_tracking_code_table')
return []
return actions
@guard_postgres
@ -404,6 +413,29 @@ def do_user_table():
cur.close()
def do_tracking_code_table():
conn, cur = get_connection_and_cursor()
table_name = 'tracking_codes'
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = %s''', (table_name,))
if cur.fetchone()[0] == 0:
cur.execute('''CREATE TABLE %s (id varchar PRIMARY KEY,
formdef_id varchar,
formdata_id varchar)''' % table_name)
cur.execute('''SELECT column_name FROM information_schema.columns
WHERE table_name = %s''', (table_name,))
existing_fields = set([x[0] for x in cur.fetchall()])
needed_fields = set(['id', 'formdef_id', 'formdata_id'])
# delete obsolete fields
for field in (existing_fields - needed_fields):
cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
conn.commit()
cur.close()
@guard_postgres
def redo_views(formdef):
if get_publisher().get_site_option('postgresql_views') == 'false':
@ -793,6 +825,7 @@ class SqlFormData(SqlMixin, wcs.formdata.FormData):
('workflow_roles', 'bytea'),
('workflow_roles_array', 'text[]'),
('concerned_roles_array', 'text[]'),
('tracking_code', 'varchar'),
]
def __init__(self, id=None):
@ -847,6 +880,7 @@ class SqlFormData(SqlMixin, wcs.formdata.FormData):
'workflow_data': bytearray(cPickle.dumps(self.workflow_data)),
'id_display': self.id_display,
'anonymised': self.anonymised,
'tracking_code': self.tracking_code,
}
if self.receipt_time:
sql_dict['receipt_time'] = datetime.datetime.fromtimestamp(time.mktime(self.receipt_time)),
@ -923,6 +957,8 @@ class SqlFormData(SqlMixin, wcs.formdata.FormData):
evo._sql_id = cur.fetchone()[0]
fts_strings = [str(self.id)]
if self.tracking_code:
fts_strings.append(self.tracking_code)
for field in self._formdef.fields:
if not self.data.get(field.id):
continue
@ -1075,6 +1111,10 @@ class SqlFormData(SqlMixin, wcs.formdata.FormData):
cur.close()
rebuild_security = classmethod(rebuild_security)
def do_tracking_code_table(cls):
do_tracking_code_table()
do_tracking_code_table = classmethod(do_tracking_code_table)
class SqlUser(SqlMixin, wcs.users.User):
_table_name = 'users'
@ -1242,6 +1282,67 @@ class SqlUser(SqlMixin, wcs.users.User):
get_users_with_role = classmethod(get_users_with_role)
class TrackingCode(SqlMixin, wcs.tracking_code.TrackingCode):
_table_name = 'tracking_codes'
_table_static_fields = [
('id', 'varchar'),
('formdef_id', 'varchar'),
('formdata_id', 'varchar'),
]
id = None
@guard_postgres
def store(self):
sql_dict = {
'id': self.id,
'formdef_id': self.formdef_id,
'formdata_id': self.formdata_id
}
conn, cur = get_connection_and_cursor()
if not self.id:
column_names = sql_dict.keys()
sql_dict['id'] = self.get_new_id()
sql_statement = '''INSERT INTO %s (%s)
VALUES (%s)
RETURNING id''' % (
self._table_name,
', '.join(column_names),
', '.join(['%%(%s)s' % x for x in column_names]))
while True:
try:
cur.execute(sql_statement, sql_dict)
except psycopg2.IntegrityError:
conn.rollback()
sql_dict['id'] = self.get_new_id()
else:
break
self.id = cur.fetchone()[0]
else:
column_names = sql_dict.keys()
sql_dict['id'] = self.id
sql_statement = '''UPDATE %s SET %s WHERE id = %%(id)s RETURNING id''' % (
self._table_name,
', '.join(['%s = %%(%s)s' % (x,x) for x in column_names]))
cur.execute(sql_statement, sql_dict)
if cur.fetchone() is None:
raise AssertionError()
conn.commit()
cur.close()
def _row2ob(cls, row):
o = cls()
(o.id, o.formdata_id, o.formdef_id) = tuple(row[:3])
return o
_row2ob = classmethod(_row2ob)
def get_data_fields(cls):
return []
get_data_fields = classmethod(get_data_fields)
@guard_postgres
def get_weekday_totals(period_start=None, period_end=None):
conn, cur = get_connection_and_cursor()

73
wcs/tracking_code.py Normal file
View File

@ -0,0 +1,73 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2015 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import hashlib
import random
import string
from quixote import get_publisher
from qommon.storage import StorableObject
from formdef import FormDef
class TrackingCode(StorableObject):
_names = 'tracking-codes'
id = None
formdef_id = None
formdata_id = None
CHARS = 'BCDFGHJKLMNPQRSTVWXZ'
SIZE = 8
def __init__(self):
# do not call to StorableObject.__init__ as we don't want to have
# self.id set at this point.
pass
@classmethod
def get_new_id(cls, create=False):
r = random.SystemRandom()
return ''.join([r.choice(cls.CHARS) for x in range(cls.SIZE)])
def store(self, async=False):
if self.id is None:
while True:
self.id = self.get_new_id()
if not self.has_key(self.id):
break
StorableObject.store(self, async=async)
@property
def formdef(self):
return FormDef.get(self.formdef_id)
@formdef.setter
def formdef(self, value):
self.formdef_id = str(value.id)
@property
def formdata(self):
return self.formdef.data_class().get(self.formdata_id)
@formdata.setter
def formdata(self, value):
self.formdef = value.formdef
self.formdata_id = str(value.id)
if not self.id:
self.store()
value.tracking_code = self.id
value.store()