Add support for storing form data in a PostgreSQL database
This commit is contained in:
parent
9cfdec8603
commit
598fcbd020
|
@ -210,7 +210,7 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
'template', 'misc', 'emails', 'debug_options', 'language',
|
||||
('import', 'p_import'), 'export', 'identification', 'sitename',
|
||||
'sms', 'certificates', 'texts', 'utf8switch', 'upload_theme',
|
||||
'session', 'download_theme', 'smstest']
|
||||
'session', 'download_theme', 'smstest', 'postgresql']
|
||||
|
||||
certificates = CertificatesDirectory()
|
||||
emails = EmailsDirectory()
|
||||
|
@ -223,6 +223,14 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
|
||||
'<div class="splitcontent-left">'
|
||||
|
||||
if get_publisher().has_site_option('postgresql'):
|
||||
'<div class="bo-block">'
|
||||
'<h2>%s</h2>' % _('Storage')
|
||||
'<dl> <dt><a href="postgresql">%s</a></dt> <dd>%s</dd> </dl>' % (
|
||||
_('PostgreSQL Settings'),
|
||||
_('Configure access to PostgreSQL database'))
|
||||
'</div>'
|
||||
|
||||
'<div class="bo-block">'
|
||||
'<h2>%s</h2>' % _('Security')
|
||||
|
||||
|
@ -856,3 +864,42 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
|
||||
redirect('.')
|
||||
|
||||
def postgresql [html] (self):
|
||||
if not get_publisher().has_site_option('postgresql'):
|
||||
raise errors.TraversalError()
|
||||
postgresql_cfg = get_cfg('postgresql', {})
|
||||
form = Form(enctype='multipart/form-data')
|
||||
form.add(StringWidget, 'dbname',
|
||||
title=_('Database Name'), required=True,
|
||||
value=postgresql_cfg.get('dbname'))
|
||||
form.add(StringWidget, 'user',
|
||||
title=_('User'), required=True,
|
||||
value=postgresql_cfg.get('user'),
|
||||
hint=_('User name used to authenticate'))
|
||||
form.add(PasswordWidget, 'password',
|
||||
title=_('Password'), required=True,
|
||||
value=postgresql_cfg.get('password'),
|
||||
hint=_('Password used to authenticate'))
|
||||
form.add(StringWidget, 'host',
|
||||
title=_('Host'), required=True,
|
||||
value=postgresql_cfg.get('host', 'localhost'),
|
||||
hint=_('Database host address'))
|
||||
form.add(IntWidget, 'port',
|
||||
title=_('Port'), required=True,
|
||||
value=int(postgresql_cfg.get('port', 5432)),
|
||||
hint=_('Connection port number'))
|
||||
form.add_submit('submit', _('Submit'))
|
||||
form.add_submit('cancel', _('Cancel'))
|
||||
|
||||
if form.get_widget('cancel').parse():
|
||||
return redirect('.')
|
||||
|
||||
if not form.is_submitted() or form.has_errors():
|
||||
get_response().breadcrumb.append(('postgresql', _('PostgreSQL Settings')))
|
||||
html_top('settings', title=_('PostgreSQL Settings'))
|
||||
'<h2>%s</h2>' % _('PostgreSQL Settings')
|
||||
form.render()
|
||||
else:
|
||||
cfg_submit(form, 'postgresql', ['dbname', 'user', 'password',
|
||||
'host', 'port'])
|
||||
redirect('.')
|
||||
|
|
|
@ -48,6 +48,7 @@ from categories import Category
|
|||
from wcs.workflows import Workflow, CommentableWorkflowStatusItem, \
|
||||
SendmailWorkflowStatusItem, ChoiceWorkflowStatusItem
|
||||
import fields
|
||||
import sql
|
||||
|
||||
class FormField:
|
||||
### only used to unpickle form fields from older (<200603) versions
|
||||
|
@ -139,7 +140,13 @@ class FormDef(StorableObject):
|
|||
def data_class(self):
|
||||
if hasattr(sys.modules['formdef'], self.url_name.title()):
|
||||
return getattr(sys.modules['formdef'], self.url_name.title())
|
||||
cls = new.classobj(self.url_name.title(), (FormData,),
|
||||
if get_cfg('postgresql', {}):
|
||||
table_name = 'formdata_' + self.url_name.replace('-', '_')
|
||||
cls = new.classobj(self.url_name.title(), (sql.SqlFormData,),
|
||||
{'_formdef': self,
|
||||
'_table_name': table_name})
|
||||
else:
|
||||
cls = new.classobj(self.url_name.title(), (FormData,),
|
||||
{'_names': 'form-%s' % self.url_name})
|
||||
setattr(sys.modules['formdef'], self.url_name.title(), cls)
|
||||
setattr(sys.modules['wcs.formdef'], self.url_name.title(), cls)
|
||||
|
@ -149,7 +156,9 @@ class FormDef(StorableObject):
|
|||
new_url_name = simplify(self.name)
|
||||
if not self.url_name:
|
||||
self.url_name = new_url_name
|
||||
if new_url_name != self.url_name:
|
||||
if get_cfg('postgresql', {}):
|
||||
sql.do_formdef_tables(self)
|
||||
elif new_url_name != self.url_name:
|
||||
# title changed, url will be changed only if there are not yet any
|
||||
# submitted forms
|
||||
data_class = self.data_class()
|
||||
|
|
|
@ -0,0 +1,355 @@
|
|||
# w.c.s. - web application for online forms
|
||||
# Copyright (C) 2005-2012 Entr'ouvert
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import psycopg2
|
||||
import datetime
|
||||
import time
|
||||
import cPickle
|
||||
|
||||
from quixote import get_request, get_session, get_publisher
|
||||
from qommon.storage import _take
|
||||
from qommon import get_cfg
|
||||
|
||||
import wcs.formdata
|
||||
|
||||
SQL_TYPE_MAPPING = {
|
||||
'title': None,
|
||||
'subtitle': None,
|
||||
'comment': None,
|
||||
'page': None,
|
||||
'text': 'text',
|
||||
'bool': 'boolean',
|
||||
'file': 'bytea',
|
||||
'date': 'date',
|
||||
'items': 'text[]',
|
||||
'table': 'text[][]',
|
||||
'table-select': 'text[][]',
|
||||
'tablerows': 'text[][]',
|
||||
'ranked-items': 'text[][]',
|
||||
}
|
||||
|
||||
|
||||
|
||||
def get_connection():
|
||||
if not hasattr(get_publisher(), 'pgconn'):
|
||||
postgresql_cfg = get_cfg('postgresql', {})
|
||||
get_publisher().pgconn = psycopg2.connect(**postgresql_cfg)
|
||||
return get_publisher().pgconn
|
||||
|
||||
def do_formdef_tables(formdef):
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
table_name = 'formdata_' + formdef.url_name.replace('-', '_')
|
||||
|
||||
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
|
||||
WHERE table_name = %s''', (table_name,))
|
||||
if cur.fetchone()[0] == 0:
|
||||
cur.execute('''CREATE TABLE %s (id serial PRIMARY KEY,
|
||||
user_id varchar,
|
||||
user_hash varchar,
|
||||
receipt_time timestamp,
|
||||
status varchar);''' % table_name)
|
||||
cur.execute('''CREATE TABLE %s_evolutions (id serial PRIMARY KEY,
|
||||
who varchar,
|
||||
status varchar,
|
||||
time timestamp,
|
||||
comment text,
|
||||
parts bytea,
|
||||
formdata_id integer REFERENCES %s (id));''' % (
|
||||
table_name, table_name))
|
||||
cur.execute('''SELECT column_name FROM information_schema.columns
|
||||
WHERE table_name = %s''', (table_name,))
|
||||
existing_fields = [x[0] for x in cur.fetchall()]
|
||||
|
||||
for field in formdef.fields:
|
||||
if 'f%s' % field.id not in existing_fields:
|
||||
sql_type = SQL_TYPE_MAPPING.get(field.type, 'varchar')
|
||||
if sql_type is None:
|
||||
continue
|
||||
cur.execute('''ALTER TABLE %s ADD COLUMN %s %s''' % (
|
||||
table_name,
|
||||
'f%s' % field.id,
|
||||
sql_type))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
|
||||
class SqlFormData(wcs.formdata.FormData):
|
||||
_names = None # make sure StorableObject methods fail
|
||||
_formdef = None
|
||||
|
||||
def __init__(self, id=None):
|
||||
self.id = id
|
||||
|
||||
|
||||
_evolution = None
|
||||
def get_evolution(self):
|
||||
if self._evolution:
|
||||
return self._evolution
|
||||
if not self.id:
|
||||
self._evolution = []
|
||||
return self._evolution
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
sql_statement = '''SELECT id, who, status, time, comment, parts FROM %s_evolutions
|
||||
WHERE formdata_id = %%(id)s''' % self._table_name
|
||||
cur.execute(sql_statement, {'id': self.id})
|
||||
self._evolution = []
|
||||
while True:
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
break
|
||||
self._evolution.append(self._row2evo(row))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
return self._evolution
|
||||
|
||||
def _row2evo(cls, row):
|
||||
o = wcs.formdata.Evolution()
|
||||
o._sql_id, o.who, o.status, o.time, o.comment = tuple(row[:5])
|
||||
if row[5]:
|
||||
o.parts = cPickle.loads(str(row[5]))
|
||||
return o
|
||||
_row2evo = classmethod(_row2evo)
|
||||
|
||||
def set_evolution(self, value):
|
||||
self._evolution = value
|
||||
|
||||
evolution = property(get_evolution, set_evolution)
|
||||
|
||||
def get_with_indexed_value(cls, index, value, ignore_errors = False):
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
sql_statement = '''SELECT id, user_id, user_hash, receipt_time, status, %s
|
||||
FROM %s
|
||||
WHERE %s = %%(value)s''' % (
|
||||
', '.join(['f%s' % x.id for x in cls._formdef.fields]),
|
||||
cls._table_name,
|
||||
index)
|
||||
cur.execute(sql_statement, {'value': str(value)})
|
||||
objects = []
|
||||
while True:
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
break
|
||||
objects.append(cls._row2ob(row))
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
if ignore_errors:
|
||||
objects = (x for x in objects if x is not None)
|
||||
|
||||
return list(objects)
|
||||
get_with_indexed_value = classmethod(get_with_indexed_value)
|
||||
|
||||
def store(self):
|
||||
sql_dict = {
|
||||
'user_id': self.user_id,
|
||||
'user_hash': self.user_hash,
|
||||
'receipt_time': datetime.datetime.fromtimestamp(time.mktime(self.receipt_time)),
|
||||
'status': self.status
|
||||
}
|
||||
columns = self.data.keys()
|
||||
for field in self._formdef.fields:
|
||||
sql_type = SQL_TYPE_MAPPING.get(field.type, 'varchar')
|
||||
if sql_type is None:
|
||||
continue
|
||||
value = self.data.get(field.id)
|
||||
if value is not None:
|
||||
if field.type == 'ranked-items':
|
||||
# turn {'poire': 2, 'abricot': 1, 'pomme': 3} into an array
|
||||
value = [[x, str(y)] for x, y in value.items()]
|
||||
elif sql_type == 'varchar':
|
||||
pass
|
||||
elif sql_type == 'date':
|
||||
value = datetime.datetime.fromtimestamp(time.mktime(value))
|
||||
elif sql_type == 'bytea':
|
||||
value = bytearray(cPickle.dumps(value))
|
||||
elif sql_type == 'boolean':
|
||||
pass
|
||||
sql_dict['f%s' % field.id] = value
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
if not self.id:
|
||||
sql_statement = '''INSERT INTO %s (id, user_id, user_hash, receipt_time, status, %s)
|
||||
VALUES (DEFAULT, %%(user_id)s, %%(user_hash)s, %%(receipt_time)s, %%(status)s, %s)
|
||||
RETURNING id''' % (
|
||||
self._table_name,
|
||||
', '.join(['f%s' % x for x in columns]),
|
||||
', '.join(['%%(f%s)s' % x for x in columns]))
|
||||
cur.execute(sql_statement, sql_dict)
|
||||
self.id = cur.fetchone()[0]
|
||||
else:
|
||||
sql_dict['id'] = self.id
|
||||
sql_statement = '''UPDATE %s SET user_id = %%(user_id)s,
|
||||
user_hash = %%(user_hash)s,
|
||||
receipt_time = %%(receipt_time)s,
|
||||
status = %%(status)s,
|
||||
%s''' % (
|
||||
self._table_name,
|
||||
', '.join(['f%s = %%(f%s)s' % (x,x) for x in columns]))
|
||||
cur.execute(sql_statement, sql_dict)
|
||||
|
||||
if self._evolution:
|
||||
for evo in self._evolution:
|
||||
if hasattr(evo, '_sql_id'):
|
||||
continue
|
||||
sql_statement = '''INSERT INTO %s_evolutions (
|
||||
id, who, status,
|
||||
time, comment, parts,
|
||||
formdata_id)
|
||||
VALUES (DEFAULT, %%(who)s, %%(status)s,
|
||||
%%(time)s, %%(comment)s,
|
||||
%%(parts)s, %%(formdata_id)s)
|
||||
RETURNING id''' % self._table_name
|
||||
sql_dict = {
|
||||
'who': evo.who,
|
||||
'status': evo.status,
|
||||
'time': datetime.datetime.fromtimestamp(time.mktime(evo.time)),
|
||||
'comment': evo.comment,
|
||||
'formdata_id': self.id,
|
||||
}
|
||||
if evo.parts:
|
||||
sql_dict['parts'] = bytearray(cPickle.dumps(evo.parts))
|
||||
else:
|
||||
sql_dict['parts'] = None
|
||||
cur.execute(sql_statement, sql_dict)
|
||||
evo.sql_id = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
def keys(cls):
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
sql_statement = 'SELECT id FROM %s' % cls._table_name
|
||||
cur.execute(sql_statement)
|
||||
ids = [x[0] for x in cur.fetchall()]
|
||||
conn.commit()
|
||||
cur.close()
|
||||
return ids
|
||||
keys = classmethod(keys)
|
||||
|
||||
def count(cls):
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
sql_statement = 'SELECT count(*) FROM %s' % cls._table_name
|
||||
cur.execute(sql_statement)
|
||||
count = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
cur.close()
|
||||
return count
|
||||
count = classmethod(count)
|
||||
|
||||
def _row2ob(cls, row):
|
||||
o = cls()
|
||||
o.id, o.user_id, o.user_hash, o.receipt_time, o.status = tuple(row[:5])
|
||||
if o.receipt_time:
|
||||
o.receipt_time = o.receipt_time.timetuple()
|
||||
o.data = {}
|
||||
i = 5
|
||||
for field in cls._formdef.fields:
|
||||
sql_type = SQL_TYPE_MAPPING.get(field.type, 'varchar')
|
||||
if sql_type is None:
|
||||
continue
|
||||
value = row[i]
|
||||
if value:
|
||||
if field.type == 'ranked-items':
|
||||
d = {}
|
||||
for data, rank in value:
|
||||
d[data] = int(rank)
|
||||
value = d
|
||||
if sql_type == 'date':
|
||||
value = value.timetuple()
|
||||
elif sql_type == 'bytea':
|
||||
value = cPickle.loads(str(value))
|
||||
o.data[field.id] = value
|
||||
i += 1
|
||||
return o
|
||||
_row2ob = classmethod(_row2ob)
|
||||
|
||||
def get_data_fields(cls):
|
||||
data_fields = []
|
||||
for field in cls._formdef.fields:
|
||||
sql_type = SQL_TYPE_MAPPING.get(field.type, 'varchar')
|
||||
if sql_type is None:
|
||||
continue
|
||||
data_fields.append('f%s' % field.id)
|
||||
return data_fields
|
||||
get_data_fields = classmethod(get_data_fields)
|
||||
|
||||
def get(cls, id, ignore_errors=False, ignore_migration=False):
|
||||
if id is None:
|
||||
if ignore_errors:
|
||||
return None
|
||||
else:
|
||||
raise KeyError()
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql_statement = '''SELECT id, user_id, user_hash, receipt_time, status, %s
|
||||
FROM %s
|
||||
WHERE id = %%(id)s''' % (
|
||||
', '.join(cls.get_data_fields()),
|
||||
cls._table_name)
|
||||
cur.execute(sql_statement, {'id': str(id)})
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
cur.close()
|
||||
raise KeyError()
|
||||
cur.close()
|
||||
return cls._row2ob(row)
|
||||
get = classmethod(get)
|
||||
|
||||
|
||||
def select(cls, clause = None, order_by = None, ignore_errors = False, limit = None):
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
sql_statement = '''SELECT id, user_id, user_hash, receipt_time, status, %s
|
||||
FROM %s''' % (
|
||||
', '.join(cls.get_data_fields()),
|
||||
cls._table_name)
|
||||
cur.execute(sql_statement)
|
||||
objects = []
|
||||
while True:
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
break
|
||||
objects.append(cls._row2ob(row))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
if ignore_errors:
|
||||
objects = (x for x in objects if x is not None)
|
||||
if clause:
|
||||
objects = (x for x in objects if clause(x))
|
||||
if order_by:
|
||||
order_by = str(order_by)
|
||||
if order_by[0] == '-':
|
||||
reverse = True
|
||||
order_by = order_by[1:]
|
||||
else:
|
||||
reverse = False
|
||||
# only list can be sorted
|
||||
objects = list(objects)
|
||||
objects.sort(lambda x,y: cmp(getattr(x, order_by), getattr(y, order_by)))
|
||||
if reverse:
|
||||
objects.reverse()
|
||||
if limit:
|
||||
objects = _take(objects, limit)
|
||||
return list(objects)
|
||||
select = classmethod(select)
|
||||
|
Loading…
Reference in New Issue