journal: add upload view

This commit is contained in:
Christophe Siraut 2020-08-28 16:20:05 +02:00
parent 2568b29dad
commit c78bc66bcb
4 changed files with 258 additions and 2 deletions

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python3
# logtracker
# Copyright (c) 2020 Entr'ouvert
import re
field_pattern = re.compile(r'^([A-Z0-9_]+)=(.*)$')
field_multiline_pattern = re.compile(r'^([A-Z_][A-Z0-9_]+)\n([\w\W]*)$')
def handle_journal_upload_stream(journal_stream, debug=False):
tail = ''
while True:
line = journal_stream.readline().decode('utf-8', errors='replace').rstrip('\n')
if line.endswith('\r'):
line = line.rstrip('\r')
if not line:
continue
elif line == '0':
break
else:
if (line[0].islower() or line[0].isdigit()) and len(line) < 6:
if debug:
print('ignore ff7c fff4 3d9a etc.: %s' % line)
else:
tail = tail + line
else:
if tail:
line = tail + line
tail = ''
yield line
def get_journal_entries(journal_stream, debug=False):
store = []
multiline_field = ''
emptylines_count = 0
for line in handle_journal_upload_stream(journal_stream):
if not line:
if multiline_field:
match = field_multiline_pattern.match(multiline_field)
if match:
k, v = match.groups()
store.append((k, v))
else:
if debug:
print('content dropped: %s' % multiline_field)
multiline_field = ''
else:
if store:
yield store
store = []
emptylines_count = 0
else:
emptylines_count += 1
if emptylines_count >= 3:
# disconnect broken stream
break
continue
if line.startswith('__CURSOR') and store:
# sometimes a newline ends a multiline field + ends an entry
yield store
store = []
# jsonb rejects u0000
line = line.replace('\u0000', '')
match = field_pattern.match(line)
if match:
k, v = match.groups()
if v.isdigit():
v = int(v)
store.append((k, v))
else:
multiline_field = multiline_field + line + '\n'

View File

@ -1,12 +1,18 @@
import datetime
from functools import wraps
from django.contrib.auth.mixins import LoginRequiredMixin
from django.conf import settings
from django.core import serializers
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import TemplateView
from django.views.generic.list import ListView
from logtracker.journal.models import Entry
from logtracker.journal.journalstream import get_journal_entries
class APIEntriesList(LoginRequiredMixin, ListView):
@ -32,3 +38,68 @@ class HomeView(LoginRequiredMixin, TemplateView):
def post(self, request, *args, **kwargs):
url = '%s?%s' % (reverse('emails'), '%s=%s' % (request.POST['field'], request.POST['address']))
return HttpResponseRedirect(url)
def get_chunks(gen, n):
chunk = []
ended = False
while True:
for i in range(n):
try:
chunk.append(next(gen))
except StopIteration:
ended = True
yield chunk
chunk = []
if ended:
break
def ssl_client_verify(view):
@wraps(view)
def wrapper(request, *args, **kwargs):
headers = request.META
if headers.get('X-SSL') == 1 and headers.get('X-SSL-Client-Verify') == 0:
request.host_verified = headers.get('X-SSL-Client-CN')
else:
if settings.DEBUG:
request.host_verified = 'test_host'
else:
raise PermissionDenied
return view(request, *args, **kwargs)
return wrapper
@ssl_client_verify
@csrf_exempt
def UploadView(request, debug=False):
if request.method == "POST":
count = 0
data = {}
new_entries = []
start_timestamp = datetime.datetime.now()
timestamp = start_timestamp
journal_stream = request.META.get('wsgi.input')
for chunk in get_chunks(get_journal_entries(journal_stream), 20):
for el in chunk:
data = {k: v for k, v in el}
try:
timestamp = datetime.datetime.fromtimestamp(data['__REALTIME_TIMESTAMP'] / 1000000)
except (KeyError, TypeError, ValueError):
continue
entry = Entry(timestamp=timestamp, host=request.host_verified, data=data)
new_entries.append(entry)
count += 1
if debug and count % 1000 == 0:
print(count, timestamp)
try:
Entry.objects.bulk_create(new_entries)
new_entries = []
except:
# todo: log errors or raise?
continue
if debug:
elapsed = datetime.datetime.now() - start_timestamp
print('elapsed: %s' % elapsed)
print('count: %s' % count)
return HttpResponse('')

View File

