Source code for controller.app

"""
Main Application and Configuration module
"""

import enum
import json
import os
import shutil
import sys
import time
import warnings
from collections.abc import Iterable
from pathlib import Path
from typing import Any, Optional, TypedDict, Union, cast

import click
import requests
import typer
from git import Repo as GitRepo
from packaging.version import Version
from python_on_whales import docker
from python_on_whales.utils import DockerException
from zxcvbn import zxcvbn  # type: ignore

from controller import (
    COMPOSE_ENVIRONMENT_FILE,
    CONFS_DIR,
    CONTAINERS_YAML_DIRNAME,
    DATA_DIR,
    DATAFILE,
    EXTENDED_PROJECT_DISABLED,
    PLACEHOLDER,
    PROJECT_DIR,
    PROJECTRC,
    RED,
    REGISTRY,
    SUBMODULES_DIR,
    ComposeServices,
    EnvType,
    __version__,
    log,
    print_and_exit,
)
from controller.commands import load_commands
from controller.packages import Packages
from controller.project import ANGULAR, NO_FRONTEND, Project
from controller.templating import Templating
from controller.utilities import configuration, git, services, system
from controller.utilities.tables import print_table

ROOT_UID = 0
BASE_UID = 1000


CommandParameter = Union[int, str, bool, None, Path, Iterable[str], enum.Enum]


