journal: add upload view
This commit is contained in:
parent
2568b29dad
commit
c78bc66bcb
|
@ -0,0 +1,72 @@
|
|||
#!/usr/bin/env python3
|
||||
# logtracker
|
||||
# Copyright (c) 2020 Entr'ouvert
|
||||
import re
|
||||
|
||||
field_pattern = re.compile(r'^([A-Z0-9_]+)=(.*)$')
|
||||
field_multiline_pattern = re.compile(r'^([A-Z_][A-Z0-9_]+)\n([\w\W]*)$')
|
||||
|
||||
|
||||
def handle_journal_upload_stream(journal_stream, debug=False):
|
||||
tail = ''
|
||||
while True:
|
||||
line = journal_stream.readline().decode('utf-8', errors='replace').rstrip('\n')
|
||||
if line.endswith('\r'):
|
||||
line = line.rstrip('\r')
|
||||
if not line:
|
||||
continue
|
||||
elif line == '0':
|
||||
break
|
||||
else:
|
||||
if (line[0].islower() or line[0].isdigit()) and len(line) < 6:
|
||||
if debug:
|
||||
print('ignore ff7c fff4 3d9a etc.: %s' % line)
|
||||
else:
|
||||
tail = tail + line
|
||||
else:
|
||||
if tail:
|
||||
line = tail + line
|
||||
tail = ''
|
||||
yield line
|
||||
|
||||
|
||||
def get_journal_entries(journal_stream, debug=False):
|
||||
store = []
|
||||
multiline_field = ''
|
||||
emptylines_count = 0
|
||||
for line in handle_journal_upload_stream(journal_stream):
|
||||
if not line:
|
||||
if multiline_field:
|
||||
match = field_multiline_pattern.match(multiline_field)
|
||||
if match:
|
||||
k, v = match.groups()
|
||||
store.append((k, v))
|
||||
else:
|
||||
if debug:
|
||||
print('content dropped: %s' % multiline_field)
|
||||
multiline_field = ''
|
||||
else:
|
||||
if store:
|
||||
yield store
|
||||
store = []
|
||||
emptylines_count = 0
|
||||
else:
|
||||
emptylines_count += 1
|
||||
if emptylines_count >= 3:
|
||||
# disconnect broken stream
|
||||
break
|
||||
continue
|
||||
if line.startswith('__CURSOR') and store:
|
||||
# sometimes a newline ends a multiline field + ends an entry
|
||||
yield store
|
||||
store = []
|
||||
# jsonb rejects u0000
|
||||
line = line.replace('\u0000', '')
|
||||
match = field_pattern.match(line)
|
||||
if match:
|
||||
k, v = match.groups()
|
||||
if v.isdigit():
|
||||
v = int(v)
|
||||
store.append((k, v))
|
||||
else:
|
||||
multiline_field = multiline_field + line + '\n'
|
|
@ -1,12 +1,18 @@
|
|||
import datetime
|
||||
from functools import wraps
|
||||
|
||||
from django.contrib.auth.mixins import LoginRequiredMixin
|
||||
from django.conf import settings
|
||||
from django.core import serializers
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.http import HttpResponse, HttpResponseRedirect
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.urls import reverse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.generic import TemplateView
|
||||
from django.views.generic.list import ListView
|
||||
|
||||
from logtracker.journal.models import Entry
|
||||
from logtracker.journal.journalstream import get_journal_entries
|
||||
|
||||
|
||||
class APIEntriesList(LoginRequiredMixin, ListView):
|
||||
|
@ -32,3 +38,68 @@ class HomeView(LoginRequiredMixin, TemplateView):
|
|||
def post(self, request, *args, **kwargs):
|
||||
url = '%s?%s' % (reverse('emails'), '%s=%s' % (request.POST['field'], request.POST['address']))
|
||||
return HttpResponseRedirect(url)
|
||||
|
||||
|
||||
def get_chunks(gen, n):
|
||||
chunk = []
|
||||
ended = False
|
||||
while True:
|
||||
for i in range(n):
|
||||
try:
|
||||
chunk.append(next(gen))
|
||||
except StopIteration:
|
||||
ended = True
|
||||
yield chunk
|
||||
chunk = []
|
||||
if ended:
|
||||
break
|
||||
|
||||
|
||||
def ssl_client_verify(view):
|
||||
@wraps(view)
|
||||
def wrapper(request, *args, **kwargs):
|
||||
headers = request.META
|
||||
if headers.get('X-SSL') == 1 and headers.get('X-SSL-Client-Verify') == 0:
|
||||
request.host_verified = headers.get('X-SSL-Client-CN')
|
||||
else:
|
||||
if settings.DEBUG:
|
||||
request.host_verified = 'test_host'
|
||||
else:
|
||||
raise PermissionDenied
|
||||
return view(request, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
@ssl_client_verify
|
||||
@csrf_exempt
|
||||
def UploadView(request, debug=False):
|
||||
if request.method == "POST":
|
||||
count = 0
|
||||
data = {}
|
||||
new_entries = []
|
||||
start_timestamp = datetime.datetime.now()
|
||||
timestamp = start_timestamp
|
||||
journal_stream = request.META.get('wsgi.input')
|
||||
for chunk in get_chunks(get_journal_entries(journal_stream), 20):
|
||||
for el in chunk:
|
||||
data = {k: v for k, v in el}
|
||||
try:
|
||||
timestamp = datetime.datetime.fromtimestamp(data['__REALTIME_TIMESTAMP'] / 1000000)
|
||||
except (KeyError, TypeError, ValueError):
|
||||
continue
|
||||
entry = Entry(timestamp=timestamp, host=request.host_verified, data=data)
|
||||
new_entries.append(entry)
|
||||
count += 1
|
||||
if debug and count % 1000 == 0:
|
||||
print(count, timestamp)
|
||||
try:
|
||||
Entry.objects.bulk_create(new_entries)
|
||||
new_entries = []
|
||||
except:
|
||||
# todo: log errors or raise?
|
||||
continue
|
||||
if debug:
|
||||
elapsed = datetime.datetime.now() - start_timestamp
|
||||
print('elapsed: %s' % elapsed)
|
||||
print('count: %s' % count)
|
||||
return HttpResponse('')
|
||||
|
|
|
@ -2,7 +2,7 @@ from django.conf.urls import include, url
|
|||
from django.contrib import admin
|
||||
from django.conf import settings
|
||||
|
||||
from logtracker.journal.views import APIEntriesList, EntriesList, HomeView
|
||||
from logtracker.journal.views import APIEntriesList, EntriesList, HomeView, UploadView
|
||||
from logtracker.mail.views import EmailsList, SendersList, MailHome
|
||||
|
||||
urlpatterns = [
|
||||
|
@ -13,6 +13,7 @@ urlpatterns = [
|
|||
url(r'^api/mail/$', EmailsList.as_view(), name='emails'),
|
||||
url(r'^mail/$', MailHome.as_view(), name='mail'),
|
||||
url(r'^mail/senders/$', SendersList.as_view(), name='senders'),
|
||||
url(r'^upload$', UploadView, name='upload'),
|
||||
]
|
||||
|
||||
if 'mellon' in settings.INSTALLED_APPS:
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
import io
|
||||
from django.test.utils import override_settings
|
||||
from logtracker.journal.journalstream import get_journal_entries
|
||||
|
||||
|
||||
raw_entries = b'''__CURSOR=s=6f1be3fd9e534674878d6ce7394e035f;i=f3b60;b=f8595485f17145e89bf6d8068dfbf140;m=cb448036;t=5ab93a50ad4e5;x=99d88bd273a65ed7
|
||||
__REALTIME_TIMESTAMP=1596025501045989
|
||||
__MONOTONIC_TIMESTAMP=3410264118
|
||||
_BOOT_ID=f8595485f17145e89bf6d8068dfbf140
|
||||
PRIORITY=6
|
||||
_TRANSPO\r
|
||||
fff4\r
|
||||
RT=journal
|
||||
_SELINUX_CONTEXT
|
||||
^K^@^@^@^@^@^@^@unconfined
|
||||
|
||||
__CURSOR=s=92043378025640f3989a22d75fc7422d;i=f5ce1;b=c2a67631459e4eb4bef62a31641efe64;m=cdec3b6f;t=5abf656e3cfe2;x=b34e4efef33912b1
|
||||
__REALTIME_TIMESTAMP=1596449391628258
|
||||
__MONOTONIC_TIMESTAMP=3454810991
|
||||
_BOOT_ID=c2a67631459e4eb4bef62a31641efe64
|
||||
_UID=1000
|
||||
_GID=1000
|
||||
_CAP_EFFECTIVE=0
|
||||
_SELINUX_CONTEXT
|
||||
^K^@^@^@^@^@^@^@unconfined
|
||||
|
||||
_MACHINE_ID=332e6893061344b3bf9e76f4f066f008
|
||||
_HOSTNAME=pad
|
||||
_TRANSPORT=journal
|
||||
_AUDIT_LOGINUID=1000
|
||||
_SYSTEMD_OWNER_UID=1000
|
||||
_SYSTEMD_SLICE=user-1000.slice
|
||||
_SYSTEMD_USER_SLICE=-.slice
|
||||
_AUDIT_SESSION=2
|
||||
_SYSTEMD_CGROUP=/user.slice/user-1000.slice/session-2.scope
|
||||
_SYSTEMD_SESSION=2
|
||||
_SYSTEMD_UNIT=session-2.scope
|
||||
_SYSTEMD_INVOCATION_ID=56dcd19554ef4153b7a409267bac34d7
|
||||
PRIORITY=4
|
||||
GLIB_OLD_LOG_API=1
|
||||
_PID=1886
|
||||
_COMM=gnome-shell
|
||||
_EXE=/usr/bin/gnome-shell
|
||||
_CMDLINE=/usr/bin/gnome-shell
|
||||
GLIB_DOMAIN=Gjs
|
||||
MESSAGE
|
||||
^@^@^@^@^@^@^@JS ERROR: TypeError: this._trackedWindows.get(...) is undefined
|
||||
_onWindowActorRemoved@resource:///org/gnome/shell/ui/panel.js:843:9
|
||||
wrapper@resource:///org/gnome/gjs/modules/_legacy.js:82:22
|
||||
|
||||
_SOURCE_REALTIME_TIMESTAMP=1596449391615304
|
||||
|
||||
|
||||
__CURSOR=s=92043378025640f3989a22d75fc7422d;i=f5ce2;b=c2a67631459e4eb4bef62a31641efe64;m=cdec3da9;t=5abf656e3d21b;x=43f26d5c857dddba
|
||||
__REALTIME_TIMESTAMP=1596449391628827
|
||||
__MONOTONIC_TIMESTAMP=3454811561
|
||||
_BOOT_ID=c2a67631459e4eb4bef62a31641efe64
|
||||
_UID=1000
|
||||
_GID=1000
|
||||
_CAP_EFFECTIVE=0
|
||||
_SELINUX_CONTEXT
|
||||
^K^@^@^@^@^@^@^@unconfined
|
||||
|
||||
_MACHINE_ID=332e6893061344b3bf9e76f4f066f008
|
||||
_HOSTNAME=pad
|
||||
PRIORITY=6
|
||||
SYSLOG_FACILITY=3
|
||||
SYSLOG_IDENTIFIER=systemd
|
||||
_TRANSPORT=journal
|
||||
_PID=1827
|
||||
_COMM=systemd
|
||||
_EXE=/lib/systemd/systemd
|
||||
_CMDLINE=/lib/systemd/systemd --user
|
||||
_AUDIT_SESSION=3
|
||||
_AUDIT_LOGINUID=1000
|
||||
_SYSTEMD_CGROUP=/user.slice/user-1000.slice/user@1000.service/init.scope
|
||||
_SYSTEMD_OWNER_UID=1000
|
||||
_SYSTEMD_UNIT=user@1000.service
|
||||
_SYSTEMD_USER_UNIT=init.scope
|
||||
_SYSTEMD_SLICE=user-1000.slice
|
||||
_SYSTEMD_USER_SLICE=-.slice
|
||||
_SYSTEMD_INVOCATION_ID=8ce5033623b845dfae8ea4666504d836
|
||||
USER_UNIT=at-spi-dbus-bus.service
|
||||
USER_INVOCATION_ID=cab95ef28cc14d17a509f096120f7d19
|
||||
CODE_FILE=src/core/unit.c
|
||||
CODE_LINE=5745
|
||||
CODE_FUNC=unit_log_success
|
||||
MESSAGE_ID=7ad2d189f7e94e70a38c781354912448
|
||||
MESSAGE=at-spi-dbus-bus.service: Succeeded.
|
||||
_SOURCE_REALTIME_TIMESTAMP=1596449391625441
|
||||
|
||||
0\r
|
||||
'''
|
||||
|
||||
|
||||
def test_journal_stream_auth(client):
|
||||
page = client.get('/upload')
|
||||
assert page.status_code == 403
|
||||
|
||||
|
||||
@override_settings(DEBUG=True)
|
||||
def test_journal_stream_auth_debug(client):
|
||||
page = client.get('/upload')
|
||||
assert page.status_code == 200
|
||||
|
||||
|
||||
def test_journal_stream():
|
||||
bf = io.BytesIO(raw_entries)
|
||||
entries = list(get_journal_entries(bf))
|
||||
assert len(entries) == 3
|
||||
assert ('_TRANSPORT', 'journal') in entries[0]
|
||||
assert '_legacy.js:82' in [v for k, v in entries[1] if k == 'MESSAGE'][0]
|
Reference in New Issue