trivial: adjust for pylint (#79712)
gitea/hobo/pipeline/head This commit looks good Details

This commit is contained in:
Frédéric Péters 2023-07-15 20:46:19 +02:00
parent 9988a356ff
commit 35179bd7d4
106 changed files with 284 additions and 363 deletions

View File

@ -58,7 +58,7 @@ class Command(hobo_deploy.Command):
user_dict['is_staff'] = True
user_dict['is_superuser'] = True
user, created = User.objects.get_or_create(defaults=user_dict, username=user_dict['username'])
User.objects.get_or_create(defaults=user_dict, username=user_dict['username'])
# create/update user attributes
fields = []
@ -70,7 +70,7 @@ class Command(hobo_deploy.Command):
# so it gets shared as SAML attribute.
fields.append(attribute['name'])
continue
attr, created = Attribute.all_objects.update_or_create(
attr, dummy = Attribute.all_objects.update_or_create(
name=attribute['name'], defaults={'kind': attribute['kind']}
)
for key in (
@ -94,7 +94,7 @@ class Command(hobo_deploy.Command):
attr.save()
# creation of IdpPolicy
policy, created = SPOptionsIdPPolicy.objects.get_or_create(name='Default')
policy, dummy = SPOptionsIdPPolicy.objects.get_or_create(name='Default')
policy.enabled = True
policy.authn_request_signed = False
policy.accepted_name_id_format = ['uuid']
@ -108,7 +108,7 @@ class Command(hobo_deploy.Command):
# create SAML default policy attributes
names = ['username', 'is_superuser'] + fields + disabled_fields
for name in names:
attribute, created = SAMLAttribute.objects.get_or_create(
attribute, dummy = SAMLAttribute.objects.get_or_create(
name=name,
name_format='basic',
attribute_name='django_user_%s' % name,
@ -177,7 +177,7 @@ class Command(hobo_deploy.Command):
provider.metadata_url = service['saml-sp-metadata-url']
variables = service.get('variables', {})
if variables.get('ou-slug'):
ou, created = OrganizationalUnit.objects.get_or_create(
ou, dummy = OrganizationalUnit.objects.get_or_create(
slug=service['variables']['ou-slug']
)
ou.name = service['variables']['ou-label']
@ -200,7 +200,7 @@ class Command(hobo_deploy.Command):
create_ou = True
break
if create_ou:
ou, created = OrganizationalUnit.objects.get_or_create(name=service['title'])
ou, dummy = OrganizationalUnit.objects.get_or_create(name=service['title'])
if service_created or not provider.ou:
provider.ou = ou
provision_target_ous[provider.ou.id] = provider.ou
@ -219,7 +219,7 @@ class Command(hobo_deploy.Command):
# add a superuser role for the service
name = _('Superuser of %s') % service['title']
su_role, created = Role.objects.get_or_create(
su_role, dummy = Role.objects.get_or_create(
service=provider, slug='_a2-hobo-superuser', defaults={'name': name}
)
if su_role.name != name:
@ -286,20 +286,21 @@ class Command(hobo_deploy.Command):
'''Load default roles based on a template'''
roles_filename = os.path.join(skeleton_dir, 'roles.json')
if not os.path.exists(roles_filename):
self.logger.debug('no skeleton roles: roles file %r does not ' 'exist', roles_filename)
self.logger.debug('no skeleton roles: roles file %r does not exist', roles_filename)
return
if Role.objects.filter(ou=provider.ou).exclude(slug__startswith='_a2-').exists():
return
roles = []
for role in serializers.deserialize('json', open(roles_filename)):
assert isinstance(role.object, Role)
# reset id and natural key
role.object.pk = None
role.object.uuid = Role._meta.get_field('uuid').default()
# same ou as provider
role.object.ou = provider.ou
# XXX: attach to service or not ?
roles.append(role.object)
with open(roles_filename) as fd:
for role in serializers.deserialize('json', fd):
assert isinstance(role.object, Role)
# reset id and natural key
role.object.pk = None
role.object.uuid = Role._meta.get_field('uuid').default()
# same ou as provider
role.object.ou = provider.ou
# XXX: attach to service or not ?
roles.append(role.object)
if roles:
Role.objects.bulk_create(roles)
Role.objects.get(uuid=roles[-1].uuid).save()

View File

@ -258,7 +258,7 @@ class Provisionning(threading.local):
sync=sync,
)
elif users:
audience = [audience for ou in ous.keys() for s, audience in self.get_audience(ou)]
audience = [audience for ou in ous for s, audience in self.get_audience(ou)]
logger.info(
'deprovisionning users %s from %s', ', '.join(map(force_str, users)), ', '.join(audience)
)
@ -509,7 +509,7 @@ class Provisionning(threading.local):
if sync:
url += '&sync=1'
try:
response = requests.put(sign_url(url, service['secret']), json=data)
response = requests.put(sign_url(url, service['secret']), json=data, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error('error provisionning to %s (%s)', audience, e)

View File

@ -28,12 +28,12 @@ class ProvisionSerializer(serializers.Serializer):
service_type = serializers.CharField(required=False)
service_url = serializers.CharField(required=False)
def validate(self, data):
if not (data.get('user_uuid') or data.get('role_uuid')):
def validate(self, attrs):
if not (attrs.get('user_uuid') or attrs.get('role_uuid')):
raise serializers.ValidationError('must provide user_uuid or role_uuid')
if data.get('user_uuid') and data.get('role_uuid'):
if attrs.get('user_uuid') and attrs.get('role_uuid'):
raise serializers.ValidationError('cannot provision both user & role')
return data
return attrs
class ProvisionView(GenericAPIView):

View File

@ -63,7 +63,8 @@ class Command(BaseCommand):
if json_filename == '-':
hobo_environment = json.load(sys.stdin)
else:
hobo_environment = json.load(open(json_filename))
with open(json_filename) as fd:
hobo_environment = json.load(fd)
self.deploy(base_url, hobo_environment, ignore_timestamp)
def deploy(self, base_url, hobo_environment, ignore_timestamp):
@ -92,8 +93,9 @@ class Command(BaseCommand):
timestamp = hobo_environment.get('timestamp')
tenant_hobo_json = os.path.join(tenant.get_directory(), 'hobo.json')
if os.path.exists(tenant_hobo_json):
if not ignore_timestamp and json.load(open(tenant_hobo_json)).get('timestamp') == timestamp:
return
with open(tenant_hobo_json) as fd:
if not ignore_timestamp and json.load(fd).get('timestamp') == timestamp:
return
# add an attribute to current tenant for easier retrieval
self.me['this'] = True
@ -148,7 +150,7 @@ class Command(BaseCommand):
if not idp_url:
continue
try:
response = requests.get(idp_url, verify=False)
response = requests.get(idp_url, verify=False, timeout=10)
except requests.exceptions.RequestException:
continue
if response.status_code != 200:

View File

@ -65,7 +65,7 @@ class Command(BaseCommand, NotificationProcessing):
object_type = notification['objects']['@type']
msg = 'received request for %sing %%d %%s objects (Celery)' % action
logger.info(msg, len(notification['objects']['data']), object_type)
for i in range(20):
for _ in range(20):
try:
getattr(cls, 'provision_' + object_type)(
issuer, action, notification['objects']['data'], full=full

View File

@ -38,5 +38,4 @@ class Command(BaseCommand):
if 'import_site' in get_commands():
if not os.path.exists(template):
raise UnknownTemplateError('unknown template (%r)' % template)
else:
call_command('import_site', template)
call_command('import_site', template)

View File

@ -6,7 +6,6 @@ from hobo.agent.common.management.commands import hobo_deploy
from hobo.deploy.signals import notify_agents
from hobo.environment.models import AVAILABLE_SERVICES, Hobo, Variable
from hobo.environment.utils import get_or_create_local_hobo
from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound
from hobo.profile.models import AttributeDefinition
@ -63,9 +62,6 @@ class Command(hobo_deploy.Command):
# this is the primary hobo, receiving a notification
# because something changed in secondary hobo environment.
# mark services with ou-label/ou-slug
secondary_hobo = [
x for x in services if x['service-id'] == 'hobo' and not x.get('secondary')
][0]
slug_prefix = '_%s_' % hobo_environment['variables']['ou-slug']
service_slug = '%s%s' % (slug_prefix, service_dict['slug'])
@ -148,7 +144,7 @@ class Command(hobo_deploy.Command):
attribute.order = i + 1
attribute.save()
seen.append(attribute.name)
removed, details_ = AttributeDefinition.objects.exclude(name__in=seen).delete()
removed, _ = AttributeDefinition.objects.exclude(name__in=seen).delete()
if removed:
changes = True
if changes:

View File

@ -79,12 +79,12 @@ class BaseService:
if not os.path.exists(self.service_manage_try_cmd):
return
cmd = self.service_manage_cmd + ' hobo_deploy ' + self.base_url + ' -'
cmd_process = subprocess.Popen(
with subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = cmd_process.communicate(input=force_bytes(json.dumps(environment)))
if cmd_process.returncode != 0:
raise RuntimeError('command "%s" failed: %r %r' % (cmd, stdout, stderr))
) as cmd_process:
stdout, stderr = cmd_process.communicate(input=force_bytes(json.dumps(environment)))
if cmd_process.returncode != 0:
raise RuntimeError('command "%s" failed: %r %r' % (cmd, stdout, stderr))
@classmethod
def notify(cls, data):
@ -101,14 +101,14 @@ class BaseService:
return
cmd = cls.service_manage_cmd + ' hobo_notify -'
try:
cmd_process = subprocess.Popen(
with subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
) as cmd_process:
stdout, stderr = cmd_process.communicate(input=force_bytes(json.dumps(data)))
if cmd_process.returncode != 0:
raise RuntimeError('command "%s" failed: %r %r' % (cmd, stdout, stderr))
except OSError:
return
stdout, stderr = cmd_process.communicate(input=force_bytes(json.dumps(data)))
if cmd_process.returncode != 0:
raise RuntimeError('command "%s" failed: %r %r' % (cmd, stdout, stderr))
class Passerelle(BaseService):
@ -195,7 +195,7 @@ def deploy(environment):
service_classes = {}
for klassname, service in globals().items():
for service in globals().values():
if not hasattr(service, 'service_id'):
continue
service_classes[service.service_id] = service
@ -222,7 +222,7 @@ def deploy(environment):
def notify(data):
services = []
for klassname, service in globals().items():
for service in globals().values():
if not hasattr(service, 'service_id'):
continue
services.append(service)

View File

@ -16,10 +16,8 @@
import urllib
import requests
from django.conf import settings
from django.utils.http import urlencode
from requests import Response
from requests import Session as RequestsSession
from requests.auth import AuthBase
@ -59,7 +57,7 @@ class Requests(RequestsSession):
query_params['orig'] = remote_service.get('orig')
remote_service_base_url = remote_service.get('url')
scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(remote_service_base_url)
scheme, netloc, dummy, params, _, fragment = urllib.parse.urlparse(remote_service_base_url)
query = urlencode(query_params)
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))

View File

@ -24,7 +24,7 @@ from django.core.files.base import ContentFile
from django.db.models import Prefetch
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse, reverse_lazy
from django.urls import reverse
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
@ -192,13 +192,13 @@ class AppAddElementView(TemplateView):
app = context['app']
element_infos = {x['id']: x for c in context['categories'] for x in c.elements}
for element_slug in request.POST.getlist('elements'):
element, created = Element.objects.get_or_create(
element, dummy = Element.objects.get_or_create(
type=type, slug=element_slug, defaults={'name': element_infos[element_slug]['text']}
)
element.name = element_infos[element_slug]['text']
element.cache = element_infos[element_slug]
element.save()
relation, created = Relation.objects.get_or_create(application=app, element=element)
relation, dummy = Relation.objects.get_or_create(application=app, element=element)
relation.auto_dependency = False
relation.save()
return HttpResponseRedirect(reverse('application-manifest', kwargs={'app_slug': app_slug}))
@ -306,32 +306,32 @@ class Install(FormView):
def form_valid(self, form):
tar_io = io.BytesIO(self.request.FILES['bundle'].read())
tar = tarfile.open(fileobj=tar_io)
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
if self.application and self.application.slug != manifest.get('slug'):
form.add_error(
'bundle', _('Can not update this application, wrong slug (%s).') % manifest.get('slug')
with tarfile.open(fileobj=tar_io) as tar:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
if self.application and self.application.slug != manifest.get('slug'):
form.add_error(
'bundle', _('Can not update this application, wrong slug (%s).') % manifest.get('slug')
)
return self.form_invalid(form)
app, created = Application.objects.get_or_create(
slug=manifest.get('slug'), defaults={'name': manifest.get('application')}
)
return self.form_invalid(form)
app, created = Application.objects.get_or_create(
slug=manifest.get('slug'), defaults={'name': manifest.get('application')}
)
self.application = app
app.name = manifest.get('application')
app.description = manifest.get('description')
app.documentation_url = manifest.get('documentation_url') or ''
if created:
# mark as non-editable only newly deployed applications, this allows
# overwriting a local application and keep on developing it.
app.editable = False
else:
app.relation_set.all().delete()
app.save()
icon = manifest.get('icon')
if icon:
app.icon.save(icon, tar.extractfile(icon), save=True)
else:
app.icon.delete()
self.application = app
app.name = manifest.get('application')
app.description = manifest.get('description')
app.documentation_url = manifest.get('documentation_url') or ''
if created:
# mark as non-editable only newly deployed applications, this allows
# overwriting a local application and keep on developing it.
app.editable = False
else:
app.relation_set.all().delete()
app.save()
icon = manifest.get('icon')
if icon:
app.icon.save(icon, tar.extractfile(icon), save=True)
else:
app.icon.delete()
for element_dict in manifest.get('elements'):
element, created = Element.objects.get_or_create(

View File

@ -60,7 +60,7 @@ class RemoteTemplate:
# match.
page_cache, expiry_time = item
selected_cache_page = None
for page_redirect_url in sorted(page_cache.keys(), key=lambda x: len(x)):
for page_redirect_url in sorted(page_cache.keys(), key=len):
if selected_cache_page is None:
selected_cache_page = page_redirect_url
continue
@ -117,6 +117,7 @@ class RemoteTemplate:
self.theme_skeleton_url,
params={'source': self.source},
headers={'Accept-Language': self.language_code},
timeout=10,
)
if r.status_code != 200:
logger.error('failed to retrieve theme (status code: %s)', r.status_code)
@ -137,15 +138,16 @@ class RemoteTemplate:
self.set_language(lang_code)
# always cache root
root_url = urllib.parse.urlunparse(urllib.parse.urlparse(self.source)[:2] + ('/', '', '', ''))
if not root_url in self.combo_skeleton_pages.values():
if root_url not in self.combo_skeleton_pages.values():
self.combo_skeleton_pages['__root'] = root_url
page_cache = {}
for page_id, page_redirect_url in self.combo_skeleton_pages.items():
for page_redirect_url in self.combo_skeleton_pages.values():
r = requests.get(
self.theme_skeleton_url,
params={'source': page_redirect_url},
headers={'Accept-Language': lang_code},
timeout=10,
)
if r.status_code != 200:
# abort
@ -223,6 +225,7 @@ def _authentic2_get_next_url(request):
def user_urls(request):
# ugly, but necessary..
if 'wcs.qommon' in settings.INSTALLED_APPS:
# noqa pylint: disable=import-error
from quixote import get_publisher
pub = get_publisher()

View File

@ -21,4 +21,5 @@ class AppConfig(django.apps.AppConfig):
name = 'hobo.deploy'
def ready(self):
# noqa pylint: disable=unused-import
from . import signals

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import smtplib
import socket
import urllib.parse
import dns.resolver
@ -87,6 +86,6 @@ def validate_email_domain(value):
return
for service in get_operational_services():
fqdn = urllib.parse.urlparse(service.base_url).netloc.split(':')[0]
if fqdn == domain or fqdn == 'www.' + domain:
if fqdn in (domain, 'www.' + domain):
return
raise ValidationError(_('Domain %s is not allowed') % domain)

View File

@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
from django.utils.translation import gettext as _
from django.views.generic import TemplateView
@ -34,7 +33,7 @@ class HomeView(VariablesFormMixin, TemplateView):
'email_abuse_report_url',
]
form_class = EmailsForm
success_message = _('Emails settings have been updated. ' 'It will take a few seconds to be effective.')
success_message = _('Emails settings have been updated. It will take a few seconds to be effective.')
home = HomeView.as_view()

View File

@ -1,3 +1,2 @@
# noqa pylint: disable=unused-import
from django.contrib import admin
# Register your models here.

View File

@ -1,5 +1,3 @@
from optparse import make_option
from django.core.management.base import BaseCommand
from hobo.environment import models

View File

@ -89,13 +89,13 @@ class Command(BaseCommand):
print('All steps executed successfully. Your environment should now be ready.')
def run_cook(self, filename):
recipe = json.load(open(filename))
with open(filename) as fd:
recipe = json.load(fd)
variables = {}
steps = []
if 'load-variables-from' in recipe:
variables.update(
json.load(open(os.path.join(os.path.dirname(filename), recipe['load-variables-from'])))
)
with open(os.path.join(os.path.dirname(filename), recipe['load-variables-from'])) as fd:
variables.update(json.load(fd))
variables.update(recipe.get('variables', {}))
for step in recipe.get('steps', []):
action, action_args = list(step.items())[0]
@ -155,9 +155,8 @@ class Command(BaseCommand):
base_url_filename = os.path.join(tenant.get_directory(), 'base_url')
if not os.path.exists(base_url_filename):
fd = open(base_url_filename, 'w')
fd.write(url.rstrip('/'))
fd.close()
with open(base_url_filename, 'w') as fd:
fd.write(url.rstrip('/'))
connection.set_tenant(tenant)
@ -210,7 +209,7 @@ class Command(BaseCommand):
obj_type = ContentType.objects.get_for_model(klass)
for variable_name in variables.keys():
label = variables[variable_name].get('label')
variable, created = Variable.objects.get_or_create(
variable, dummy = Variable.objects.get_or_create(
name=variable_name,
service_type=obj_type,
service_pk=obj.id,
@ -219,7 +218,7 @@ class Command(BaseCommand):
if label:
variable.label = label
value = variables[variable_name].get('value')
if isinstance(value, dict) or isinstance(value, list):
if isinstance(value, (dict, list)):
value = json.dumps(value)
variable.value = value
variable.save()
@ -272,7 +271,7 @@ class Command(BaseCommand):
variable, created = Variable.objects.get_or_create(
name=name, defaults={'label': label or name, 'auto': auto}
)
if isinstance(value, dict) or isinstance(value, list):
if isinstance(value, (dict, list)):
value = json.dumps(value)
if variable.label != label or variable.value != value or variable.auto != auto or created:
if label:

View File

@ -1,4 +1,4 @@
from django.db import migrations, models
from django.db import migrations
class Migration(migrations.Migration):

View File

@ -1,4 +1,4 @@
from django.db import migrations, models
from django.db import migrations
class Migration(migrations.Migration):

View File

@ -1,7 +1,6 @@
# Generated by Django 1.11.29 on 2021-02-28 09:57
import django.core.validators
from django.db import migrations, models
from django.db import migrations
class Migration(migrations.Migration):

View File

@ -235,7 +235,7 @@ class ServiceBase(models.Model):
if is_new and settings.SERVICE_EXTRA_VARIABLES:
for variable in settings.SERVICE_EXTRA_VARIABLES.get(self.Extra.service_id, []):
v = Variable()
if type(variable) is dict:
if isinstance(variable, dict):
v.name = variable.get('name')
v.label = variable.get('label')
else:

View File

@ -139,7 +139,7 @@ def get_variable(name):
def set_variable(name, value):
from .models import Variable
variable, created = Variable.objects.get_or_create(name=name, defaults={'auto': True})
variable, dummy = Variable.objects.get_or_create(name=name, defaults={'auto': True})
variable.value = value
variable.save()
@ -197,13 +197,13 @@ def import_parameters(parameters):
with transaction.atomic():
for variables in parameters.get('variables', []):
obj, created = Variable.objects.get_or_create(name=variables['name'])
obj, dummy = Variable.objects.get_or_create(name=variables['name'])
for key, value in variables.items():
setattr(obj, key, value)
obj.save()
for fields in parameters.get('profile', {}).get('fields', []):
obj, created = AttributeDefinition.objects.get_or_create(name=fields['name'])
obj, dummy = AttributeDefinition.objects.get_or_create(name=fields['name'])
for key, value in fields.items():
setattr(obj, key, value)
obj.save()

View File

@ -16,9 +16,7 @@
import datetime
import json
import string
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.http import Http404, HttpResponse, HttpResponseRedirect, JsonResponse
from django.shortcuts import get_object_or_404

View File

@ -22,6 +22,7 @@ import sys as _sys
import traceback as _traceback
from syslog import LOG_ALERT, LOG_CRIT, LOG_DEBUG, LOG_ERR, LOG_INFO, LOG_WARNING
# noqa pylint: disable=import-error
from systemd._journal import sendv
_IDENT_CHARACTER = set('ABCDEFGHIJKLMNOPQRTSUVWXYZ_0123456789')

View File

@ -23,7 +23,6 @@ from random import choice, randint
import requests
from django.conf import settings
from django.core import exceptions
from django.db import connection
from django.utils.encoding import force_bytes
from lxml import etree
@ -51,7 +50,7 @@ def get_variable(name, default=''):
"""get hobo variables from DB
set it to '' into DB if not already created
"""
variable, created = Variable.objects.get_or_create(name=name, defaults={'auto': True, 'value': default})
variable, dummy = Variable.objects.get_or_create(name=name, defaults={'auto': True, 'value': default})
return variable
@ -164,7 +163,7 @@ class MatomoWS:
data['module'] = 'API'
data['token_auth'] = self.token_auth
data['language'] = 'en'
resp = requests.post(self.url_ws_base, data=data)
resp = requests.post(self.url_ws_base, data=data, timeout=30)
if resp.status_code != 200:
raise MatomoException('unexpected status code: %s' % resp.status_code)
tree = self.parse_response(resp.content)
@ -237,7 +236,7 @@ class MatomoWS:
"""this function use a different matomo's webservice API"""
url = "%s/matomo.php" % self.url_ws_base
data = {'requests': ['?idsite=%s&action_name=ping&rec=1' % id_site]}
resp = requests.post(url, json=data)
resp = requests.post(url, json=data, timeout=30)
if resp.status_code != 200:
raise MatomoException('unexpected status code: %s' % resp.status_code)
try:

View File

@ -14,10 +14,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
from django.contrib import messages
from django.urls import reverse_lazy
from django.views.generic import FormView, RedirectView
from django.views.generic import FormView
from .forms import EnableForm, SettingsForm
from .utils import (

View File

@ -1,4 +1,3 @@
import django
from django.http import JsonResponse
from django.middleware.common import CommonMiddleware
from django.utils.translation import gettext_lazy as _

View File

@ -1,5 +1,3 @@
import os.path
from django.conf import settings
from django.http import HttpResponse
from django.utils.deprecation import MiddlewareMixin

View File

@ -29,7 +29,6 @@ class InternalIPMiddleware:
settings.DEBUG = True
except TypeError:
pass
return None
def process_response(self, request, response):
if hasattr(self, 'old_value'):

View File

@ -8,7 +8,7 @@ from hobo.scrutiny.wsgi import middleware
class VersionMiddleware(MiddlewareMixin):
def process_request(self, request):
if request.method == 'GET' and (request.path == '/__version__' or request.path == '/__version__/'):
if request.method == 'GET' and request.path in ('/__version__', '/__version__/'):
packages_version = middleware.VersionMiddleware.get_packages_version()
return HttpResponse(json.dumps(packages_version), content_type='application/json')
return None

View File

@ -21,4 +21,3 @@ class XForwardedForMiddleware(MiddlewareMixin):
if ip:
request.META['REMOTE_ADDR'] = ip
break
return None

View File

@ -1,4 +1,4 @@
from django.apps import AppConfig, apps
from django.apps import AppConfig
from . import settings, threads
@ -15,11 +15,10 @@ class MultitenantAppConfig(AppConfig):
verbose_name = 'Multitenant'
def ready(self):
# noqa pylint: disable=global-statement
global _tenant_settings_wrapper
from django import conf
from django.db import migrations
from django.db.migrations import operations
# Install tenant aware settings
if not isinstance(conf.settings._wrapped, settings.TenantSettingsWrapper):

View File

@ -19,8 +19,8 @@ import os
import shutil
import time
# noqa pylint: disable=import-error
import haystack.backends.whoosh_backend
from django.conf import settings
from django.db import connection
from tenant_schemas.postgresql_backend.base import FakeTenant
@ -28,7 +28,6 @@ from tenant_schemas.postgresql_backend.base import FakeTenant
class WhooshSearchBackend(haystack.backends.whoosh_backend.WhooshSearchBackend):
@property
def use_file_storage(self):
tenant = connection.tenant
return not (isinstance(connection.tenant, FakeTenant))
@use_file_storage.setter

View File

@ -20,8 +20,6 @@ from django.db import connection
class AdminEmailHandler(django.utils.log.AdminEmailHandler):
def format_subject(self, subject):
from .models import Tenant
subject = super().format_subject(subject)
try:
subject = '[%s] %s' % (connection.tenant.domain_url, subject)

View File

@ -3,14 +3,11 @@
# Email: carneiro.be@gmail.com
# License: MIT license
# Home-page: http://github.com/bcarneiro/django-tenant-schemas
from optparse import make_option
import django
from django.conf import settings
from django.core.management import call_command, get_commands, load_command_class
from django.core.management.base import BaseCommand, CommandError
from django.db import connection
from tenant_schemas.utils import get_public_schema_name
from hobo.multitenant.middleware import TenantMiddleware
@ -205,6 +202,7 @@ def disable_global_logging():
# try to disable sentry
if 'sentry_sdk' in sys.modules:
# noqa pylint: disable=import-error
import sentry_sdk
sentry_sdk.init()

View File

@ -1,5 +1,3 @@
import os
import subprocess
import sys
from django.core.management.base import CommandError

View File

@ -1,5 +1,4 @@
import sys
from optparse import make_option
from django.core.management.base import BaseCommand, CommandError

View File

@ -3,8 +3,6 @@
# Email: carneiro.be@gmail.com
# License: MIT license
# Home-page: http://github.com/bcarneiro/django-tenant-schemas
import django
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from tenant_schemas.utils import django_is_in_test_mode
@ -12,16 +10,11 @@ from tenant_schemas.utils import django_is_in_test_mode
class Command(BaseCommand):
def handle(self, *args, **options):
database = options.get('database', 'default')
if (
settings.DATABASES[database]['ENGINE'] == 'tenant_schemas.postgresql_backend'
or MigrateCommand is BaseCommand
):
raise CommandError(
"migrate has been disabled, for database '{}'. Use migrate_schemas "
"instead. Please read the documentation if you don't know why you "
"shouldn't call migrate directly!".format(database)
)
super().handle(*args, **options)
raise CommandError(
"migrate has been disabled, for database '{}'. Use migrate_schemas "
"instead. Please read the documentation if you don't know why you "
"shouldn't call migrate directly!".format(database)
)
if django_is_in_test_mode():

View File

@ -14,16 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import django
from django.apps import apps
from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import connection
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
from django.utils.timezone import localtime
from tenant_schemas.postgresql_backend.base import FakeTenant
from tenant_schemas.utils import get_public_schema_name, schema_exists
from hobo.multitenant.management.commands import SyncCommon
from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound

View File

@ -1,11 +1,9 @@
import os
from optparse import make_option
from django.conf import settings
from django.contrib.staticfiles.handlers import StaticFilesHandler
from django.contrib.staticfiles.management.commands.runserver import Command as StaticRunserverCommand
from django.contrib.staticfiles.views import serve
from django.core.management.commands.runserver import Command as RunserverCommand
from django.views import static
from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound

View File

@ -1,29 +0,0 @@
# this file derive from django-tenant-schemas
# Author: Bernardo Pires Carneiro
# Email: carneiro.be@gmail.com
# License: MIT license
# Home-page: http://github.com/bcarneiro/django-tenant-schemas
from django.conf import settings
from django.core.management.base import CommandError
from tenant_schemas.utils import django_is_in_test_mode
if 'south' in settings.INSTALLED_APPS:
from south.management.commands import syncdb
else:
from django.core.management.commands import syncdb
class Command(syncdb.Command):
def handle(self, *args, **options):
database = options.get('database', 'default')
if (
settings.DATABASES[database]['ENGINE'] == 'tenant_schemas.postgresql_backend'
and not django_is_in_test_mode()
):
raise CommandError(
"syncdb has been disabled, for database '{}'. "
"Use sync_schemas instead. Please read the "
"documentation if you don't know why "
"you shouldn't call syncdb directly!".format(database)
)
super().handle(*args, **options)

View File

@ -7,7 +7,6 @@
import argparse
import sys
import django
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command, get_commands, load_command_class

View File

@ -8,7 +8,7 @@ from django.db import connection
from django.utils import timezone
from tenant_schemas.models import TenantMixin
from tenant_schemas.postgresql_backend.base import _check_schema_name
from tenant_schemas.utils import django_is_in_test_mode, get_public_schema_name, schema_exists
from tenant_schemas.utils import get_public_schema_name, schema_exists
class Tenant(TenantMixin):

View File

@ -50,11 +50,10 @@ class TenantSettingsWrapper:
new = False
for loader in self.loaders:
new_time = loader.get_new_time(tenant)
if (
(not new_time and last_time)
or (new_time and not last_time)
or (new_time and new_time > last_time)
):
if not new_time and last_time:
new = True
break
if new_time and (not last_time or new_time > last_time):
new = True
break

View File

@ -5,7 +5,6 @@ import urllib.parse
from django.conf import settings
from django.utils.encoding import force_bytes
from django.utils.http import urlencode
from hobo.theme.utils import get_theme
@ -33,7 +32,7 @@ class FileBaseSettingsLoader:
self.update_settings_from_path(tenant_settings, path)
def update_settings_from_path(self, tenant_settings, path):
raise NotImplemented
raise NotImplementedError
class SettingsDictUpdateMixin:
@ -133,8 +132,6 @@ class TemplateVars(FileBaseSettingsLoader):
variables['is_portal_agent'] = False
variables['is_portal_user'] = False
authentic_service = None
for service in hobo_json.get('services'):
if not service.get('slug'):
continue
@ -150,7 +147,6 @@ class TemplateVars(FileBaseSettingsLoader):
variables['portal_user_slug'] = service.get('slug')
if service.get('service-id') == 'authentic':
authentic_service = service
variables['idp_url'] = service.get('base_url')
variables['idp_api_url'] = service.get('base_url') + 'api/'
variables['idp_account_url'] = service.get('base_url') + 'accounts/'

View File

@ -1,8 +1,6 @@
import os
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.core.files import File
from django.core.files.storage import FileSystemStorage
from django.db import connection
from django.utils._os import safe_join
@ -14,6 +12,7 @@ __all__ = ('TenantFileSystemStorage',)
class TenantFileSystemStorage(FileSystemStorage, TenantStorageMixin):
'''Lookup files in $TENANT_BASE/<tenant.schema>/media/'''
# noqa pylint: disable=invalid-overridden-method
@property
def location(self):
if connection.tenant:

View File

@ -3,23 +3,21 @@ Wrapper class that takes a list of template loaders as an argument and attempts
to load templates from them in order, caching the result.
"""
import hashlib
import itertools
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db import connection
from django.template import Origin, TemplateDoesNotExist, engines
from django.template import Origin
from django.template.loaders.cached import Loader as DjangoCachedLoader
from django.template.loaders.filesystem import Loader as DjangoFilesystemLoader
from django.utils._os import safe_join
from django.utils.encoding import force_bytes
from tenant_schemas.utils import get_public_schema_name
class CachedLoader(DjangoCachedLoader):
def cache_key(self, template_name, template_dirs, skip=None):
key = super().cache_key(template_name, template_dirs, skip=skip)
def cache_key(self, template_name, skip=None):
key = super().cache_key(template_name, skip=skip)
if connection.tenant:
return connection.tenant.domain_url + '-' + key
return key

View File

@ -96,7 +96,7 @@ if uwsgi:
with close_db(), tenant_context(tenant):
function(*args, **kwargs)
except Exception:
logger.exception('spooler: function "%s" raised' % name)
logger.exception('spooler: function "%s" raised', name)
return uwsgi.SPOOL_OK
uwsgi.spooler = spooler_function

View File

@ -5,6 +5,7 @@ import logging
from django.db import close_old_connections, connection
from uwsgidecorators import spool
# noqa pylint: disable=unused-import
import hobo.multitenant.settings_loaders # this will get imported via importlib but fail for some reason in a uwsgi job
from hobo.provisionning.utils import NotificationProcessing
@ -41,7 +42,7 @@ def provision(args):
issuer=args['issuer'],
action=args['action'],
data=json.loads(args['body']),
full=True if args['full'] == 'true' else False,
full=bool(args['full'] == 'true'),
)
except Exception:
# we need to catch every exceptions otherwise the task will be re scheduled for ever and ever

View File

@ -124,16 +124,16 @@ class NotificationProcessing:
)
excluded_attrs = ['roles', 'password']
extra_attributes = UserExtraAttributes.objects.update_or_create(
UserExtraAttributes.objects.update_or_create(
user=user,
defaults={'data': {k: v for k, v in o.items() if k not in excluded_attrs}},
)
if new:
logger.info('provisionned new user %s', user_str(user))
else:
for key in attributes:
if getattr(user, key) != attributes[key]:
setattr(user, key, attributes[key])
for key, value in attributes.items():
if getattr(user, key) != value:
setattr(user, key, value)
updated.add(key)
if updated:
user.save()
@ -172,7 +172,7 @@ class NotificationProcessing:
@classmethod
def provision_role(cls, issuer, action, data, full=False):
uuids = set()
roles_by_uuid = dict()
roles_by_uuid = {}
if action == 'provision':
# first pass to gather existing roles by uuid
@ -263,7 +263,7 @@ class NotificationProcessing:
@classmethod
def provision(cls, object_type, issuer, action, data, full):
for i in range(20):
for _ in range(20):
try:
getattr(cls, 'provision_' + object_type)(issuer=issuer, action=action, data=data, full=full)
except TryAgain:

View File

@ -16,10 +16,8 @@
import urllib
import requests
from django.conf import settings
from django.utils.http import urlencode
from requests import Response
from requests import Session as RequestsSession
from requests.auth import AuthBase
@ -58,7 +56,7 @@ class Requests(RequestsSession):
query_params = {'orig': remote_service.get('orig')}
remote_service_base_url = remote_service.get('url')
scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(remote_service_base_url)
scheme, netloc, dummy, params, dummy, fragment = urllib.parse.urlparse(remote_service_base_url)
query = urlencode(query_params)
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))

View File

@ -132,8 +132,8 @@ class PublikAuthentication(authentication.BaseAuthentication):
if not hasattr(settings, 'KNOWN_SERVICES'):
self.logger.warning('no known services')
raise PublikAuthenticationFailed('no-known-services-setting')
for service_id in settings.KNOWN_SERVICES:
for slug, service in settings.KNOWN_SERVICES[service_id].items():
for services in settings.KNOWN_SERVICES.values():
for service in services.values():
if service.get('verif_orig') == orig and service.get('secret'):
return service['secret']
self.logger.warning('no secret found for origin %r', orig)
@ -165,7 +165,7 @@ class APIClientAuthenticationUnavailable(exceptions.APIException):
class APIClientAuthentication(authentication.BasicAuthentication):
def authenticate_credentials(self, identifier, password, request=None):
def authenticate_credentials(self, userid, password, request=None):
idp_services = list(getattr(settings, 'KNOWN_SERVICES', {}).get('authentic', {}).values())
if not idp_services:
return None
@ -173,10 +173,10 @@ class APIClientAuthentication(authentication.BasicAuthentication):
url = authentic['url'] + 'api/check-api-client/'
try:
response = Requests().post(url, json={'identifier': identifier, 'password': password})
response = Requests().post(url, json={'identifier': userid, 'password': password})
except requests.Timeout:
raise APIClientAuthenticationUnavailable()
except requests.RequestException as err:
except requests.RequestException:
raise APIClientAuthenticationUnavailable()
try:

View File

@ -52,7 +52,7 @@ class VersionMiddleware(MiddlewareMixin):
path += urllib.parse.quote(environ.get('SCRIPT_NAME', ''))
path += urllib.parse.quote(environ.get('PATH_INFO', ''))
method = environ.get('REQUEST_METHOD', 'GET')
if method == 'GET' and (path == '/__version__' or path == '/__version__/'):
if method == 'GET' and (path in '/__version__', '/__version__/'):
packages_version = self.get_packages_version()
start_response('200 Ok', [('content-type', 'application/json')])
return [json.dumps(packages_version)]

View File

@ -16,7 +16,7 @@
from django.http import HttpResponseRedirect
from django.urls import reverse_lazy
from django.views.generic import FormView, TemplateView
from django.views.generic import FormView
from hobo.environment.utils import get_variable, set_variable

View File

@ -9,8 +9,6 @@ import urllib.parse
from django.utils.encoding import smart_bytes
from django.utils.http import quote, urlencode
'''Simple signature scheme for query strings'''
class SignatureError(Exception):
pass

View File

@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
from django.utils.translation import gettext as _
from django.views.generic import TemplateView

View File

@ -61,12 +61,12 @@ def get_selected_theme():
def set_theme(theme_id):
from hobo.environment.models import Variable
selected_theme, created = Variable.objects.get_or_create(name='theme', service_pk__isnull=True)
selected_theme, dummy = Variable.objects.get_or_create(name='theme', service_pk__isnull=True)
old_theme = get_theme(selected_theme.value) or {}
old_variables = old_theme.get('variables', {})
theme = get_theme(theme_id)
for variable in theme.get('variables', {}).keys():
theme_variable, created = Variable.objects.get_or_create(name=variable, service_pk__isnull=True)
theme_variable, dummy = Variable.objects.get_or_create(name=variable, service_pk__isnull=True)
theme_variable.auto = True
if isinstance(theme['variables'][variable], str):
theme_variable.value = theme['variables'][variable]

View File

@ -1,5 +1,4 @@
import json
import urllib
from django.conf import settings
from django.contrib.auth import logout as auth_logout
@ -11,12 +10,10 @@ from django.shortcuts import resolve_url
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.translation import gettext as _
from django.views.generic import edit
from django.views.generic.base import TemplateView
from .environment.models import Authentic, Variable
from .environment.utils import Zone, get_installed_services
from .forms import HoboForm, HoboUpdateForm, get_tenant_model
from .environment.utils import get_installed_services
def is_superuser(u):

View File

@ -9,8 +9,8 @@ https://docs.djangoproject.com/en/1.6/howto/deployment/wsgi/
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hobo.settings")
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hobo.settings")
application = get_wsgi_application()

View File

@ -6,12 +6,16 @@ LANGUAGE_CODE = 'en-us'
BROKER_URL = 'memory://'
LANGUAGES = [('en', 'English')]
# noqa pylint: disable=undefined-variable
INSTALLED_APPS += ('hobo.agent.common', 'hobo.user_name.apps.UserNameConfig')
# noqa pylint: disable=undefined-variable
ALLOWED_HOSTS.append('localhost')
# noqa pylint: disable=undefined-variable
TEMPLATES[0]['OPTIONS']['debug'] = True
# noqa pylint: disable=used-before-assignment
MIDDLEWARE = MIDDLEWARE + (
'hobo.middleware.RobotsTxtMiddleware',
'hobo.provisionning.middleware.ProvisionningMiddleware',

View File

@ -521,7 +521,7 @@ def test_scandeps_on_unknown_element(app, admin_user, settings):
assert relation.error is True
assert relation.error_status == 'notfound'
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/export-import/forms/unknown/dependencies/':
return {'status_code': 500}
return mocked_http(url, request)
@ -648,7 +648,7 @@ def test_scandeps_on_renamed_element(app, admin_user, settings):
},
}
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/export-import/forms/':
return {'status_code': 404}
return mocked_http(url, request)
@ -661,7 +661,7 @@ def test_scandeps_on_renamed_element(app, admin_user, settings):
assert job.status == 'failed'
assert job.exception == 'Failed to get elements of type forms (404)'
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/export-import/forms/':
return {'status_code': 500}
return mocked_http(url, request)
@ -949,7 +949,7 @@ def test_deploy_application(app, admin_user, settings, app_bundle, app_bundle_wi
},
}
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/export-import/forms/':
return {
'content': {"data": [form_def]},
@ -973,7 +973,7 @@ def test_deploy_application(app, admin_user, settings, app_bundle, app_bundle_wi
assert elements[2].cache == form_def
assert elements[3].cache == {}
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/export-import/forms/':
return {'status_code': 500}
return mocked_http(url, request)
@ -1069,7 +1069,7 @@ def test_refresh_application(app, admin_user, settings):
assert element.cache == form_def
assert resp.location.endswith('/applications/manifest/test/')
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/export-import/forms/':
return {'status_code': 500}
return mocked_http(url, request)
@ -1194,7 +1194,7 @@ def test_deploy_application_roles(app, admin_user, settings, app_bundle_roles):
assert job.status == 'failed'
assert job.exception == 'Failed to create role test-role (500)'
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path == '/api/provision/':
return {'status_code': 500}
return mocked_http(url, request)
@ -1271,7 +1271,7 @@ def test_job_status_page(app, admin_user, settings):
assert 'window.location.reload' in resp
assert 'window.location = "/applications/manifest/test/"' not in resp
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path.startswith('/api/jobs/'):
return {
'content': json.dumps(
@ -1301,7 +1301,7 @@ def test_job_status_page(app, admin_user, settings):
job.exception = ''
job.save()
def response_content(url, request):
def response_content(url, request): # noqa pylint: disable=function-redefined
if url.path.startswith('/api/jobs/'):
return {
'content': json.dumps({'err': 0, 'data': {'status': 'failed', 'completion_status': '42%'}}),

View File

@ -45,7 +45,7 @@ def test_theme_base(settings, rf):
context = theme_base(rf.get('/'))
check(context, TEMPLATE)
assert seen_urls[0] == 'http://combo.example.com/_skeleton_/?source=http%3A%2F%2Ftestserver%2F'
for i in range(10):
for dummy in range(10):
# wait for the other requests, made from a thread, to happen
time.sleep(0.1)
if len(seen_urls) == 4:
@ -96,7 +96,7 @@ def test_theme_base_language(settings, rf):
):
context = theme_base(rf.get('/'))
assert context['theme_base']().source == 'Skeleton for English'
for i in range(10):
for dummy in range(10):
# wait for the other requests, made from a thread, to happen
time.sleep(0.1)
if len(seen_urls) == 5:

View File

@ -1,6 +1,5 @@
import json
import os
from io import StringIO
from unittest.mock import Mock, call, mock_open, patch
import pytest
@ -20,7 +19,6 @@ from hobo.environment.models import (
Hobo,
Lingo,
Passerelle,
ServiceBase,
Variable,
Wcs,
Welco,
@ -332,11 +330,7 @@ def test_create_hobo_primary(mocked_TenantMiddleware, mocked_call_command, mocke
assert command.create_site.mock_calls == []
assert mocked_call_command.mock_calls == [call('create_hobo_tenant', 'entrouvert.org')]
assert len(mocked_connection.set_tenant.mock_calls) == 1
assert mocked_open.mock_calls == [
call('/foo/base_url', 'w'),
call().write('http://entrouvert.org/and_much'),
call().close(),
]
assert call().write('http://entrouvert.org/and_much') in mocked_open.mock_calls
@patch('hobo.environment.management.commands.cook.connection')
@ -398,8 +392,8 @@ def test_set_idp(command, db):
command.set_idp()
# objects sorted on title: [obj1, obj2]
obj1, ignored = Authentic.objects.get_or_create(slug='slug1', defaults={'title': 'bar'})
obj2, ignored = Authentic.objects.get_or_create(
obj1, dummy = Authentic.objects.get_or_create(slug='slug1', defaults={'title': 'bar'})
obj2, dummy = Authentic.objects.get_or_create(
slug='slug2', defaults={'title': 'foo', 'base_url': 'http://example.org'}
)
assert obj1.use_as_idp_for_self is False

View File

@ -1,7 +1,5 @@
import asyncore
import random
import smtpd
import smtplib
import asyncore # noqa pylint: disable=deprecated-module
import smtpd # noqa pylint: disable=deprecated-module
import socket
import threading
from unittest import mock
@ -15,7 +13,7 @@ from dns.rdtypes.ANY import MX, TXT
from test_manager import login
from hobo.emails.validators import validate_email_address
from hobo.environment.models import Combo, ServiceBase, Variable, Wcs
from hobo.environment.models import Combo, ServiceBase, Variable
from hobo.test_utils import find_free_port
@ -71,7 +69,7 @@ def smtp_server(monkeypatch, port_available):
class MailServer(smtpd.SMTPServer):
def handle_accept(self):
conn, addr = self.accept()
channel = RecipientValidatingSMTPChannel(self, conn, addr)
RecipientValidatingSMTPChannel(self, conn, addr)
server = MailServer(('localhost', port_available), None)
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
@ -122,7 +120,7 @@ def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
def test_validate_email_address_bypass(settings):
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
assert validate_email_address('foo') == None
assert validate_email_address('foo') is None
def test_invalid_address(client, admin_user):

View File

@ -343,11 +343,11 @@ def test_check_operational_view(app, admin_user, monkeypatch):
base_url='https://combo.agglo.love', template_name='...portal-user...', slug='portal'
)
response = app.get('/sites/check_operational/combo/portal')
assert response.json['operational'] == False
assert response.json['operational'] is False
monkeypatch.setattr(ServiceBase, 'is_operational', lambda x: True)
response = app.get('/sites/check_operational/combo/portal')
assert response.json['operational'] == True
assert response.json['operational'] is True
response = app.get('/sites/check_operational/foo/bar', status=404)

View File

@ -72,7 +72,7 @@ def test_is_running(app, admin_user, services, monkeypatch):
def blues_mock(url, request):
return {'status_code': 404}
with HTTMock(blues_mock, jazz_mock) as mock:
with HTTMock(blues_mock, jazz_mock):
response = app.get('/api/health/')
content = json.loads(response.text)
blues = content['data']['blues']
@ -115,7 +115,7 @@ def test_security_data(app, admin_user, services, monkeypatch):
def blues_mock(url, request):
return {'status_code': 200}
with HTTMock(blues_mock, jazz_mock) as mock:
with HTTMock(blues_mock, jazz_mock):
response = app.get('/api/health/')
content = json.loads(response.text)
blues = content['data']['blues']

View File

@ -277,14 +277,16 @@ def test_configure_service_provider(mocked_get, tmpdir):
os.remove(tenant_idp_metadata)
command.configure_service_provider(env, tenant)
with pytest.raises(IOError, match='No such file or directory'):
open(tenant_idp_metadata)
with open(tenant_idp_metadata):
pass
# idp not available
response1.status_code = 500
mocked_get.side_effect = [exceptions.RequestException, response1]
command.configure_service_provider(ENVIRONMENT, tenant)
with pytest.raises(IOError, match='No such file or directory'):
open(tenant_idp_metadata)
with open(tenant_idp_metadata):
pass
# case when idp is becoming available
mocked_get.side_effect = [response1, response2]

View File

@ -1,9 +1,7 @@
from unittest import mock
import pytest
from django.conf import settings
from django.core.management import load_command_class
from django.core.management.base import CommandError
from django.test import override_settings
from hobo.agent.common.management.commands.import_template import Command, UnknownTemplateError

View File

@ -1,5 +1,3 @@
from unittest import mock
from test_manager import login
from hobo.environment.models import Variable

View File

@ -135,7 +135,7 @@ def test_reorder_view(logged_app):
def test_profile_attribute_option_view(logged_app):
assert AttributeDefinition.objects.filter(name='first_name')[0].required
resp = logged_app.get('/profile/first_name/options', status=200)
assert resp.form['required'].checked == True
assert resp.form['required'].checked is True
resp.form['required'].checked = False
resp = resp.form.submit()
assert not AttributeDefinition.objects.filter(name='first_name')[0].required

View File

@ -3,7 +3,6 @@ import re
from unittest import mock
import pytest
from django.contrib.auth.models import User
from django.test import override_settings
from requests import Response
from test_manager import login

View File

@ -75,9 +75,9 @@ def test_provisionning(app, db, settings):
}
User = get_user_model()
assert User.objects.count() == 0
resp = app.put_json('/__provision__/', notification, status=403)
app.put_json('/__provision__/', notification, status=403)
assert User.objects.count() == 0
resp = app.put_json(
app.put_json(
sign_url('/__provision__/?orig=%s' % 'hobo.example.invalid', 'xxx'), notification, status=200
)
assert User.objects.count() == 1

View File

@ -1,9 +1,6 @@
import base64
import pytest
import requests
import responses
from django.test import RequestFactory
@pytest.fixture
@ -30,19 +27,19 @@ def app_with_auth(app):
def test_no_known_services(app_with_auth, db, settings):
settings.ROOT_URLCONF = 'hobo.test_urls'
resp = app_with_auth.get('/authenticated-testview/', status=403)
app_with_auth.get('/authenticated-testview/', status=403)
def test_no_idp_in_known_services(app_with_auth, db, settings):
settings.ROOT_URLCONF = 'hobo.test_urls'
settings.KNOWN_SERVICES = {}
resp = app_with_auth.get('/authenticated-testview/', status=403)
app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_connection_error(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post('https://idp.example.invalid/api/check-api-client/', status=403)
resp = app_with_auth.get('/authenticated-testview/', status=403)
app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_timeout(app_with_auth, db, settings_with_idp):
@ -52,7 +49,7 @@ def test_idp_timeout(app_with_auth, db, settings_with_idp):
assert resp.json == {'err': 1, 'err_desc': 'IDP temporarily unavailable, try again later.'}
def test_idp_connection_error(app_with_auth, db, settings_with_idp):
def test_idp_unavailable_error(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post('https://idp.example.invalid/api/check-api-client/', body=requests.RequestException('...'))
resp = app_with_auth.get('/authenticated-testview/', status=503)
@ -66,7 +63,7 @@ def test_idp_no_err_key(app_with_auth, db, settings_with_idp):
json={'foo': 'bar'},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/', status=403)
app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_app_error(app_with_auth, db, settings_with_idp):
@ -76,7 +73,7 @@ def test_idp_app_error(app_with_auth, db, settings_with_idp):
json={'err': 1},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/', status=403)
app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_wrong_serialization(app_with_auth, db, settings_with_idp):
@ -86,13 +83,13 @@ def test_idp_wrong_serialization(app_with_auth, db, settings_with_idp):
json={'err': 0, 'data': {'foo': 'bar'}},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/', status=403)
app_with_auth.get('/authenticated-testview/', status=403)
def test_no_credentials(app, db, settings_with_idp):
# test that the '/authenticated-testview/' really requires authentication,
# otherwise all the others tests are meaningless.
resp = app.get('/authenticated-testview/', status=403)
app.get('/authenticated-testview/', status=403)
def test_access_granted(app_with_auth, db, settings_with_idp):
@ -112,4 +109,4 @@ def test_access_granted(app_with_auth, db, settings_with_idp):
},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/')
app_with_auth.get('/authenticated-testview/')

View File

@ -1,13 +1,12 @@
import json
import os
import urllib.parse
import pytest
from django.conf import UserSettingsHolder
from django.test import override_settings
from hobo.deploy.utils import get_hobo_json
from hobo.environment.models import Authentic, Combo, Variable
from hobo.environment.models import Authentic, Combo
from hobo.multitenant.settings_loaders import Authentic as AuthenticLoader
from hobo.multitenant.settings_loaders import BackofficeLoginHint, TemplateVars
from hobo.profile.models import AttributeDefinition
@ -70,7 +69,8 @@ def test_authentic_update_settings_from_path(tmpdir):
# serialize hobo.json
path = os.path.join(str(tmpdir), 'hobo.json')
json.dump(env, open(path, 'w'))
with open(path, 'w') as fd:
json.dump(env, fd)
# call settings loaders
tenant_settings = UserSettingsHolder({})
@ -93,7 +93,8 @@ def test_mellon_backoffice_login_hint_setting_from_path(tmpdir):
env['services'][0]['this'] = True
path = os.path.join(str(tmpdir), 'hobo.json')
json.dump(env, open(path, 'w'))
with open(path, 'w') as fd:
json.dump(env, fd)
# call settings loaders
tenant_settings = UserSettingsHolder({})
@ -116,7 +117,8 @@ def test_mellon_always_backoffice_login_hint_setting_from_path(tmpdir):
env['services'][0]['this'] = True
path = os.path.join(str(tmpdir), 'hobo.json')
json.dump(env, open(path, 'w'))
with open(path, 'w') as fd:
json.dump(env, fd)
# call settings loaders
tenant_settings = UserSettingsHolder({})
@ -133,7 +135,8 @@ def test_email_update_settings_from_path(tmpdir):
def update_settings(env):
path = os.path.join(str(tmpdir), 'hobo.json')
json.dump(env, open(path, 'w'))
with open(path, 'w') as fd:
json.dump(env, fd)
loader.update_settings_from_path(tenant_settings, path)
tenant_settings.DEFAULT_FROM_EMAIL = 'webmaster@hobo.example.org'
@ -174,7 +177,8 @@ def test_sms_update_settings_from_path(tmpdir):
}
env = {'services': [], 'variables': variables}
path = os.path.join(str(tmpdir), 'hobo.json')
json.dump(env, open(path, 'w'))
with open(path, 'w') as fd:
json.dump(env, fd)
loader.update_settings_from_path(tenant_settings, path)
assert tenant_settings.SMS_URL == 'https://example.com/send/'
assert tenant_settings.SMS_SENDER == 'Sender'
@ -193,7 +197,8 @@ def test_sms_update_settings_from_path(tmpdir):
with override_settings(PHONE_COUNTRY_CODES=PHONE_COUNTRY_CODES):
env = {'services': [], 'variables': variables}
path = os.path.join(str(tmpdir), 'hobo.json')
json.dump(env, open(path, 'w'))
with open(path, 'w') as fd:
json.dump(env, fd)
loader.update_settings_from_path(tenant_settings, path)
assert tenant_settings.PHONE_COUNTRY_CODES == {
'32': {'region': 'BE', 'region_desc': 'Belgium'},

View File

@ -1,7 +1,6 @@
from unittest.mock import patch
import pytest
from django.core.exceptions import ValidationError
from test_manager import login
from hobo.environment.models import Variable

View File

@ -1,5 +1,3 @@
import json
import pytest
import hobo.scrutiny.wsgi.middleware

View File

View File

@ -1,12 +1,10 @@
import json
import os
import django.db.utils
import pytest
from django.core.management import call_command
from django.db import connection, transaction
from django_webtest import DjangoTestApp, WebTestMixin
from tenant_schemas.postgresql_backend.base import FakeTenant
from tenant_schemas.utils import tenant_context
from hobo.multitenant.models import Tenant

View File

@ -12,8 +12,10 @@ PROJECT_NAME = 'authentic2-multitenant'
open_backup = open
with patch.object(builtins, 'open', mock_open(read_data=b'xxx')):
exec(open_backup(os.environ['DEBIAN_CONFIG_COMMON']).read())
with open_backup(os.environ['DEBIAN_CONFIG_COMMON']) as fd:
exec(fd.read())
# noqa pylint: disable=undefined-variable
DATABASES['default']['NAME'] = hobo.test_utils.get_safe_db_name()
# Avoid conflic with real tenants
@ -22,12 +24,15 @@ TENANT_BASE = '/that/path/does/not/exist'
# Add the XForwardedForMiddleware
# noqa pylint: disable=used-before-assignment
MIDDLEWARE = ('authentic2.middleware.XForwardedForMiddleware',) + MIDDLEWARE
# Add authentic settings loader
# noqa pylint: disable=used-before-assignment
TENANT_SETTINGS_LOADERS = ('hobo.multitenant.settings_loaders.Authentic',) + TENANT_SETTINGS_LOADERS
# Add authentic2 hobo agent
# noqa pylint: disable=used-before-assignment
INSTALLED_APPS = ('hobo.agent.authentic2',) + INSTALLED_APPS
CACHES = {
@ -44,4 +49,5 @@ CSRF_COOKIE_SECURE = False
LANGUAGE_CODE = 'en'
# noqa pylint: disable=undefined-variable
LOGGING['handlers']['debug']['filename'] = 'debug.log'

View File

@ -15,7 +15,6 @@ from requests import RequestException
from hobo.multitenant.middleware import TenantMiddleware
os.sys.path.append('%s/tests' % os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import byteify
pytestmark = pytest.mark.django_db
@ -32,9 +31,6 @@ def skeleton_dir(request, settings):
def test_hobo_deploy(monkeypatch, tenant_base, mocker, skeleton_dir, tmp_path):
from django.conf import settings
from django.core.management import call_command
from hobo.agent.authentic2.management.commands.hobo_deploy import Command as HoboDeployCommand
# Create skeleton roles.json
@ -366,8 +362,6 @@ def test_hobo_deploy(monkeypatch, tenant_base, mocker, skeleton_dir, tmp_path):
assert [arg[0][0]['objects']['@type'] for arg in mock_notify.call_args_list] == ['role', 'role']
assert [arg[0][0]['full'] for arg in mock_notify.call_args_list] == [True, True]
from hobo.multitenant.middleware import TenantMiddleware
tenants = list(TenantMiddleware.get_tenants())
assert len(tenants) == 1
tenant = tenants[0]
@ -444,7 +438,6 @@ def test_hobo_deploy(monkeypatch, tenant_base, mocker, skeleton_dir, tmp_path):
)
assert LibertyProvider.objects.count() == 4
services = tenant.get_hobo_json()['services']
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
other_services = [service for service in services if not service.get('this')]
@ -555,8 +548,6 @@ def test_import_template(db, tenant_base):
def test_hobo_deploy_with_legacy_urls(monkeypatch, tenant_base, mocker, skeleton_dir, tmp_path):
from django.core.management import call_command
from hobo.agent.authentic2.management.commands.hobo_deploy import Command as HoboDeployCommand
requests_get = mocker.patch('requests.get')
@ -664,8 +655,6 @@ def test_hobo_deploy_with_legacy_urls(monkeypatch, tenant_base, mocker, skeleton
with mock.patch('hobo.agent.authentic2.provisionning.notify_agents'):
call_command('hobo_deploy', 'http://sso.example.net', hobo_json(env))
from hobo.multitenant.middleware import TenantMiddleware
tenants = list(TenantMiddleware.get_tenants())
assert len(tenants) == 1
tenant = tenants[0]

View File

@ -1,4 +1,3 @@
import json
import os
from unittest.mock import ANY, call, patch
@ -461,7 +460,7 @@ def test_provision_user(transactional_db, tenant, caplog):
for r in o['roles']:
r1 = {'uuid': role.uuid, 'name': role.name, 'slug': role.slug}
r2 = {'uuid': child_role.uuid, 'name': child_role.name, 'slug': child_role.slug}
assert r == r1 or r == r2
assert r in (r1, r2)
assert len({r['uuid'] for r in o['roles']}) == 2
assert o['is_superuser'] is True
@ -856,7 +855,7 @@ def test_provision_debug(transactional_db, tenant, caplog, settings, tmpdir):
protocol_conformance=lasso.PROTOCOL_SAML_2_0,
)
with provisionning:
role = Role.objects.create(name='coin', ou=get_default_ou())
Role.objects.create(name='coin', ou=get_default_ou())
assert notify_agents.call_count == 1
assert os.path.exists(log_path)

View File

View File

@ -1,4 +1,3 @@
import json
import os
import shutil
import tempfile

View File

@ -3,7 +3,7 @@ import os.path
from unittest.mock import mock_open, patch
import hobo.test_utils
from hobo.settings import *
from hobo.settings import * # noqa pylint: disable=wildcard-import,unused-wildcard-import
LANGUAGE_CODE = 'en-us'
@ -11,8 +11,10 @@ PROJECT_NAME = 'multipublik'
open_backup = open
with patch.object(builtins, 'open', mock_open(read_data=b'xxx')):
exec(open_backup(os.path.join(os.path.dirname(__file__), '../debian/debian_config_common.py')).read())
with open_backup(os.path.join(os.path.dirname(__file__), '../debian/debian_config_common.py')) as fd:
exec(fd.read())
# noqa pylint: disable=undefined-variable
DATABASES['default']['NAME'] = hobo.test_utils.get_safe_db_name()
CACHES = {

View File

@ -1,17 +1,13 @@
import json
import os
import random
import shutil
import tempfile
import pytest
from django.conf import settings
from tenant_schemas.utils import tenant_context
from hobo.agent.hobo.management.commands.hobo_deploy import Command as HoboDeployCommand
from hobo.deploy.utils import get_hobo_json
from hobo.environment.models import Combo, Hobo
from hobo.multitenant.management.commands.create_hobo_tenant import Command as CreateHoboTenant
from hobo.multitenant.middleware import TenantMiddleware

View File

View File

@ -49,8 +49,10 @@ TEMPLATES = [
open_backup = open
with patch.object(builtins, 'open', mock_open(read_data=b'xxx')):
exec(open_backup(os.path.join(os.path.dirname(__file__), '../debian/debian_config_common.py')).read())
with open_backup(os.path.join(os.path.dirname(__file__), '../debian/debian_config_common.py')) as fd:
exec(fd.read())
# noqa pylint: disable=undefined-variable
DATABASES['default']['NAME'] = hobo.test_utils.get_safe_db_name()
TENANT_APPS = (

View File

@ -82,7 +82,8 @@ NOTIFICATION = {
@pytest.fixture
def local_settings(tmpdir):
path = os.path.join(str(tmpdir), 'local_settings.py')
open(path, 'w').write(LOCAL_SETTINGS)
with open(path, 'w') as fd:
fd.write(LOCAL_SETTINGS)
os.environ['HOBO_AGENT_SETTINGS_FILE'] = path
@ -90,7 +91,12 @@ def local_settings(tmpdir):
@mock.patch('hobo.agent.worker.services.subprocess')
def test_deploy(mocked_subprocess, mocked_exists):
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_opened = mock.Mock(
communicate=mocked_communicate,
__enter__=lambda x, *args: x,
__exit__=lambda x, *args: None,
returncode=0,
)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
deploy(ENVIRONMENT)
@ -105,7 +111,7 @@ def test_deploy(mocked_subprocess, mocked_exists):
# environment sent
mock_calls = mocked_communicate.mock_calls
assert len(mock_calls) == 4
for i in range(0, len(mock_calls)):
for dummy in range(0, len(mock_calls)):
assert json.loads(force_str(mock_calls[0][2]['input'])) == ENVIRONMENT
mocked_opened.returncode = 1
@ -117,7 +123,12 @@ def test_deploy(mocked_subprocess, mocked_exists):
@mock.patch('hobo.agent.worker.services.subprocess')
def test_deploy_host_with_agent_patterns(mocked_subprocess, mocked_exists, local_settings):
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_opened = mock.Mock(
communicate=mocked_communicate,
__enter__=lambda x, *args: x,
__exit__=lambda x, *args: None,
returncode=0,
)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
# load AGENT_HOST_PATTERNS from local_settings fixture
@ -140,7 +151,12 @@ def test_notify(mocked_subprocess, mocked_listdir, mocked_exists):
mocked_listdir.side_effect = listdir
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_opened = mock.Mock(
communicate=mocked_communicate,
__enter__=lambda x, *args: x,
__exit__=lambda x, *args: None,
returncode=0,
)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
notify(NOTIFICATION)
@ -167,7 +183,12 @@ def test_notify(mocked_subprocess, mocked_listdir, mocked_exists):
@mock.patch('hobo.agent.worker.services.subprocess')
def test_notify_no_tenant_path(mocked_subprocess, mocked_exists):
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_opened = mock.Mock(
communicate=mocked_communicate,
__enter__=lambda x, *args: x,
__exit__=lambda x, *args: None,
returncode=0,
)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
notify(NOTIFICATION)

View File

@ -1,5 +1,3 @@
import pytest
from hobo.multitenant.middleware import TenantMiddleware

View File

@ -35,7 +35,7 @@ def test_request_context_filter(caplog, settings, tenants, client):
)
user.set_password('john.doe')
user.save()
issuer, created = Issuer.objects.get_or_create(entity_id='https://idp.example.com')
issuer, dummy = Issuer.objects.get_or_create(entity_id='https://idp.example.com')
user.saml_identifiers.create(name_id='ab' * 16, issuer=issuer)
for tenant in tenants:
@ -99,7 +99,7 @@ def test_systemd(settings, tenants, client, journald_handler, sender):
)
user.set_password('john.doe')
user.save()
issuer, created = Issuer.objects.get_or_create(entity_id='https://idp.example.com')
issuer, dummy = Issuer.objects.get_or_create(entity_id='https://idp.example.com')
user.saml_identifiers.create(name_id='ab' * 16, issuer=issuer)
for tenant in tenants:

View File

@ -22,9 +22,10 @@ import random
import django.conf
import django.db
import pytest
import utilities
from tenant_schemas.utils import tenant_context
from . import utilities
def test_tenant_middleware(tenants, client, settings):
settings.ALLOWED_HOSTS.append('invalid.example.net')
@ -44,7 +45,7 @@ def test_tenant_json_settings(tenants, settings):
):
# check the setting is not defined
with pytest.raises(AttributeError):
settings.HOBO_TEST_VARIABLE
settings.HOBO_TEST_VARIABLE # noqa pylint: disable=pointless-statement
assert django.conf.settings.UPDATE_ME == {'x': 1}
assert django.conf.settings.EXTEND_ME == [1]
@ -58,7 +59,7 @@ def test_tenant_json_settings(tenants, settings):
# check it's no longer defined after going back to the public schema
with pytest.raises(AttributeError):
settings.HOBO_TEST_VARIABLE
settings.HOBO_TEST_VARIABLE # noqa pylint: disable=pointless-statement
assert django.conf.settings.UPDATE_ME == {'x': 1}
assert django.conf.settings.EXTEND_ME == [1]
@ -69,7 +70,7 @@ def test_tenant_template_vars(tenants, settings, client):
):
# check the setting is not defined
with pytest.raises(AttributeError):
django.conf.settings.TEMPLATE_VARS
django.conf.settings.TEMPLATE_VARS # noqa pylint: disable=pointless-statement
for tenant in tenants:
with tenant_context(tenant):
@ -90,7 +91,7 @@ def test_tenant_template_vars(tenants, settings, client):
# check it's no longer defined after going back to the public schema
with pytest.raises(AttributeError):
django.conf.settings.TEMPLATE_VARS
django.conf.settings.TEMPLATE_VARS # noqa pylint: disable=pointless-statement
def test_tenant_settings_vars(tenants, settings, client):
@ -99,11 +100,11 @@ def test_tenant_settings_vars(tenants, settings, client):
):
# check the setting is not defined
with pytest.raises(AttributeError):
django.conf.settings.LOCAL1
django.conf.settings.LOCAL1 # noqa pylint: disable=pointless-statement
with pytest.raises(AttributeError):
django.conf.settings.LOCAL2
django.conf.settings.LOCAL2 # noqa pylint: disable=pointless-statement
with pytest.raises(AttributeError):
django.conf.settings.LOCAL3
django.conf.settings.LOCAL3 # noqa pylint: disable=pointless-statement
for tenant in tenants:
with tenant_context(tenant):
@ -125,7 +126,7 @@ def test_tenant_cors_settings(tenants, settings, client):
):
# check the setting is not defined
with pytest.raises(AttributeError):
settings.CORS_ORIGIN_WHITELIST
settings.CORS_ORIGIN_WHITELIST # noqa pylint: disable=pointless-statement
for tenant in tenants:
with tenant_context(tenant):
@ -135,7 +136,7 @@ def test_tenant_cors_settings(tenants, settings, client):
assert 'http://other.example.net' in django.conf.settings.CORS_ORIGIN_WHITELIST
with pytest.raises(AttributeError):
django.conf.settings.CORS_ORIGIN_WHITELIST
django.conf.settings.CORS_ORIGIN_WHITELIST # noqa pylint: disable=pointless-statement
def test_tenant_theme_settings(tenants, settings, client):
@ -160,7 +161,7 @@ def test_shared_secret():
from hobo.multitenant.settings_loaders import KnownServices
secrets = set()
for i in range(100):
for dummy in range(100):
a = str(random.getrandbits(160))
b = str(random.getrandbits(160))
assert KnownServices.shared_secret(a, b) == KnownServices.shared_secret(b, a)
@ -215,7 +216,7 @@ def test_legacy_urls_mapping(tenants, settings):
for tenant in tenants:
with tenant_context(tenant):
hobo_json = tenant.get_hobo_json()
tenant.get_hobo_json()
assert hasattr(settings, 'LEGACY_URLS_MAPPING')
assert settings.LEGACY_URLS_MAPPING['olda2.example.net'] == 'other.example.net'

View File

@ -1,6 +1,7 @@
import pytest
from django.core.management import call_command
from test_create_tenant import schema_exists
from .test_create_tenant import schema_exists
@pytest.fixture(autouse=True)

View File

@ -1,6 +1,5 @@
import json
import os
import sys
from unittest import mock
import pytest

View File

@ -17,10 +17,11 @@
import threading
import django.conf
import utilities
from django.core.cache import cache, caches
from tenant_schemas.utils import tenant_context
from . import utilities
def test_thread(tenants, settings, client):
with utilities.patch_default_settings(
@ -70,6 +71,7 @@ def test_cache(tenants, client):
with tenant_context(tenant):
def f():
# noqa pylint: disable=cell-var-from-loop
assert cache.get('coin') == tenant.domain_url
t1 = threading.Thread(target=f)

View File

@ -39,7 +39,7 @@ def test_mocked_uwsgi(uwsgi):
function.spool(1, 2)
assert set(uwsgi.spool.call_args[1].keys()) == {'body', 'tenant', 'name'}
assert pickle.loads(uwsgi.spool.call_args[1]['body']) == {'args': (1, 2), 'kwargs': {}}
assert uwsgi.spool.call_args[1]['name'] == b'test_uwsgidecorators.function'
assert uwsgi.spool.call_args[1]['name'] == b'tests_multitenant.test_uwsgidecorators.function'
assert uwsgi.spool.call_args[1]['tenant'] == b''
@ -55,5 +55,5 @@ def test_mocked_uwsgi_tenant(uwsgi, tenant):
assert set(uwsgi.spool.call_args[1].keys()) == {'body', 'tenant', 'name'}
assert pickle.loads(uwsgi.spool.call_args[1]['body']) == {'args': (1, 2), 'kwargs': {}}
assert uwsgi.spool.call_args[1]['name'] == b'test_uwsgidecorators.function'
assert uwsgi.spool.call_args[1]['name'] == b'tests_multitenant.test_uwsgidecorators.function'
assert uwsgi.spool.call_args[1]['tenant'] == b'tenant.example.net'

View File

View File

@ -1,5 +1,3 @@
import json
import os
import shutil
import tempfile

View File

@ -14,9 +14,11 @@ PROJECT_NAME = 'passerelle'
#
open_backup = open
with patch.object(builtins, 'open', mock_open(read_data=b'xxx')):
exec(open_backup(os.environ['DEBIAN_CONFIG_COMMON']).read())
with open_backup(os.environ['DEBIAN_CONFIG_COMMON']) as fd:
exec(fd.read())
BRANCH_NAME = os.environ.get("BRANCH_NAME", "").replace('/', '-')[:15]
# noqa pylint: disable=undefined-variable
DATABASES['default']['NAME'] = hobo.test_utils.get_safe_db_name()
# Avoid conflic with real tenants
@ -24,6 +26,7 @@ DATABASES['default']['NAME'] = hobo.test_utils.get_safe_db_name()
TENANT_BASE = '/that/path/does/not/exist'
# suds logs are buggy
# noqa pylint: disable=undefined-variable
LOGGING['loggers']['suds'] = {
'level': 'ERROR',
'handlers': ['mail_admins'],

View File

@ -5,12 +5,11 @@ from io import StringIO
from django.core.management import call_command
from django.db import connection
from passerelle.utils import export_site
from passerelle.utils import export_site # noqa pylint: disable=import-error
from hobo.multitenant.middleware import TenantMiddleware
os.sys.path.append('%s/tests' % os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import byteify
def test_deploy_specifics(db, tenant_base):
@ -58,5 +57,6 @@ def test_import_template(db, tenant_base):
call_command(
'import_template', '--basepath=%s' % os.path.dirname(__file__), 'data_passerelle_export_site'
)
content = open('%s/data_passerelle_export_site.json' % os.path.dirname(__file__)).read()
with open('%s/data_passerelle_export_site.json' % os.path.dirname(__file__)) as fd:
content = fd.read()
assert export_site() == json.loads(content)

View File

Some files were not shown because too many files have changed in this diff Show More