scrutiny/scrutiny/projects/models.py

178 lines
6.1 KiB
Python

import os
import re
import subprocess
from django.conf import settings
from django.db import models
class Project(models.Model):
title = models.CharField(max_length=50)
slug = models.SlugField()
def __unicode__(self):
return self.title
class Platform(models.Model):
project = models.ForeignKey('Project')
title = models.CharField(max_length=50)
slug = models.SlugField()
order = models.PositiveIntegerField()
class Meta:
ordering = ['order']
def __unicode__(self):
return '%s / %s' % (self.project.title, self.title)
class Service(models.Model):
project = models.ForeignKey('Project')
title = models.CharField(max_length=50)
slug = models.SlugField()
def __unicode__(self):
return '%s / %s' % (self.project.title, self.title)
def get_modules(self, platforms=None):
modules = {}
if platforms is None:
platforms = self.project.platform_set.all()
for platform in platforms:
try:
installed_service = InstalledService.objects.get(service=self,
platform=platform)
except InstalledService.DoesNotExist:
continue
seen_modules = []
uninstalled_modules = []
for installed_version in InstalledVersion.objects.filter(
service=installed_service).order_by('-timestamp'):
if installed_version.version.module in seen_modules:
continue
seen_modules.append(installed_version.version.module)
if not installed_version.version.version:
uninstalled_modules.append(installed_version.version.module)
continue
modules[installed_version.version.module.name] = installed_version.version.module
return sorted(modules.values(), key=lambda x: x.name)
def get_installed_service(self, platform):
try:
installed_service = InstalledService.objects.get(service=self,
platform=platform)
except InstalledService.DoesNotExist:
return None
return installed_service
class InstalledService(models.Model):
platform = models.ForeignKey('Platform')
service = models.ForeignKey('Service')
url = models.URLField()
def __unicode__(self):
return '%s / %s / %s (@ %s)' % (self.platform.project.title,
self.platform.title, self.service.title,
self.url)
class Module(models.Model):
name = models.CharField(max_length=50)
repository_url = models.URLField()
def __unicode__(self):
return self.name
def get_installed_version(self, platform, service):
try:
installed_service = InstalledService.objects.get(
platform=platform, service=service)
except InstalledService.DoesNotExist:
return None
try:
v = InstalledVersion.objects.filter(
service=installed_service, version__module=self).order_by('-timestamp')[0]
except IndexError:
return None
return v
def get_git_log(self):
kws = {}
kws['stdout'] = subprocess.PIPE
kws['stderr'] = subprocess.STDOUT
kws['cwd'] = os.path.join(settings.MEDIA_ROOT, 'src', self.name)
cmd = ['git', 'log', '--cherry-pick', '--format=oneline']
p = subprocess.Popen(cmd, **kws)
stdout = p.communicate()[0]
p.wait()
return [x.split(' ', 1) for x in stdout.splitlines()]
def get_version_hash(self, version_number):
if re.findall(r'\.g([0-9a-f]{7})', version_number):
return re.findall(r'\.g([0-9a-f]{7})', version_number)[0]
if re.findall(r'\+g([0-9a-f]{7})', version_number):
return re.findall(r'\+g([0-9a-f]{7})', version_number)[0]
tagname = 'v' + version_number
kws = {}
kws['stdout'] = subprocess.PIPE
kws['stderr'] = subprocess.STDOUT
kws['cwd'] = os.path.join(settings.MEDIA_ROOT, 'src', self.name)
cmd = ['git', 'rev-list', '-1', tagname]
p = subprocess.Popen(cmd, **kws)
stdout = p.communicate()[0]
p.wait()
return stdout[:7]
def get_diff_log(self, v1, v2, format='oneline', grep=None):
kws = {}
kws['stdout'] = subprocess.PIPE
kws['stderr'] = subprocess.STDOUT
kws['cwd'] = os.path.join(settings.MEDIA_ROOT, 'src', self.name)
cmd = ['git', 'log', '--left-right', '--cherry-pick', '--format=%s' % format]
if grep:
cmd.append('--grep')
cmd.append(grep)
def get_ref(v):
if re.findall(r'\.g([0-9a-f]{7})', v):
return re.findall(r'\.g([0-9a-f]{7})', v)[0]
if re.findall(r'\+g([0-9a-f]{7})', v):
return re.findall(r'\+g([0-9a-f]{7})', v)[0]
return 'v' + v
cmd.append('%s...%s' % (get_ref(v1), get_ref(v2)))
p = subprocess.Popen(cmd, **kws)
stdout = p.communicate()[0]
p.wait()
return stdout.decode('utf-8')
def get_all_installed_versions(self):
versions = set()
for service in InstalledService.objects.all():
version = self.get_installed_version(service.platform, service.service)
if version and version.version.version:
versions.add(version.version.version)
return versions
class Version(models.Model):
module = models.ForeignKey('Module')
version = models.CharField(max_length=100, blank=True)
def __unicode__(self):
return '%s %s' % (self.module.name, self.version)
class InstalledVersion(models.Model):
service = models.ForeignKey('InstalledService')
version = models.ForeignKey('Version')
timestamp = models.DateTimeField()
def get_previous_version(self):
try:
return InstalledVersion.objects.filter(service=self.service,
version__module=self.version.module,
timestamp__lt=self.timestamp
).order_by('-timestamp')[0]
except IndexError:
return None