feeder: prevent situation of half-dropped schema (#54658)
To prevent loosing currently loaded data wcs-olap, failing ro rename the temporary schema to its final name, wcs-olap will: - first, inside a transaction, rename the current schema instead of dropping it, then rename the new schema to the current schema's name; in case of failure it will retry 33 times sleeping 1 second between each attempt; - if successfull, drop the renamed old schema, again in a retry loop, if it fails to drop it logs an error, without aborting the current feeding.
This commit is contained in:
parent
e0a28c2f85
commit
05c4031776
|
@ -14,6 +14,7 @@ import reprlib
|
|||
from .utils import Whatever
|
||||
import psycopg2
|
||||
import psycopg2.errorcodes
|
||||
import time
|
||||
|
||||
from cached_property import cached_property
|
||||
from wcs_olap.wcs_api import WcsApiError
|
||||
|
@ -330,6 +331,10 @@ class WcsOlapFeeder(object):
|
|||
def default_ctx(self):
|
||||
return self.ctx.as_dict()
|
||||
|
||||
def schema_exists(self, name):
|
||||
self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name])
|
||||
return bool(self.cur.fetchone())
|
||||
|
||||
def ex(self, query, ctx=None, vars=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
|
@ -341,6 +346,27 @@ class WcsOlapFeeder(object):
|
|||
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
|
||||
raise
|
||||
|
||||
@contextlib.contextmanager
|
||||
def atomic(self):
|
||||
self.ex('BEGIN')
|
||||
try:
|
||||
yield
|
||||
self.ex('COMMIT')
|
||||
except Exception:
|
||||
self.ex('ROLLBACK')
|
||||
raise
|
||||
|
||||
def retry_on_db_error(self, function, times, sleep_time):
|
||||
for i in range(times):
|
||||
try:
|
||||
function()
|
||||
break
|
||||
except psycopg2.Error:
|
||||
if i == times - 1:
|
||||
raise
|
||||
self.logger.warning('failure during retry', exc_info=True)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def do_schema(self):
|
||||
self.ex('SET search_path = public')
|
||||
self.logger.debug('dropping schema %s', self.schema_temp)
|
||||
|
@ -624,11 +650,7 @@ class WcsOlapFeeder(object):
|
|||
if self.do_feed:
|
||||
self.ex('SET synchronous_commit = on')
|
||||
if not self.fake:
|
||||
self.logger.debug('dropping schema %s', self.schema)
|
||||
self.drop_tables_sequencially(self.schema)
|
||||
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
|
||||
self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
|
||||
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
||||
self.switch_schemas()
|
||||
|
||||
if 'cubes_model_dirs' in self.config:
|
||||
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
|
||||
|
@ -642,6 +664,42 @@ class WcsOlapFeeder(object):
|
|||
self.cur.close()
|
||||
self.connection.close()
|
||||
|
||||
def switch_schemas(self):
|
||||
self.old_schema = truncate_pg_identifier(
|
||||
self.schema
|
||||
+ '_old_'
|
||||
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
|
||||
self.ctx.push({'old_schema': self.old_schema})
|
||||
|
||||
try:
|
||||
def switch():
|
||||
with self.atomic():
|
||||
self.logger.info('renaming schema %s to %s and schema %s to %s',
|
||||
self.schema, self.old_schema,
|
||||
self.schema_temp, self.schema)
|
||||
if self.schema_exists(self.schema):
|
||||
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
|
||||
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
||||
self.retry_on_db_error(
|
||||
switch,
|
||||
times=33,
|
||||
sleep_time=1)
|
||||
except Exception:
|
||||
self.logger.warning('failed to switch schemas 3-times in 33 seconds')
|
||||
raise
|
||||
|
||||
try:
|
||||
def drop_old_schema():
|
||||
self.logger.info('dropping schema %s', self.old_schema)
|
||||
self.drop_tables_sequencially(self.old_schema)
|
||||
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
|
||||
self.retry_on_db_error(
|
||||
drop_old_schema,
|
||||
times=33,
|
||||
sleep_time=1)
|
||||
except Exception:
|
||||
self.logger.exception('could not drop schema %s', self.old_schema)
|
||||
|
||||
def insert_agent(self, name):
|
||||
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
|
||||
res = self.cur.fetchone()
|
||||
|
|
Loading…
Reference in New Issue