remove all apns references

This commit is contained in:
Frédéric Péters 2018-11-04 14:52:24 +01:00
parent 8e74dfe330
commit 83d96300c0
18 changed files with 5 additions and 703 deletions

View File

@ -2,9 +2,8 @@ from django.apps import apps
from django.contrib import admin, messages
from django.utils.translation import ugettext_lazy as _
from .apns import APNSServerError
from .gcm import GCMError
from .models import APNSDevice, GCMDevice, WebPushDevice, WNSDevice
from .models import GCMDevice, WebPushDevice, WNSDevice
from .settings import PUSH_NOTIFICATIONS_SETTINGS as SETTINGS
from .webpush import WebPushError
@ -41,8 +40,6 @@ class DeviceAdmin(admin.ModelAdmin):
ret.append(r)
except GCMError as e:
errors.append(str(e))
except APNSServerError as e:
errors.append(e.status)
except WebPushError as e:
errors.append(e.message)
@ -114,7 +111,6 @@ class GCMDeviceAdmin(DeviceAdmin):
list_filter = ("active", "cloud_message_type")
admin.site.register(APNSDevice, DeviceAdmin)
admin.site.register(GCMDevice, GCMDeviceAdmin)
admin.site.register(WNSDevice, DeviceAdmin)
admin.site.register(WebPushDevice, DeviceAdmin)

View File

@ -7,7 +7,7 @@ from rest_framework.serializers import ModelSerializer, Serializer, ValidationEr
from rest_framework.viewsets import ModelViewSet
from ..fields import UNSIGNED_64BIT_INT_MAX_VALUE, hex_re
from ..models import APNSDevice, GCMDevice, WebPushDevice, WNSDevice
from ..models import GCMDevice, WebPushDevice, WNSDevice
from ..settings import PUSH_NOTIFICATIONS_SETTINGS as SETTINGS
@ -43,20 +43,6 @@ class DeviceSerializerMixin(ModelSerializer):
extra_kwargs = {"active": {"default": True}}
class APNSDeviceSerializer(ModelSerializer):
class Meta(DeviceSerializerMixin.Meta):
model = APNSDevice
def validate_registration_id(self, value):
# iOS device tokens are 256-bit hexadecimal (64 characters). In 2016 Apple is increasing
# iOS device tokens to 100 bytes hexadecimal (200 characters).
if hex_re.match(value) is None or len(value) not in (64, 200):
raise ValidationError("Registration ID (device token) is invalid")
return value
class UniqueRegistrationSerializerMixin(Serializer):
def validate(self, attrs):
devices = None
@ -179,15 +165,6 @@ class AuthorizedMixin(object):
# ViewSets
class APNSDeviceViewSet(DeviceViewSetMixin, ModelViewSet):
queryset = APNSDevice.objects.all()
serializer_class = APNSDeviceSerializer
class APNSDeviceAuthorizedViewSet(AuthorizedMixin, APNSDeviceViewSet):
pass
class GCMDeviceViewSet(DeviceViewSetMixin, ModelViewSet):
queryset = GCMDevice.objects.all()
serializer_class = GCMDeviceSerializer

View File

