# w.c.s. - web application for online forms # Copyright (C) 2005-2010 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import fnmatch import io import json import os import pickle import re import shutil import sys import traceback import zipfile import zoneinfo from contextlib import ExitStack, contextmanager from django.utils import timezone from django.utils.timezone import localtime from . import custom_views, data_sources, formdef, sessions from .admin import RootDirectory as AdminRootDirectory from .backoffice import RootDirectory as BackofficeRootDirectory from .Defaults import * # noqa pylint: disable=wildcard-import from .qommon import errors from .qommon.cron import CronJob from .qommon.publisher import QommonPublisher, get_request, set_publisher_class from .qommon.tokens import Token from .roles import Role from .root import RootDirectory from .tracking_code import TrackingCode from .users import User try: from .wcs_cfg import * # noqa pylint: disable=wildcard-import except ImportError: pass class UnpicklerClass(pickle.Unpickler): def find_class(self, module, name): if module == 'qommon.form': module = 'wcs.qommon.form' elif module in ('formdata', 'formdef', 'roles', 'users', 'workflows'): module = 'wcs.%s' % module module_moves = { # workflow classes moved to their own module ('wcs.workflows', 'ChoiceWorkflowStatusItem'): 'wcs.wf.choice', ('wcs.workflows', 'CommentableWorkflowStatusItem'): 'wcs.wf.comment', ('wcs.workflows', 'DisplayMessageWorkflowStatusItem'): 'wcs.wf.display_message', ('wcs.workflows', 'EditableWorkflowStatusItem'): 'wcs.wf.editable', ('wcs.workflows', 'ExportToModel'): 'wcs.wf.export_to_model', ('wcs.workflows', 'JumpOnSubmitWorkflowStatusItem'): 'wcs.wf.jump_on_submit', ('wcs.workflows', 'SendmailWorkflowStatusItem'): 'wcs.wf.sendmail', ('wcs.workflows', 'SendSMSWorkflowStatusItem'): 'wcs.wf.sms', ('wcs.workflows', 'WorkflowCommentPart'): 'wcs.wf.comment', # criteria classes moved to be sql only (2023-05-15) ('wcs.qommon.storage', 'GreaterOrEqual'): 'wcs.sql', ('wcs.qommon.storage', 'NotEqual'): 'wcs.sql', ('wcs.qommon.storage', 'StrictNotEqual'): 'wcs.sql', ('wcs.qommon.storage', 'LessOrEqual'): 'wcs.sql', ('wcs.qommon.storage', 'Between'): 'wcs.sql', ('wcs.qommon.storage', 'NotContains'): 'wcs.sql', ('wcs.qommon.storage', 'ILike'): 'wcs.sql', ('wcs.qommon.storage', 'FtsMatch'): 'wcs.sql', ('wcs.qommon.storage', 'NotNull'): 'wcs.sql', ('wcs.qommon.storage', 'Null'): 'wcs.sql', ('wcs.qommon.storage', 'ElementEqual'): 'wcs.sql', ('wcs.qommon.storage', 'ElementILike'): 'wcs.sql', ('wcs.qommon.storage', 'ElementIntersects'): 'wcs.sql', ('wcs.qommon.storage', 'Nothing'): 'wcs.sql', ('wcs.qommon.storage', 'Distance'): 'wcs.sql', # filter field classes moved to their own file (2024-04-12) ('wcs.backoffice.management', 'RelatedField'): 'wcs.backoffice.filter_fields', ('wcs.backoffice.management', 'UserRelatedField'): 'wcs.backoffice.filter_fields', ('wcs.backoffice.management', 'UserLabelRelatedField'): 'wcs.backoffice.filter_fields', # removed actions ('wcs.wf.redirect_to_status', 'RedirectToStatusWorkflowStatusItem'): 'NoLongerAvailableAction', ('wcs.workflows', 'RedirectToStatusWorkflowStatusItem'): 'NoLongerAvailableAction', # removed actions from auquotidien ( 'modules.abelium_domino_workflow', 'AbeliumDominoRegisterFamilyWorkflowStatusItem', ): 'NoLongerAvailableAction', ('modules.payments', 'PaymentWorkflowStatusItem'): 'NoLongerAvailableAction', ('modules.payments', 'PaymentCancelWorkflowStatusItem'): 'NoLongerAvailableAction', ('modules.payments', 'PaymentValidationWorkflowStatusItem'): 'NoLongerAvailableAction', # removed workflow part class from auquotidien ('modules.payments', 'InvoiceEvolutionPart'): 'NoLongerAvailablePart', } module = module_moves.get((module, name), module) if module is object: return object if module in ('NoLongerAvailableAction', 'NoLongerAvailablePart'): module, name = 'wcs.workflows', module __import__(module) mod = sys.modules[module] if ( module == 'wcs.formdef' and name not in ('FormDef', 'UpdateDigestAfterJob', 'UpdateStatisticsDataAfterJob') and not name.startswith('_wcs_') ): name = '_wcs_%s' % name elif module == 'wcs.carddef' and name != 'CardDef' and not name.startswith('_wcs_'): name = '_wcs_%s' % name klass = getattr(mod, name) return klass class WcsPublisher(QommonPublisher): APP_NAME = 'wcs' APP_DIR = APP_DIR DATA_DIR = DATA_DIR ERROR_LOG = ERROR_LOG missing_appdir_redirect = REDIRECT_ON_UNKNOWN_VHOST supported_languages = ['fr', 'es', 'de'] root_directory_class = RootDirectory backoffice_directory_class = BackofficeRootDirectory admin_directory_class = AdminRootDirectory sql_application_name = 'wcs' session_manager_class = None user_class = User tracking_code_class = TrackingCode unpickler_class = UnpicklerClass complex_data_cache = None @classmethod def configure(cls, config): if config.has_option('main', 'app_dir'): cls.APP_DIR = config.get('main', 'app_dir') if config.has_option('main', 'data_dir'): cls.DATA_DIR = config.get('main', 'data_dir') if config.has_option('main', 'error_log'): cls.ERROR_LOG = config.get('main', 'error_log') if config.has_option('main', 'missing_appdir_redirect'): cls.missing_appdir_redirect = config.get('main', 'missing_appdir_redirect') @classmethod def register_cronjobs(cls): super().register_cronjobs() # every hour: check for global action timeouts cls.register_cronjob( CronJob(cls.apply_global_action_timeouts, name='evaluate_global_action_timeouts', minutes=[0]) ) # once a day: update deprecations report cls.register_cronjob( CronJob(cls.update_deprecations_report, name='update_deprecations_report', hours=[2], minutes=[0]) ) # once a week: send active users to keepalive service cls.register_cronjob( CronJob(User.keepalive_users, name='keepalive_users', weekdays=[0], hours=[2], minutes=[0]) ) # once a day delete unreferenced users cls.register_cronjob( CronJob(cls.clean_deleted_users, name='clean_deleted_users', hours=[3], minutes=[0]) ) # once a day clean old audit entries from .audit import Audit cls.register_cronjob(CronJob(Audit.clean, name='clean_audit', hours=[3], minutes=[0])) # once a day clean old test results from .testdef import TestResult cls.register_cronjob(CronJob(TestResult.clean, name='clean_test_result', hours=[4], minutes=[0])) # other jobs data_sources.register_cronjob() formdef.register_cronjobs() def update_deprecations_report(self, **kwargs): from .backoffice.deprecations import DeprecationsScan DeprecationsScan().execute() def has_postgresql_config(self): return bool(self.cfg.get('postgresql', {})) def has_user_fullname_config(self): users_cfg = self.cfg.get('users') or {} return bool(users_cfg.get('field_name') or users_cfg.get('fullname_template')) def set_config(self, request=None, skip_sql=False): QommonPublisher.set_config(self, request=request) if request: request.response.charset = self.site_charset # make sure permissions are set using strings if self.cfg.get('admin-permissions'): for key in self.cfg['admin-permissions'].keys(): if not self.cfg['admin-permissions'][key]: continue self.cfg['admin-permissions'][key] = [str(x) for x in self.cfg['admin-permissions'][key]] import wcs.workflows wcs.workflows.load_extra() if self.has_postgresql_config() and not skip_sql: from . import sql self.user_class = sql.SqlUser self.test_user_class = sql.TestUser self.role_class = sql.Role self.token_class = sql.Token self.tracking_code_class = sql.TrackingCode self.session_class = sql.Session self.custom_view_class = sql.CustomView self.snapshot_class = sql.Snapshot self.loggederror_class = sql.LoggedError sql.get_connection(new=True) else: self.user_class = User self.test_user_class = User self.role_class = Role self.token_class = Token self.tracking_code_class = TrackingCode self.session_class = sessions.BasicSession self.custom_view_class = custom_views.CustomView self.snapshot_class = None self.loggederror_class = None self.session_manager_class = sessions.StorageSessionManager self.set_session_manager(self.session_manager_class(session_class=self.session_class)) def start_request(self): self.setup_timezone() super().start_request() def setup_timezone(self): try: timezone.activate(zoneinfo.ZoneInfo(self.get_site_option('timezone'))) except zoneinfo.ZoneInfoNotFoundError: timezone.deactivate() # use value from django settings def get_enabled_languages(self): return self.cfg.get('language', {}).get('languages') or [] def get_phone_local_region_code(self): # default-country-code is a legacy setting (despite its name it contained a region code) return ( self.get_site_option('local-region-code') or self.get_site_option('default-country-code') or 'FR' ) def import_zip(self, fd, overwrite_settings=True): results = { 'formdefs': 0, 'carddefs': 0, 'workflows': 0, 'categories': 0, 'carddef_categories': 0, 'workflow_categories': 0, 'block_categories': 0, 'mail_template_categories': 0, 'comment_template_categories': 0, 'data_source_categories': 0, 'roles': 0, 'settings': 0, 'datasources': 0, 'wscalls': 0, 'mail-templates': 0, 'comment-templates': 0, 'blockdefs': 0, 'apiaccess': 0, } now = localtime() for filename in ('config.pck', 'config.json'): filepath = os.path.join(self.app_dir, filename) if os.path.exists(filepath): shutil.copyfile(filepath, filepath + '.backup-%s' % now.strftime('%Y%m%d')) with zipfile.ZipFile(fd) as z: for f in z.namelist(): if f in ('.indexes', '.max_id'): continue if os.path.dirname(f) in ( 'formdefs_xml', 'carddefs_xml', 'workflows_xml', 'blockdefs_xml', 'roles_xml', 'datasources', 'wscalls', ): continue path = os.path.join(self.app_dir, f) if not os.path.exists(os.path.dirname(path)): os.mkdir(os.path.dirname(path)) if not os.path.basename(f): # skip directories continue data = z.read(f) if f in ('config.pck', 'config.json'): results['settings'] = 1 if f == 'config.pck': d = pickle.loads(data) else: d = json.loads(data) if overwrite_settings: if 'sp' in self.cfg: current_sp = self.cfg['sp'] else: current_sp = None self.cfg = d if current_sp: self.cfg['sp'] = current_sp elif 'sp' in self.cfg: del self.cfg['sp'] else: # only update a subset of settings, critical system parts such as # authentication and database settings are not overwritten. for section, section_parts in ( ('emails', ('email-*',)), ('filetypes', '*'), ('language', '*'), ('misc', ('default-position', 'default-zoom-level')), ('sms', '*'), ('submission-channels', '*'), ('backoffice-submission', '*'), ('texts', '*'), ('users', ('*_template',)), ): if section not in d: continue if section not in self.cfg: self.cfg[section] = {} for key in d[section]: for pattern in section_parts: if fnmatch.fnmatch(str(key), pattern): self.cfg[section][key] = d[section][key] self.write_cfg() continue with open(path, 'wb') as fd: fd.write(data) if os.path.split(f)[0] in results: results[os.path.split(f)[0]] += 1 # import datasources and wscalls from wcs.data_sources import NamedDataSource from wcs.wscalls import NamedWsCall for f in z.namelist(): if os.path.dirname(f) == 'datasources' and os.path.basename(f): with z.open(f) as fd: data_source = NamedDataSource.import_from_xml( fd, include_id=True, check_deprecated=True ) data_source.store() results['datasources'] += 1 if os.path.dirname(f) == 'wscalls' and os.path.basename(f): with z.open(f) as fd: wscall = NamedWsCall.import_from_xml(fd, include_id=True, check_deprecated=True) wscall.store() results['wscalls'] += 1 # second pass, fields blocks from wcs.blocks import BlockDef for f in z.namelist(): if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f): with z.open(f) as fd: blockdef = BlockDef.import_from_xml(fd, include_id=True, check_deprecated=True) blockdef.store() results['blockdefs'] += 1 # third pass, workflows from wcs.workflows import Workflow for f in z.namelist(): if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f): with z.open(f) as fd: workflow = Workflow.import_from_xml( fd, include_id=True, check_datasources=False, check_deprecated=True ) workflow.store() results['workflows'] += 1 # fourth pass, forms and cards from wcs.carddef import CardDef from wcs.formdef import FormDef formdefs = [] carddefs = [] for f in z.namelist(): if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f): with z.open(f) as fd: formdef = FormDef.import_from_xml( fd, include_id=True, check_datasources=False, check_deprecated=True ) formdef.store() formdefs.append(formdef) results['formdefs'] += 1 if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f): with z.open(f) as fd: carddef = CardDef.import_from_xml( fd, include_id=True, check_datasources=False, check_deprecated=True ) carddef.store() carddefs.append(carddef) results['carddefs'] += 1 # sixth pass, roles roles = [] for f in z.namelist(): if os.path.dirname(f) == 'roles_xml' and os.path.basename(f): with z.open(f) as fd: role = self.role_class.import_from_xml(fd, include_id=True) role.store() roles.append(role) results['roles'] += 1 # rebuild indexes for imported objects for k, v in results.items(): if k == 'settings': continue if v == 0: continue klass = None if k == 'formdefs': from .formdef import FormDef klass = FormDef elif k == 'carddefs': from .carddef import CardDef klass = CardDef elif k == 'blockdefs': klass = BlockDef elif k == 'categories': from .categories import Category klass = Category elif k == 'roles': klass = self.role_class elif k == 'workflows': klass = Workflow if klass: klass.rebuild_indexes() if k == 'formdefs': # in case of formdefs, we store them anew in case SQL changes # are required. for formdef in formdefs or FormDef.select(): formdef.store() elif k == 'carddefs': # ditto for cards for carddef in carddefs or CardDef.select(): carddef.store() return results def initialize_sql(self): from . import sql sql.get_connection(new=True) with sql.atomic(): sql.do_session_table() sql.do_user_table() sql.do_role_table() sql.do_tracking_code_table() sql.do_custom_views_table() sql.do_transient_data_table() sql.do_snapshots_table() sql.do_loggederrors_table() sql.do_tokens_table() sql.WorkflowTrace.do_table() sql.Audit.do_table() sql.TestDef.do_table() sql.TestResult.do_table() sql.Application.do_table() sql.ApplicationElement.do_table() sql.SearchableFormDef.do_table() sql.TranslatableMessage.do_table() sql.do_meta_table() from .carddef import CardDef from .formdef import FormDef conn, cur = sql.get_connection_and_cursor() sql.drop_views(None, conn, cur) for _formdef in FormDef.select() + CardDef.select(): sql.do_formdef_tables(_formdef) sql.migrate_global_views(conn, cur) sql.init_search_tokens() cur.close() def record_deprecated_usage(self, *args, **kwargs): return self.record_error(context='[DEPRECATED]', deprecated_usage=True, *args, **kwargs) def record_error( self, error_summary=None, context=None, exception=None, record=True, notify=False, deprecated_usage=False, *args, **kwargs, ): if not record and not notify: return if get_request() and getattr(get_request(), 'inspect_mode', False): # do not record anything when trying random things in the inspector raise errors.InspectException(error_summary) if get_request() and getattr(get_request(), 'disable_error_notifications', None) is True: # do not record anything if errors are disabled return if exception is not None: exc_type, exc_value, tb = sys.exc_info() if not error_summary: error_summary = traceback.format_exception_only(exc_type, exc_value) error_summary = error_summary[0][0:-1] # de-listify and strip newline plain_error_msg = str( self._generate_plaintext_error(get_request(), self, exc_type, exc_value, tb) ) else: error_file = io.StringIO() error_file.write(self._format_traceback(sys._getframe().f_back)) if get_request(): error_file.write('\n') error_file.write(get_request().dump()) error_file.write('\n') plain_error_msg = error_file.getvalue() if context: error_summary = '%s %s' % (context, error_summary) if error_summary is None: return error_summary = str(error_summary).replace('\n', ' ')[:400].strip() logged_exception = None if record and self.loggederror_class: kind = 'deprecated_usage' if deprecated_usage else None logged_exception = self.loggederror_class.record_error( error_summary, plain_error_msg, publisher=self, exception=exception, kind=kind, *args, **kwargs, ) if not notify or logged_exception and logged_exception.occurences_count > 1: # notify only first occurence return logged_exception try: self.logger.log_internal_error( error_summary, plain_error_msg, logged_exception.tech_id if logged_exception else None ) except OSError: # Could happen if there is no mail server available and exceptions # were configured to be mailed. (formerly socket.error) # Could also could happen on file descriptor exhaustion. pass return logged_exception def get_object_class(self, object_type): from wcs.blocks import BlockDef from wcs.carddef import CardDef from wcs.categories import ( BlockCategory, CardDefCategory, Category, CommentTemplateCategory, DataSourceCategory, MailTemplateCategory, WorkflowCategory, ) from wcs.comment_templates import CommentTemplate from wcs.data_sources import NamedDataSource from wcs.formdef import FormDef from wcs.mail_templates import MailTemplate from wcs.sql import SqlUser from wcs.testdef import TestDef from wcs.workflows import Workflow from wcs.wscalls import NamedWsCall for klass in ( BlockDef, CardDef, NamedDataSource, FormDef, Workflow, NamedWsCall, MailTemplate, CommentTemplate, Category, CardDefCategory, WorkflowCategory, BlockCategory, MailTemplateCategory, CommentTemplateCategory, DataSourceCategory, TestDef, SqlUser, ): if klass.xml_root_node == object_type: return klass raise KeyError('no class for object type: %s' % object_type) def apply_global_action_timeouts(self, **kwargs): from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger job = kwargs.pop('job', None) for workflow in Workflow.select(): with job.log_long_job('workflow %s (%s)' % (workflow.name, workflow.id)) if job else ExitStack(): WorkflowGlobalActionTimeoutTrigger.apply(workflow) def migrate_sql(self): from . import sql sql.migrate() def reindex_sql(self): from . import sql sql.reindex() def cleanup(self): self._cached_user_fields_formdef = None self._update_related_seen = None self._error_context = None from . import sql sql.cleanup_connection() timezone.deactivate() @contextmanager def complex_data(self): old_complex_data_cache, self.complex_data_cache = self.complex_data_cache, {} try: yield True finally: self.complex_data_cache = old_complex_data_cache def cache_complex_data(self, value, rendered_value): # Keep a temporary cache of assocations between a complex data value # (value) and a string reprensentation (produced by django with # django.template.base.render_value_in_context. # # It ensures string values are unique by appending a private unicode # code point, that will be removed in wcs/qommon/template.py. if self.complex_data_cache is None: # it doesn't do anything unless initialized. return value str_value = rendered_value.strip() + chr(0xE000 + len(self.complex_data_cache)) self.complex_data_cache[str_value] = value return str_value def has_cached_complex_data(self, value): return bool(value in (self.complex_data_cache or {})) def get_cached_complex_data(self, value, loop_context=False): if not isinstance(value, str): return value value = value.strip() if self.complex_data_cache is None: return value if value not in self.complex_data_cache: return re.sub(r'[\uE000-\uF8FF]', '', value) value_ = self.complex_data_cache.get(value) if loop_context and hasattr(value_, 'get_iterable_value'): return value_.get_iterable_value() if hasattr(value_, 'get_value'): # unlazy variable return value_.get_value() return value_ @contextmanager def inspect_recurse_skip(self, prefixes): self.inspect_recurse_skip_prefixes = prefixes or [] try: yield True finally: self.inspect_recurse_skip_prefixes = None # when parsing block widgets we usually want to skip empty rows, however # when evaluating live conditions we must keep all lines to get row indices # matching what's in the DOM. keep_all_block_rows_mode = False @contextmanager def keep_all_block_rows(self): self.keep_all_block_rows_mode = True try: yield True finally: self.keep_all_block_rows_mode = False # stacked contexts to include in logged errors _error_context = None @contextmanager def error_context(self, **kwargs): if not self._error_context: self._error_context = [] self._error_context.append(kwargs) try: yield True finally: self._error_context.pop() def get_error_context(self): return {'stack': self._error_context} if self._error_context else None def clean_deleted_users(self, **kwargs): for user_id in self.user_class.get_to_delete_ids(): self.user_class.remove_object(user_id) set_publisher_class(WcsPublisher)