combo/combo/apps/newsletters/models.py

171 lines
6.4 KiB
Python

# combo - content management system
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import requests
import requests.exceptions
import logging
import json
import urlparse
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.template.defaultfilters import slugify
from django.forms import models as model_forms
from django.conf import settings
from django.utils.http import urlencode
from combo.data.models import CellBase
from combo.data.library import register_cell_class
from combo.utils import sign_url, get_templated_url
from .forms import NewslettersManageForm
def get_signed_url(target_url, source_url, **kwargs):
source_server = urlparse.urlparse(source_url).netloc.split(':')[0]
service_dict = None
for services in settings.KNOWN_SERVICES.values():
for service in services.values():
url = service.get('url')
verif_orig = urlparse.urlparse(url).netloc.split(':')[0]
if verif_orig == source_server:
service_dict = service
break
else:
continue
break
if not service_dict:
raise RuntimeError('failed to find data for server')
params = {'orig': service_dict.get('orig')}
params.update(kwargs)
target_url = target_url + '?' + urlencode(params)
return sign_url(target_url, key=service_dict.get('secret'))
class SubscriptionsSaveError(Exception):
pass
@register_cell_class
class NewslettersCell(CellBase):
title = models.CharField(verbose_name=_('Title'), max_length=128)
url = models.URLField(verbose_name=_('Newsletters service url'),
max_length=128)
resources_restrictions = models.CharField(_('resources restrictions'),
blank=True, max_length=1024,
help_text=_('list of resources(themes) separated by commas'))
transports_restrictions = models.CharField(_('transports restrictions'),
blank=True, max_length=1024,
help_text=_('list of transports separated by commas'))
template_name = 'newsletters/newsletters.html'
user_dependant = True
@classmethod
def is_enabled(cls):
return True
class Meta:
verbose_name = _('Newsletters')
def get_default_form_class(self):
model_fields = ('title', 'url', 'resources_restrictions',
'transports_restrictions')
return model_forms.modelform_factory(self.__class__, fields=model_fields)
def simplify(self, name):
return slugify(name.strip())
def get_resources_restrictions(self):
return filter(None, map(self.simplify, self.resources_restrictions.strip().split(',')))
def get_transports_restrictions(self):
return filter(None, map(self.simplify, self.transports_restrictions.strip().split(',')))
def check_resource(self, resource):
restrictions = self.get_resources_restrictions()
if restrictions and self.simplify(resource) not in restrictions:
return False
return True
def check_transport(self, transport):
restrictions = self.get_transports_restrictions()
if restrictions and transport not in restrictions:
return False
return True
def filter_data(self, data):
filtered = []
for item in data:
if not self.check_resource(item['text']):
continue
for t in item['transports']:
if self.check_transport(t['id']):
filtered.append(item)
return filtered
def get_newsletters(self, **kwargs):
url = get_templated_url(self.url)
endpoint = url + 'newsletters/'
url = get_signed_url(endpoint, url, **kwargs)
response = requests.get(url)
if response.ok:
json_response = response.json()
return self.filter_data(json_response['data'])
return []
def get_subscriptions(self, **kwargs):
url = get_templated_url(self.url)
endpoint = url + 'subscriptions/'
url = get_signed_url(endpoint, url, **kwargs)
response = requests.get(url)
if response.ok:
json_response = response.json()
return self.filter_data(json_response['data'])
return []
def set_subscriptions(self, subscriptions, **kwargs):
logger = logging.getLogger(__name__)
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
url = get_templated_url(self.url)
try:
endpoint = url + 'subscriptions/'
url = get_signed_url(endpoint, url, **kwargs)
response = requests.post(url, data=json.dumps(subscriptions),
headers=headers)
if not response.json()['data']:
raise SubscriptionsSaveError
except requests.exceptions.HTTPError:
logger.error(u'set subscriptions on %s returned an HTTP error code: %s',
response.request.url, response.status_code)
raise SubscriptionsSaveError
except requests.exceptions.RequestException, e:
logger.error(u'set subscriptions on %s failed with exception: %s',
url, e)
raise SubscriptionsSaveError
def render(self, context):
user = context.get('user')
if user and user.is_authenticated():
form = NewslettersManageForm(instance=self, user=user)
context['form'] = form
return super(NewslettersCell, self).render(context)
def is_visible(self, user=None):
if user is None or not user.is_authenticated():
return False
return super(NewslettersCell, self).is_visible(user)