git-redmine/git_redmine.py

761 lines
27 KiB
Python

#!/usr/bin/env python3
# pip install --user python-redmine click
import configparser
import glob
import os
import re
import subprocess
import tempfile
from datetime import datetime, timedelta
import click
import git
import unidecode
from redminelib import Redmine
from requests.adapters import HTTPAdapter
MARKER = '# Everything below is ignored\n'
def slugify(title):
title = unidecode.unidecode(title)
title = re.sub('[^a-zA-Z]+', '-', title)
return title
def get_repo():
return git.Repo(search_parent_directories=True)
def get_config(name, default=Ellipsis):
repo = get_repo()
reader = repo.config_reader()
if not reader.has_section('redmine'):
raise click.UsageError('Please add a redmine section to your git configuration')
if default is Ellipsis:
return reader.get('redmine', name)
else:
return reader.get('redmine', name, fallback=default)
def get_main_branch_name():
# try to get local default
try:
return get_config('main')
except configparser.NoOptionError:
pass
# or guess...
repo = get_repo()
reader = repo.config_reader()
default = reader.get('init', 'defaultBranch', fallback='main')
for branch_name in [default, 'master', 'main']:
if branch_name in repo.branches:
return branch_name
def get_redmine_api():
url = get_config('url')
key = get_config('key', None)
username = get_config('username', None)
password = get_config('password', None)
if not key and (not username or not password):
raise click.UsageError('Please add a redmine\'s key or username/password')
if key:
kwargs = dict(key=key)
else:
kwargs = dict(username=username, password=password)
redmine = Redmine(url, **kwargs)
redmine.engine.session.mount('http://', HTTPAdapter(max_retries=3))
redmine.engine.session.mount('https://', HTTPAdapter(max_retries=3))
redmine.statuses = {status.name: status for status in redmine.issue_status.all()}
redmine.nouveau = redmine.statuses['Nouveau']
redmine.solution = redmine.statuses['Solution proposée']
redmine.solution_validee = redmine.statuses['Solution validée']
redmine.resolu_a_deployer = redmine.statuses['Résolu (à déployer)']
redmine.developpement = [tr for tr in redmine.tracker.all() if tr.name == 'Développement'][0]
redmine.rustine = [cf for cf in redmine.custom_field.all() if cf.name == 'Rustine proposée'][0]
return redmine
def set_redmine(repo, section, option, value):
with repo.config_writer() as config_writer:
if not config_writer.has_section(section):
config_writer.add_section(section)
config_writer.set(section, option, value)
def set_branch_option(repo, branch, option, value):
set_redmine(repo, 'branch "%s"' % branch.name, option, value)
def get_issue(issue_number=None):
if not issue_number:
issue_number = get_current_issue()
api = get_redmine_api()
try:
issue = api.issue.get(issue_number)
except Exception:
raise click.UsageError('Cannot find issue %s' % issue_number)
return issue
def get_current_issue():
repo = get_repo()
branch_name = repo.head.reference.name
splitted = branch_name.rsplit('/', 1)
issue_number = splitted[-1].split('-')[0]
try:
issue_number = int(issue_number)
except Exception:
raise click.UsageError('Cannot find an issue number in current branch name %s' % branch_name)
return issue_number
def get_current_project():
project_id = get_config('project', None)
if not project_id:
raise click.UsageError('No default project is set')
api = get_redmine_api()
return api.project.get(project_id)
def get_patches(number_of_commits=0, ref=None):
repo = get_repo()
tempdir = tempfile.mkdtemp()
if number_of_commits:
ref = 'HEAD' + '~' * number_of_commits
elif not ref:
ref = '@{upstream}'
repo.git.format_patch(ref, o=tempdir)
def helper():
for path in glob.glob(os.path.join(tempdir, '*.patch')):
yield {
'path': path,
'filename': os.path.basename(path),
}
return list(helper())
@click.group()
def redmine():
"""Integrate git branch with redmine, you must configure your .config/git/config file
with a [redmine] section and keys: url, key or username/password.
"""
pass
@redmine.command()
def main_branch():
print(get_main_branch_name())
@redmine.command()
def shell():
import IPython
api = get_redmine_api()
repo = get_repo()
IPython.embed()
@redmine.group()
def issue():
pass
@redmine.group(invoke_without_command=True)
@click.pass_context
def project(ctx):
if ctx.invoked_subcommand is None:
project = get_current_project()
click.echo('Current project %s' % project)
def apply_attachments(repo, issue):
if not issue.attachments.total_count:
return
print('Currently attached patches')
attachments = sorted(issue.attachments, key=lambda a: a.id)
for i, attachment in enumerate(attachments):
print(i, attachment.created_on, '%6d bytes' % attachment.filesize, attachment.filename)
while True:
indexes = click.prompt(
'Which patch would you like to apply (id separated by spaces) ?', type=str, default=''
)
try:
indexes = indexes.strip()
if not indexes:
break
indexes = list(map(int, indexes.split()))
if not all(i < len(attachments) for i in indexes):
raise ValueError('invalid values', indexes)
except Exception as e:
print('error:', e)
continue
else:
break
for index in indexes:
attachment = attachments[index]
content = attachment.download().content
try:
p = repo.git.execute(['git', 'am', '-3'], istream=subprocess.PIPE, as_process=True)
p.communicate(content)
except Exception as e:
print(e)
print('Applying patch', index, attachment.filename, 'failed, please fix it.')
break
@issue.command()
@click.argument('issue_number')
@click.option('--reference', default=None)
def take(issue_number, reference):
'''Create or switch to a branch to fix an issue'''
api = get_redmine_api()
issue = api.issue.get(issue_number, include='attachments')
repo = get_repo()
new = False
for head in repo.heads:
if '/%s-' % issue_number in head.name:
branch_name = head.name
branch = head
if click.confirm('Do you want to move to branch « %s » ?' % branch_name, default=True):
break
else:
new = True
cloned = False
default_branch_name = 'wip/%s-%s' % (issue_number, slugify(issue.subject)[:32].strip('-'))
repo.remotes.origin.fetch()
for ref in repo.remotes.origin.refs:
if 'wip/%s-' % issue_number not in ref.name:
continue
if not click.confirm(
'Do you want to clone remote branch "%s" ?' % ref.name,
default=False,
):
continue
cloned = True
branch_name = click.prompt('Branch name', default=default_branch_name)
branch = repo.create_head(branch_name, ref)
set_branch_option(repo, branch, 'merge', 'refs/heads/%s' % get_main_branch_name())
set_branch_option(repo, branch, 'remote', '.')
break
else:
click.confirm(
'Do you want to create a branch tracking %s ?'
% (reference or 'origin/%s' % get_main_branch_name()),
default=True,
abort=True,
)
branch_name = click.prompt('Branch name', default=default_branch_name)
branch = repo.create_head(branch_name, commit=reference or 'origin/%s' % get_main_branch_name())
set_branch_option(repo, branch, 'merge', 'refs/heads/%s' % get_main_branch_name())
set_branch_option(repo, branch, 'remote', '.')
if repo.head.reference == branch:
click.echo('Already on branch %s' % branch_name)
else:
branch.checkout()
click.echo('Moved to branch %s' % branch_name)
current_user = api.user.get('current')
if (not hasattr(issue, 'assigned_to') or issue.assigned_to.id != current_user.id) and click.confirm(
'Do you want to assign the issue to yourself ?',
default=not hasattr(issue, 'assigned_to') and issue.status.id == api.nouveau.id,
):
issue.assigned_to_id = current_user.id
issue.save()
if new and not cloned:
apply_attachments(repo, issue)
@issue.command()
@click.option('--issue', default=None, type=int)
def apply(issue):
issue = get_issue(issue)
repo = get_repo()
apply_attachments(repo, issue)
def get_comment(message, default=False):
if not click.confirm(message, default=default):
return
message = '\n' + MARKER
message = click.edit(message)
if message is not None:
message = message.split(MARKER, 1)[0].rstrip('\n')
return message
@issue.command()
@click.option('--issue', default=None, type=int)
def validate(issue):
api = get_redmine_api()
issue = get_issue(issue)
if issue.status.id != api.solution.id:
click.secho('Issue %s has no solution proposed.' % issue.id, fg='red')
return False
comment = get_comment('Do you want to add a comment to your validation?')
kwargs = {}
if comment:
kwargs['notes'] = comment
kwargs['status_id'] = api.solution_validee.id
api.issue.update(issue.id, **kwargs)
@issue.command()
@click.option('--issue', default=None, type=int)
def show(issue):
issue = get_issue(issue)
click.echo('URL: %s' % issue.url)
click.echo('Subject: %s' % issue.subject)
click.echo('Description: %s' % issue.description)
click.echo('')
journals = list(issue.journals)
if journals:
click.echo('Last note by %s: ' % journals[-1].user)
click.echo('%s' % journals[-1].notes)
@issue.command()
@click.option('--issue', default=None, type=int)
@click.option('--no-rebase', default=False, type=bool)
@click.argument('number_of_commits', default=0)
@click.pass_context
def submit(ctx, issue, no_rebase, number_of_commits):
'''Submit current patch from this issue branch to Redmine'''
if not no_rebase:
ctx.invoke(rebase)
issue = get_issue(issue)
patches = get_patches(number_of_commits)
message = '\n\n' + MARKER
for patch in patches:
message += '\n%s' % patch['filename']
message = click.edit(message)
if message is not None:
message = message.split(MARKER, 1)[0].rstrip('\n')
api = get_redmine_api()
kwargs = {}
repo = get_repo()
for commit in list(get_commits(repo, get_main_branch_name()))[:number_of_commits]:
if ('#%s' % get_current_issue()) not in commit.message:
click.echo(click.style('Missing commit number in commit message', fg='red'))
click.echo()
click.echo(commit.message)
raise click.Abort()
if click.confirm('Push -f this branch to origin ?', default=True):
origin = repo.remote()
for pi in origin.push(f=True):
if pi.flags & pi.ERROR:
click.echo(
click.style(
'Push from « %s » to « %s » failed.'
% (pi.local_ref.name, pi.remote_ref.name, pi.summary),
fg='red',
)
)
if click.confirm('Propose this patch as a solution ?', default=True):
current_user = api.user.get('current')
if not hasattr(issue, 'assigned_to'):
issue.assigned_to_id = current_user.id
issue.save()
elif issue.assigned_to.id != current_user.id:
if click.confirm(
'Issue is currently assigned to %s, do you want '
'to assign the issue to yourself ?' % issue.assigned_to.name
):
issue.assigned_to_id = current_user.id
issue.save()
kwargs['tracker_id'] = api.developpement.id
kwargs['status_id'] = api.solution.id
api.issue.update(
issue.id,
notes=message,
uploads=patches,
custom_fields=[{'id': api.rustine.id, 'value': '1'}],
**kwargs,
)
@issue.command()
@click.argument('issue', default=0, type=int)
def comment(issue):
'''Add a comment to the current issue or a chosen one'''
issue = get_issue(issue or None)
message = click.edit('')
api = get_redmine_api()
api.issue.update(issue.id, notes=message)
@issue.command()
@click.option('--reference', default=None)
@click.pass_context
def new(ctx, reference):
'''Create a new issue in the default project of this repository'''
project = get_current_project()
api = get_redmine_api()
subject_and_description = click.edit('Enter subject on first line\n\nand notes after.')
if not subject_and_description:
raise click.UsageError('No subject and description')
subject_and_description = subject_and_description.splitlines()
subject, description = subject_and_description[0], '\n'.join(subject_and_description[1:])
subject = subject.strip()
if not subject:
raise click.UsageError('No subject')
description = description.strip()
current_user = api.user.get('current')
click.echo('Project: %s' % project)
click.echo('Subject: %s' % subject)
click.echo('Description: %s' % description)
click.echo('Assigned to: %s' % current_user)
if click.confirm('Create issue ?', default=True):
issue = api.issue.create(
project_id=project.id,
subject=subject,
tracker_id=api.developpement.id,
description=description,
assigned_to_id=current_user.id,
)
click.echo('Created issue %s' % issue.url)
ctx.invoke(take, issue_number=issue.id, reference=reference)
@issue.command()
@click.argument('issue', type=int)
def link(issue):
'''Create link with another issue'''
api = get_redmine_api()
current_issue = get_issue()
try:
target_issue = api.issue.get(issue)
except Exception:
raise click.UsageError('Cannot find issue %s' % issue)
if len(api.issue_relation.filter(issue_id=current_issue.id, issue_to_id=target_issue.id)):
click.echo(click.style('Already linked.', fg='green'))
return
if click.confirm(
f'Link issue {current_issue.id } - « {current_issue.subject} » to {target_issue.id} - « {target_issue.subject} »',
default=False,
abort=True,
):
api.issue_relation.create(
issue_id=current_issue.id, issue_to_id=target_issue.id, relation_type='relates'
)
click.echo(click.style('Linked.', fg='green'))
class MyProgressPrinter(git.RemoteProgress):
def update(self, op_code, cur_count, max_count=None, message=''):
print(op_code, cur_count, max_count, cur_count / (max_count or 100.0), message or 'NO MESSAGE')
def get_commits(repo, ref):
yield from git.Commit.iter_items(repo, '%s..' % ref)
@redmine.command(name='merge-and-push')
@click.option('--issue', default=None, type=int)
@click.option('--validate-msg/--no-validate-msg', default=True)
@click.argument('target_branch', default=get_main_branch_name())
def merge_and_push(issue, validate_msg, target_branch):
issue = get_issue(issue or None)
repo = get_repo()
origin = repo.remote()
if repo.head.is_detached:
raise click.UsageError('Your cannot merge from a detached HEAD.')
if repo.is_dirty():
raise click.UsageError('Your cannot merge, your repo is dirty.')
current_head = repo.head.ref.name
if current_head == target_branch:
raise click.UsageError('Your cannot merge on %s as your are already on it.')
try:
repo.branches[target_branch]
except IndexError:
raise click.UsageError('%r is not a local branch.' % target_branch)
notes = '<pre>\n%s\n</pre>' % repo.git.log('%s..' % target_branch)
if validate_msg:
for commit in get_commits(repo, target_branch):
if ('#%s' % get_current_issue()) not in commit.message:
click.echo(click.style('Missing commit number in commit message', fg='red'))
click.echo()
click.echo(commit.message)
raise click.Abort()
if 'rebase' in commit.message:
click.echo()
click.echo('Commit %s contains the word "rebase" :' % commit.hexsha[:8])
click.echo()
click.echo(''.join([' ' + line for line in commit.message.splitlines()]))
click.echo()
click.confirm('Continue ?', abort=True)
try:
click.echo('Checking-out branch « %s » ... ' % target_branch, nl=False)
repo.branches[target_branch].checkout()
click.echo(click.style('Done.', fg='green'))
click.echo(
'Pull-rebasing from remote « %s » onto branch « %s » ... ' % (origin.name, target_branch),
nl=False,
)
failure = False
for pi in origin.pull(rebase=True):
if pi.flags & pi.ERROR:
failure = True
click.echo(
click.style('Pull-rebase from « %s » failed: %s.' % (pi.ref.name, pi.note), fg='red')
)
click.echo(click.style('Done.', fg='green'))
if failure:
raise click.ClickException('Pull rebase failed.')
finally:
click.echo('Checking-out branch « %s »... ' % current_head, nl=False)
repo.branches[current_head].checkout()
click.echo(click.style('Done.', fg='green'))
try:
click.echo('Rebasing branch « %s » onto branch « %s » ... ' % (current_head, target_branch), nl=False)
repo.git.rebase(target_branch)
except git.GitCommandError as e:
click.echo(click.style('command %r failed, aborting.' % e.command, fg='red'))
try:
repo.git.rebase(abort=True)
except git.GitCommandError as e:
click.echo(click.style('rebase abort failed, %s\n%s.' % (e.stdout, e.stderr), fg='red'))
raise click.Abort()
click.echo(click.style('Done.', fg='green'))
try:
click.echo('Checking-out to %s... ' % target_branch, nl=False)
repo.branches[target_branch].checkout()
click.echo(click.style('Done.', fg='green'))
click.echo('Merging branch « %s » into « %s » ... ' % (current_head, target_branch), nl=False)
try:
repo.git.merge(current_head, ff=True)
except git.GitCommandError as e:
click.echo(click.style('command %r failed, aborting.' % e.command, fg='red'))
try:
repo.git.merge(abort=True)
except git.GitCommandError as e:
click.echo(click.style('merge abort failed, %s\n%s.' % (e.stdout, e.stderr), fg='red'))
raise click.Abort()
click.echo(click.style('Done.', fg='green'))
try:
origin.refs[current_head]
except IndexError:
pass
else:
if click.confirm(
'Do you want to delete feature branch « %s » on remote « %s » ?'
% (current_head, origin.name),
default=True,
):
for pi in origin.push(refspec=':%s' % current_head):
if pi.flags & pi.ERROR:
click.echo(
click.style(
'Push from « %s » to « %s » failed.'
% (pi.local_ref.name, pi.remote_ref.name, pi.summary),
fg='red',
)
)
if click.confirm(
'Do you want to push « %s » on remote « %s » ?' % (target_branch, repo.remote().name),
default=True,
):
for pi in origin.push():
if pi.flags & pi.ERROR:
click.echo(
click.style(
'Push from « %s » to « %s » failed: %s.'
% (pi.local_ref.name, pi.remote_ref.name, pi.summary),
fg='red',
)
)
if click.confirm('Do you want to delete feature branch « %s » ?' % current_head, default=True):
repo.delete_head(repo.branches[current_head])
else:
repo.branches[current_head].checkout()
except Exception:
click.echo(click.style('\nFailure going back to branch « %s ».' % current_head, fg='red'))
repo.branches[current_head].checkout()
raise
if click.confirm('Set issue status to solved ?', default=True):
api = get_redmine_api()
current_user = api.user.get('current')
kwargs = {}
if not hasattr(issue, 'assigned_to'):
issue.assigned_to_id = current_user.id
issue.save()
elif issue.assigned_to.id != current_user.id:
if click.confirm(
'Issue is currently assigned to %s, do you want '
'to assign the issue to yourself ?' % issue.assigned_to.name
):
issue.assigned_to_id = current_user.id
issue.save()
kwargs['status_id'] = api.resolu_a_deployer.id
api.issue.update(issue.id, notes=notes, **kwargs)
@redmine.command(name='rebase')
@click.argument('target_branch', default=get_main_branch_name())
def rebase(target_branch):
repo = get_repo()
origin = repo.remote()
if repo.head.is_detached:
raise click.UsageError('Your cannot rebase from a detached HEAD.')
if repo.is_dirty():
raise click.UsageError('Your cannot rebase, your repo is dirty.')
current_head = repo.head.ref.name
if current_head == target_branch:
raise click.UsageError('Your cannot rebase on « %s » as your are already on it.' % target_branch)
try:
repo.branches[target_branch]
except IndexError:
raise click.UsageError('%r is not a local branch.' % target_branch)
try:
click.echo('Checking-out branch « %s » ... ' % target_branch, nl=False)
repo.branches[target_branch].checkout()
click.echo(click.style('Done.', fg='green'))
click.echo(
'Pull-rebasing from remote « %s » onto branch « %s » ... ' % (origin.name, target_branch),
nl=False,
)
failure = False
for pi in origin.pull(rebase=True):
if pi.flags & pi.ERROR:
failure = True
click.echo(
click.style('Pull-rebase from « %s » failed: %s.' % (pi.ref.name, pi.note), fg='red')
)
click.echo(click.style('Done.', fg='green'))
if failure:
raise click.ClickException('Pull rebase failed.')
finally:
click.echo('Checking-out branch « %s »... ' % current_head, nl=False)
repo.branches[current_head].checkout()
click.echo(click.style('Done.', fg='green'))
try:
click.echo('Rebasing branch « %s » onto branch « %s » ... ' % (current_head, target_branch), nl=False)
repo.git.rebase(target_branch)
except git.GitCommandError as e:
click.echo(click.style('command %r failed, aborting.' % e.command, fg='red'))
try:
repo.git.rebase(abort=True)
except git.GitCommandError as e:
click.echo(click.style('rebase abort failed, %s\n%s.' % (e.stdout, e.stderr), fg='red'))
raise click.Abort()
click.echo(click.style('Done.', fg='green'))
@issue.command(name='open')
@click.option('--issue', default=None, type=int)
def _open(issue):
issue = get_issue(issue)
subprocess.call(['xdg-open', issue.url])
@project.command()
@click.argument('project_id')
def set(project_id):
'''Set default redmine project for this git repository'''
api = get_redmine_api()
try:
api.project.get(project_id)
except Exception:
raise click.UsageError('Project %s is unknown' % project_id)
repo = get_repo()
config_writer = repo.config_writer()
if not config_writer.has_section('redmine'):
config_writer.add_section('redmine')
config_writer.set('redmine', 'project', project_id)
config_writer.write()
@redmine.command()
@click.option('--local/--no-local', default=False)
def clean(local):
repo = get_repo()
repo.git.fetch(prune=True)
api = get_redmine_api()
api.engine.chunk = 10
issue_re = re.compile(r'wip/([0-9]+)')
issues_to_branch = {}
origin = repo.remote()
for ref in origin.refs:
m = issue_re.search(ref.name)
if not m:
continue
issues_to_branch[m.group(1)] = ref.name.lstrip('origin/')
if not issues_to_branch:
return
issues_to_delete = []
temp_issues_to_branch = list(issues_to_branch)
while temp_issues_to_branch:
issue_ids = ','.join(temp_issues_to_branch[:30])
issues = api.issue.filter(issue_id=issue_ids, status_id='closed', include=['journals'])
for issue in issues:
journals = list(issue.journals)
last_journal = len(journals) and journals[len(journals) - 1]
if last_journal.created_on < datetime.now() - timedelta(days=21):
issues_to_delete.append((issue.id, issue.subject))
temp_issues_to_branch = temp_issues_to_branch[30:]
if not local:
for _id, _subject in issues_to_delete:
print('%5s | %60s | %s' % (_id, issues_to_branch[str(_id)][:60], _subject))
if issues_to_delete and click.confirm('Remove theses remote branches ?'):
for _id, _subject in issues_to_delete:
ref = issues_to_branch[str(_id)]
click.echo('Deleting origin/%s... ' % ref, nl=False)
try:
origin.push(refspec=':%s' % ref)
except git.GitCommandError as e:
click.echo(click.style('Failed(%s).' % e, fg='red'))
else:
click.echo(click.style('Done.', fg='green'))
issues_to_branch = {}
for ref in repo.branches:
m = issue_re.search(ref.name)
if not m:
continue
issues_to_branch[m.group(1)] = ref.name
issues = api.issue.filter(issue_id=','.join(issues_to_branch), status_id='closed')
issues_to_delete = list(issues.values_list('id', 'subject'))
for _id, _subject in issues_to_delete:
print('%5s | %60s | %s' % (_id, issues_to_branch[str(_id)][:60], _subject))
if issues_to_delete and click.confirm('Remove theses branches ?'):
for _id, _subject in issues_to_delete:
ref = issues_to_branch[str(_id)]
click.echo('Deleting %s... ' % ref, nl=False)
repo.delete_head(ref, force=True)
click.echo(click.style('Done.', fg='green'))
if __name__ == '__main__':
redmine()