[docs] class ProjectRCType(TypedDict, total=False): project: str stack: str hostname: str production: bool swarm: bool project_configuration: configuration.Configuration
[docs] class Configuration: projectrc: ProjectRCType = {} host_configuration: configuration.Configuration = {} swarm_mode: bool = False # This is the final configuration (defaults + project + projectrc) specs: configuration.Configuration = {} services_list: Optional[str] environment: dict[str, str] action: Optional[str] = None parameters: list[str] = [] production: bool = False testing: bool = False project: str = "" frontend: Optional[str] = None hostname: str = "" remote_engine: Optional[str] = None stack: str = "" load_backend: bool = False load_frontend: bool = False load_commons: bool = False version: str = "" rapydo_version: str = "" project_title: Optional[str] = None project_description: Optional[str] = None project_keywords: Optional[str] = None initialize: bool = False update: bool = False check: bool = False install: bool = False print_version: bool = False create: bool = False # It will be replaced with PROJECT_DIR/project ABS_PROJECT_PATH: Path = PROJECT_DIR # used by single-container commands (i.e. run) in swarm mode FORCE_COMPOSE_ENGINE: bool = False
[docs] @staticmethod def set_action(action: Optional[str], params: dict[str, Any]) -> None: Configuration.action = action Configuration.initialize = Configuration.action == "init" Configuration.update = Configuration.action == "update" Configuration.check = Configuration.action == "check" Configuration.install = Configuration.action == "install" Configuration.print_version = Configuration.action == "version" Configuration.create = Configuration.action == "create" params.pop("version") # This will start to fail when this parameter will be dropped params.pop("services_list") Configuration.parameters = [] project = params.pop("project") if project and project != Configuration.projectrc.get("project"): Configuration.parameters.append(f"--project {project}") hostname = params.pop("hostname") if hostname != "localhost" and hostname != Configuration.projectrc.get( "hostname" ): Configuration.parameters.append(f"--hostname {hostname}") stack = params.pop("stack") if stack and stack != Configuration.projectrc.get("stack"): Configuration.parameters.append(f"--stack {stack}") if params.pop("production"): Configuration.parameters.append("--production") if params.pop("testing"): Configuration.parameters.append("--testing") environment = params.pop("environment") if environment: for e in environment: Configuration.parameters.append(f"--env {e}") remote_engine = params.pop("remote_engine") if remote_engine and stack: Configuration.parameters.append(f"--remote {remote_engine}") if params.pop("no_backend"): Configuration.parameters.append("--no-backend") if params.pop("no_frontend"): Configuration.parameters.append("--no-frontend") if params.pop("no_commons"): Configuration.parameters.append("--no-commons") if params: log.warning("Found unknown parameters: {}", params)
[docs] def projectrc_values( ctx: typer.Context, param: typer.CallbackParam, value: str ) -> Optional[str]: if ctx.resilient_parsing: # pragma: no cover return None if not param.name or value != param.get_default(ctx): return value # This cast is not correct... but enough for this callback.. from_projectrc = cast(Optional[str], Configuration.projectrc.get(param.name)) if from_projectrc is not None: return from_projectrc return value
[docs] def version_callback(value: bool) -> None: if value: typer.echo(f"rapydo version: {__version__}") raise typer.Exit()
[docs] def controller_cli_options( ctx: typer.Context, project: str = typer.Option( None, "--project", "-p", help="Name of the project", callback=projectrc_values, ), # Deprecated since 2.1 services_list: Optional[str] = typer.Option( None, "-s", help="Comma separated list of services to be included", callback=projectrc_values, ), hostname: str = typer.Option( "localhost", "--hostname", "-H", help="Hostname of the current machine", callback=projectrc_values, show_default=False, ), stack: str = typer.Option( None, "--stack", help="Docker-compose stack to be loaded", callback=projectrc_values, ), production: bool = typer.Option( False, "--production", "--prod", help="Enable production mode", callback=projectrc_values, show_default=False, ), testing: bool = typer.Option( False, "--testing", "--test", help="Enable test mode", callback=projectrc_values, envvar="TESTING", show_default=False, ), environment: list[str] = typer.Option( [], "--env", "-e", help="Temporary change the value of an environment variable", ), remote_engine: Optional[str] = typer.Option( None, "--remote", help="Execute commands on a remote host", ), no_backend: bool = typer.Option( False, "--no-backend", help="Exclude backend configuration", callback=projectrc_values, show_default=False, ), no_frontend: bool = typer.Option( False, "--no-frontend", help="Exclude frontend configuration", callback=projectrc_values, show_default=False, ), no_commons: bool = typer.Option( False, "--no-commons", help="Exclude project common configuration", callback=projectrc_values, show_default=False, ), version: bool = typer.Option( False, "--version", "-v", help="Print version information and quit", show_default=False, callback=version_callback, is_eager=True, ), ) -> None: # This is needed because during tests both Application and Configuration # are persistent and after a run (i.e. after starting the registry) # FORCE_COMPOSE_ENGINE remains enabled # It would not be needed during the normal use of the controller Configuration.FORCE_COMPOSE_ENGINE = False Configuration.set_action(ctx.invoked_subcommand, ctx.params) # Deprecated since 2.1 if services_list: warnings.warn("-s is replaced by rapydo <command> service") time.sleep(1) Configuration.services_list = services_list Configuration.production = production Configuration.testing = testing Configuration.project = project Configuration.hostname = hostname Configuration.remote_engine = remote_engine Configuration.environment = {} for e in environment: if "=" not in e: print_and_exit("Invalid enviroment, missing value in {}", e) key, value = e.split("=") Configuration.environment[key] = value if stack: Configuration.stack = stack else: Configuration.stack = "production" if production else "development" Configuration.load_backend = not no_backend Configuration.load_frontend = not no_frontend Configuration.load_commons = not no_commons
# Temporary fix to ease migration to typer
[docs] class CommandsData:
[docs] def __init__( self, files: list[Path], base_files: list[Path], services: list[str], active_services: list[str], base_services: ComposeServices, compose_config: ComposeServices, ): self.files = files self.base_files = base_files self.services = services self.active_services = active_services or [] self.base_services = base_services self.compose_config = compose_config core_services = list(base_services.keys()) all_services = list(compose_config.keys()) self.custom_services: list[str] = [ s for s in all_services if s not in core_services ]
[docs] class Application: # Typer app # Register callback with CLI options and basic initialization/checks app = typer.Typer( callback=controller_cli_options, context_settings={"help_option_names": ["--help", "-h"]}, ) # controller app controller: Optional["Application"] = None project_scaffold = Project() data: CommandsData gits: dict[str, GitRepo] = {} env: dict[str, EnvType] = {} base_services: ComposeServices compose_config: ComposeServices
[docs] def __init__(self) -> None: Application.controller = self self.active_services: list[str] = [] self.files: list[Path] = [] self.base_files: list[Path] = [] self.services = None self.enabled_services: list[str] = [] if not PROJECT_DIR.is_dir(): project_dir = None else: project_dir = Application.project_scaffold.get_project( Configuration.projectrc.get("project"), ignore_multiples=True ) load_commands(project_dir) Application.load_projectrc()
[docs] @staticmethod def serialize_parameter( param: str, value: CommandParameter, IF: CommandParameter = True, ) -> Optional[str]: if isinstance(value, enum.Enum): value = value.value if IF and value is not None: if isinstance(value, bool): return f"{param}" if isinstance(value, tuple) or isinstance(value, list): return " ".join([f"{param} {v}" for v in value]) # Options => (--param value) if param: return f"{param} {value}" # Arguments ( => no param, only a value) return str(value) return None
[docs] @staticmethod def print_command(*parameters: Optional[str]) -> None: pre_params = " ".join( [p for p in Configuration.parameters if p is not None] ).strip() post_params = " ".join([p for p in parameters if p is not None]).strip() if pre_params: pre_params = f"{pre_params} " if post_params: post_params = f" {post_params}" log.debug( "Command: rapydo {}{}{}", pre_params, Configuration.action, post_params, log_to_file=True, )
[docs] @staticmethod def get_controller() -> "Application": if not Application.controller: # pragma: no cover raise AttributeError("Application.controller not initialized") return Application.controller
[docs] def controller_init(self, services: Optional[Iterable[str]] = None) -> None: if Configuration.create: Application.check_installed_software() return None main_folder_error = Application.project_scaffold.check_main_folder() if main_folder_error: print_and_exit(main_folder_error) if not Configuration.print_version: Application.check_installed_software() # if project is None, it is retrieve by project folder Configuration.project = Application.project_scaffold.get_project( Configuration.project ) Configuration.ABS_PROJECT_PATH = PROJECT_DIR.joinpath(Configuration.project) if Configuration.print_version: self.read_specs(read_extended=True) return None log.debug("You are using RAPyDo version {}", __version__) if Configuration.check: log.info("Selected project: {}", Configuration.project) else: log.debug("Selected project: {}", Configuration.project) if ( Configuration.initialize or Configuration.update or Configuration.check or Configuration.install ): Application.check_internet_connection() if Configuration.install: self.read_specs(read_extended=False) return None # Auth is not available yet, will be read by read_specs Application.project_scaffold.load_project_scaffold( Configuration.project, auth=None ) Application.preliminary_version_check() # read project configuration self.read_specs(read_extended=True) # from read_specs Application.project_scaffold.load_frontend_scaffold(Configuration.frontend) Application.verify_rapydo_version() Application.project_scaffold.inspect_project_folder() self.current_uid = system.get_current_uid() self.current_gid = system.get_current_gid() # Cannot be tested if self.current_uid == ROOT_UID: # pragma: no cover self.current_uid = BASE_UID log.warning("Current user is 'root'") else: os_user = system.get_username(self.current_uid) log.debug("Current UID: {} ({})", self.current_uid, os_user) log.debug("Current GID: {}", self.current_gid) if Configuration.initialize: return None Application.git_submodules() if Configuration.update: return None self.make_env() # Compose services and variables base_services, compose_config = self.get_compose_configuration(services) if Configuration.action != "password": self.check_placeholders_and_passwords(compose_config, self.enabled_services) Application.data = CommandsData( files=self.files, base_files=self.base_files, services=self.enabled_services, active_services=self.active_services, base_services=base_services, compose_config=compose_config, ) return None
[docs] @staticmethod def load_projectrc() -> None: projectrc_yaml = cast( ProjectRCType, configuration.load_yaml_file(file=PROJECTRC, is_optional=True), ) Configuration.host_configuration = projectrc_yaml.pop( "project_configuration", {} ) Configuration.projectrc = projectrc_yaml Configuration.swarm_mode = ( Configuration.projectrc.get("swarm", False) or os.environ.get("SWARM_MODE", "0") == "1" )
[docs] @staticmethod def check_installed_software() -> None: log.debug( "python version: {}.{}.{}", sys.version_info.major, sys.version_info.minor, sys.version_info.micro, ) # 17.05 added support for multi-stage builds # https://docs.docker.com/compose/compose-file/compose-file-v3/#compose-and-docker-compatibility-matrix # 18.09.2 fixed the CVE-2019-5736 vulnerability # 20.10.0 introduced copy --chmod and improved logging Packages.check_program( "docker", min_version="20.10.0", min_recommended_version="20.10.0" ) if docker.compose.is_installed(): # too slow to verify the version on every commands... near half a seconds # Sometimes a couple of seconds! # v = docker.compose.version() # log.debug("docker compose is installed: {}", v) log.debug("docker compose is installed") else: # pragma: no cover print_and_exit( "A mandatory dependency is missing: docker compose not found" "\nInstallation guide: " "https://docs.docker.com/compose/cli-command/#installing-compose-v2" "\nor try the automated installation with {command}", command=RED("rapydo install compose"), )
# no need to check the git executable, because alredy verified by GitPython # in case of missing git GitPython will fail and this check will never executed # Packages.check_program("git")
[docs] def read_specs(self, read_extended: bool = True) -> None: """Read project configuration""" try: confs = configuration.read_configuration( default_file_path=CONFS_DIR, base_project_path=Configuration.ABS_PROJECT_PATH, projects_path=PROJECT_DIR, submodules_path=SUBMODULES_DIR, read_extended=read_extended, production=Configuration.production, ) # confs 3 is the core config, extra fields are allowd configuration.validate_configuration(confs[3], core=True) # confs 0 is the merged conf core + custom, extra fields are allowd configuration.validate_configuration(confs[0], core=False) log.info("Project configuration is valid") Configuration.specs = configuration.mix_configuration( confs[0], Configuration.host_configuration ) configuration.validate_configuration(Configuration.specs, core=False) log.info("Host configuration is valid") self.extended_project = confs[1] self.extended_project_path = confs[2] except AttributeError as e: # pragma: no cover print_and_exit(str(e)) Configuration.frontend = cast( str, ( Configuration.specs.get("variables", {}) .get("env", {}) .get("FRONTEND_FRAMEWORK", NO_FRONTEND) ), ) if Configuration.frontend == NO_FRONTEND: Configuration.frontend = None project = Configuration.specs.get("project", {}) Configuration.project_title = project.get("title", "Unknown title") Configuration.version = project.get("version", "") Configuration.rapydo_version = project.get("rapydo", "") Configuration.project_description = project.get( "description", "Unknown description" ) Configuration.project_keywords = project.get("keywords", "") if not Configuration.rapydo_version: # pragma: no cover print_and_exit( "RAPyDo version not found in your project_configuration file" ) Configuration.rapydo_version = str(Configuration.rapydo_version)
[docs] @staticmethod def preliminary_version_check() -> None: specs = configuration.load_yaml_file( file=Configuration.ABS_PROJECT_PATH.joinpath( configuration.PROJECT_CONF_FILENAME ) ) Application.verify_rapydo_version( rapydo_version=specs.get("project", {}).get("rapydo", "") )
[docs] @staticmethod def verify_rapydo_version(rapydo_version: str = "") -> bool: """ Verify if the installed controller matches the current project requirement """ if not rapydo_version: rapydo_version = Configuration.rapydo_version if not rapydo_version: # pragma: no cover return True r = Version(rapydo_version) c = Version(__version__) if r == c: return True else: # pragma: no cover if r > c: ac = f"Upgrade your controller to version {r}" else: ac = f"Downgrade your controller to version {r} or upgrade your project" msg = f"""RAPyDo version is not compatible. This project requires RAPyDo {r} but you are using version {c}. {ac} You can use of one: - rapydo install (install in editable from submodules/do) - rapydo install --no-editable (install from pypi) """ print_and_exit(msg)
[docs] @staticmethod def check_internet_connection() -> None: """Check if connected to internet""" try: requests.get("https://www.google.com", timeout=2) if Configuration.check: log.info("Internet connection is available") except requests.ConnectionError: # pragma: no cover print_and_exit("Internet connection is unavailable")
[docs] @staticmethod def working_clone( name: str, repo: configuration.Submodule, from_path: Optional[Path] = None ) -> Optional[GitRepo]: # substitute values starting with '$$' myvars = { ANGULAR: Configuration.frontend == ANGULAR, } condition = repo.get("_if", "") if condition.startswith("$$"): # Is this repo enabled? if not myvars.get(condition.lstrip("$"), None): return None default_version = ( Configuration.rapydo_version if Configuration.rapydo_version else __version__ ) if from_path is not None: local_path = from_path.joinpath(name) if not local_path.exists(): print_and_exit("Submodule {} not found in {}", name, local_path) submodule_path = Path(SUBMODULES_DIR, name) if submodule_path.exists(): log.info("Path {} already exists, removing", submodule_path) if submodule_path.is_dir() and not submodule_path.is_symlink(): shutil.rmtree(submodule_path) else: submodule_path.unlink() os.symlink(local_path, submodule_path) url = repo.get("online_url") if not url: # pragma: no cover print_and_exit("Submodule misconfiguration, online url not found: {}", name) return git.clone( url=url, path=Path(name), branch=repo.get("branch") or default_version, do=Configuration.initialize, check=not Configuration.install, )
[docs] @staticmethod def git_submodules(from_path: Optional[Path] = None) -> None: """Check and/or clone git projects""" submodules = ( Configuration.specs.get("variables", {}).get("submodules", {}).copy() ) main_repo = git.get_repo(".") # This is to reassure mypy, but this is check is already done # in preliminary checks, so it can never happen if not main_repo: # pragma: no cover print_and_exit("Current folder is not a git main_repository") Application.gits["main"] = main_repo for name, submodule in submodules.items(): repo = Application.working_clone(name, submodule, from_path=from_path) if repo: Application.gits[name] = repo
[docs] def get_compose_configuration( self, enabled_services: Optional[Iterable[str]] = None ) -> tuple[ComposeServices, ComposeServices]: compose_files: list[Path] = [] MODE = f"{Configuration.stack}.yml" customconf = Configuration.ABS_PROJECT_PATH.joinpath(CONTAINERS_YAML_DIRNAME) angular_loaded = False def add(p: Path, f: str) -> None: compose_files.append(p.joinpath(f)) if Configuration.load_backend: add(CONFS_DIR, "backend.yml") if Configuration.load_frontend: if Configuration.frontend == ANGULAR: add(CONFS_DIR, "angular.yml") angular_loaded = True if ( Configuration.swarm_mode and Configuration.production and not Configuration.FORCE_COMPOSE_ENGINE ): add(CONFS_DIR, "swarm_angular_prod_options.yml") if Configuration.swarm_mode and not Configuration.FORCE_COMPOSE_ENGINE: add(CONFS_DIR, "swarm_options.yml") if Application.env.get("NFS_HOST"): log.info("NFS Server is enabled") add(CONFS_DIR, "volumes_nfs.yml") else: add(CONFS_DIR, "volumes_local.yml") if Configuration.production: add(CONFS_DIR, "production.yml") else: add(CONFS_DIR, "development.yml") if angular_loaded: add(CONFS_DIR, "angular-development.yml") if self.extended_project and self.extended_project_path: extendedconf = self.extended_project_path.joinpath(CONTAINERS_YAML_DIRNAME) # Only added if exists, this is the only non mandatory conf file extended_mode_conf = extendedconf.joinpath(MODE) if extended_mode_conf.exists(): compose_files.append(extended_mode_conf) if Configuration.load_commons: add(extendedconf, "commons.yml") if Configuration.load_commons: add(customconf, "commons.yml") add(customconf, MODE) # Read necessary files self.files, self.base_files = configuration.read_composer_yamls(compose_files) # to build the config with files and variables from controller.deploy.docker import Docker docker = Docker( compose_files=self.base_files, verify_swarm=not Configuration.initialize ) base_services = docker.compose.get_config().services if base_services is None: # pragma: no cover log.error("Got invalid compose base config") base_services = {} docker = Docker( compose_files=self.files, verify_swarm=not Configuration.initialize ) compose_config = docker.compose.get_config().services if compose_config is None: # pragma: no cover log.error("Got invalid compose config") compose_config = {} self.active_services = services.find_active(compose_config) self.enabled_services = services.get_services( Configuration.services_list or enabled_services, default=self.active_services, ) for service in self.enabled_services: if service not in self.active_services: print_and_exit("No such service: {}", service) log.debug("Enabled services: {}", ", ".join(self.enabled_services)) self.create_datafile(list(compose_config.keys()), self.active_services) return base_services, compose_config
[docs] def create_projectrc(self) -> None: templating = Templating() t = templating.get_template( "projectrc", { "project": Configuration.project, "hostname": Configuration.hostname, "swarm": Configuration.swarm_mode, "production": Configuration.production, "testing": Configuration.testing, "services": self.active_services, "env_variables": Configuration.environment, }, ) templating.save_template(PROJECTRC, t, force=True) Application.load_projectrc() if not self.files: log.debug("Created temporary default {} file", PROJECTRC) PROJECTRC.unlink() else: log.info("Created default {} file", PROJECTRC)
[docs] def make_env(self) -> None: try: COMPOSE_ENVIRONMENT_FILE.unlink() except FileNotFoundError: pass Application.env = Configuration.specs.get("variables", {}).get("env", {}) Application.env["PROJECT_DOMAIN"] = Configuration.hostname Application.env["COMPOSE_PROJECT_NAME"] = Configuration.project Application.env["DATA_DIR"] = str(DATA_DIR.resolve()) Application.env["SUBMODULE_DIR"] = str(SUBMODULES_DIR.resolve()) Application.env["PROJECT_DIR"] = str( PROJECT_DIR.joinpath(Configuration.project).resolve() ) if self.extended_project_path is None: Application.env["BASE_PROJECT_DIR"] = Application.env["PROJECT_DIR"] else: Application.env["BASE_PROJECT_DIR"] = str( self.extended_project_path.resolve() ) if self.extended_project is None: Application.env["EXTENDED_PROJECT"] = EXTENDED_PROJECT_DISABLED Application.env["BASE_PROJECT"] = Application.env["COMPOSE_PROJECT_NAME"] else: Application.env["EXTENDED_PROJECT"] = str(self.extended_project) Application.env["BASE_PROJECT"] = Application.env["EXTENDED_PROJECT"] Application.env["RAPYDO_VERSION"] = __version__ Application.env["BUILD"] = git.get_last_commit(Application.gits["main"]) Application.env["PROJECT_VERSION"] = Configuration.version Application.env["CURRENT_UID"] = str(self.current_uid) Application.env["CURRENT_GID"] = str(self.current_gid) Application.env["PROJECT_TITLE"] = ( Configuration.project_title or "Unknown title" ) Application.env["PROJECT_DESCRIPTION"] = ( Configuration.project_description or "Unknown description" ) Application.env["PROJECT_KEYWORDS"] = Configuration.project_keywords or "" roles_dict = Configuration.specs.get("variables", {}).get("roles", {}) roles = ",".join( [k for k, v in roles_dict.items() if v != "disabled" and k != "default"] ) Application.env["AUTH_ROLES"] = f",{roles}," if Configuration.testing and not Configuration.production: Application.env["APP_MODE"] = "test" Application.env["PYTHONMALLOC"] = "debug" Application.env["PYTHONASYNCIODEBUG"] = "1" Application.env["PYTHONFAULTHANDLER"] = "1" Application.env["CELERYBEAT_SCHEDULER"] = services.get_celerybeat_scheduler( Application.env ) if Configuration.load_frontend: if Configuration.frontend == ANGULAR: Application.env["ACTIVATE_ANGULAR"] = "1" services.check_rabbit_password(Application.env.get("RABBITMQ_PASSWORD")) services.check_redis_password(Application.env.get("REDIS_PASSWORD")) for e in Application.env: env_value = os.environ.get(e) if env_value is None: continue Application.env[e] = env_value Application.env.update(Configuration.environment) if Configuration.swarm_mode: if not Application.env.get("SWARM_MANAGER_ADDRESS"): Application.env["SWARM_MANAGER_ADDRESS"] = system.get_local_ip( Configuration.production ) if not Application.env.get("REGISTRY_HOST"): Application.env["REGISTRY_HOST"] = Application.env[ "SWARM_MANAGER_ADDRESS" ] # is None ensure empty string as a valid address # if Application.env.get("SYSLOG_ADDRESS") is None: # manager_addr = Application.env["SWARM_MANAGER_ADDRESS"] # Application.env["SYSLOG_ADDRESS"] = f"tcp://{manager_addr}:514" if Configuration.FORCE_COMPOSE_ENGINE or not Configuration.swarm_mode: DEPLOY_ENGINE = "compose" else: DEPLOY_ENGINE = "swarm" Application.env["DEPLOY_ENGINE"] = DEPLOY_ENGINE # Unfortunately this will only work after the creation of the network # i.e. will be fallen back to 127.0.0.1 the first time try: docker_network = docker.network.inspect( f"{Configuration.project}_{DEPLOY_ENGINE}_default" ) if docker_network.ipam.config: DOCKER_SUBNET = docker_network.ipam.config[0]["Subnet"] else: DOCKER_SUBNET = "127.0.0.1" # The first execution will fail and fallen back to localhost except DockerException: DOCKER_SUBNET = "127.0.0.1" Application.env["DOCKER_SUBNET"] = DOCKER_SUBNET FAIL2BAN_IPTABLES = "legacy" if str(Application.env["ACTIVATE_FAIL2BAN"]) == "1": iptables_version = Packages.get_bin_version("iptables", clean_output=False) nf_tables = iptables_version and "nf_tables" in iptables_version if nf_tables: FAIL2BAN_IPTABLES = "nf_tables" Application.env["FAIL2BAN_IPTABLES"] = FAIL2BAN_IPTABLES # Set Backend Python version py_version = Application.env.get("BACKEND_PYTHON_VERSION", "v3.12") py_values = configuration.BACKEND_PYTHON_VERSION_VALUES if py_version == py_values.py39.value: build_mode = "backend-legacy39" elif py_version == py_values.py310.value: build_mode = "backend-legacy310" elif py_version == py_values.py311.value: build_mode = "backend-legacy311" else: build_mode = "backend" Application.env["BACKEND_BUILD_MODE"] = build_mode # TODO: replace with removeprefix py_version = str(py_version).replace("v", "") PYTHON_PATH = f"/usr/local/lib/python{py_version}/dist-packages" Application.env["PYTHON_PATH"] = PYTHON_PATH configuration.validate_env(Application.env) log.info("Environment configuration is valid") with open(COMPOSE_ENVIRONMENT_FILE, "w+") as whandle: for key, value in sorted(Application.env.items()): if value is None: value = "" else: value = str(value) if " " in value: value = f"'{value}'" whandle.write(f"{key}={value}\n")
[docs] @staticmethod def create_datafile(services: list[str], active_services: list[str]) -> None: try: DATAFILE.unlink() except FileNotFoundError: pass data = { "submodules": [k for k, v in Application.gits.items() if v is not None], "services": active_services, "allservices": services, } with open(DATAFILE, "w+") as outfile: json.dump(data, outfile)
[docs] @staticmethod def parse_datafile(key: str) -> list[str]: try: with open(DATAFILE) as json_file: datafile = json.load(json_file) return cast(list[str], datafile.get(key, [])) except FileNotFoundError: return []
[docs] @staticmethod def autocomplete_service( ctx: click.core.Context, param: click.Parameter, incomplete: str ) -> list[str]: values = Application.parse_datafile("services") if not incomplete: return values return [x for x in values if x.startswith(incomplete)]
[docs] @staticmethod def autocomplete_allservice( ctx: click.core.Context, param: click.Parameter, incomplete: str ) -> list[str]: values = Application.parse_datafile("allservices") if not incomplete: return values return [x for x in values if x.startswith(incomplete)]
[docs] @staticmethod def autocomplete_submodule( ctx: click.core.Context, param: click.Parameter, incomplete: str ) -> list[str]: values = Application.parse_datafile("submodules") if not incomplete: return values return [x for x in values if x.startswith(incomplete)]
[docs] @staticmethod def check_placeholders_and_passwords( compose_services: ComposeServices, active_services: list[str] ) -> None: if not active_services: # pragma: no cover print_and_exit( """You have no active service \nSuggestion: to activate a top-level service edit your project_configuration and add the variable "ACTIVATE_DESIREDSERVICE: 1" """ ) elif Configuration.check: log.info( "Active services: {}", ", ".join(active_services), log_to_file=True ) extra_services: list[str] = [] if Configuration.swarm_mode and REGISTRY not in active_services: extra_services.append(REGISTRY) all_services = active_services + extra_services missing: dict[str, set[str]] = {} passwords: dict[str, str] = {} passwords_services: dict[str, set[str]] = {} for service_name in all_services: # This can happens with `rapydo run swagger` because in case of run # the controller_init method is executed without passing the service # This is because interfaces are not enabled on the base stack and the # controller_init([service]) would fail # As side effect, non-existing services are not blocked if service_name not in compose_services: continue service = compose_services[service_name] if service: if service.environment is None: log.error( "Invalid env format, cannot check placeholders and passwords" ) continue if isinstance(service.environment, list): # pragma: no cover log.error( "Unsupported env format, cannot check placeholders and passwords" ) continue for key, value in service.environment.items(): if str(value) == PLACEHOLDER: key = services.normalize_placeholder_variable(key) missing.setdefault(key, set()) missing[key].add(service_name) elif key.endswith("_PASSWORD") and value: key = services.normalize_placeholder_variable(key) passwords.setdefault(key, cast(str, value)) passwords_services.setdefault(key, set()) passwords_services[key].add(service_name) placeholders = [] for variable, raw_services in missing.items(): if variable in services.vars_to_services_mapping: serv = {services.vars_to_services_mapping[variable]} else: serv = raw_services active_serv = [s for s in serv if s in all_services] if active_serv: placeholders.append([variable, ", ".join(active_serv)]) MIN_PASSWORD_SCORE = int( Application.env.get("MIN_PASSWORD_SCORE", 2) # type: ignore ) for variable, raw_services in passwords_services.items(): if variable in services.vars_to_services_mapping: serv = {services.vars_to_services_mapping[variable]} else: serv = raw_services active_serv = [s for s in serv if s in all_services] if active_serv: password = passwords.get(variable) result = zxcvbn(password) score = result["score"] if score < MIN_PASSWORD_SCORE: if score == MIN_PASSWORD_SCORE - 1: log.warning("The password used in {} is weak", variable) elif score == MIN_PASSWORD_SCORE - 2: log.error("The password used in {} is very weak", variable) else: log.critical( "The password used in {} is extremely weak", variable ) if placeholders: log.critical("The following variables are missing in your configuration:") print_table( ["VARIABLE", "SERVICE(S)"], placeholders, table_title="Missing variables", ) log.info("You can fix this error by updating your .projectrc file") sys.exit(1) return None
[docs] @staticmethod def git_update(ignore_submodule: list[str]) -> None: for name, gitobj in Application.gits.items(): if name in ignore_submodule: log.debug("Skipping update on {}", name) continue if gitobj and not git.can_be_updated(name, gitobj): print_and_exit("Can't continue with updates") controller_is_updated = False for name, gitobj in Application.gits.items(): if name in ignore_submodule: continue if name == "do": controller_is_updated = True if gitobj: git.update(name, gitobj) if controller_is_updated: installation_path = Packages.get_installation_path("rapydo") # Can't be tested on GA since rapydo is alway installed from a folder if not installation_path: # pragma: no cover log.warning( "Controller is not installed in editable mode, " "rapydo is unable to update it" ) elif Application.gits["do"].working_dir: if installation_path.is_symlink(): installation_path = installation_path.resolve() do_dir = Path(Application.gits["do"].working_dir) if do_dir.is_symlink(): do_dir = do_dir.readlink() if do_dir == installation_path: log.info( "Controller installed from {} and updated", installation_path ) else: log.warning( "Controller not updated because it is installed outside this " "project. Installation path is {}, the current folder is {}", installation_path, do_dir, ) else: # pragma: no cover log.warning("Controller submodule folder can't be found")
[docs] @staticmethod def git_checks(ignore_submodule: list[str]) -> None: for name, gitobj in Application.gits.items(): if name in ignore_submodule: log.debug("Skipping checks on {}", name) continue if gitobj: git.check_updates(name, gitobj) git.check_unstaged(name, gitobj)