Source code for controller.commands.password

"""
Manage services passwords
"""

import re
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Optional, cast

import typer
from zxcvbn import zxcvbn  # type: ignore

from controller import PLACEHOLDER, PROJECTRC, REGISTRY, log, print_and_exit
from controller.app import Application, Configuration
from controller.commands import PASSWORD_MODULES
from controller.deploy.docker import Docker
from controller.templating import Templating, get_strong_password
from controller.utilities.tables import print_table

UPDATE_LABEL = "updated on"


# Enum() expects a string, tuple, list or dict literal as the second argument
# https://github.com/python/mypy/issues/5317
SupportedServices = Enum(  # type: ignore
    "SupportedServices", {name: name for name in sorted(PASSWORD_MODULES.keys())}
)


# Note: can't directly extract yaml with comments because it is not supported
# https://github.com/yaml/pyyaml/issues/90
[docs] def parse_projectrc() -> dict[str, datetime]: if not PROJECTRC.exists(): return {} updates: dict[str, datetime] = {} with open(PROJECTRC) as f: lines = f.readlines() env_seen = False for line in lines: line = line.strip() # Skip empty lines if not line: continue # Skip everything before the env block if line == "env:": env_seen = True continue elif not env_seen: continue if UPDATE_LABEL in line: variable = line.split(":")[0] m = re.search( # This is is expected to be a date yyyy-mm-dd # this reg exp will start to fail starting from 1st Jan 2100 :-D rf".*{UPDATE_LABEL} (20[0-9][0-9]-[0-1][0-9]-[0-3][0-9])$", line, ) if m: updates[variable] = datetime.strptime(m.group(1), "%Y-%m-%d") return updates
[docs] def get_projectrc_variables_indentation(projectrc: list[str]) -> int: env_indentation = 0 for line in projectrc: # save the indentation level of the env block # it will be used to determine the variables indentation # if no further lines will be found if line.strip().startswith("env:"): env_indentation = line.index("env:") continue # Skip every line before the env block if env_indentation == 0: continue m = re.search(r"^(\s+).*", line) if m: blanks = len(m.group(1)) # Skip any lines after env with an indentation lower than env itself # (for example a blank line or any other wrong-indented line) if blanks < env_indentation: continue return blanks # if reached this point it means that after the env block no further non-empty lines # have been found, so return the env level by adding 1 indentation level # Add 1 indentation level return int(3 * env_indentation / 2)
# Note: can't directly use utilities in app.py because in this case we want to # maintain all values (not only templated variables) and we also want to keep comments
[docs] def update_projectrc(variables: dict[str, str]) -> None: today = date.today().strftime("%Y-%m-%d") annotation = f"# {UPDATE_LABEL} {today}" with open(PROJECTRC) as f: lines = f.readlines() append_additional_lines: list[str] = [] blanks = get_projectrc_variables_indentation(lines) if blanks == 0: # pragma: no cover print_and_exit("Malformed .projectrc file, can't find an env block") pref = " " * blanks for variable, value in variables.items(): for index, line in enumerate(lines): # If the variable is found in .projectrc, let's update it if line.strip().startswith(variable): lines[index] = f'{pref}{variable}: "{value}" {annotation}\n' break # if the variable is not found in .projectrc, let's append as additional else: append_additional_lines.append( f'{pref}{variable}: "{value}" {annotation}\n' ) templating = Templating() templating.make_backup(PROJECTRC) with open(PROJECTRC, "w") as f: last_line = "" for line in lines + append_additional_lines: last_line = line f.write(line) if not line.endswith("\n"): f.write("\n") # If last line is not an empty line, let's add a newline at the end of file if last_line.strip(): f.write("\n") # Write again the .env file Application.get_controller().load_projectrc() Application.get_controller().read_specs(read_extended=True) Application.get_controller().make_env()
[docs] def get_expired_passwords() -> list[tuple[str, datetime]]: expired_passwords: list[tuple[str, datetime]] = [] last_updates = parse_projectrc() now = datetime.now() PASSWORD_EXPIRATION = int( Application.env.get("PASSWORD_EXPIRATION_WARNING") or "180" ) if PASSWORD_EXPIRATION == 0: return expired_passwords for s in PASSWORD_MODULES: # This should never happens and can't be (easily) tested if s not in Application.data.base_services: # pragma: no cover print_and_exit("Command misconfiguration, unknown {} service", s) if s != REGISTRY and s not in Application.data.active_services: continue if s == REGISTRY and not Configuration.swarm_mode: continue module = PASSWORD_MODULES.get(s) if not module: # pragma: no cover print_and_exit(f"{s} misconfiguration, module not found") for variable in module.PASSWORD_VARIABLES: if variable in last_updates: change_date = last_updates.get(variable, datetime.fromtimestamp(0)) expiration_date = change_date + timedelta(days=PASSWORD_EXPIRATION) if now > expiration_date: expired_passwords.append( ( variable, expiration_date, ) ) return expired_passwords
[docs] @Application.app.command(help="Manage services passwords") def password( service: SupportedServices = typer.Argument(None, help="Service name"), show: bool = typer.Option( False, "--show", help="Show the current password(s)", show_default=False, ), random: bool = typer.Option( False, "--random", help="Generate a random password", show_default=False, ), new_password: str = typer.Option( None, "--password", help="Force the given password", show_default=False, ), ) -> None: Application.print_command( Application.serialize_parameter("--show", show, IF=show), Application.serialize_parameter("--random", random, IF=random), Application.serialize_parameter("--password", new_password, IF=new_password), Application.serialize_parameter("", service), ) Application.get_controller().controller_init() # No service specified, only a summary will be reported if not service: if random: print_and_exit("--random flag is not supported without a service") if new_password: print_and_exit("--password option is not supported without a service") MIN_PASSWORD_SCORE = int( Application.env.get("MIN_PASSWORD_SCORE", 2) # type: ignore ) last_updates = parse_projectrc() now = datetime.now() table: list[list[str]] = [] for s in PASSWORD_MODULES: # This should never happens and can't be (easily) tested if s not in Application.data.base_services: # pragma: no cover print_and_exit("Command misconfiguration, unknown {} service", s) if s != REGISTRY and s not in Application.data.active_services: continue if s == REGISTRY and not Configuration.swarm_mode: continue module = PASSWORD_MODULES.get(s) if not module: # pragma: no cover print_and_exit(f"{s} misconfiguration, module not found") PASSWORD_EXPIRATION = int( Application.env.get("PASSWORD_EXPIRATION_WARNING") or "180" ) for variable in module.PASSWORD_VARIABLES: password = Application.env.get(variable) if password == PLACEHOLDER: score = None else: result = zxcvbn(password) score = result["score"] if variable not in last_updates: expired = True last_change = "N/A" else: change_date = last_updates.get(variable, datetime.fromtimestamp(0)) last_change = change_date.strftime("%Y-%m-%d") if PASSWORD_EXPIRATION == 0: expired = False else: expiration_date = change_date + timedelta( days=PASSWORD_EXPIRATION ) expired = now > expiration_date pass_line: list[str] = [] pass_line.append(s) pass_line.append(variable) if expired: pass_line.append(f"[bold red]{last_change}[/bold red]") else: pass_line.append(f"[bold green]{last_change}[/bold green]") if score is None: pass_line.append("[bold red]NOT SET[/bold red]") elif score < MIN_PASSWORD_SCORE: pass_line.append(f"[bold red]{score}[/bold red]") else: pass_line.append(f"[bold green]{score}[/bold green]") if show: pass_line.append(str(password)) table.append(pass_line) headers = ["SERVICE", "VARIABLE", "LAST CHANGE", "STRENGTH"] if show: headers.append("PASSWORD") print_table(headers, table, table_title="Current passwords for active services") # In this case a service is asked to be updated else: module = PASSWORD_MODULES.get(service.value) if not module: # pragma: no cover print_and_exit(f"{service.value} misconfiguration, module not found") if random: new_password = get_strong_password() elif not new_password: print_and_exit("Please specify one between --random and --password options") docker = Docker() variables = module.PASSWORD_VARIABLES old_password = Application.env.get(variables[0]) new_variables = {variable: new_password for variable in variables} # Some services can only be updated if already running, # others can be updated even if offline, # but in every case if the stack is running it has to be restarted if service.value == REGISTRY: is_running = docker.registry.ping(do_exit=False) container: Optional[tuple[str, str]] = ("registry", "") else: container = docker.get_container(service.value) is_running = container is not None is_running_needed = module.IS_RUNNING_NEEDED log.info("Changing password for {}...", service.value) if is_running_needed and (not is_running or not container): print_and_exit( "Can't update {} because it is not running. Please start your stack", service.value, ) update_projectrc(new_variables) if container: module.password(container, old_password, new_password) if is_running: log.info("{} was running, restarting services...", service.value) Application.get_controller().check_placeholders_and_passwords( Application.data.compose_config, Application.data.services ) if service.value == REGISTRY: port = cast(int, Application.env["REGISTRY_PORT"]) docker.client.container.remove(REGISTRY, force=True) docker.compose.create_volatile_container( REGISTRY, detach=True, publish=[(port, port)] ) elif Configuration.swarm_mode: docker.compose.dump_config(Application.data.services) docker.swarm.deploy() else: docker.compose.start_containers(Application.data.services) else: log.info("{} was not running, restart is not needed", service.value) log.info( "The password of {} has been changed. " "Please find the new password into your .projectrc file as {} variable", service.value, variables[0], )