This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
portail-citoyen2/portail_citoyen2/apps/data_source_plugin/cms_plugins.py

176 lines
6.9 KiB
Python

import logging
import hashlib
from xml.etree import ElementTree as ET
import time
import threading
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.core.cache import cache
from django.template import Template
import feedparser
import requests
from requests.exceptions import RequestException, HTTPError, Timeout
from cms.plugin_base import CMSPluginBase
from cms.plugin_pool import plugin_pool
from models import DataSourcePlugin as DataSourcePluginModel, DataSource, RawInlineTemplatePlugin as RawInlineTemplatePluginModel
import signature
from allauth.socialaccount.models import SocialToken
logger = logging.getLogger(__name__)
CACHE_SENTINEL = object()
class Data(object):
'''Encapsulate data from a source'''
MAPPING = {
DataSource.JSON: 'json',
DataSource.RSS: 'rss',
DataSource.HTML: 'html',
DataSource.XML: 'xml',
}
def __init__(self, data_source, context, limit, refresh, request=None):
self.data_source = data_source
self.kind = self.MAPPING.get(data_source.mime_type)
self.context = context
self.url = Template(self.data_source.url).render(self.context)
self.limit = limit
self.refresh = refresh
self.key = hashlib.md5('datasource-{self.data_source.id}-{self.url}-{self.limit}-{self.refresh}'.format(self=self)).hexdigest()
self.now = time.time()
self.__content = CACHE_SENTINEL
self.request = request
def get_access_token(self):
user = self.request.user
try:
token = SocialToken.objects.get(account__provider='authentic2',
account__user=user)
logger.debug('retrieved access token: %r', token)
return token.token
except SocialToken.DoesNotExist:
logger.warning('unable to find a social token for user: %r', user)
return ''
def update_content(self):
content = None
try:
self.final_url = self.url
if self.data_source.signature_key:
# remove the hmac- prefix
hash_algo = self.data_source.auth_mech[:5]
self.final_url = signature.sign_url(self.final_url.encode('ascii'),
self.data_source.signature_key.encode('utf-8'),
algo=hash_algo)
logger.debug('getting data source %r from url %r',
self.data_source.name, self.final_url)
headers = {
'Accept': self.data_source.mime_type,
}
if self.data_source.auth_mech == 'oauth2':
headers['Authorization'] = 'Bearer %s' % self.get_access_token()
request = requests.get(self.final_url, headers=headers,
verify=self.data_source.verify_certificate,
allow_redirects=self.data_source.allow_redirects,
timeout=self.data_source.timeout)
request.raise_for_status()
except HTTPError:
logger.warning('HTTP Error %s when loading datasource %s from'
' URL %s', request.status_code, self.data_source.id, self.final_url)
except Timeout:
logger.warning('HTTP Request timeout(%s s) when loading datasource'
' %s from URL %s', self.data_source.timeout,
self.data_source.id, self.final_url)
except RequestException:
logger.warning('HTTP Request failed when loading datasource'
' %s from URL %s', self.data_source.id, self.final_url)
else:
try:
content = getattr(self, 'get_content_'+self.kind)(request)
except Exception:
logger.exception('decoding of content from %s failed', self.final_url)
else:
logger.debug('getting data source %r from url %r finished',
self.data_source.id, self.final_url)
if self.refresh and content is not None:
cache.set(self.key, (content, self.now+self.refresh), 3600)
return content
def get_content(self):
if self.__content is not CACHE_SENTINEL:
return self.__content
self.__content, until = cache.get(self.key, (CACHE_SENTINEL, None))
use_cache = self.__content is not CACHE_SENTINEL
# do not use cache if refresh timeout is 0
use_cache = use_cache and self.refresh > 0
# do not use cache if updatecache is present in the query string
use_cache = use_cache and 'updatecache' not in self.context['request'].GET
if use_cache:
if until < self.now:
# reload cache content asynchronously in a thread
# and return the current content
logger.debug('content from %r is stale launching reloading', self.url)
threading.Thread(target=self.update_content).start()
else:
self.__content = self.update_content()
return self.__content
content = property(get_content)
def get_content_json(self, request):
try:
return request.json()
except ValueError:
logger.warning('unable to decode json content from %s: %r', self.final_url, request.content[:20])
return None
def get_content_rss(self, request):
result = feedparser.parse(request.content)
result.entries = sorted(result.entries, key=lambda e: e['updated_parsed'])[:self.limit]
return result
def get_content_html(self, request):
return request.text
def get_content_xml(self, request):
try:
return ET.fromstring(request.content)
except ET.ParseError:
logger.error('unable to parse the XML content %r', request.content)
return None
class RawInlineTemplatePlugin(CMSPluginBase):
model = RawInlineTemplatePluginModel
name = _('Raw Inline Template Plugin')
render_template = "data_source_plugin/plugin.html"
text_enabled = True
def icon_src(self, instance):
return settings.STATIC_URL + u"cms/images/plugins/link.png"
class DataSourcePlugin(CMSPluginBase):
model = DataSourcePluginModel
name = _('Data Source Plugin')
render_template = None
text_enabled = True
def get_sources(self, context, instance):
request = context['request']
for source in instance.sources.all():
yield Data(source.source, context, instance.limit, instance.refresh, request=request)
def render(self, context, instance, placeholder):
logger.debug('getting context of data source plugin %s', instance.id)
context['data_sources'] = list(self.get_sources(context, instance))
logger.debug('finished getting context of data source plugin %s', instance.id)
return context
plugin_pool.register_plugin(RawInlineTemplatePlugin)
plugin_pool.register_plugin(DataSourcePlugin)