feeder: prevent situation of half-dropped schema (#54658)

To prevent loosing currently loaded data wcs-olap, failing ro rename the
temporary schema to its final name, wcs-olap will:

- first, inside a transaction, rename the current schema instead of
  dropping it, then rename the new schema to the current schema's name;
  in case of failure it will retry 33 times sleeping 1 second between
  each attempt;

- if successfull, drop the renamed old schema, again in a retry loop, if
  it fails to drop it logs an error, without aborting the current
  feeding.
This commit is contained in:
Benjamin Dauvergne 2021-08-07 18:43:29 +02:00
parent e0a28c2f85
commit 05c4031776
1 changed files with 63 additions and 5 deletions

View File

@ -14,6 +14,7 @@ import reprlib
from .utils import Whatever
import psycopg2
import psycopg2.errorcodes
import time
from cached_property import cached_property
from wcs_olap.wcs_api import WcsApiError
@ -330,6 +331,10 @@ class WcsOlapFeeder(object):
def default_ctx(self):
return self.ctx.as_dict()
def schema_exists(self, name):
self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name])
return bool(self.cur.fetchone())
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
@ -341,6 +346,27 @@ class WcsOlapFeeder(object):
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
raise
@contextlib.contextmanager
def atomic(self):
self.ex('BEGIN')
try:
yield
self.ex('COMMIT')
except Exception:
self.ex('ROLLBACK')
raise
def retry_on_db_error(self, function, times, sleep_time):
for i in range(times):
try:
function()
break
except psycopg2.Error:
if i == times - 1:
raise
self.logger.warning('failure during retry', exc_info=True)
time.sleep(sleep_time)
def do_schema(self):
self.ex('SET search_path = public')
self.logger.debug('dropping schema %s', self.schema_temp)
@ -624,11 +650,7 @@ class WcsOlapFeeder(object):
if self.do_feed:
self.ex('SET synchronous_commit = on')
if not self.fake:
self.logger.debug('dropping schema %s', self.schema)
self.drop_tables_sequencially(self.schema)
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
self.switch_schemas()
if 'cubes_model_dirs' in self.config:
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
@ -642,6 +664,42 @@ class WcsOlapFeeder(object):
self.cur.close()
self.connection.close()
def switch_schemas(self):
self.old_schema = truncate_pg_identifier(
self.schema
+ '_old_'
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
self.ctx.push({'old_schema': self.old_schema})
try:
def switch():
with self.atomic():
self.logger.info('renaming schema %s to %s and schema %s to %s',
self.schema, self.old_schema,
self.schema_temp, self.schema)
if self.schema_exists(self.schema):
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
self.retry_on_db_error(
switch,
times=33,
sleep_time=1)
except Exception:
self.logger.warning('failed to switch schemas 3-times in 33 seconds')
raise
try:
def drop_old_schema():
self.logger.info('dropping schema %s', self.old_schema)
self.drop_tables_sequencially(self.old_schema)
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
self.retry_on_db_error(
drop_old_schema,
times=33,
sleep_time=1)
except Exception:
self.logger.exception('could not drop schema %s', self.old_schema)
def insert_agent(self, name):
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
res = self.cur.fetchone()