#!/usr/bin/env python # -*- coding: utf-8 -*- # pip install --user python-redmine click from __future__ import print_function import os import re import git import unidecode import tempfile import glob import subprocess from requests.adapters import HTTPAdapter from redminelib import Redmine import click MARKER = '# Everything below is ignored\n' def slugify(title): title = unidecode.unidecode(title) title = re.sub('[^a-zA-Z]+', '-', title) return title def get_repo(): return git.Repo(search_parent_directories=True) def get_config(name, default=Ellipsis): repo = get_repo() reader = repo.config_reader() if not reader.has_section('redmine'): raise click.UsageError('Please add a redmine section to your git configuration') if not reader.has_option('redmine', name): if default is Ellipsis: raise click.UsageError('Please add redmine\'s %s' % name) else: return default return reader.get('redmine', name) def get_redmine_api(): url = get_config('url') key = get_config('key', None) username = get_config('username', None) password = get_config('password', None) if not key and (not username or not password): raise click.UsageError('Please add a redmine\'s key or username/password') if key: kwargs = dict(key=key) else: kwargs = dict(username=username, password=password) redmine = Redmine(url, **kwargs) redmine.engine.session.mount('http://', HTTPAdapter(max_retries=3)) redmine.engine.session.mount('https://', HTTPAdapter(max_retries=3)) redmine.rustine = [cf for cf in redmine.custom_field.all() if cf.name == u'Rustine proposée'][0] redmine.solution = [st for st in redmine.issue_status.all() if st.name == u'Solution proposée'][0] redmine.resolu_a_deployer = [st for st in redmine.issue_status.all() if st.name == u'Résolu (à déployer)'][0] return redmine def set_redmine(section, option, value): repo = get_repo() config_writer = repo.config_writer() if not config_writer.has_section(section): config_writer.add_section(section) config_writer.set(section, option, value) config_writer.write() def set_branch_option(branch, option, value): set_redmine('branch "%s"' % branch.name, option, value) def get_issue(issue_number=None): if not issue_number: issue_number = get_current_issue() api = get_redmine_api() try: issue = api.issue.get(issue_number) except Exception: raise click.UsageError( 'Cannot find issue %s' % issue_number) return issue def get_current_issue(): repo = get_repo() branch_name = repo.head.reference.name splitted = branch_name.rsplit('/', 1) issue_number = splitted[-1].split('-')[0] try: issue_number = int(issue_number) except Exception: raise click.UsageError( 'Cannot find an issue number in current branch name %s' % branch_name) return issue_number def get_current_project(): project_id = get_config('project', None) if not project_id: raise click.UsageError('No default project is set') api = get_redmine_api() return api.project.get(project_id) def get_patches(number_of_commits=0, ref=None): repo = get_repo() tempdir = tempfile.mkdtemp() if number_of_commits: ref = 'HEAD' + '~' * number_of_commits elif not ref: ref = '@{upstream}' repo.git.format_patch(ref, o=tempdir) def helper(): for path in glob.glob(os.path.join(tempdir, '*.patch')): yield { 'path': path, 'filename': os.path.basename(path), } return list(helper()) @click.group() def redmine(): '''Integrate git branch with redmine, you must configure your .config/git/config file with a [redmine] section and keys: url, key or username/password. ''' pass @redmine.command() def shell(): import IPython api = get_redmine_api() repo = get_repo() IPython.embed() @redmine.group() def issue(): pass @redmine.group(invoke_without_command=True) @click.pass_context def project(ctx): if ctx.invoked_subcommand is None: project = get_current_project() click.echo('Current project %s' % project) def apply_attachments(repo, issue): if not issue.attachments.total_count: return print('Currently attached patches') attachments = sorted(issue.attachments, key=lambda a: a.id) for i, attachment in enumerate(attachments): print(i, attachment.created_on, '%6d bytes' % attachment.filesize, attachment.filename) while True: indexes = click.prompt('Which patch would you like to apply (id separated by spaces) ?', type=str, default='') try: indexes = indexes.strip() if not indexes: break indexes = map(int, indexes.split()) if not all(i < len(attachments) for i in indexes): raise ValueError('invalid values', indexes) except Exception as e: print('error:', e) continue else: break for index in indexes: attachment = attachments[index] content = attachment.download().content try: p = repo.git.execute(['git', 'am'], istream=subprocess.PIPE, as_process=True) p.communicate(content) except Exception as e: print(e) print('Applying patch', index, attachment.filename, 'failed, please fix it.') break @issue.command() @click.argument('issue_number') def take(issue_number): '''Create or switch to a branch to fix an issue''' api = get_redmine_api() issue = api.issue.get(issue_number, include='attachments') repo = get_repo() new = False for head in repo.heads: if '/%s-' % issue_number in head.name: branch_name = head.name branch = head break else: new = True default_branch_name = 'wip/%s-%s' % (issue_number, slugify(issue.subject)[:32]) click.confirm('Do you want to create a branch tracking master ?', abort=True) branch_name = click.prompt('Branch name', default=default_branch_name) branch = repo.create_head(branch_name) set_branch_option(branch, 'merge', 'refs/heads/master') set_branch_option(branch, 'remote', '.') if repo.head.reference == branch: click.echo('Already on branch %s' % branch_name) else: branch.checkout() click.echo('Moved to branch %s' % branch_name) current_user = api.user.get('current') if ((not hasattr(issue, 'assigned_to') or issue.assigned_to.id != current_user.id) and click.confirm('Do you want to assign the issue to yourself ?')): issue.assigned_to_id = current_user.id issue.save() if new: apply_attachments(repo, issue) @issue.command() @click.option('--issue', default=None, type=int) def apply(issue): issue = get_issue(issue) repo = get_repo() apply_attachments(repo, issue) @issue.command() @click.option('--issue', default=None, type=int) def show(issue): issue = get_issue(issue) click.echo('URL: %s' % issue.url) click.echo('Subject: %s' % issue.subject) click.echo('Description: %s' % issue.description) click.echo('') journals = list(issue.journals) if journals: click.echo('Last note by %s: ' % journals[-1].user) click.echo('%s' % journals[-1].notes) @issue.command() @click.option('--issue', default=None, type=int) @click.argument('number_of_commits', default=0) @click.pass_context def submit(ctx, issue, number_of_commits): '''Submit current patch from this issue branch to Redmine''' ctx.invoke(rebase) issue = get_issue(issue) patches = get_patches(number_of_commits) message = '\n\n' + MARKER for patch in patches: message += '\n%s' % patch['filename'] message = click.edit(message) if message is not None: message = message.split(MARKER, 1)[0].rstrip('\n') api = get_redmine_api() kwargs = {} if click.confirm('Push -f this branch to origin ?', default=True): repo = get_repo() origin = repo.remote() for pi in origin.push(f=True): if pi.flags & pi.ERROR: click.echo(click.style(u'Push from « %s » to « %s » failed.' % ( pi.local_ref.name, pi.remote_ref.name, pi.summary), fg='red')) if click.confirm('Propose this patch as a solution ?'): current_user = api.user.get('current') if not hasattr(issue, 'assigned_to'): issue.assigned_to_id = current_user.id issue.save() elif issue.assigned_to.id != current_user.id: if click.confirm('Issue is currently assigned to %s, do you want ' 'to assign the issue to yourself ?' % issue.assigned_to.name): issue.assigned_to_id = current_user.id issue.save() kwargs['status_id'] = api.solution.id api.issue.update(issue.id, notes=message, uploads=patches, custom_fields=[{'id': api.rustine.id, 'value': u'1'}], **kwargs) @issue.command() @click.argument('issue', default=0, type=int) def comment(issue): '''Add a comment to the current issue or a chosen one''' issue = get_issue(issue or None) message = click.edit('') api = get_redmine_api() api.issue.update(issue.id, notes=message) @issue.command() @click.pass_context def new(ctx): '''Create a new issue in the default project of this repository''' project = get_current_project() api = get_redmine_api() subject_and_description = click.edit('Enter subject on first line\n\nand notes after.') \ .splitlines() subject, description = subject_and_description[0], '\n'.join(subject_and_description[1:]) subject = subject.strip() description = description.strip() current_user = api.user.get('current') click.echo('Project: %s' % project) click.echo('Subject: %s' % subject) click.echo('Description: %s' % description) click.echo('Assigned to: %s' % current_user) if click.confirm('Create issue ?'): issue = api.issue.create( project_id=project.id, subject=subject, description=description, assigned_to_id=current_user.id) click.echo('Created issue %s' % issue.url) ctx.invoke(take, issue_number=issue.id) class MyProgressPrinter(git.RemoteProgress): def update(self, op_code, cur_count, max_count=None, message=''): print(op_code, cur_count, max_count, cur_count / (max_count or 100.0), message or "NO MESSAGE") def get_commits(repo, ref): for commit in git.Commit.iter_items(repo, '%s..' % ref): yield commit @redmine.command(name='merge-and-push') @click.option('--issue', default=None, type=int) @click.argument('target_branch', default='master') def merge_and_push(issue, target_branch): issue = get_issue(issue or None) repo = get_repo() origin = repo.remote() if repo.head.is_detached: raise click.UsageError('Your cannot merge from a detached HEAD.') if repo.is_dirty(): raise click.UsageError('Your cannot merge, your repo is dirty.') current_head = repo.head.ref.name if current_head == target_branch: raise click.UsageError('Your cannot merge on %s as your are already on it.') try: repo.branches[target_branch] except IndexError: raise click.UsageError('%r is not a local branch.' % target_branch) notes = '
%s
' % repo.git.log('%s..' % target_branch) click.echo(notes) for commit in get_commits(repo, target_branch): if 'rebase' in commit.message: click.echo() click.echo('Commit %s contains the word "rebase" :' % commit.hexsha[:8]) click.echo() click.echo(''.join([' ' + line for line in commit.message.splitlines()])) click.echo() click.confirm('Continue ?', abort=True) try: click.echo(u'Checking-out branch « %s » ... ' % target_branch, nl=False) repo.branches[target_branch].checkout() click.echo(click.style('Done.', fg='green')) click.echo(u'Pull-rebasing from remote « %s » onto branch « %s » ... ' % (origin.name, target_branch), nl=False) failure = False for pi in origin.pull(rebase=True): if pi.flags & pi.ERROR: failure = True click.echo(click.style(u'Pull-rebase from « %s » failed: %s.' % ( pi.ref.name, pi.note), fg='red')) click.echo(click.style('Done.', fg='green')) if failure: raise click.ClickException('Pull rebase failed.') finally: click.echo(u'Checking-out branch « %s »... ' % current_head, nl=False) repo.branches[current_head].checkout() click.echo(click.style('Done.', fg='green')) try: click.echo(u'Rebasing branch « %s » onto branch « %s » ... ' % (current_head, target_branch), nl=False) repo.git.rebase(target_branch) except git.GitCommandError as e: click.echo(click.style('command %r failed, aborting.' % e.command, fg='red')) try: repo.git.rebase(abort=True) except git.GitCommandError as e: click.echo(click.style('rebase abort failed, %s\n%s.' % (e.stdout, e.stderr), fg='red')) raise click.Abort() click.echo(click.style('Done.', fg='green')) try: click.echo('Checking-out to %s... ' % target_branch, nl=False) repo.branches[target_branch].checkout() click.echo(click.style('Done.', fg='green')) click.echo(u'Merging branch « %s » into « %s » ... ' % (current_head, target_branch), nl=False) try: repo.git.merge(current_head, ff=True) except git.GitCommandError as e: click.echo(click.style('command %r failed, aborting.' % e.command, fg='red')) try: repo.git.merge(abort=True) except git.GitCommandError as e: click.echo(click.style('merge abort failed, %s\n%s.' % (e.stdout, e.stderr), fg='red')) raise click.Abort() click.echo(click.style('Done.', fg='green')) try: origin.refs[current_head] except IndexError: pass else: if click.confirm(u'Do you want to delete feature branch « %s » on remote « %s » ?' % ( current_head, origin.name)): for pi in origin.push(refspec=':%s' % current_head): if pi.flags & pi.ERROR: click.echo(click.style(u'Push from « %s » to « %s » failed.' % ( pi.local_ref.name, pi.remote_ref.name, pi.summary), fg='red')) if click.confirm(u'Do you want to push « %s » on remote « %s » ?' % (target_branch, repo.remote().name)): for pi in origin.push(): if pi.flags & pi.ERROR: click.echo(click.style(u'Push from « %s » to « %s » failed: %s.' % ( pi.local_ref.name, pi.remote_ref.name, pi.summary), fg='red')) if click.confirm(u'Do you want to delete feature branch « %s » ?' % current_head): repo.delete_head(repo.branches[current_head]) else: repo.branches[current_head].checkout() except Exception: click.echo(click.style(u'\nFailure going back to branch « %s ».' % current_head, fg='red')) repo.branches[current_head].checkout() raise if click.confirm('Set issue status to solved ?', default=True): api = get_redmine_api() current_user = api.user.get('current') kwargs = {} if not hasattr(issue, 'assigned_to'): issue.assigned_to_id = current_user.id issue.save() elif issue.assigned_to.id != current_user.id: if click.confirm('Issue is currently assigned to %s, do you want ' 'to assign the issue to yourself ?' % issue.assigned_to.name): issue.assigned_to_id = current_user.id issue.save() kwargs['status_id'] = api.resolu_a_deployer.id api.issue.update(issue.id, notes=notes, **kwargs) @redmine.command(name='rebase') @click.argument('target_branch', default='master') def rebase(target_branch): repo = get_repo() origin = repo.remote() if repo.head.is_detached: raise click.UsageError('Your cannot rebase from a detached HEAD.') if repo.is_dirty(): raise click.UsageError('Your cannot rebase, your repo is dirty.') current_head = repo.head.ref.name if current_head == target_branch: raise click.UsageError(u'Your cannot rebase on « %s » as your are already on it.' % target_branch) try: repo.branches[target_branch] except IndexError: raise click.UsageError('%r is not a local branch.' % target_branch) try: click.echo(u'Checking-out branch « %s » ... ' % target_branch, nl=False) repo.branches[target_branch].checkout() click.echo(click.style('Done.', fg='green')) click.echo(u'Pull-rebasing from remote « %s » onto branch « %s » ... ' % (origin.name, target_branch), nl=False) failure = False for pi in origin.pull(rebase=True): if pi.flags & pi.ERROR: failure = True click.echo(click.style(u'Pull-rebase from « %s » failed: %s.' % ( pi.ref.name, pi.note), fg='red')) click.echo(click.style('Done.', fg='green')) if failure: raise click.ClickException('Pull rebase failed.') finally: click.echo(u'Checking-out branch « %s »... ' % current_head, nl=False) repo.branches[current_head].checkout() click.echo(click.style('Done.', fg='green')) try: click.echo(u'Rebasing branch « %s » onto branch « %s » ... ' % (current_head, target_branch), nl=False) repo.git.rebase(target_branch) except git.GitCommandError as e: click.echo(click.style('command %r failed, aborting.' % e.command, fg='red')) try: repo.git.rebase(abort=True) except git.GitCommandError as e: click.echo(click.style('rebase abort failed, %s\n%s.' % (e.stdout, e.stderr), fg='red')) raise click.Abort() click.echo(click.style('Done.', fg='green')) @issue.command(name='open') @click.option('--issue', default=None, type=int) def _open(issue): issue = get_issue(issue) subprocess.call(['xdg-open', issue.url]) @project.command() @click.argument('project_id') def set(project_id): '''Set default redmine project for this git repository''' api = get_redmine_api() try: api.project.get(project_id) except Exception: raise click.UsageError('Project %s is unknown' % project_id) repo = get_repo() config_writer = repo.config_writer() if not config_writer.has_section('redmine'): config_writer.add_section('redmine') config_writer.set('redmine', 'project', project_id) config_writer.write() if __name__ == '__main__': redmine()