authentic/src/authentic2/plugins.py

176 lines
6.0 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Use setuptools entrypoints to find plugins
Propose helper methods to load urls from plugins or modify INSTALLED_APPS
"""
import logging
import pkg_resources
from django.apps import apps
from django.urls import include, path
logger = logging.getLogger(__name__)
__ALL__ = ['get_plugins']
PLUGIN_CACHE = {}
class PluginError(Exception):
pass
DEFAULT_GROUP_NAME = 'authentic2.plugin'
def get_plugins(group_name=DEFAULT_GROUP_NAME, use_cache=True, *args, **kwargs):
"""Traverse all entry points for group_name and instantiate them using args
and kwargs.
"""
# XXX: necessary with tox under python3 for py2 env, dont know why...
pkg_resources.get_distribution('authentic2')
if group_name in PLUGIN_CACHE and use_cache:
return PLUGIN_CACHE[group_name]
plugins = []
for entrypoint in pkg_resources.iter_entry_points(group_name):
try:
plugin_callable = entrypoint.load()
except Exception as e:
logger.exception('unable to load entrypoint %s', entrypoint)
raise PluginError('unable to load entrypoint %s' % entrypoint, e)
plugins.append(plugin_callable(*args, **kwargs))
if apps.ready:
for app_config in apps.get_app_configs():
if hasattr(app_config, 'get_a2_plugin'):
plugins.append(app_config.get_a2_plugin())
PLUGIN_CACHE[group_name] = plugins
return plugins
def register_plugins_urls(urlpatterns, group_name=DEFAULT_GROUP_NAME):
"""Call get_before_urls and get_after_urls on all plugins providing them
and add those urls to the given urlpatterns.
URLs returned by get_before_urls() are added to the head of urlpatterns
and those returned by get_after_urls() are added to the tail of
urlpatterns.
"""
plugins = get_plugins(group_name)
before_urls = []
after_urls = []
for plugin in plugins:
if hasattr(plugin, 'get_before_urls'):
urls = plugin.get_before_urls()
before_urls.append(path('', include(urls)))
if hasattr(plugin, 'get_after_urls'):
urls = plugin.get_after_urls()
after_urls.append(path('', include(urls)))
return before_urls + urlpatterns + after_urls
def register_plugins_installed_apps(installed_apps, group_name=DEFAULT_GROUP_NAME):
"""Call get_apps() on all plugins of group_name and add the returned
applications path to the installed_apps sequence.
Applications already present are ignored.
"""
installed_apps = list(installed_apps)
for plugin in get_plugins(group_name):
if hasattr(plugin, 'get_apps'):
apps = plugin.get_apps()
for app in apps:
if app not in installed_apps:
installed_apps.append(app)
return installed_apps
def register_plugins_middleware(middleware_classes, group_name=DEFAULT_GROUP_NAME):
middleware_classes = list(middleware_classes)
for plugin in get_plugins(group_name):
if hasattr(plugin, 'get_before_middleware'):
apps = plugin.get_before_middleware()
for app in reversed(apps):
if app not in middleware_classes:
middleware_classes.insert(0, app)
if hasattr(plugin, 'get_after_middleware'):
apps = plugin.get_after_middleware()
for app in apps:
if app not in middleware_classes:
middleware_classes.append(app)
return tuple(middleware_classes)
def register_plugins_authentication_backends(authentication_backends, group_name=DEFAULT_GROUP_NAME):
authentication_backends = list(authentication_backends)
for plugin in get_plugins(group_name):
if hasattr(plugin, 'get_authentication_backends'):
classes = plugin.get_authentication_backends()
for cls in classes:
if cls not in authentication_backends:
authentication_backends.append(cls)
return tuple(authentication_backends)
def register_plugins_authenticators(authenticators=(), group_name=DEFAULT_GROUP_NAME):
authenticators = list(authenticators)
for plugin in get_plugins(group_name):
if hasattr(plugin, 'get_authenticators'):
classes = plugin.get_authenticators()
for cls in classes:
if cls not in authenticators:
authenticators.append(cls)
return tuple(authenticators)
def register_plugins_idp_backends(idp_backends, group_name=DEFAULT_GROUP_NAME):
idp_backends = list(idp_backends)
for plugin in get_plugins(group_name):
if hasattr(plugin, 'get_idp_backends'):
classes = plugin.get_idp_backends()
for cls in classes:
if cls not in idp_backends:
idp_backends.append(cls)
return tuple(idp_backends)
def collect_from_plugins(name, *args, **kwargs):
"""
Collect a property or the result of a function from plugins.
"""
accumulator = []
for plugin in get_plugins():
if not hasattr(plugin, name):
continue
attribute = getattr(plugin, name)
if hasattr(attribute, '__call__'):
accumulator.append(attribute(*args, **kwargs))
else:
accumulator.append(attribute)
return accumulator
def init():
for plugin in get_plugins():
if hasattr(plugin, 'init'):
plugin.init()