diff --git a/logtracker/journal/journalstream.py b/logtracker/journal/journalstream.py new file mode 100644 index 0000000..ad03c9f --- /dev/null +++ b/logtracker/journal/journalstream.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +# logtracker +# Copyright (c) 2020 Entr'ouvert +import re + +field_pattern = re.compile(r'^([A-Z0-9_]+)=(.*)$') +field_multiline_pattern = re.compile(r'^([A-Z_][A-Z0-9_]+)\n([\w\W]*)$') + + +def handle_journal_upload_stream(journal_stream, debug=False): + tail = '' + while True: + line = journal_stream.readline().decode('utf-8', errors='replace').rstrip('\n') + if line.endswith('\r'): + line = line.rstrip('\r') + if not line: + continue + elif line == '0': + break + else: + if (line[0].islower() or line[0].isdigit()) and len(line) < 6: + if debug: + print('ignore ff7c fff4 3d9a etc.: %s' % line) + else: + tail = tail + line + else: + if tail: + line = tail + line + tail = '' + yield line + + +def get_journal_entries(journal_stream, debug=False): + store = [] + multiline_field = '' + emptylines_count = 0 + for line in handle_journal_upload_stream(journal_stream): + if not line: + if multiline_field: + match = field_multiline_pattern.match(multiline_field) + if match: + k, v = match.groups() + store.append((k, v)) + else: + if debug: + print('content dropped: %s' % multiline_field) + multiline_field = '' + else: + if store: + yield store + store = [] + emptylines_count = 0 + else: + emptylines_count += 1 + if emptylines_count >= 3: + # disconnect broken stream + break + continue + if line.startswith('__CURSOR') and store: + # sometimes a newline ends a multiline field + ends an entry + yield store + store = [] + # jsonb rejects u0000 + line = line.replace('\u0000', '') + match = field_pattern.match(line) + if match: + k, v = match.groups() + if v.isdigit(): + v = int(v) + store.append((k, v)) + else: + multiline_field = multiline_field + line + '\n' diff --git a/logtracker/journal/views.py b/logtracker/journal/views.py index a14622b..e7393cf 100644 --- a/logtracker/journal/views.py +++ b/logtracker/journal/views.py @@ -1,12 +1,18 @@ +import datetime +from functools import wraps + from django.contrib.auth.mixins import LoginRequiredMixin +from django.conf import settings from django.core import serializers +from django.core.exceptions import PermissionDenied from django.http import HttpResponse, HttpResponseRedirect -from django.shortcuts import get_object_or_404 from django.urls import reverse +from django.views.decorators.csrf import csrf_exempt from django.views.generic import TemplateView from django.views.generic.list import ListView from logtracker.journal.models import Entry +from logtracker.journal.journalstream import get_journal_entries class APIEntriesList(LoginRequiredMixin, ListView): @@ -32,3 +38,68 @@ class HomeView(LoginRequiredMixin, TemplateView): def post(self, request, *args, **kwargs): url = '%s?%s' % (reverse('emails'), '%s=%s' % (request.POST['field'], request.POST['address'])) return HttpResponseRedirect(url) + + +def get_chunks(gen, n): + chunk = [] + ended = False + while True: + for i in range(n): + try: + chunk.append(next(gen)) + except StopIteration: + ended = True + yield chunk + chunk = [] + if ended: + break + + +def ssl_client_verify(view): + @wraps(view) + def wrapper(request, *args, **kwargs): + headers = request.META + if headers.get('X-SSL') == 1 and headers.get('X-SSL-Client-Verify') == 0: + request.host_verified = headers.get('X-SSL-Client-CN') + else: + if settings.DEBUG: + request.host_verified = 'test_host' + else: + raise PermissionDenied + return view(request, *args, **kwargs) + return wrapper + + +@ssl_client_verify +@csrf_exempt +def UploadView(request, debug=False): + if request.method == "POST": + count = 0 + data = {} + new_entries = [] + start_timestamp = datetime.datetime.now() + timestamp = start_timestamp + journal_stream = request.META.get('wsgi.input') + for chunk in get_chunks(get_journal_entries(journal_stream), 20): + for el in chunk: + data = {k: v for k, v in el} + try: + timestamp = datetime.datetime.fromtimestamp(data['__REALTIME_TIMESTAMP'] / 1000000) + except (KeyError, TypeError, ValueError): + continue + entry = Entry(timestamp=timestamp, host=request.host_verified, data=data) + new_entries.append(entry) + count += 1 + if debug and count % 1000 == 0: + print(count, timestamp) + try: + Entry.objects.bulk_create(new_entries) + new_entries = [] + except: + # todo: log errors or raise? + continue + if debug: + elapsed = datetime.datetime.now() - start_timestamp + print('elapsed: %s' % elapsed) + print('count: %s' % count) + return HttpResponse('') diff --git a/logtracker/urls.py b/logtracker/urls.py index 8c644ab..a082064 100644 --- a/logtracker/urls.py +++ b/logtracker/urls.py @@ -2,7 +2,7 @@ from django.conf.urls import include, url from django.contrib import admin from django.conf import settings -from logtracker.journal.views import APIEntriesList, EntriesList, HomeView +from logtracker.journal.views import APIEntriesList, EntriesList, HomeView, UploadView from logtracker.mail.views import EmailsList, SendersList, MailHome urlpatterns = [ @@ -13,6 +13,7 @@ urlpatterns = [ url(r'^api/mail/$', EmailsList.as_view(), name='emails'), url(r'^mail/$', MailHome.as_view(), name='mail'), url(r'^mail/senders/$', SendersList.as_view(), name='senders'), + url(r'^upload$', UploadView, name='upload'), ] if 'mellon' in settings.INSTALLED_APPS: diff --git a/tests/test_journalstream.py b/tests/test_journalstream.py new file mode 100644 index 0000000..1012675 --- /dev/null +++ b/tests/test_journalstream.py @@ -0,0 +1,112 @@ +import io +from django.test.utils import override_settings +from logtracker.journal.journalstream import get_journal_entries + + +raw_entries = b'''__CURSOR=s=6f1be3fd9e534674878d6ce7394e035f;i=f3b60;b=f8595485f17145e89bf6d8068dfbf140;m=cb448036;t=5ab93a50ad4e5;x=99d88bd273a65ed7 +__REALTIME_TIMESTAMP=1596025501045989 +__MONOTONIC_TIMESTAMP=3410264118 +_BOOT_ID=f8595485f17145e89bf6d8068dfbf140 +PRIORITY=6 +_TRANSPO\r +fff4\r +RT=journal +_SELINUX_CONTEXT +^K^@^@^@^@^@^@^@unconfined + +__CURSOR=s=92043378025640f3989a22d75fc7422d;i=f5ce1;b=c2a67631459e4eb4bef62a31641efe64;m=cdec3b6f;t=5abf656e3cfe2;x=b34e4efef33912b1 +__REALTIME_TIMESTAMP=1596449391628258 +__MONOTONIC_TIMESTAMP=3454810991 +_BOOT_ID=c2a67631459e4eb4bef62a31641efe64 +_UID=1000 +_GID=1000 +_CAP_EFFECTIVE=0 +_SELINUX_CONTEXT +^K^@^@^@^@^@^@^@unconfined + +_MACHINE_ID=332e6893061344b3bf9e76f4f066f008 +_HOSTNAME=pad +_TRANSPORT=journal +_AUDIT_LOGINUID=1000 +_SYSTEMD_OWNER_UID=1000 +_SYSTEMD_SLICE=user-1000.slice +_SYSTEMD_USER_SLICE=-.slice +_AUDIT_SESSION=2 +_SYSTEMD_CGROUP=/user.slice/user-1000.slice/session-2.scope +_SYSTEMD_SESSION=2 +_SYSTEMD_UNIT=session-2.scope +_SYSTEMD_INVOCATION_ID=56dcd19554ef4153b7a409267bac34d7 +PRIORITY=4 +GLIB_OLD_LOG_API=1 +_PID=1886 +_COMM=gnome-shell +_EXE=/usr/bin/gnome-shell +_CMDLINE=/usr/bin/gnome-shell +GLIB_DOMAIN=Gjs +MESSAGE +^@^@^@^@^@^@^@JS ERROR: TypeError: this._trackedWindows.get(...) is undefined +_onWindowActorRemoved@resource:///org/gnome/shell/ui/panel.js:843:9 +wrapper@resource:///org/gnome/gjs/modules/_legacy.js:82:22 + +_SOURCE_REALTIME_TIMESTAMP=1596449391615304 + + +__CURSOR=s=92043378025640f3989a22d75fc7422d;i=f5ce2;b=c2a67631459e4eb4bef62a31641efe64;m=cdec3da9;t=5abf656e3d21b;x=43f26d5c857dddba +__REALTIME_TIMESTAMP=1596449391628827 +__MONOTONIC_TIMESTAMP=3454811561 +_BOOT_ID=c2a67631459e4eb4bef62a31641efe64 +_UID=1000 +_GID=1000 +_CAP_EFFECTIVE=0 +_SELINUX_CONTEXT +^K^@^@^@^@^@^@^@unconfined + +_MACHINE_ID=332e6893061344b3bf9e76f4f066f008 +_HOSTNAME=pad +PRIORITY=6 +SYSLOG_FACILITY=3 +SYSLOG_IDENTIFIER=systemd +_TRANSPORT=journal +_PID=1827 +_COMM=systemd +_EXE=/lib/systemd/systemd +_CMDLINE=/lib/systemd/systemd --user +_AUDIT_SESSION=3 +_AUDIT_LOGINUID=1000 +_SYSTEMD_CGROUP=/user.slice/user-1000.slice/user@1000.service/init.scope +_SYSTEMD_OWNER_UID=1000 +_SYSTEMD_UNIT=user@1000.service +_SYSTEMD_USER_UNIT=init.scope +_SYSTEMD_SLICE=user-1000.slice +_SYSTEMD_USER_SLICE=-.slice +_SYSTEMD_INVOCATION_ID=8ce5033623b845dfae8ea4666504d836 +USER_UNIT=at-spi-dbus-bus.service +USER_INVOCATION_ID=cab95ef28cc14d17a509f096120f7d19 +CODE_FILE=src/core/unit.c +CODE_LINE=5745 +CODE_FUNC=unit_log_success +MESSAGE_ID=7ad2d189f7e94e70a38c781354912448 +MESSAGE=at-spi-dbus-bus.service: Succeeded. +_SOURCE_REALTIME_TIMESTAMP=1596449391625441 + +0\r +''' + + +def test_journal_stream_auth(client): + page = client.get('/upload') + assert page.status_code == 403 + + +@override_settings(DEBUG=True) +def test_journal_stream_auth_debug(client): + page = client.get('/upload') + assert page.status_code == 200 + + +def test_journal_stream(): + bf = io.BytesIO(raw_entries) + entries = list(get_journal_entries(bf)) + assert len(entries) == 3 + assert ('_TRANSPORT', 'journal') in entries[0] + assert '_legacy.js:82' in [v for k, v in entries[1] if k == 'MESSAGE'][0]