"""
Main Application and Configuration module
"""
import enum
import json
import os
import shutil
import sys
import time
import warnings
from collections.abc import Iterable
from pathlib import Path
from typing import Any, Optional, TypedDict, Union, cast
import click
import requests
import typer
from git import Repo as GitRepo
from packaging.version import Version
from python_on_whales import docker
from python_on_whales.utils import DockerException
from zxcvbn import zxcvbn # type: ignore
from controller import (
COMPOSE_ENVIRONMENT_FILE,
CONFS_DIR,
CONTAINERS_YAML_DIRNAME,
DATA_DIR,
DATAFILE,
EXTENDED_PROJECT_DISABLED,
PLACEHOLDER,
PROJECT_DIR,
PROJECTRC,
RED,
REGISTRY,
SUBMODULES_DIR,
ComposeServices,
EnvType,
__version__,
log,
print_and_exit,
)
from controller.commands import load_commands
from controller.packages import Packages
from controller.project import ANGULAR, NO_FRONTEND, Project
from controller.templating import Templating
from controller.utilities import configuration, git, services, system
from controller.utilities.tables import print_table
ROOT_UID = 0
BASE_UID = 1000
CommandParameter = Union[int, str, bool, None, Path, Iterable[str], enum.Enum]
[docs]
class ProjectRCType(TypedDict, total=False):
project: str
stack: str
hostname: str
production: bool
swarm: bool
project_configuration: configuration.Configuration
[docs]
class Configuration:
projectrc: ProjectRCType = {}
host_configuration: configuration.Configuration = {}
swarm_mode: bool = False
# This is the final configuration (defaults + project + projectrc)
specs: configuration.Configuration = {}
services_list: Optional[str]
environment: dict[str, str]
action: Optional[str] = None
parameters: list[str] = []
production: bool = False
testing: bool = False
project: str = ""
frontend: Optional[str] = None
hostname: str = ""
remote_engine: Optional[str] = None
stack: str = ""
load_backend: bool = False
load_frontend: bool = False
load_commons: bool = False
version: str = ""
rapydo_version: str = ""
project_title: Optional[str] = None
project_description: Optional[str] = None
project_keywords: Optional[str] = None
initialize: bool = False
update: bool = False
check: bool = False
install: bool = False
print_version: bool = False
create: bool = False
# It will be replaced with PROJECT_DIR/project
ABS_PROJECT_PATH: Path = PROJECT_DIR
# used by single-container commands (i.e. run) in swarm mode
FORCE_COMPOSE_ENGINE: bool = False
[docs]
@staticmethod
def set_action(action: Optional[str], params: dict[str, Any]) -> None:
Configuration.action = action
Configuration.initialize = Configuration.action == "init"
Configuration.update = Configuration.action == "update"
Configuration.check = Configuration.action == "check"
Configuration.install = Configuration.action == "install"
Configuration.print_version = Configuration.action == "version"
Configuration.create = Configuration.action == "create"
params.pop("version")
# This will start to fail when this parameter will be dropped
params.pop("services_list")
Configuration.parameters = []
project = params.pop("project")
if project and project != Configuration.projectrc.get("project"):
Configuration.parameters.append(f"--project {project}")
hostname = params.pop("hostname")
if hostname != "localhost" and hostname != Configuration.projectrc.get(
"hostname"
):
Configuration.parameters.append(f"--hostname {hostname}")
stack = params.pop("stack")
if stack and stack != Configuration.projectrc.get("stack"):
Configuration.parameters.append(f"--stack {stack}")
if params.pop("production"):
Configuration.parameters.append("--production")
if params.pop("testing"):
Configuration.parameters.append("--testing")
environment = params.pop("environment")
if environment:
for e in environment:
Configuration.parameters.append(f"--env {e}")
remote_engine = params.pop("remote_engine")
if remote_engine and stack:
Configuration.parameters.append(f"--remote {remote_engine}")
if params.pop("no_backend"):
Configuration.parameters.append("--no-backend")
if params.pop("no_frontend"):
Configuration.parameters.append("--no-frontend")
if params.pop("no_commons"):
Configuration.parameters.append("--no-commons")
if params:
log.warning("Found unknown parameters: {}", params)
[docs]
def projectrc_values(
ctx: typer.Context, param: typer.CallbackParam, value: str
) -> Optional[str]:
if ctx.resilient_parsing: # pragma: no cover
return None
if not param.name or value != param.get_default(ctx):
return value
# This cast is not correct... but enough for this callback..
from_projectrc = cast(Optional[str], Configuration.projectrc.get(param.name))
if from_projectrc is not None:
return from_projectrc
return value
[docs]
def version_callback(value: bool) -> None:
if value:
typer.echo(f"rapydo version: {__version__}")
raise typer.Exit()
[docs]
def controller_cli_options(
ctx: typer.Context,
project: str = typer.Option(
None,
"--project",
"-p",
help="Name of the project",
callback=projectrc_values,
),
# Deprecated since 2.1
services_list: Optional[str] = typer.Option(
None,
"-s",
help="Comma separated list of services to be included",
callback=projectrc_values,
),
hostname: str = typer.Option(
"localhost",
"--hostname",
"-H",
help="Hostname of the current machine",
callback=projectrc_values,
show_default=False,
),
stack: str = typer.Option(
None,
"--stack",
help="Docker-compose stack to be loaded",
callback=projectrc_values,
),
production: bool = typer.Option(
False,
"--production",
"--prod",
help="Enable production mode",
callback=projectrc_values,
show_default=False,
),
testing: bool = typer.Option(
False,
"--testing",
"--test",
help="Enable test mode",
callback=projectrc_values,
envvar="TESTING",
show_default=False,
),
environment: list[str] = typer.Option(
[],
"--env",
"-e",
help="Temporary change the value of an environment variable",
),
remote_engine: Optional[str] = typer.Option(
None,
"--remote",
help="Execute commands on a remote host",
),
no_backend: bool = typer.Option(
False,
"--no-backend",
help="Exclude backend configuration",
callback=projectrc_values,
show_default=False,
),
no_frontend: bool = typer.Option(
False,
"--no-frontend",
help="Exclude frontend configuration",
callback=projectrc_values,
show_default=False,
),
no_commons: bool = typer.Option(
False,
"--no-commons",
help="Exclude project common configuration",
callback=projectrc_values,
show_default=False,
),
version: bool = typer.Option(
False,
"--version",
"-v",
help="Print version information and quit",
show_default=False,
callback=version_callback,
is_eager=True,
),
) -> None:
# This is needed because during tests both Application and Configuration
# are persistent and after a run (i.e. after starting the registry)
# FORCE_COMPOSE_ENGINE remains enabled
# It would not be needed during the normal use of the controller
Configuration.FORCE_COMPOSE_ENGINE = False
Configuration.set_action(ctx.invoked_subcommand, ctx.params)
# Deprecated since 2.1
if services_list:
warnings.warn("-s is replaced by rapydo <command> service")
time.sleep(1)
Configuration.services_list = services_list
Configuration.production = production
Configuration.testing = testing
Configuration.project = project
Configuration.hostname = hostname
Configuration.remote_engine = remote_engine
Configuration.environment = {}
for e in environment:
if "=" not in e:
print_and_exit("Invalid enviroment, missing value in {}", e)
key, value = e.split("=")
Configuration.environment[key] = value
if stack:
Configuration.stack = stack
else:
Configuration.stack = "production" if production else "development"
Configuration.load_backend = not no_backend
Configuration.load_frontend = not no_frontend
Configuration.load_commons = not no_commons
# Temporary fix to ease migration to typer
[docs]
class CommandsData:
[docs]
def __init__(
self,
files: list[Path],
base_files: list[Path],
services: list[str],
active_services: list[str],
base_services: ComposeServices,
compose_config: ComposeServices,
):
self.files = files
self.base_files = base_files
self.services = services
self.active_services = active_services or []
self.base_services = base_services
self.compose_config = compose_config
core_services = list(base_services.keys())
all_services = list(compose_config.keys())
self.custom_services: list[str] = [
s for s in all_services if s not in core_services
]
[docs]
class Application:
# Typer app
# Register callback with CLI options and basic initialization/checks
app = typer.Typer(
callback=controller_cli_options,
context_settings={"help_option_names": ["--help", "-h"]},
)
# controller app
controller: Optional["Application"] = None
project_scaffold = Project()
data: CommandsData
gits: dict[str, GitRepo] = {}
env: dict[str, EnvType] = {}
base_services: ComposeServices
compose_config: ComposeServices
[docs]
def __init__(self) -> None:
Application.controller = self
self.active_services: list[str] = []
self.files: list[Path] = []
self.base_files: list[Path] = []
self.services = None
self.enabled_services: list[str] = []
if not PROJECT_DIR.is_dir():
project_dir = None
else:
project_dir = Application.project_scaffold.get_project(
Configuration.projectrc.get("project"), ignore_multiples=True
)
load_commands(project_dir)
Application.load_projectrc()
[docs]
@staticmethod
def serialize_parameter(
param: str,
value: CommandParameter,
IF: CommandParameter = True,
) -> Optional[str]:
if isinstance(value, enum.Enum):
value = value.value
if IF and value is not None:
if isinstance(value, bool):
return f"{param}"
if isinstance(value, tuple) or isinstance(value, list):
return " ".join([f"{param} {v}" for v in value])
# Options => (--param value)
if param:
return f"{param} {value}"
# Arguments ( => no param, only a value)
return str(value)
return None
[docs]
@staticmethod
def print_command(*parameters: Optional[str]) -> None:
pre_params = " ".join(
[p for p in Configuration.parameters if p is not None]
).strip()
post_params = " ".join([p for p in parameters if p is not None]).strip()
if pre_params:
pre_params = f"{pre_params} "
if post_params:
post_params = f" {post_params}"
log.debug(
"Command: rapydo {}{}{}",
pre_params,
Configuration.action,
post_params,
log_to_file=True,
)
[docs]
@staticmethod
def get_controller() -> "Application":
if not Application.controller: # pragma: no cover
raise AttributeError("Application.controller not initialized")
return Application.controller
[docs]
def controller_init(self, services: Optional[Iterable[str]] = None) -> None:
if Configuration.create:
Application.check_installed_software()
return None
main_folder_error = Application.project_scaffold.check_main_folder()
if main_folder_error:
print_and_exit(main_folder_error)
if not Configuration.print_version:
Application.check_installed_software()
# if project is None, it is retrieve by project folder
Configuration.project = Application.project_scaffold.get_project(
Configuration.project
)
Configuration.ABS_PROJECT_PATH = PROJECT_DIR.joinpath(Configuration.project)
if Configuration.print_version:
self.read_specs(read_extended=True)
return None
log.debug("You are using RAPyDo version {}", __version__)
if Configuration.check:
log.info("Selected project: {}", Configuration.project)
else:
log.debug("Selected project: {}", Configuration.project)
if (
Configuration.initialize
or Configuration.update
or Configuration.check
or Configuration.install
):
Application.check_internet_connection()
if Configuration.install:
self.read_specs(read_extended=False)
return None
# Auth is not available yet, will be read by read_specs
Application.project_scaffold.load_project_scaffold(
Configuration.project, auth=None
)
Application.preliminary_version_check()
# read project configuration
self.read_specs(read_extended=True)
# from read_specs
Application.project_scaffold.load_frontend_scaffold(Configuration.frontend)
Application.verify_rapydo_version()
Application.project_scaffold.inspect_project_folder()
self.current_uid = system.get_current_uid()
self.current_gid = system.get_current_gid()
# Cannot be tested
if self.current_uid == ROOT_UID: # pragma: no cover
self.current_uid = BASE_UID
log.warning("Current user is 'root'")
else:
os_user = system.get_username(self.current_uid)
log.debug("Current UID: {} ({})", self.current_uid, os_user)
log.debug("Current GID: {}", self.current_gid)
if Configuration.initialize:
return None
Application.git_submodules()
if Configuration.update:
return None
self.make_env()
# Compose services and variables
base_services, compose_config = self.get_compose_configuration(services)
if Configuration.action != "password":
self.check_placeholders_and_passwords(compose_config, self.enabled_services)
Application.data = CommandsData(
files=self.files,
base_files=self.base_files,
services=self.enabled_services,
active_services=self.active_services,
base_services=base_services,
compose_config=compose_config,
)
return None
[docs]
@staticmethod
def load_projectrc() -> None:
projectrc_yaml = cast(
ProjectRCType,
configuration.load_yaml_file(file=PROJECTRC, is_optional=True),
)
Configuration.host_configuration = projectrc_yaml.pop(
"project_configuration", {}
)
Configuration.projectrc = projectrc_yaml
Configuration.swarm_mode = (
Configuration.projectrc.get("swarm", False)
or os.environ.get("SWARM_MODE", "0") == "1"
)
[docs]
@staticmethod
def check_installed_software() -> None:
log.debug(
"python version: {}.{}.{}",
sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro,
)
# 17.05 added support for multi-stage builds
# https://docs.docker.com/compose/compose-file/compose-file-v3/#compose-and-docker-compatibility-matrix
# 18.09.2 fixed the CVE-2019-5736 vulnerability
# 20.10.0 introduced copy --chmod and improved logging
Packages.check_program(
"docker", min_version="20.10.0", min_recommended_version="20.10.0"
)
if docker.compose.is_installed():
# too slow to verify the version on every commands... near half a seconds
# Sometimes a couple of seconds!
# v = docker.compose.version()
# log.debug("docker compose is installed: {}", v)
log.debug("docker compose is installed")
else: # pragma: no cover
print_and_exit(
"A mandatory dependency is missing: docker compose not found"
"\nInstallation guide: "
"https://docs.docker.com/compose/cli-command/#installing-compose-v2"
"\nor try the automated installation with {command}",
command=RED("rapydo install compose"),
)
# no need to check the git executable, because alredy verified by GitPython
# in case of missing git GitPython will fail and this check will never executed
# Packages.check_program("git")
[docs]
def read_specs(self, read_extended: bool = True) -> None:
"""Read project configuration"""
try:
confs = configuration.read_configuration(
default_file_path=CONFS_DIR,
base_project_path=Configuration.ABS_PROJECT_PATH,
projects_path=PROJECT_DIR,
submodules_path=SUBMODULES_DIR,
read_extended=read_extended,
production=Configuration.production,
)
# confs 3 is the core config, extra fields are allowd
configuration.validate_configuration(confs[3], core=True)
# confs 0 is the merged conf core + custom, extra fields are allowd
configuration.validate_configuration(confs[0], core=False)
log.info("Project configuration is valid")
Configuration.specs = configuration.mix_configuration(
confs[0], Configuration.host_configuration
)
configuration.validate_configuration(Configuration.specs, core=False)
log.info("Host configuration is valid")
self.extended_project = confs[1]
self.extended_project_path = confs[2]
except AttributeError as e: # pragma: no cover
print_and_exit(str(e))
Configuration.frontend = cast(
str,
(
Configuration.specs.get("variables", {})
.get("env", {})
.get("FRONTEND_FRAMEWORK", NO_FRONTEND)
),
)
if Configuration.frontend == NO_FRONTEND:
Configuration.frontend = None
project = Configuration.specs.get("project", {})
Configuration.project_title = project.get("title", "Unknown title")
Configuration.version = project.get("version", "")
Configuration.rapydo_version = project.get("rapydo", "")
Configuration.project_description = project.get(
"description", "Unknown description"
)
Configuration.project_keywords = project.get("keywords", "")
if not Configuration.rapydo_version: # pragma: no cover
print_and_exit(
"RAPyDo version not found in your project_configuration file"
)
Configuration.rapydo_version = str(Configuration.rapydo_version)
[docs]
@staticmethod
def preliminary_version_check() -> None:
specs = configuration.load_yaml_file(
file=Configuration.ABS_PROJECT_PATH.joinpath(
configuration.PROJECT_CONF_FILENAME
)
)
Application.verify_rapydo_version(
rapydo_version=specs.get("project", {}).get("rapydo", "")
)
[docs]
@staticmethod
def verify_rapydo_version(rapydo_version: str = "") -> bool:
"""
Verify if the installed controller matches the current project requirement
"""
if not rapydo_version:
rapydo_version = Configuration.rapydo_version
if not rapydo_version: # pragma: no cover
return True
r = Version(rapydo_version)
c = Version(__version__)
if r == c:
return True
else: # pragma: no cover
if r > c:
ac = f"Upgrade your controller to version {r}"
else:
ac = f"Downgrade your controller to version {r} or upgrade your project"
msg = f"""RAPyDo version is not compatible.
This project requires RAPyDo {r} but you are using version {c}. {ac}
You can use of one:
- rapydo install (install in editable from submodules/do)
- rapydo install --no-editable (install from pypi)
"""
print_and_exit(msg)
[docs]
@staticmethod
def check_internet_connection() -> None:
"""Check if connected to internet"""
try:
requests.get("https://www.google.com", timeout=2)
if Configuration.check:
log.info("Internet connection is available")
except requests.ConnectionError: # pragma: no cover
print_and_exit("Internet connection is unavailable")
[docs]
@staticmethod
def working_clone(
name: str, repo: configuration.Submodule, from_path: Optional[Path] = None
) -> Optional[GitRepo]:
# substitute values starting with '$$'
myvars = {
ANGULAR: Configuration.frontend == ANGULAR,
}
condition = repo.get("_if", "")
if condition.startswith("$$"):
# Is this repo enabled?
if not myvars.get(condition.lstrip("$"), None):
return None
default_version = (
Configuration.rapydo_version
if Configuration.rapydo_version
else __version__
)
if from_path is not None:
local_path = from_path.joinpath(name)
if not local_path.exists():
print_and_exit("Submodule {} not found in {}", name, local_path)
submodule_path = Path(SUBMODULES_DIR, name)
if submodule_path.exists():
log.info("Path {} already exists, removing", submodule_path)
if submodule_path.is_dir() and not submodule_path.is_symlink():
shutil.rmtree(submodule_path)
else:
submodule_path.unlink()
os.symlink(local_path, submodule_path)
url = repo.get("online_url")
if not url: # pragma: no cover
print_and_exit("Submodule misconfiguration, online url not found: {}", name)
return git.clone(
url=url,
path=Path(name),
branch=repo.get("branch") or default_version,
do=Configuration.initialize,
check=not Configuration.install,
)
[docs]
@staticmethod
def git_submodules(from_path: Optional[Path] = None) -> None:
"""Check and/or clone git projects"""
submodules = (
Configuration.specs.get("variables", {}).get("submodules", {}).copy()
)
main_repo = git.get_repo(".")
# This is to reassure mypy, but this is check is already done
# in preliminary checks, so it can never happen
if not main_repo: # pragma: no cover
print_and_exit("Current folder is not a git main_repository")
Application.gits["main"] = main_repo
for name, submodule in submodules.items():
repo = Application.working_clone(name, submodule, from_path=from_path)
if repo:
Application.gits[name] = repo
[docs]
def get_compose_configuration(
self, enabled_services: Optional[Iterable[str]] = None
) -> tuple[ComposeServices, ComposeServices]:
compose_files: list[Path] = []
MODE = f"{Configuration.stack}.yml"
customconf = Configuration.ABS_PROJECT_PATH.joinpath(CONTAINERS_YAML_DIRNAME)
angular_loaded = False
def add(p: Path, f: str) -> None:
compose_files.append(p.joinpath(f))
if Configuration.load_backend:
add(CONFS_DIR, "backend.yml")
if Configuration.load_frontend:
if Configuration.frontend == ANGULAR:
add(CONFS_DIR, "angular.yml")
angular_loaded = True
if (
Configuration.swarm_mode
and Configuration.production
and not Configuration.FORCE_COMPOSE_ENGINE
):
add(CONFS_DIR, "swarm_angular_prod_options.yml")
if Configuration.swarm_mode and not Configuration.FORCE_COMPOSE_ENGINE:
add(CONFS_DIR, "swarm_options.yml")
if Application.env.get("NFS_HOST"):
log.info("NFS Server is enabled")
add(CONFS_DIR, "volumes_nfs.yml")
else:
add(CONFS_DIR, "volumes_local.yml")
if Configuration.production:
add(CONFS_DIR, "production.yml")
else:
add(CONFS_DIR, "development.yml")
if angular_loaded:
add(CONFS_DIR, "angular-development.yml")
if self.extended_project and self.extended_project_path:
extendedconf = self.extended_project_path.joinpath(CONTAINERS_YAML_DIRNAME)
# Only added if exists, this is the only non mandatory conf file
extended_mode_conf = extendedconf.joinpath(MODE)
if extended_mode_conf.exists():
compose_files.append(extended_mode_conf)
if Configuration.load_commons:
add(extendedconf, "commons.yml")
if Configuration.load_commons:
add(customconf, "commons.yml")
add(customconf, MODE)
# Read necessary files
self.files, self.base_files = configuration.read_composer_yamls(compose_files)
# to build the config with files and variables
from controller.deploy.docker import Docker
docker = Docker(
compose_files=self.base_files, verify_swarm=not Configuration.initialize
)
base_services = docker.compose.get_config().services
if base_services is None: # pragma: no cover
log.error("Got invalid compose base config")
base_services = {}
docker = Docker(
compose_files=self.files, verify_swarm=not Configuration.initialize
)
compose_config = docker.compose.get_config().services
if compose_config is None: # pragma: no cover
log.error("Got invalid compose config")
compose_config = {}
self.active_services = services.find_active(compose_config)
self.enabled_services = services.get_services(
Configuration.services_list or enabled_services,
default=self.active_services,
)
for service in self.enabled_services:
if service not in self.active_services:
print_and_exit("No such service: {}", service)
log.debug("Enabled services: {}", ", ".join(self.enabled_services))
self.create_datafile(list(compose_config.keys()), self.active_services)
return base_services, compose_config
[docs]
def create_projectrc(self) -> None:
templating = Templating()
t = templating.get_template(
"projectrc",
{
"project": Configuration.project,
"hostname": Configuration.hostname,
"swarm": Configuration.swarm_mode,
"production": Configuration.production,
"testing": Configuration.testing,
"services": self.active_services,
"env_variables": Configuration.environment,
},
)
templating.save_template(PROJECTRC, t, force=True)
Application.load_projectrc()
if not self.files:
log.debug("Created temporary default {} file", PROJECTRC)
PROJECTRC.unlink()
else:
log.info("Created default {} file", PROJECTRC)
[docs]
def make_env(self) -> None:
try:
COMPOSE_ENVIRONMENT_FILE.unlink()
except FileNotFoundError:
pass
Application.env = Configuration.specs.get("variables", {}).get("env", {})
Application.env["PROJECT_DOMAIN"] = Configuration.hostname
Application.env["COMPOSE_PROJECT_NAME"] = Configuration.project
Application.env["DATA_DIR"] = str(DATA_DIR.resolve())
Application.env["SUBMODULE_DIR"] = str(SUBMODULES_DIR.resolve())
Application.env["PROJECT_DIR"] = str(
PROJECT_DIR.joinpath(Configuration.project).resolve()
)
if self.extended_project_path is None:
Application.env["BASE_PROJECT_DIR"] = Application.env["PROJECT_DIR"]
else:
Application.env["BASE_PROJECT_DIR"] = str(
self.extended_project_path.resolve()
)
if self.extended_project is None:
Application.env["EXTENDED_PROJECT"] = EXTENDED_PROJECT_DISABLED
Application.env["BASE_PROJECT"] = Application.env["COMPOSE_PROJECT_NAME"]
else:
Application.env["EXTENDED_PROJECT"] = str(self.extended_project)
Application.env["BASE_PROJECT"] = Application.env["EXTENDED_PROJECT"]
Application.env["RAPYDO_VERSION"] = __version__
Application.env["BUILD"] = git.get_last_commit(Application.gits["main"])
Application.env["PROJECT_VERSION"] = Configuration.version
Application.env["CURRENT_UID"] = str(self.current_uid)
Application.env["CURRENT_GID"] = str(self.current_gid)
Application.env["PROJECT_TITLE"] = (
Configuration.project_title or "Unknown title"
)
Application.env["PROJECT_DESCRIPTION"] = (
Configuration.project_description or "Unknown description"
)
Application.env["PROJECT_KEYWORDS"] = Configuration.project_keywords or ""
roles_dict = Configuration.specs.get("variables", {}).get("roles", {})
roles = ",".join(
[k for k, v in roles_dict.items() if v != "disabled" and k != "default"]
)
Application.env["AUTH_ROLES"] = f",{roles},"
if Configuration.testing and not Configuration.production:
Application.env["APP_MODE"] = "test"
Application.env["PYTHONMALLOC"] = "debug"
Application.env["PYTHONASYNCIODEBUG"] = "1"
Application.env["PYTHONFAULTHANDLER"] = "1"
Application.env["CELERYBEAT_SCHEDULER"] = services.get_celerybeat_scheduler(
Application.env
)
if Configuration.load_frontend:
if Configuration.frontend == ANGULAR:
Application.env["ACTIVATE_ANGULAR"] = "1"
services.check_rabbit_password(Application.env.get("RABBITMQ_PASSWORD"))
services.check_redis_password(Application.env.get("REDIS_PASSWORD"))
for e in Application.env:
env_value = os.environ.get(e)
if env_value is None:
continue
Application.env[e] = env_value
Application.env.update(Configuration.environment)
if Configuration.swarm_mode:
if not Application.env.get("SWARM_MANAGER_ADDRESS"):
Application.env["SWARM_MANAGER_ADDRESS"] = system.get_local_ip(
Configuration.production
)
if not Application.env.get("REGISTRY_HOST"):
Application.env["REGISTRY_HOST"] = Application.env[
"SWARM_MANAGER_ADDRESS"
]
# is None ensure empty string as a valid address
# if Application.env.get("SYSLOG_ADDRESS") is None:
# manager_addr = Application.env["SWARM_MANAGER_ADDRESS"]
# Application.env["SYSLOG_ADDRESS"] = f"tcp://{manager_addr}:514"
if Configuration.FORCE_COMPOSE_ENGINE or not Configuration.swarm_mode:
DEPLOY_ENGINE = "compose"
else:
DEPLOY_ENGINE = "swarm"
Application.env["DEPLOY_ENGINE"] = DEPLOY_ENGINE
# Unfortunately this will only work after the creation of the network
# i.e. will be fallen back to 127.0.0.1 the first time
try:
docker_network = docker.network.inspect(
f"{Configuration.project}_{DEPLOY_ENGINE}_default"
)
if docker_network.ipam.config:
DOCKER_SUBNET = docker_network.ipam.config[0]["Subnet"]
else:
DOCKER_SUBNET = "127.0.0.1"
# The first execution will fail and fallen back to localhost
except DockerException:
DOCKER_SUBNET = "127.0.0.1"
Application.env["DOCKER_SUBNET"] = DOCKER_SUBNET
FAIL2BAN_IPTABLES = "legacy"
if str(Application.env["ACTIVATE_FAIL2BAN"]) == "1":
iptables_version = Packages.get_bin_version("iptables", clean_output=False)
nf_tables = iptables_version and "nf_tables" in iptables_version
if nf_tables:
FAIL2BAN_IPTABLES = "nf_tables"
Application.env["FAIL2BAN_IPTABLES"] = FAIL2BAN_IPTABLES
# Set Backend Python version
py_version = Application.env.get("BACKEND_PYTHON_VERSION", "v3.12")
py_values = configuration.BACKEND_PYTHON_VERSION_VALUES
if py_version == py_values.py39.value:
build_mode = "backend-legacy39"
elif py_version == py_values.py310.value:
build_mode = "backend-legacy310"
elif py_version == py_values.py311.value:
build_mode = "backend-legacy311"
else:
build_mode = "backend"
Application.env["BACKEND_BUILD_MODE"] = build_mode
# TODO: replace with removeprefix
py_version = str(py_version).replace("v", "")
PYTHON_PATH = f"/usr/local/lib/python{py_version}/dist-packages"
Application.env["PYTHON_PATH"] = PYTHON_PATH
configuration.validate_env(Application.env)
log.info("Environment configuration is valid")
with open(COMPOSE_ENVIRONMENT_FILE, "w+") as whandle:
for key, value in sorted(Application.env.items()):
if value is None:
value = ""
else:
value = str(value)
if " " in value:
value = f"'{value}'"
whandle.write(f"{key}={value}\n")
[docs]
@staticmethod
def create_datafile(services: list[str], active_services: list[str]) -> None:
try:
DATAFILE.unlink()
except FileNotFoundError:
pass
data = {
"submodules": [k for k, v in Application.gits.items() if v is not None],
"services": active_services,
"allservices": services,
}
with open(DATAFILE, "w+") as outfile:
json.dump(data, outfile)
[docs]
@staticmethod
def parse_datafile(key: str) -> list[str]:
try:
with open(DATAFILE) as json_file:
datafile = json.load(json_file)
return cast(list[str], datafile.get(key, []))
except FileNotFoundError:
return []
[docs]
@staticmethod
def autocomplete_service(
ctx: click.core.Context, param: click.Parameter, incomplete: str
) -> list[str]:
values = Application.parse_datafile("services")
if not incomplete:
return values
return [x for x in values if x.startswith(incomplete)]
[docs]
@staticmethod
def autocomplete_allservice(
ctx: click.core.Context, param: click.Parameter, incomplete: str
) -> list[str]:
values = Application.parse_datafile("allservices")
if not incomplete:
return values
return [x for x in values if x.startswith(incomplete)]
[docs]
@staticmethod
def autocomplete_submodule(
ctx: click.core.Context, param: click.Parameter, incomplete: str
) -> list[str]:
values = Application.parse_datafile("submodules")
if not incomplete:
return values
return [x for x in values if x.startswith(incomplete)]
[docs]
@staticmethod
def check_placeholders_and_passwords(
compose_services: ComposeServices, active_services: list[str]
) -> None:
if not active_services: # pragma: no cover
print_and_exit(
"""You have no active service
\nSuggestion: to activate a top-level service edit your project_configuration
and add the variable "ACTIVATE_DESIREDSERVICE: 1"
"""
)
elif Configuration.check:
log.info(
"Active services: {}", ", ".join(active_services), log_to_file=True
)
extra_services: list[str] = []
if Configuration.swarm_mode and REGISTRY not in active_services:
extra_services.append(REGISTRY)
all_services = active_services + extra_services
missing: dict[str, set[str]] = {}
passwords: dict[str, str] = {}
passwords_services: dict[str, set[str]] = {}
for service_name in all_services:
# This can happens with `rapydo run swagger` because in case of run
# the controller_init method is executed without passing the service
# This is because interfaces are not enabled on the base stack and the
# controller_init([service]) would fail
# As side effect, non-existing services are not blocked
if service_name not in compose_services:
continue
service = compose_services[service_name]
if service:
if service.environment is None:
log.error(
"Invalid env format, cannot check placeholders and passwords"
)
continue
if isinstance(service.environment, list): # pragma: no cover
log.error(
"Unsupported env format, cannot check placeholders and passwords"
)
continue
for key, value in service.environment.items():
if str(value) == PLACEHOLDER:
key = services.normalize_placeholder_variable(key)
missing.setdefault(key, set())
missing[key].add(service_name)
elif key.endswith("_PASSWORD") and value:
key = services.normalize_placeholder_variable(key)
passwords.setdefault(key, cast(str, value))
passwords_services.setdefault(key, set())
passwords_services[key].add(service_name)
placeholders = []
for variable, raw_services in missing.items():
if variable in services.vars_to_services_mapping:
serv = {services.vars_to_services_mapping[variable]}
else:
serv = raw_services
active_serv = [s for s in serv if s in all_services]
if active_serv:
placeholders.append([variable, ", ".join(active_serv)])
MIN_PASSWORD_SCORE = int(
Application.env.get("MIN_PASSWORD_SCORE", 2) # type: ignore
)
for variable, raw_services in passwords_services.items():
if variable in services.vars_to_services_mapping:
serv = {services.vars_to_services_mapping[variable]}
else:
serv = raw_services
active_serv = [s for s in serv if s in all_services]
if active_serv:
password = passwords.get(variable)
result = zxcvbn(password)
score = result["score"]
if score < MIN_PASSWORD_SCORE:
if score == MIN_PASSWORD_SCORE - 1:
log.warning("The password used in {} is weak", variable)
elif score == MIN_PASSWORD_SCORE - 2:
log.error("The password used in {} is very weak", variable)
else:
log.critical(
"The password used in {} is extremely weak", variable
)
if placeholders:
log.critical("The following variables are missing in your configuration:")
print_table(
["VARIABLE", "SERVICE(S)"],
placeholders,
table_title="Missing variables",
)
log.info("You can fix this error by updating your .projectrc file")
sys.exit(1)
return None
[docs]
@staticmethod
def git_update(ignore_submodule: list[str]) -> None:
for name, gitobj in Application.gits.items():
if name in ignore_submodule:
log.debug("Skipping update on {}", name)
continue
if gitobj and not git.can_be_updated(name, gitobj):
print_and_exit("Can't continue with updates")
controller_is_updated = False
for name, gitobj in Application.gits.items():
if name in ignore_submodule:
continue
if name == "do":
controller_is_updated = True
if gitobj:
git.update(name, gitobj)
if controller_is_updated:
installation_path = Packages.get_installation_path("rapydo")
# Can't be tested on GA since rapydo is alway installed from a folder
if not installation_path: # pragma: no cover
log.warning(
"Controller is not installed in editable mode, "
"rapydo is unable to update it"
)
elif Application.gits["do"].working_dir:
if installation_path.is_symlink():
installation_path = installation_path.resolve()
do_dir = Path(Application.gits["do"].working_dir)
if do_dir.is_symlink():
do_dir = do_dir.readlink()
if do_dir == installation_path:
log.info(
"Controller installed from {} and updated", installation_path
)
else:
log.warning(
"Controller not updated because it is installed outside this "
"project. Installation path is {}, the current folder is {}",
installation_path,
do_dir,
)
else: # pragma: no cover
log.warning("Controller submodule folder can't be found")
[docs]
@staticmethod
def git_checks(ignore_submodule: list[str]) -> None:
for name, gitobj in Application.gits.items():
if name in ignore_submodule:
log.debug("Skipping checks on {}", name)
continue
if gitobj:
git.check_updates(name, gitobj)
git.check_unstaged(name, gitobj)