This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
cmsplugin-blurp/src/cmsplugin_blurp/renderers/data_source.py

396 lines
15 KiB
Python

import logging
import hashlib
from xml.etree import ElementTree as ET
import time
import threading
import pprint
import feedparser
import requests
from requests.exceptions import RequestException, HTTPError, Timeout
from django.core.cache import cache
from django.conf import settings
from . import signature, template
log = logging.getLogger(__name__)
class Renderer(template.TemplateRenderer):
'''Data source renderer the expected configuration looks like
this:
{
'name': u'Datas from xyz',
'class': 'cmsplugin_blurp.renderers.data_source.Renderer',
'sources': [
{
'slug': 'slug', # mandatory
'url': 'https://...', # mandatory
'parser_type': 'raw', # optional, possible values are json, xml, css, csv or raw, default value is raw
'content_type': 'application/octet-stream', # optional, default value is compute from the parser_type
'auth_mech': None, # optional, possible values are hmac-sha1, hmac-sha256,oauth2', default is None
'signature_key': None, # mandatory if auth_mech is not None
'verify_certificate': False, # optional, default is False
'allow_redirects': True, # optional default is True
'timeout': 10, # optional default is 1, it cannot be less than 1
'refresh': 3600, # optional, default is taken from the renderer level
'limit': 0, # optional, default is taken from the renderer level
'accepted_http_status': [200,400], # optional, default it taken from the renderer level
},
]
'template_name': 'data_from_xyz.html'
# default template if the template file cannot be found
'template': '{{ slug|pprint }}'
# time between refresh of the case
# use 0 for no cache
# cache is also update if updatecache key is in the query string
# you can also override it in each source
'refresh': 3600, # optional default is 3600 seconds
# limit to the number of elements to return,
# not limited if it is 0
# you can also override it in each source
'limit': 0, # optional default is 0
'accepted_http_status': [200,400], # optional, default is [200]
}
'''
__sources = None
@classmethod
def check_config(cls, config):
if not 'sources' in config \
or not isinstance(config['sources'], (tuple, list)) \
or len(config['sources']) == 0:
yield 'sources must be a list or a tuple containing at least one element'
for source in config['sources']:
if not 'slug' in source:
yield 'each source must have a slug key'
if not 'url' in source:
yield 'each source must have an url key'
if 'parser_type' in source \
and source['parser_type'] not in ('raw', 'csv', 'json', 'xml', 'rss'):
yield 'unknown parser_type {0!r}'.format(source['parser_type'])
if 'auth_mech' in source:
if source['auth_mech'] not in ('hmac-sha1', 'hmac-sha256', 'oauth2'):
yield 'unknown auth_mech {0!r}'.format(source['auth_mech'])
if source['auth_mech'].startswith('hmac-') \
and ('signature_key' not in source
or not isinstance(source['signature_key'], basestring)):
yield 'missing signature_key string'
def get_sources(self, context):
if self.__sources is None:
sources = []
for source in self.config['sources']:
slug = '{0}.{1}'.format(self.slug, source['slug'])
data = Data(slug, self.config, source, context)
sources.append((source['slug'], data))
if settings.TEMPLATE_DEBUG:
sources.append(('blurp_debug__',
'\n'.join(self.debug_content(context))))
self.__sources = sources
return self.__sources
def debug_content(self, context):
try:
yield u'config: {0}'.format(pprint.pformat(self.config))
except Exception, e:
yield u'config: pformat failed {0!r}'.format(e)
for source in self.config.get('sources', []):
slug = source.get('slug')
if not slug:
continue
try:
yield u'slug {0!r}: {1}'.format(slug,
pprint.pformat(context.get(slug)))
except Exception, e:
yield u'slug {0!r}: pformat failed {1!r}'.format(slug, e)
def use_ajax(self, context):
'''Only use ajax if some content is not cached'''
if super(Renderer, self).use_ajax(context):
for source in self.get_sources():
if not source.content_is_cached():
return False
else:
return False
def render(self, context):
for slug, source in self.get_sources(context):
context[slug] = source
return super(Renderer, self).render(context)
class Data(object):
'''Encapsulate data from a source'''
__CACHE_SENTINEL = object()
JSON = 'application/json'
RSS = 'application/rss+xml'
XML = 'text/xml'
CSV = 'text/csv'
OCTET_STREAM = 'application/octet-stream'
MAPPING = {
'json': JSON,
'rss': RSS,
'xml': XML,
'csv': CSV,
'raw': OCTET_STREAM,
}
def __init__(self, slug, config, source, context):
self.slug = slug
self.context = context
self.request = context.get('request')
self.source = source
self.limit = source.get('limit', config.get('limit', 0))
self.refresh = source.get('refresh', config.get('refresh', 0))
self.url = source['url']
self.verify = source.get('verify_certificate', True)
self.redirects = source.get('allow_redirects', False)
self.async = source.get('async', False)
self.timeout = source.get('timeout', 10)
self.auth_mech = source.get('auth_mech')
self.signature_key = source.get('signature_key')
self.parser_type = source.get('parser_type', 'raw')
self.content_type = source.get('content_type', self.MAPPING[self.parser_type])
self.accepted_http_status = source.get('accepted_http_status',
config.get('accepted_http_status', [200]))
self.user_context = source.get('user_context',
config.get('user_context', self.auth_mech == 'oauth2'))
pre_hash = 'datasource-{self.slug}-{self.url}-{self.limit}-' \
'{self.refresh}-{self.auth_mech}-{self.signature_key}' \
.format(self=self)
# If authentication is used
if self.user_context:
pre_hash += '-%s' % unicode(self.request.user).encode('utf-8')
log.debug('key pre hash value %r', pre_hash)
self.key = hashlib.md5(pre_hash).hexdigest()
self.now = time.time()
self.__content = self.__CACHE_SENTINEL
def get_oauth2_access_token(self):
'''Query django-allauth models to find an access token for this user'''
from allauth.socialaccount.models import SocialToken
user = self.request.user
if user.is_authenticated():
try:
token = SocialToken.objects.get(
account__provider='authentic2',
account__user=user)
log.debug('found access token: %r', token)
return token.token
except SocialToken.DoesNotExist:
log.warning('unable to find a social token for user: %r', user)
return ''
def resolve_http_url(self):
try:
self.final_url = self.url
if self.source.get('auth_mech', '').startswith('hmac'):
# remove the hmac- prefix
hash_algo = self.auth_mech[5:]
self.final_url = signature.sign_url(
self.final_url,
self.signature_key,
algo=hash_algo)
log.debug('getting data source from url %r for renderer %s',
self.final_url, self.slug)
headers = {
'Accept': self.content_type,
}
if self.auth_mech == 'oauth2':
headers['Authorization'] = 'Bearer %s' % self.get_oauth2_access_token()
log.debug('with headers %r', headers)
request = requests.get(
self.final_url,
headers=headers,
verify=self.verify,
allow_redirects=self.redirects,
timeout=self.timeout,
stream=True)
if request.status_code not in self.accepted_http_status:
request.raise_for_status()
return request.raw, None
except HTTPError:
error = 'HTTP Error %s when loading URL %s for renderer %r' % (
request.status_code,
self.final_url,
self.slug)
log.warning(error)
except Timeout:
error = 'HTTP Request timeout(%s s) when loading URL ' \
'%s for renderer %s' % (
self.timeout,
self.final_url,
self.slug)
log.warning(error)
except RequestException, e:
error = 'HTTP Request failed when loading URL ' \
'%s for renderer %r: %s' % (
self.final_url,
self.slug, e)
log.warning(error)
return None, error
def resolve_file_url(self):
path = self.url[7:]
try:
return file(path), None
except Exception:
error = 'unable to resolve file URL: %r' % self.url
log.warning(error)
return None, error
def update_content(self):
try:
return self.update_content_real()
except:
log.exception('exception while updating content')
def update_content_real(self):
if self.url.startswith('http'):
stream, error = self.resolve_http_url()
elif self.url.startswith('file:'):
stream, error = self.resolve_file_url()
else:
msg = 'unknown scheme: %r' % self.url
log.error(msg)
if settings.TEMPLATE_DEBUG:
return msg
return
if stream is None:
log.error(error)
if settings.TEMPLATE_DEBUG:
return error
return
try:
data = getattr(self, 'parse_'+self.parser_type)(stream)
except Exception:
msg = 'error parsing %s content on %s' % (self.parser_type, self.url)
log.exception(msg)
if settings.TEMPLATE_DEBUG:
return msg
return None
if self.refresh and data is not None:
log.debug('set cache for url %r with key %r', self.url, self.key)
cache.set(self.key, (data, self.now+self.refresh), 86400*12)
if self.key in self.UPDATE_THREADS:
c = self.CONDITIONS.setdefault(self.key, threading.Condition())
with c:
self.UPDATE_THREADS.pop(self.key)
self.CONDITIONS.pop(self.key)
return data
UPDATE_THREADS = {}
CONDITIONS = {}
def content_is_cached(self, ignore_stale_content=True):
'''Test if some content is in cache'''
if self.__content is self.__CACHE_SENTINEL:
content, until = cache.get(self.key, (self.__CACHE_SENTINEL, None))
if not ignore_stale_content:
self.__content = content
if until is not None and ignore_stale_content and until < self.now:
return False
if self.refresh <= 0:
return False
if self.request and 'updatecache' in self.request.GET:
return False
self.__content = content
return True
else:
return True
def get_content(self):
if self.__content is not self.__CACHE_SENTINEL:
return self.__content
self.__content, until = cache.get(self.key, (self.__CACHE_SENTINEL, None))
use_cache = self.__content is not self.__CACHE_SENTINEL
if not use_cache:
log.debug('found content in cache for url %r', self.url)
# do not use cache if refresh timeout is 0
use_cache = use_cache and self.refresh > 0
if self.refresh == 0:
log.debug('self refresh is 0, ignoring cache')
# do not use cache if updatecache is present in the query string
use_cache = use_cache and (not self.request or 'updatecache' not in self.request.GET)
if self.request and 'updatecache' in self.request.GET:
log.debug('updatecache in query string, ignoring cache')
if use_cache:
if until < self.now:
# reload cache content asynchronously in a thread
# and return the current content
log.debug('asynchronous update for url %r until: %s < now: %s', self.url, until, self.now)
c = self.CONDITIONS.setdefault(self.key, threading.Condition())
t = threading.Thread(target=self.update_content)
t2 = self.UPDATE_THREADS.setdefault(self.key, t)
if t2 is t: # yeah we are the first to run
with c:
t.start()
c.notify_all() # notify other updating thread that we started
if not self.async:
if not t2 is t:
with c:
while not t2.ident:
c.wait()
t2.join()
else:
log.debug('synchronous update for url %r', self.url)
self.__content = self.update_content()
return self.__content
content = property(get_content)
def parse_json(self, stream):
import json
return json.load(stream)
def parse_rss(self, stream):
result = feedparser.parse(stream.read())
entries = result.entries
entries = sorted(result.entries, key=lambda e: e['updated_parsed'])
result.entries = entries[:self.limit]
return result
def parse_raw(self, stream):
return stream.read()
def parse_xml(self, stream):
return ET.fromstring(stream.read())
def parse_csv(self, stream):
import csv
params = self.source.get('csv_params', {})
encoding = self.source.get('csv_encoding', 'utf-8')
def list_decode(l):
return map(lambda s: s.decode(encoding), l)
def dict_decode(d):
return dict((a, b.decode(encoding)) for a, b in d.iteritems())
if hasattr(stream, 'iter_lines'):
stream = stream.iter_lines()
if 'fieldnames' in params:
reader = csv.DictReader(stream, **params)
decoder = dict_decode
else:
reader = csv.reader(stream, **params)
decoder = list_decode
return list(decoder(e) for e in reader)
def __call__(self):
return self.get_content()
def __iter__(self):
return iter(self())