@ -1,141 +0,0 @@
"""
Apple Push Notification Service
Documentation is available on the iOS Developer Library:
https://developer.apple.com/library/content/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/APNSOverview.html
"""
import time
from apns2 import client as apns2_client
from apns2 import errors as apns2_errors
from apns2 import payload as apns2_payload
from . import models
from .conf import get_manager
from .exceptions import NotificationError
class APNSError(NotificationError):
pass
class APNSUnsupportedPriority(APNSError):
pass
class APNSServerError(APNSError):
def __init__(self, status):
super(APNSServerError, self).__init__(status)
self.status = status
def _apns_create_socket(certfile=None, application_id=None):
certfile = certfile or get_manager().get_apns_certificate(application_id)
client = apns2_client.APNsClient(
certfile,
use_sandbox=get_manager().get_apns_use_sandbox(application_id),
use_alternative_port=get_manager().get_apns_use_alternative_port(application_id)
)
client.connect()
return client
def _apns_prepare(
token, alert, application_id=None, badge=None, sound=None, category=None,
content_available=False, action_loc_key=None, loc_key=None, loc_args=[],
extra={}, mutable_content=False, thread_id=None, url_args=None):
if action_loc_key or loc_key or loc_args:
apns2_alert = apns2_payload.PayloadAlert(
body=alert if alert else {}, body_localized_key=loc_key,
body_localized_args=loc_args, action_localized_key=action_loc_key)
else:
apns2_alert = alert
if callable(badge):
badge = badge(token)
return apns2_payload.Payload(
apns2_alert, badge, sound, content_available, mutable_content, category,
url_args, custom=extra, thread_id=thread_id)
def _apns_send(
registration_id, alert, batch=False, application_id=None, certfile=None, **kwargs
):
client = _apns_create_socket(certfile=certfile, application_id=application_id)
notification_kwargs = {}
# if expiration isn"t specified use 1 month from now
notification_kwargs["expiration"] = kwargs.pop("expiration", None)
if not notification_kwargs["expiration"]:
notification_kwargs["expiration"] = int(time.time()) + 2592000
priority = kwargs.pop("priority", None)
if priority:
try:
notification_kwargs["priority"] = apns2_client.NotificationPriority(str(priority))
except ValueError:
raise APNSUnsupportedPriority("Unsupported priority %d" % (priority))
if batch:
data = [apns2_client.Notification(
token=rid, payload=_apns_prepare(rid, alert, **kwargs)) for rid in registration_id]
return client.send_notification_batch(
data, get_manager().get_apns_topic(application_id=application_id),
**notification_kwargs
)
data = _apns_prepare(registration_id, alert, **kwargs)
client.send_notification(
registration_id, data,
get_manager().get_apns_topic(application_id=application_id),
**notification_kwargs
)
def apns_send_message(registration_id, alert, application_id=None, certfile=None, **kwargs):
"""
Sends an APNS notification to a single registration_id.
This will send the notification as form data.
If sending multiple notifications, it is more efficient to use
apns_send_bulk_message()
Note that if set alert should always be a string. If it is not set,
it won"t be included in the notification. You will need to pass None
to this for silent notifications.
"""
try:
_apns_send(
registration_id, alert, application_id=application_id,
certfile=certfile, **kwargs
)
except apns2_errors.APNsException as apns2_exception:
if isinstance(apns2_exception, apns2_errors.Unregistered):
device = models.APNSDevice.objects.get(registration_id=registration_id)
device.active = False
device.save()
raise APNSServerError(status=apns2_exception.__class__.__name__)
def apns_send_bulk_message(
registration_ids, alert, application_id=None, certfile=None, **kwargs
):
"""
Sends an APNS notification to one or more registration_ids.
The registration_ids argument needs to be a list.
Note that if set alert should always be a string. If it is not set,
it won"t be included in the notification. You will need to pass None
to this for silent notifications.
"""
results = _apns_send(
registration_ids, alert, batch=True, application_id=application_id,
certfile=certfile, **kwargs
)
inactive_tokens = [token for token, result in results.items() if result == "Unregistered"]
models.APNSDevice.objects.filter(registration_id__in=inactive_tokens).update(active=False)
return results

View File

