eoptasks/eoptasks.py

487 lines
18 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/python3
#
# eoptasks - run commands on servers
# Copyright (C) 2018-2021 Entrouvert
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import configparser
import copy
import curses
import json
import os
import random
import re
import socket
import subprocess
import sys
import time
import libtmux
import yaml
class Server:
def __init__(self, servername, tags=[], display_name=''):
self.name = servername
self.display_name = display_name or self.name
self.keywords = set(re.split(r'[-_ \.]', servername))
for tag in tags:
self.keywords.add(tag)
# add all possible hostname parts as keywords,
# ex: node1.dev.entrouvert.org will add:
# node1.dev, node1.dev.entrouvert, node1.dev.entrouvert.org,
# dev.entrouvert, dev.entrouvert.org, entrouvert.org
parts = servername.split('.')
for i in range(len(parts) - 1):
for j in range(i, len(parts)):
if i != j:
self.keywords.add('.'.join(parts[i : j + 1]))
if i == 0:
# add first component without trailing digits, this allows
# matching db1.prod.saas.entrouvert.org with the db
# keyword.
self.keywords.add(re.sub(r'\d+$', '', parts[0]))
def __repr__(self):
return '<Server %s %r>' % (self.name, self.keywords)
def get_config():
config = configparser.ConfigParser()
config.read(os.path.join(os.path.expanduser('~/.config/eoptasks.ini')))
return config
def get_servers():
servers = []
config = get_config()
serversfile = config.get('config', 'servers', fallback=None)
if serversfile is None:
print(
'You need to create ~/.config/eoptasks.ini with such a content:\n'
'\n'
' [config]\n'
' servers = /home/user/src/puppet/data/servers.yaml\n'
)
sys.exit(1)
ignorelist = [x.strip() for x in config.get('config', 'ignore', fallback='').split(',')]
stripsuffixes = [x.strip() for x in config.get('config', 'stripsuffix', fallback='').split(',')]
def get_display_name(x):
for stripsuffix in stripsuffixes:
if stripsuffix and x.endswith(stripsuffix):
return x[: -len(stripsuffix)]
return x
# load servers from servers.yaml
for s in yaml.safe_load(open(serversfile))['servers']:
servername, tags = s.get('name'), s.get('tags', [])
if servername in ignorelist:
continue
servers.append(Server(servername, tags, display_name=get_display_name(servername)))
# load additional servers from eoptasks.ini
if config.has_section('servers'):
for server in config.options('servers'):
servername = server
tags = [x.strip() for x in config.get('servers', server).split(',')]
servers.append(Server(servername, tags, display_name=get_display_name(servername)))
for server in servers:
if config.has_section('server:%s' % server.name):
# server.key_automation will be a list
# [content to expect, list of keys to send]
server.key_automation = []
for key, value in sorted(config.items('server:%s' % server.name)):
if key.startswith('expect'):
server.key_automation.append([value, []])
elif key.startswith('send'):
server.key_automation[-1][1].append(value)
return servers
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--list-servers', action='store_true')
parser.add_argument('--session-name', dest='session_name', type=str)
parser.add_argument('--status-window', action='store_true')
parser.add_argument('--command-window', action='store_true')
parser.add_argument('--command-server-name', dest='command_server_name', type=str)
parser.add_argument('--noinput', dest='noinput', action='store_true')
parser.add_argument('--list-commands', dest='list_commands', action='store_true')
parser.add_argument('-k', dest='keywords', type=str)
parser.add_argument('-x', dest='exclude', type=str)
parser.add_argument('cmd', type=str, nargs='?', default=None)
parser.add_argument('args', nargs=argparse.REMAINDER)
args = parser.parse_args()
return args
def filter_servers(servers, args):
selected_servers = []
if args.keywords:
config = get_config()
cmd_keywords = args.keywords
if config.has_section('keywords'):
cmd_keywords = config['keywords'].get(args.keywords, args.keywords)
for keyword in cmd_keywords.split(','):
keywords = set(keyword.split('/'))
selected_servers.extend(
[x for x in servers if keywords.issubset(x.keywords) and not x in selected_servers]
)
for keyword in cmd_keywords.split(','):
if keyword.startswith('!') or keyword.startswith('-'):
selected_servers = [x for x in selected_servers if keyword[1:] not in x.keywords]
else:
selected_servers = servers
if args.exclude:
for exclude in args.exclude.split(','):
regex = re.compile(exclude)
selected_servers = [x for x in selected_servers if not regex.match(x.name)]
return selected_servers
def status_window(args):
session_name = args.session_name
curses.setupterm()
window = curses.initscr()
window.addstr(0, 0, 'eoptasks', curses.A_STANDOUT)
window.addstr(0, 10, '🙂')
curses.curs_set(0)
window.refresh()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_address = '/tmp/.eoptasks.%s' % session_name
sock.bind(server_address)
sock.listen(1)
e = None
servers_results = {}
while True:
connection, client_address = sock.accept()
try:
json_msg = b''
while True:
data = connection.recv(5000)
if not data:
break
json_msg += data
msg = json.loads(json_msg.decode('utf-8'))
finally:
connection.close()
if msg.get('@type') == 'servers-info':
servers_info = msg['info']
elif msg.get('@type') == 'server-result':
servers_results.update(msg['info'])
try:
height, width = window.getmaxyx()
max_length = max([len(x['display_name']) for x in servers_info.values()]) + 4
nb_columns = (width - 4) // max_length
for i, server_name in enumerate(servers_info):
y = 2 + (i // nb_columns)
x = 1 + (width // nb_columns) * (i % nb_columns)
window.addstr(y, x + 3, servers_info[server_name]['display_name'])
status_icon = {
'running': '',
'done': '🆗',
}.get(servers_info[server_name]['status'], '💤')
if servers_results.get(server_name) == 'error':
status_icon = ''
window.addstr(y, x, status_icon)
if y > height - 4:
break
window.refresh()
total_servers = len(servers_info.keys())
running_servers = len([x for x in servers_info.values() if x['status'] == 'running'])
done_servers = len([x for x in servers_info.values() if x['status'] == 'done'])
if total_servers == done_servers:
break
except Exception as e:
window.addstr(0, 10, '😡 %r' % e)
window.refresh()
os.unlink(server_address)
window.addstr(0, 10, '😎')
window.refresh()
time.sleep(5)
def send_status_message(tmux_session_name, msg):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_address = '/tmp/.eoptasks.%s' % tmux_session_name
try:
sock.connect(server_address)
except OSError as e:
return
sock.sendall(json.dumps(msg).encode('utf-8'))
sock.close()
def get_commands():
commands = {
'apt.update': 'sudo apt update',
'apt.upgrade': 'sudo apt update && sudo apt full-upgrade -y -o Dpkg::Options::="--force-confold"',
# collectstatic is useful after an upgrade of gadjo.
'collectstatic': '''sudo -u authentic-multitenant authentic2-multitenant-manage collectstatic --noinput;
sudo -u bijoe bijoe-manage collectstatic --noinput;
sudo -u chrono chrono-manage collectstatic --noinput;
sudo -u combo combo-manage collectstatic --noinput;
sudo -u fargo fargo-manage collectstatic --noinput;
sudo -u hobo hobo-manage collectstatic --noinput;
sudo -u lingo lingo-manage collectstatic --noinput;
sudo -u passerelle passerelle-manage collectstatic --noinput;
sudo -u welco welco-manage collectstatic --noinput;
sudo -u wcs wcs-manage collectstatic;
/bin/true'''.replace(
'\n', ''
),
# combo.reload is useful to get a new {% start_timestamp %} after an
# upgrade of publik-base-theme.
'combo.reload': '''sudo service combo reload; /bin/true''',
# hobo-agent.restart is the fastest way to get the number of threads
# used by celery under control :/
'hobo-agent.restart': '''test -e /etc/hobo-agent/settings.py && sudo supervisorctl reread && sudo supervisorctl restart hobo-agent''',
# memcached.restart is useful to force shared theme to be updated.
'memcached.restart': '''sudo service memcached restart; /bin/true''',
'restart.all': '''sudo systemctl restart authentic2-multitenant bijoe chrono combo docbow fargo hobo lingo passerelle wcs welco; /bin/true''',
'reload.all': '''sudo systemctl reload authentic2-multitenant bijoe chrono combo docbow fargo hobo lingo passerelle wcs welco; /bin/true''',
'passerelle.restart': '''sudo systemctl restart passerelle; /bin/true''',
'wcs.restart': '''sudo systemctl restart wcs; /bin/true''',
# puppet.update, unfortunately without proper error checking.
'puppet.update': '''sudo puppet agent -t || true''',
}
config = get_config()
for section in config.sections():
if section.startswith('command:'):
commands[section[len('command:') :]] = config.get(section, 'cmd')
return commands
def command_window(args):
tmux_session_name = args.session_name
commands = get_commands()
if args.cmd in commands:
cmd = commands[args.cmd]
else:
cmd = args.cmd
if args.args:
cmd += ' ' + ' '.join(['"%s"' % x for x in args.args])
orig_cmd = cmd
config = get_config()
environ = {}
if config.has_section('server:%s' % args.command_server_name):
new_environ = {}
for key, value in config.items('server:%s' % args.command_server_name):
if key.startswith('env'):
new_environ[value.split('=', 1)[0]] = value.split('=', 1)[1]
if new_environ:
environ = copy.copy(os.environ)
environ.update(new_environ)
call_kwargs = {}
if environ:
call_kwargs['env'] = environ
while True:
# -t: force a tty for interactive commands.
rc = subprocess.call(['ssh', '-t', args.command_server_name] + [cmd], **call_kwargs)
if rc == 0:
send_status_message(
tmux_session_name, {'@type': 'server-result', 'info': {args.command_server_name: 'success'}}
)
break
send_status_message(
tmux_session_name, {'@type': 'server-result', 'info': {args.command_server_name: 'error'}}
)
if args.noinput:
break
choice = None
while choice not in ['r', 's', 'q']:
choice = input('[R]etry, [S]hell, [Q]uit --> ').lower()
if choice == 'r':
cmd = orig_cmd
elif choice == 's':
cmd = 'bash'
elif choice == 'q':
break
args = parse_args()
if args.list_commands:
commands = get_commands()
for command in sorted(commands):
print(command)
sys.exit(0)
if args.status_window:
status_window(args)
sys.exit(0)
if args.command_window:
command_window(args)
sys.exit(0)
servers = get_servers()
selected_servers = filter_servers(servers, args)
selected_servers = sorted(selected_servers, key=lambda x: x.name)
if args.list_servers:
for server in selected_servers:
print(server.name)
sys.exit(0)
if not selected_servers:
sys.stderr.write('No matching servers\n')
sys.exit(1)
if not args.cmd:
sys.stderr.write('Missing command\n')
sys.exit(1)
def init_tmux_session():
if os.environ.get('TMUX'): # already in a tmux
sys.stderr.write('Cannot run embedded in tmux\n')
sys.exit(1)
tmux_session_name = 's%s' % random.randrange(1000)
server_address = '/tmp/.eoptasks.%s' % tmux_session_name
try:
os.unlink(server_address)
except OSError:
pass
os.environ['SHELL'] = '/bin/sh'
os.system(
'tmux new-session -s %s -n 🌑 -d %s --status-window --session-name %s'
% (tmux_session_name, sys.argv[0], tmux_session_name)
)
return tmux_session_name
tmux_session_name = init_tmux_session()
pid = os.fork()
if pid:
os.system('tmux attach-session -t %s' % tmux_session_name)
else:
def cluster_name(server_name):
cluster_name = re.sub(r'\d', '', server_name)
for location in ('rbx', 'gra', 'sbg'):
cluster_name = cluster_name.replace('.%s.' % location, '.loc')
return cluster_name
tmux = libtmux.Server()
try:
session = tmux.sessions.get(session_name=tmux_session_name)
except AttributeError:
session = tmux.find_where({'session_name': tmux_session_name})
status_window = session.attached_window
all_servers = selected_servers[:]
total_number = len(selected_servers)
servers_info = {}
for server in selected_servers:
servers_info[server.name] = {'status': '', 'display_name': server.display_name}
def send_status():
current_windows = [x.name for x in session.windows]
for server in all_servers:
server_info = servers_info[server.name]
if server.name in current_windows:
server_info['status'] = 'running'
elif server_info['status'] == 'running':
server_info['status'] = 'done'
send_status_message(tmux_session_name, {'@type': 'servers-info', 'info': servers_info})
def handle_expects():
for win in session.windows:
try:
server = [x for x in all_servers if x.name == win.name][0]
except IndexError:
continue
if not getattr(server, 'key_automation', None):
continue
try:
current_content = win.cmd('capture-pane', '-p').stdout[-1]
except IndexError:
pass
else:
if current_content.endswith(server.key_automation[0][0]):
for keys in server.key_automation[0][1]:
# pane.send_keys sends a leading space character and
# that makes it unusuable to send a password (for
# example).
try:
win.panes[0].cmd('send-keys', keys + '\n')
except IndexError:
continue
time.sleep(0.1)
# remove played automation
server.key_automation = server.key_automation[1:]
while selected_servers:
current_clusters = [cluster_name(x.name) for x in session.windows]
for server in selected_servers[:]:
if cluster_name(server.name) in current_clusters:
continue
selected_servers.remove(server)
window_cmd = '%s --session-name %s --command-window --command-server-name %s %s "%s" %s' % (
sys.argv[0],
tmux_session_name,
server.name,
'--noinput' if args.noinput else '',
args.cmd,
' '.join(['"%s"' % x for x in args.args]),
)
session.new_window(attach=False, window_name=server.name, window_shell=window_cmd)
break
else:
time.sleep(0.1)
while len(session.windows) > 10:
send_status()
handle_expects()
time.sleep(0.1)
send_status()
handle_expects()
percentage = (total_number - len(selected_servers)) / total_number
if percentage == 1:
status_window.rename_window('🌕')
elif percentage >= 0.75:
status_window.rename_window('🌔')
elif percentage >= 0.5:
status_window.rename_window('🌓')
elif percentage >= 0.25:
status_window.rename_window('🌒')
while len(session.windows) > 1:
send_status()
handle_expects()
time.sleep(0.1)
status_window.rename_window('🌕')
send_status()