@ -2,7 +2,7 @@ from django.conf.urls import include, url
from django.contrib import admin
from django.conf import settings
from logtracker.journal.views import APIEntriesList, EntriesList, HomeView
from logtracker.journal.views import APIEntriesList, EntriesList, HomeView, UploadView
from logtracker.mail.views import EmailsList, SendersList, MailHome
urlpatterns = [
@ -13,6 +13,7 @@ urlpatterns = [
url(r'^api/mail/$', EmailsList.as_view(), name='emails'),
url(r'^mail/$', MailHome.as_view(), name='mail'),
url(r'^mail/senders/$', SendersList.as_view(), name='senders'),
url(r'^upload$', UploadView, name='upload'),
]
if 'mellon' in settings.INSTALLED_APPS:

112
tests/test_journalstream.py Normal file
View File

@ -0,0 +1,112 @@
import io
from django.test.utils import override_settings
from logtracker.journal.journalstream import get_journal_entries
raw_entries = b'''__CURSOR=s=6f1be3fd9e534674878d6ce7394e035f;i=f3b60;b=f8595485f17145e89bf6d8068dfbf140;m=cb448036;t=5ab93a50ad4e5;x=99d88bd273a65ed7
__REALTIME_TIMESTAMP=1596025501045989
__MONOTONIC_TIMESTAMP=3410264118
_BOOT_ID=f8595485f17145e89bf6d8068dfbf140
PRIORITY=6
_TRANSPO\r
fff4\r
RT=journal
_SELINUX_CONTEXT
^K^@^@^@^@^@^@^@unconfined
__CURSOR=s=92043378025640f3989a22d75fc7422d;i=f5ce1;b=c2a67631459e4eb4bef62a31641efe64;m=cdec3b6f;t=5abf656e3cfe2;x=b34e4efef33912b1
__REALTIME_TIMESTAMP=1596449391628258
__MONOTONIC_TIMESTAMP=3454810991
_BOOT_ID=c2a67631459e4eb4bef62a31641efe64
_UID=1000
_GID=1000
_CAP_EFFECTIVE=0
_SELINUX_CONTEXT
^K^@^@^@^@^@^@^@unconfined
_MACHINE_ID=332e6893061344b3bf9e76f4f066f008
_HOSTNAME=pad
_TRANSPORT=journal
_AUDIT_LOGINUID=1000
_SYSTEMD_OWNER_UID=1000
_SYSTEMD_SLICE=user-1000.slice
_SYSTEMD_USER_SLICE=-.slice
_AUDIT_SESSION=2
_SYSTEMD_CGROUP=/user.slice/user-1000.slice/session-2.scope
_SYSTEMD_SESSION=2
_SYSTEMD_UNIT=session-2.scope
_SYSTEMD_INVOCATION_ID=56dcd19554ef4153b7a409267bac34d7
PRIORITY=4
GLIB_OLD_LOG_API=1
_PID=1886
_COMM=gnome-shell
_EXE=/usr/bin/gnome-shell
_CMDLINE=/usr/bin/gnome-shell
GLIB_DOMAIN=Gjs
MESSAGE
^@^@^@^@^@^@^@JS ERROR: TypeError: this._trackedWindows.get(...) is undefined
_onWindowActorRemoved@resource:///org/gnome/shell/ui/panel.js:843:9
wrapper@resource:///org/gnome/gjs/modules/_legacy.js:82:22
_SOURCE_REALTIME_TIMESTAMP=1596449391615304
__CURSOR=s=92043378025640f3989a22d75fc7422d;i=f5ce2;b=c2a67631459e4eb4bef62a31641efe64;m=cdec3da9;t=5abf656e3d21b;x=43f26d5c857dddba
__REALTIME_TIMESTAMP=1596449391628827
__MONOTONIC_TIMESTAMP=3454811561
_BOOT_ID=c2a67631459e4eb4bef62a31641efe64
_UID=1000
_GID=1000
_CAP_EFFECTIVE=0
_SELINUX_CONTEXT
^K^@^@^@^@^@^@^@unconfined
_MACHINE_ID=332e6893061344b3bf9e76f4f066f008
_HOSTNAME=pad
PRIORITY=6
SYSLOG_FACILITY=3
SYSLOG_IDENTIFIER=systemd
_TRANSPORT=journal
_PID=1827
_COMM=systemd
_EXE=/lib/systemd/systemd
_CMDLINE=/lib/systemd/systemd --user
_AUDIT_SESSION=3
_AUDIT_LOGINUID=1000
_SYSTEMD_CGROUP=/user.slice/user-1000.slice/user@1000.service/init.scope
_SYSTEMD_OWNER_UID=1000
_SYSTEMD_UNIT=user@1000.service
_SYSTEMD_USER_UNIT=init.scope
_SYSTEMD_SLICE=user-1000.slice
_SYSTEMD_USER_SLICE=-.slice
_SYSTEMD_INVOCATION_ID=8ce5033623b845dfae8ea4666504d836
USER_UNIT=at-spi-dbus-bus.service
USER_INVOCATION_ID=cab95ef28cc14d17a509f096120f7d19
CODE_FILE=src/core/unit.c
CODE_LINE=5745
CODE_FUNC=unit_log_success
MESSAGE_ID=7ad2d189f7e94e70a38c781354912448
MESSAGE=at-spi-dbus-bus.service: Succeeded.
_SOURCE_REALTIME_TIMESTAMP=1596449391625441
0\r
'''
def test_journal_stream_auth(client):
page = client.get('/upload')
assert page.status_code == 403
@override_settings(DEBUG=True)
def test_journal_stream_auth_debug(client):
page = client.get('/upload')
assert page.status_code == 200
def test_journal_stream():
bf = io.BytesIO(raw_entries)
entries = list(get_journal_entries(bf))
assert len(entries) == 3
assert ('_TRANSPORT', 'journal') in entries[0]
assert '_legacy.js:82' in [v for k, v in entries[1] if k == 'MESSAGE'][0]