engine: close connections after usage (#27482)

This commit is contained in:
Benjamin Dauvergne 2018-11-20 13:18:34 +01:00
parent 4f139f93e2
commit 5628a22dcb
1 changed files with 100 additions and 100 deletions

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import logging
import itertools
@ -59,25 +60,25 @@ class EngineDimension(object):
@property
def members(self):
assert self.type != 'date'
cursor = self.engine.get_cursor()
sql = self.members_query
if not sql:
if self.dimension.join:
join = self.engine_cube.get_join(self.dimension.join[-1])
sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
'GROUP BY %s, %s ORDER BY %s' % (
self.value, self.value_label or self.value, join.table, join.name,
self.value, self.value_label or self.value, self.order_by or self.value))
else:
sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
'GROUP BY %s, %s ORDER BY %s' % (
self.value, self.value_label or self.value, self.value,
self.value_label or self.value, self.self.order_by or self.value))
sql = sql.format(fact_table=self.engine_cube.fact_table)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield Member(*row)
with self.engine.get_cursor() as cursor:
sql = self.members_query
if not sql:
if self.dimension.join:
join = self.engine_cube.get_join(self.dimension.join[-1])
sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
'GROUP BY %s, %s ORDER BY %s' % (
self.value, self.value_label or self.value, join.table, join.name,
self.value, self.value_label or self.value, self.order_by or self.value))
else:
sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
'GROUP BY %s, %s ORDER BY %s' % (
self.value, self.value_label or self.value, self.value,
self.value_label or self.value, self.self.order_by or self.value))
sql = sql.format(fact_table=self.engine_cube.fact_table)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield Member(*row)
class SchemaJSONDimension(schemas.Dimension):
@ -169,10 +170,10 @@ class JSONDimensions(object):
if not self.engine_cube.json_field:
return []
if not self.__cache:
cursor = self.engine.get_cursor()
sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
cursor.execute(sql)
self.__cache = [row[0] for row in cursor.fetchall()]
with self.engine.get_cursor() as cursor:
sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
cursor.execute(sql)
self.__cache = [row[0] for row in cursor.fetchall()]
return self.__cache
def __iter__(self):
@ -249,9 +250,9 @@ class EngineCube(object):
return getattr(self.cube, name)
def count(self):
cursor = self.engine.get_cursor()
cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
return cursor.fetchone()[0]
with self.engine.get_cursor() as cursor:
cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
return cursor.fetchone()[0]
def get_join(self, name):
if name.startswith('json_'):
@ -266,67 +267,66 @@ class EngineCube(object):
return self.cube.get_join(name)
def sql_query(self, filters, drilldown, measures, **kwargs):
cursor = self.engine.get_cursor()
with self.engine.get_cursor() as cursor:
projections = []
joins = set()
where = []
group_by = []
order_by = []
join_conditions = []
projections = []
joins = set()
where = []
group_by = []
order_by = []
join_conditions = []
for dimension_name, values in filters:
dimension = self.dimensions[dimension_name]
# assert dimension.filter
condition, values = dimension.build_filter(values)
condition = cursor.mogrify(condition, values)
if dimension.filter_needs_join:
joins.update(dimension.join)
if dimension.filter_in_join:
join_conditions.append(condition)
else:
where.append(condition)
for dimension_name, values in filters:
dimension = self.dimensions[dimension_name]
# assert dimension.filter
condition, values = dimension.build_filter(values)
condition = cursor.mogrify(condition, values)
if dimension.filter_needs_join:
for dimension_name in drilldown:
dimension = self.dimensions[dimension_name]
joins.update(dimension.join)
if dimension.filter_in_join:
join_conditions.append(condition)
else:
where.append(condition)
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
dimension.name))
group_by.append(dimension.group_by or dimension.value)
order_by.append(dimension.order_by or dimension.value)
for dimension_name in drilldown:
dimension = self.dimensions[dimension_name]
joins.update(dimension.join)
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
dimension.name))
group_by.append(dimension.group_by or dimension.value)
order_by.append(dimension.order_by or dimension.value)
for measure_name in measures:
measure = self.get_measure(measure_name)
if measure.expression not in projections:
projections.append(measure.expression + ' AS ' + measure.name)
sql = 'SELECT ' + ', '.join(projections)
table_expression = ' %s' % self.cube.fact_table
if joins:
join_tree = {}
# Build join tree
for join_name in joins:
join = self.get_join(join_name)
master_table = join.master_table or self.fact_table
join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
table_expression = build_table_expression(join_tree,
self.fact_table,
other_conditions=join_conditions)
sql += ' FROM %s' % table_expression
where_conditions = 'true'
if where:
where_conditions = ' AND '.join(where)
sql += ' WHERE %s' % where_conditions
if group_by:
sql += ' GROUP BY %s' % ', '.join(group_by)
if order_by:
sql += ' ORDER BY %s' % ', '.join(order_by)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
return sql
for measure_name in measures:
measure = self.get_measure(measure_name)
if measure.expression not in projections:
projections.append(measure.expression + ' AS ' + measure.name)
sql = 'SELECT ' + ', '.join(projections)
table_expression = ' %s' % self.cube.fact_table
if joins:
join_tree = {}
# Build join tree
for join_name in joins:
join = self.get_join(join_name)
master_table = join.master_table or self.fact_table
join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
table_expression = build_table_expression(join_tree,
self.fact_table,
other_conditions=join_conditions)
sql += ' FROM %s' % table_expression
where_conditions = 'true'
if where:
where_conditions = ' AND '.join(where)
sql += ' WHERE %s' % where_conditions
if group_by:
sql += ' GROUP BY %s' % ', '.join(group_by)
if order_by:
sql += ' ORDER BY %s' % ', '.join(order_by)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
return sql
def query(self, filters, drilldown, measures, **kwargs):
self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
@ -337,17 +337,17 @@ class EngineCube(object):
cells.append(self.dimensions[dimension_name])
for measure_name in measures:
cells.append(self.measures[measure_name])
cursor = self.engine.get_cursor()
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield [{
'name': cell.name,
'label': cell.label,
'type': cell.type,
'value': value,
} for cell, value in zip(cells, row)]
with self.engine.get_cursor() as cursor:
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield [{
'name': cell.name,
'label': cell.label,
'type': cell.type,
'value': value,
} for cell, value in zip(cells, row)]
class Engine(object):
@ -366,10 +366,10 @@ class Engine(object):
def __getattr__(self, name):
return getattr(self.warehouse, name)
@contextlib.contextmanager
def get_cursor(self):
connection = psycopg2.connect(
self.warehouse.pg_dsn)
cursor = connection.cursor()
search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
cursor.execute('SET SEARCH_PATH = %s' % search_path)
return cursor
with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection:
with connection.cursor() as cursor:
search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
cursor.execute('SET SEARCH_PATH = %s' % search_path)
yield cursor