@ -2,7 +2,7 @@ from django.core.exceptions import ImproperlyConfigured
from django.utils.six import string_types
from ..settings import PUSH_NOTIFICATIONS_SETTINGS as SETTINGS
from .base import BaseConfig, check_apns_certificate
from .base import BaseConfig
SETTING_MISMATCH = (
@ -24,7 +24,6 @@ MISSING_SETTING = (
)
PLATFORMS = [
"APNS",
"FCM",
"GCM",
"WNS",
@ -43,11 +42,6 @@ OPTIONAL_SETTINGS = [
"APPLICATION_GROUP", "APPLICATION_SECRET"
]
APNS_REQUIRED_SETTINGS = ["CERTIFICATE"]
APNS_OPTIONAL_SETTINGS = [
"USE_SANDBOX", "USE_ALTERNATIVE_PORT", "TOPIC"
]
FCM_REQUIRED_SETTINGS = GCM_REQUIRED_SETTINGS = ["API_KEY"]
FCM_OPTIONAL_SETTINGS = GCM_OPTIONAL_SETTINGS = [
"POST_URL", "MAX_RECIPIENTS", "ERROR_TIMEOUT"
@ -117,34 +111,6 @@ class AppConfig(BaseConfig):
)
)
def _validate_apns_config(self, application_id, application_config):
allowed = REQUIRED_SETTINGS + OPTIONAL_SETTINGS + APNS_REQUIRED_SETTINGS + \
APNS_OPTIONAL_SETTINGS
self._validate_allowed_settings(application_id, application_config, allowed)
self._validate_required_settings(
application_id, application_config, APNS_REQUIRED_SETTINGS
)
# determine/set optional values
application_config.setdefault("USE_SANDBOX", False)
application_config.setdefault("USE_ALTERNATIVE_PORT", False)
application_config.setdefault("TOPIC", None)
self._validate_apns_certificate(application_config["CERTIFICATE"])
def _validate_apns_certificate(self, certfile):
"""Validate the APNS certificate at startup."""
try:
with open(certfile, "r") as f:
content = f.read()
check_apns_certificate(content)
except Exception as e:
raise ImproperlyConfigured(
"The APNS certificate file at %r is not readable: %s" % (certfile, e)
)
def _validate_fcm_config(self, application_id, application_config):
allowed = (
REQUIRED_SETTINGS + OPTIONAL_SETTINGS + FCM_REQUIRED_SETTINGS + FCM_OPTIONAL_SETTINGS
@ -278,30 +244,6 @@ class AppConfig(BaseConfig):
def get_max_recipients(self, cloud_type, application_id=None):
return self._get_application_settings(application_id, cloud_type, "MAX_RECIPIENTS")
def get_apns_certificate(self, application_id=None):
r = self._get_application_settings(application_id, "APNS", "CERTIFICATE")
if not isinstance(r, string_types):
# probably the (Django) file, and file path should be got
if hasattr(r, "path"):
return r.path
elif (hasattr(r, "has_key") or hasattr(r, "__contains__")) and "path" in r:
return r["path"]
else:
raise ImproperlyConfigured(
"The APNS certificate settings value should be a string, or "
"should have a 'path' attribute or key"
)
return r
def get_apns_use_sandbox(self, application_id=None):
return self._get_application_settings(application_id, "APNS", "USE_SANDBOX")
def get_apns_use_alternative_port(self, application_id=None):
return self._get_application_settings(application_id, "APNS", "USE_ALTERNATIVE_PORT")
def get_apns_topic(self, application_id=None):
return self._get_application_settings(application_id, "APNS", "TOPIC")
def get_wns_package_security_id(self, application_id=None):
return self._get_application_settings(application_id, "WNS", "PACKAGE_SECURITY_ID")

View File

@ -2,15 +2,6 @@ from django.core.exceptions import ImproperlyConfigured
class BaseConfig(object):
def get_apns_certificate(self, application_id=None):
raise NotImplementedError
def get_apns_use_sandbox(self, application_id=None):
raise NotImplementedError
def get_apns_use_alternative_port(self, application_id=None):
raise NotImplementedError
def get_fcm_api_key(self, application_id=None):
raise NotImplementedError
@ -36,20 +27,3 @@ class BaseConfig(object):
"""Returns a collection containing the configured applications."""
raise NotImplementedError
def check_apns_certificate(ss):
mode = "start"
for s in ss.split("\n"):
if mode == "start":
if "BEGIN RSA PRIVATE KEY" in s or "BEGIN PRIVATE KEY" in s:
mode = "key"
elif mode == "key":
if "END RSA PRIVATE KEY" in s or "END PRIVATE KEY" in s:
mode = "end"
break
elif s.startswith("Proc-Type") and "ENCRYPTED" in s:
raise ImproperlyConfigured("Encrypted APNS private keys are not supported")
if mode != "end":
raise ImproperlyConfigured("The APNS certificate doesn't contain a private key")

View File

@ -69,53 +69,6 @@ class LegacyConfig(BaseConfig):
)
return self._get_application_settings(application_id, key, msg)
def get_apns_certificate(self, application_id=None):
r = self._get_application_settings(
application_id, "APNS_CERTIFICATE",
"You need to setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
)
if not isinstance(r, string_types):
# probably the (Django) file, and file path should be got
if hasattr(r, "path"):
return r.path
elif (hasattr(r, "has_key") or hasattr(r, "__contains__")) and "path" in r:
return r["path"]
else:
msg = (
"The APNS certificate settings value should be a string, or "
"should have a 'path' attribute or key"
)
raise ImproperlyConfigured(msg)
return r
def get_apns_use_sandbox(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_USE_SANDBOX", msg)
def get_apns_use_alternative_port(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_USE_ALTERNATIVE_PORT", msg)
def get_apns_topic(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_TOPIC", msg)
def get_apns_host(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_HOST", msg)
def get_apns_port(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_PORT", msg)
def get_apns_feedback_host(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_FEEDBACK_HOST", msg)
def get_apns_feedback_port(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "APNS_FEEDBACK_PORT", msg)
def get_wns_package_security_id(self, application_id=None):
msg = "Setup PUSH_NOTIFICATIONS_SETTINGS properly to send messages"
return self._get_application_settings(application_id, "WNS_PACKAGE_SECURITY_ID", msg)

View File

@ -14,22 +14,6 @@ class Migration(migrations.Migration):
]
operations = [
migrations.CreateModel(
name='APNSDevice',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name', models.CharField(max_length=255, null=True, verbose_name='Name', blank=True)),
('active', models.BooleanField(default=True, help_text='Inactive devices will not be sent notifications', verbose_name='Is active')),
('date_created', models.DateTimeField(auto_now_add=True, verbose_name='Creation date', null=True)),
('device_id', models.UUIDField(help_text='UDID / UIDevice.identifierForVendor()', max_length=32, null=True, verbose_name='Device ID', blank=True, db_index=True)),
('registration_id', models.CharField(unique=True, max_length=64, verbose_name='Registration ID')),
('user', models.ForeignKey(blank=True, to=settings.AUTH_USER_MODEL, null=True, on_delete=models.CASCADE)),
],
options={
'verbose_name': 'APNS device',
},
bases=(models.Model,),
),
migrations.CreateModel(
name='GCMDevice',
fields=[

View File

@ -12,9 +12,4 @@ class Migration(migrations.Migration):
]
operations = [
migrations.AlterField(
model_name='apnsdevice',
name='registration_id',
field=models.CharField(max_length=200, unique=True, verbose_name='Registration ID'),
),
]

View File

@ -12,12 +12,6 @@ class Migration(migrations.Migration):
]
operations = [
migrations.AddField(
model_name='apnsdevice',
name='application_id',
field=models.CharField(help_text='Opaque application identity, should be filled in for multiple key/certificate access', max_length=64, null=True, verbose_name='Application ID', blank=True),
preserve_default=True,
),
migrations.AddField(
model_name='gcmdevice',
name='application_id',

View File

@ -117,59 +117,6 @@ class GCMDevice(Device):
)
class APNSDeviceManager(models.Manager):
def get_queryset(self):
return APNSDeviceQuerySet(self.model)
class APNSDeviceQuerySet(models.query.QuerySet):
def send_message(self, message, certfile=None, **kwargs):
if self:
from .apns import apns_send_bulk_message
app_ids = self.filter(active=True).order_by("application_id")\
.values_list("application_id", flat=True).distinct()
res = []
for app_id in app_ids:
reg_ids = list(self.filter(active=True, application_id=app_id).values_list(
"registration_id", flat=True)
)
r = apns_send_bulk_message(
registration_ids=reg_ids, alert=message, application_id=app_id,
certfile=certfile, **kwargs
)
if hasattr(r, "keys"):
res += [r]
elif hasattr(r, "__getitem__"):
res += r
return res
class APNSDevice(Device):
device_id = models.UUIDField(
verbose_name=_("Device ID"), blank=True, null=True, db_index=True,
help_text="UDID / UIDevice.identifierForVendor()"
)
registration_id = models.CharField(
verbose_name=_("Registration ID"), max_length=200, unique=True
)
objects = APNSDeviceManager()
class Meta:
verbose_name = _("APNS device")
def send_message(self, message, certfile=None, **kwargs):
from .apns import apns_send_message
return apns_send_message(
registration_id=self.registration_id,
alert=message,
application_id=self.application_id, certfile=certfile,
**kwargs
)
class WNSDeviceManager(models.Manager):
def get_queryset(self):
return WNSDeviceQuerySet(self.model)

View File

@ -21,14 +21,6 @@ PUSH_NOTIFICATIONS_SETTINGS.setdefault(
PUSH_NOTIFICATIONS_SETTINGS.setdefault("FCM_MAX_RECIPIENTS", 1000)
PUSH_NOTIFICATIONS_SETTINGS.setdefault("FCM_ERROR_TIMEOUT", None)
# APNS
if settings.DEBUG:
PUSH_NOTIFICATIONS_SETTINGS.setdefault("APNS_USE_SANDBOX", True)
else:
PUSH_NOTIFICATIONS_SETTINGS.setdefault("APNS_USE_SANDBOX", False)
PUSH_NOTIFICATIONS_SETTINGS.setdefault("APNS_USE_ALTERNATIVE_PORT", False)
PUSH_NOTIFICATIONS_SETTINGS.setdefault("APNS_TOPIC", None)
# WNS
PUSH_NOTIFICATIONS_SETTINGS.setdefault("WNS_PACKAGE_SECURITY_ID", None)
PUSH_NOTIFICATIONS_SETTINGS.setdefault("WNS_SECRET_KEY", None)

View File

@ -26,7 +26,6 @@ classifiers =
[options]
packages = find:
install_requires =
apns2>=0.3.0
pywebpush>=1.3.0
Django>=1.11

View File

@ -1,113 +0,0 @@
from apns2.client import NotificationPriority
from apns2.errors import BadTopic, PayloadTooLarge, Unregistered
from django.conf import settings
from django.test import TestCase, override_settings
from push_notifications.apns import APNSError
from push_notifications.models import APNSDevice
from ._mock import mock
class APNSModelTestCase(TestCase):
def _create_devices(self, devices):
for device in devices:
APNSDevice.objects.create(registration_id=device)
@override_settings()
def test_apns_send_bulk_message(self):
self._create_devices(["abc", "def"])
# legacy conf manager requires a value
settings.PUSH_NOTIFICATIONS_SETTINGS.update({
"APNS_CERTIFICATE": "/path/to/apns/certificate.pem"
})
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification_batch") as s:
APNSDevice.objects.all().send_message("Hello world", expiration=1)
args, kargs = s.call_args
self.assertEqual(args[0][0].token, "abc")
self.assertEqual(args[0][1].token, "def")
self.assertEqual(args[0][0].payload.alert, "Hello world")
self.assertEqual(args[0][1].payload.alert, "Hello world")
self.assertEqual(kargs["expiration"], 1)
def test_apns_send_message_extra(self):
self._create_devices(["abc"])
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
APNSDevice.objects.get().send_message(
"Hello world", expiration=2, priority=5, extra={"foo": "bar"})
args, kargs = s.call_args
self.assertEqual(args[0], "abc")
self.assertEqual(args[1].alert, "Hello world")
self.assertEqual(args[1].custom, {"foo": "bar"})
self.assertEqual(kargs["priority"], NotificationPriority.Delayed)
self.assertEqual(kargs["expiration"], 2)
def test_apns_send_message(self):
self._create_devices(["abc"])
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
APNSDevice.objects.get().send_message("Hello world", expiration=1)
args, kargs = s.call_args
self.assertEqual(args[0], "abc")
self.assertEqual(args[1].alert, "Hello world")
self.assertEqual(kargs["expiration"], 1)
def test_apns_send_message_to_single_device_with_error(self):
# these errors are device specific, device.active will be set false
devices = ["abc"]
self._create_devices(devices)
with mock.patch("push_notifications.apns._apns_send") as s:
s.side_effect = Unregistered
device = APNSDevice.objects.get(registration_id="abc")
with self.assertRaises(APNSError) as ae:
device.send_message("Hello World!")
self.assertEqual(ae.exception.status, "Unregistered")
self.assertFalse(APNSDevice.objects.get(registration_id="abc").active)
def test_apns_send_message_to_several_devices_with_error(self):
# these errors are device specific, device.active will be set false
devices = ["abc", "def", "ghi"]
expected_exceptions_statuses = ["PayloadTooLarge", "BadTopic", "Unregistered"]
self._create_devices(devices)
with mock.patch("push_notifications.apns._apns_send") as s:
s.side_effect = [PayloadTooLarge, BadTopic, Unregistered]
for idx, token in enumerate(devices):
device = APNSDevice.objects.get(registration_id=token)
with self.assertRaises(APNSError) as ae:
device.send_message("Hello World!")
self.assertEqual(ae.exception.status, expected_exceptions_statuses[idx])
if idx == 2:
self.assertFalse(APNSDevice.objects.get(registration_id=token).active)
else:
self.assertTrue(APNSDevice.objects.get(registration_id=token).active)
def test_apns_send_message_to_bulk_devices_with_error(self):
# these errors are device specific, device.active will be set false
devices = ["abc", "def", "ghi"]
results = {"abc": "PayloadTooLarge", "def": "BadTopic", "ghi": "Unregistered"}
self._create_devices(devices)
with mock.patch("push_notifications.apns._apns_send") as s:
s.return_value = results
results = APNSDevice.objects.all().send_message("Hello World!")
for idx, token in enumerate(devices):
if idx == 2:
self.assertFalse(APNSDevice.objects.get(registration_id=token).active)
else:
self.assertTrue(APNSDevice.objects.get(registration_id=token).active)

View File

@ -1,91 +0,0 @@
from apns2.client import NotificationPriority
from django.test import TestCase
from push_notifications.apns import APNSUnsupportedPriority, _apns_send
from ._mock import mock
class APNSPushPayloadTest(TestCase):
def test_push_payload(self):
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
_apns_send(
"123", "Hello world", badge=1, sound="chime",
extra={"custom_data": 12345}, expiration=3
)
self.assertTrue(s.called)
args, kargs = s.call_args
self.assertEqual(args[0], "123")
self.assertEqual(args[1].alert, "Hello world")
self.assertEqual(args[1].badge, 1)
self.assertEqual(args[1].sound, "chime")
self.assertEqual(args[1].custom, {"custom_data": 12345})
self.assertEqual(kargs["expiration"], 3)
def test_push_payload_with_thread_id(self):
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
_apns_send(
"123", "Hello world", thread_id="565", sound="chime",
extra={"custom_data": 12345}, expiration=3
)
args, kargs = s.call_args
self.assertEqual(args[0], "123")
self.assertEqual(args[1].alert, "Hello world")
self.assertEqual(args[1].thread_id, "565")
self.assertEqual(args[1].sound, "chime")
self.assertEqual(args[1].custom, {"custom_data": 12345})
self.assertEqual(kargs["expiration"], 3)
def test_push_payload_with_alert_dict(self):
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
_apns_send(
"123", alert={"title": "t1", "body": "b1"}, sound="chime",
extra={"custom_data": 12345}, expiration=3
)
args, kargs = s.call_args
self.assertEqual(args[0], "123")
self.assertEqual(args[1].alert["body"], "b1")
self.assertEqual(args[1].alert["title"], "t1")
self.assertEqual(args[1].sound, "chime")
self.assertEqual(args[1].custom, {"custom_data": 12345})
self.assertEqual(kargs["expiration"], 3)
def test_localised_push_with_empty_body(self):
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
_apns_send("123", None, loc_key="TEST_LOC_KEY", expiration=3)
args, kargs = s.call_args
self.assertEqual(args[0], "123")
self.assertEqual(args[1].alert.body_localized_key, "TEST_LOC_KEY")
self.assertEqual(kargs["expiration"], 3)
def test_using_extra(self):
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
_apns_send(
"123", "sample", extra={"foo": "bar"},
expiration=30, priority=10
)
args, kargs = s.call_args
self.assertEqual(args[0], "123")
self.assertEqual(args[1].alert, "sample")
self.assertEqual(args[1].custom, {"foo": "bar"})
self.assertEqual(kargs["priority"], NotificationPriority.Immediate)
self.assertEqual(kargs["expiration"], 30)
def test_bad_priority(self):
with mock.patch("apns2.credentials.init_context"):
with mock.patch("apns2.client.APNsClient.connect"):
with mock.patch("apns2.client.APNsClient.send_notification") as s:
self.assertRaises(APNSUnsupportedPriority, _apns_send, "123", "_" * 2049, priority=24)
s.assert_has_calls([])

View File

@ -77,9 +77,6 @@ class AppConfigTestCase(TestCase):
manager = AppConfig(PUSH_SETTINGS)
with self.assertRaises(ImproperlyConfigured):
manager._get_application_settings(application_id, "APNS", "CERTIFICATE")
def test_missing_setting(self):
"""
Missing application settings raises ImproperlyConfigured.
@ -97,56 +94,6 @@ class AppConfigTestCase(TestCase):
with self.assertRaises(ImproperlyConfigured):
AppConfig(PUSH_SETTINGS)
def test_validate_apns_config(self):
"""
Verify the settings for APNS platform.
"""
path = os.path.join(os.path.dirname(__file__), "test_data", "good_revoked.pem")
#
# all settings specified, required and optional, does not raise an error.
#
PUSH_SETTINGS = {
"APPLICATIONS": {
"my_apns_app": {
"PLATFORM": "APNS",
"CERTIFICATE": path,
"USE_ALTERNATIVE_PORT": True,
"USE_SANDBOX": True
}
}
}
AppConfig(PUSH_SETTINGS)
# missing required settings
PUSH_SETTINGS = {
"APPLICATIONS": {
"my_apns_app": {
"PLATFORM": "APNS",
}
}
}
with self.assertRaises(ImproperlyConfigured):
AppConfig(PUSH_SETTINGS)
# all optional settings have default values
PUSH_SETTINGS = {
"APPLICATIONS": {
"my_apns_app": {
"PLATFORM": "APNS",
"CERTIFICATE": path,
}
}
}
manager = AppConfig(PUSH_SETTINGS)
app_config = manager._settings["APPLICATIONS"]["my_apns_app"]
assert app_config["USE_SANDBOX"] is False
assert app_config["USE_ALTERNATIVE_PORT"] is False
def test_get_allowed_settings_fcm(self):
"""Verify the settings allowed for FCM platform."""

View File

@ -6,7 +6,7 @@ from django.test import TestCase
from django.utils import timezone
from push_notifications.gcm import GCMError, send_bulk_message
from push_notifications.models import APNSDevice, GCMDevice
from push_notifications.models import GCMDevice
from . import responses
from ._mock import mock
@ -29,12 +29,6 @@ class GCMModelTestCase(TestCase):
assert device.date_created is not None
assert device.date_created.date() == timezone.now().date()
def test_can_create_save_device(self):
device = APNSDevice.objects.create(registration_id="a valid registration id")
assert device.id is not None
assert device.date_created is not None
assert device.date_created.date() == timezone.now().date()
def test_gcm_send_message(self):
device = GCMDevice.objects.create(registration_id="abc", cloud_message_type="GCM")
with mock.patch(

View File

@ -1,7 +1,7 @@
from django.test import TestCase
from push_notifications.api.rest_framework import (
APNSDeviceSerializer, GCMDeviceSerializer, ValidationError
GCMDeviceSerializer, ValidationError
)
@ -9,52 +9,6 @@ GCM_DRF_INVALID_HEX_ERROR = {"device_id": [u"Device ID is not a valid hex number
GCM_DRF_OUT_OF_RANGE_ERROR = {"device_id": [u"Device ID is out of range"]}
class APNSDeviceSerializerTestCase(TestCase):
def test_validation(self):
# valid data - 32 bytes upper case
serializer = APNSDeviceSerializer(data={
"registration_id": "AEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAEAE",
"name": "Apple iPhone 6+",
"device_id": "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"application_id": "XXXXXXXXXXXXXXXXXXXX",
})
self.assertTrue(serializer.is_valid())
# valid data - 32 bytes lower case
serializer = APNSDeviceSerializer(data={
"registration_id": "aeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeaeae",
"name": "Apple iPhone 6+",
"device_id": "ffffffffffffffffffffffffffffffff",
"application_id": "XXXXXXXXXXXXXXXXXXXX",
})
self.assertTrue(serializer.is_valid())
# valid data - 100 bytes upper case
serializer = APNSDeviceSerializer(data={
"registration_id": "AE" * 100,
"name": "Apple iPhone 6+",
"device_id": "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
})
self.assertTrue(serializer.is_valid())
# valid data - 100 bytes lower case
serializer = APNSDeviceSerializer(data={
"registration_id": "ae" * 100,
"name": "Apple iPhone 6+",
"device_id": "ffffffffffffffffffffffffffffffff",
})
self.assertTrue(serializer.is_valid())
# invalid data - device_id, registration_id
serializer = APNSDeviceSerializer(data={
"registration_id": "invalid device token contains no hex",
"name": "Apple iPhone 6+",
"device_id": "ffffffffffffffffffffffffffffake",
"application_id": "XXXXXXXXXXXXXXXXXXXX",
})
self.assertFalse(serializer.is_valid())
class GCMDeviceSerializerTestCase(TestCase):
def test_device_id_validation_pass(self):
serializer = GCMDeviceSerializer(data={

View File

@ -19,7 +19,6 @@ deps =
django111: Django>=1.11,<2.0
django20: Django>=2.0,<2.1
djangomaster: https://github.com/django/django/archive/master.tar.gz
apns2>=0.3.0
[testenv:flake8]
commands = flake8