import sys
from pathlib import Path
from typing import Any, Optional, Union
import yaml
from python_on_whales.components.compose.models import ComposeConfig
from python_on_whales.utils import DockerException
from controller import (
COMPOSE_FILE,
COMPOSE_FILE_VERSION,
RED,
REGISTRY,
log,
print_and_exit,
)
from controller.app import Configuration
from controller.deploy.docker import Docker
from controller.utilities import system
from controller.utilities.tables import print_table
Port = Union[str, int]
PortMapping = tuple[Port, Port]
PortRangeMapping = tuple[Port, Port, str]
# Starting from v2.0.0 _ is replaced by -
COMPOSE_SEP = "-"
[docs]
class Compose:
[docs]
def __init__(self, docker: Docker) -> None:
self.docker_wrapper = docker
self.docker = self.docker_wrapper.client
[docs]
def get_config(self) -> ComposeConfig:
# return type is Union[ComposeConfig, dict[str, Any]] based on return_json
return self.docker.compose.config(return_json=False) # type: ignore
[docs]
def get_config_json(self) -> dict[str, Any]:
# return type is Union[ComposeConfig, dict[str, Any]] based on return_json
return self.docker.compose.config(return_json=True) # type: ignore
[docs]
@staticmethod
def create_local_path(path: Path, label: str) -> None:
try:
path.mkdir(parents=True)
log.warning(
"A {} was missing and was automatically created: {}", label, path
)
except PermissionError:
uid = system.get_current_uid()
gid = system.get_current_gid()
command = f"sudo mkdir -p {path} && sudo chown {uid}:{gid} {path}"
hint = f"\nSuggested command: {RED(command)}"
print_and_exit(
"A {} is missing and can't be automatically created: {}{}",
label,
path,
hint,
)
[docs]
def dump_config(
self,
services: list[str],
set_registry: bool = True,
v1_compatibility: bool = False,
) -> None:
compose_config = self.get_config_json()
clean_config: dict[str, Any] = {
"version": compose_config.get("version", COMPOSE_FILE_VERSION),
"networks": {},
"volumes": {},
"services": {},
}
networks = set()
volumes = set()
binds: set[Path] = set()
registry = self.docker_wrapper.registry.get_host()
# Remove unused services, networks and volumes from compose configuration
for key, value in compose_config.get("services", {}).items():
if key not in services:
continue
if Configuration.swarm_mode and set_registry and key != REGISTRY:
value["image"] = f"{registry}/{value['image']}"
for non_null_key in (
"command",
"entrypoint",
):
if non_null_key in value and value[non_null_key] is None:
value.pop(non_null_key)
if "healthcheck" in value and "test" in value["healthcheck"]:
# healtcheck commands can contain env variables double-escaped ($$)
# When dumped to docker-compose.yml the double escape is removed
# and when started the single escaped variable is not resolved
# and breaks the command. Let's double all the $ to restore the
# expected behavior and counteract the consumed $
value["healthcheck"]["test"] = [
t.replace("$", "$$") for t in value["healthcheck"]["test"]
]
for k, v in value.get("environment", {}).items():
# Empty variables are converted to None...
# and None variables are not passed to the container
# This check can be removed when will be no longer covered
if v is None:
value["environment"][k] = ""
# Ports are forced to be int to prevent failures with compose
for idx, port in enumerate(value.get("ports", [])):
target_port = system.to_int(port["target"])
published_port = system.to_int(port["published"])
if target_port is None or published_port is None:
print_and_exit( # pragma: no cover
"Can't convert service ports to integers: {}-{}",
port["target"],
port["published"],
)
port["target"] = target_port
port["published"] = published_port
value["ports"][idx] = port
clean_config["services"][key] = value
for k in value.get("networks", {}).keys():
networks.add(k)
for k in value.get("volumes", []):
source = k.get("source", "")
volume_type = k.get("type", "")
if source and volume_type == "volume":
volumes.add(source.split(":")[0])
elif source and volume_type == "bind":
# Remove unsupported option: 'create_host_path'
if v1_compatibility:
k.get("bind", {}).pop("create_host_path", None)
binds.add(Path(source.split(":")[0]))
# Remove replicas if both replicas and global mode are set
if "deploy" in value: # pragma: no cover
if "replicas" in value["deploy"] and "mode" in value["deploy"]:
if value["deploy"]["mode"] == "global":
value["deploy"].pop("replicas")
# Missing folders are then automatically created by the docker engine
# the runs with root privileges and so create folders as root
# and this can often lead to issues with permissions.
for b in binds:
if not b.exists():
self.create_local_path(b, "bind folder")
for net in networks:
clean_config["networks"][net] = compose_config["networks"].get(net)
for vol in volumes:
volume_config = compose_config["volumes"].get(vol)
if "driver_opts" in volume_config:
device_type = volume_config["driver_opts"].get("type", "local")
device = volume_config["driver_opts"].get("device", "")
if device_type == "nfs" and device:
device = device.removeprefix(":")
d = Path(device)
if not d.exists():
self.create_local_path(d, "volume path")
clean_config["volumes"][vol] = volume_config
with open(COMPOSE_FILE, "w") as fh:
fh.write(yaml.dump(clean_config, default_flow_style=False))
log.debug("Compose configuration dumped on {}", COMPOSE_FILE)
[docs]
def start_containers(
self,
services: list[str],
force: bool = False,
scales: Optional[dict[str, int]] = None,
) -> None:
if scales:
# Based on rapydo scale implementation services is always a 1-length list
service = services[0]
nreplicas = scales.get(service, 0)
services_list = f"{service}={nreplicas}"
log.info("Scaling services: {}...", services_list)
else:
services_list = ", ".join(services)
scales = {}
log.info("Starting services: {}...", services_list)
self.docker.compose.up(
services=services,
build=False,
detach=True,
abort_on_container_exit=False,
force_recreate=force,
scales=scales,
)
if scales:
log.info("Services scaled: {}", services_list)
else:
log.info("Services started: {}", services_list)
[docs]
def create_volatile_container(
self,
service: str,
command: Optional[str] = None,
publish: Optional[list[Union[PortMapping, PortRangeMapping]]] = None,
# used by interfaces
detach: bool = False,
user: Optional[str] = None,
) -> bool:
compose_engine_forced = False
if Configuration.swarm_mode:
# import here to prevent circular imports
from controller.app import Application
if not Configuration.FORCE_COMPOSE_ENGINE:
compose_engine_forced = True
Configuration.FORCE_COMPOSE_ENGINE = True
# init is needed to reload the configuration to force compose engine
Application.get_controller().controller_init()
tty = sys.stdout.isatty()
try:
output = self.docker.compose.run(
service=service,
name=service,
command=Docker.split_command(command),
user=user,
detach=detach,
tty=tty and not detach,
stream=not tty and not detach,
dependencies=False,
remove=True,
service_ports=False,
publish=publish or [],
use_aliases=True,
)
if not detach:
for out_line in output: # type: ignore
# 'stdout' or 'stderr'
# Both out and err are collapsed in stdout
# Maybe in the future would be useful to keep them separated?
# stdstream = out_line[0]
line = out_line[1]
if isinstance(line, bytes):
line = line.decode("UTF-8")
print(line.strip())
if compose_engine_forced:
Configuration.FORCE_COMPOSE_ENGINE = False
# init is needed to reload the configuration to undo compose engine
Application.get_controller().controller_init()
return True
except DockerException as e:
log.critical(e)
return False
[docs]
def get_running_services(self) -> set[str]:
prefix = f"{Configuration.project}{COMPOSE_SEP}"
containers = set()
try:
for container in self.docker.compose.ps():
name = container.name
if not name.startswith(prefix):
continue
status = container.state.status
if status != "running" and status != "starting" and status != "ready":
continue
# to be replaced with removeprefix
name = name[len(prefix) :]
# Remove the _instancenumber (i.e. _1 or _n in case of scaled services)
name = name[0 : name.index(COMPOSE_SEP)]
containers.add(name)
return containers
# An exception is raised when no service is running.
# The same happens with:
# `docker compose ps`
# that fails with a "not found" and it seems to be a bug of compose-cli.
# In case it is a feature a specific exception would be helpful here
except DockerException:
return containers
[docs]
def get_services_status(self, prefix: str) -> dict[str, str]:
prefix += COMPOSE_SEP
services_status: dict[str, str] = dict()
try:
for container in self.docker.compose.ps():
name = container.name
if not name.startswith(prefix):
continue
status = container.state.status or "N/A"
# to be replaced with removeprefix
name = name[len(prefix) :]
# Remove the _instancenumber (i.e. _1 or _n in case of scaled services)
name = name[0 : name.index(COMPOSE_SEP)]
services_status[name] = status
return services_status
# An exception is raised when no service is running.
# The same happens with:
# `docker compose ps`
# that fails with a "not found" and it seems to be a bug of compose-cli.
# In case it is a feature a specific exception would be helpful here
except DockerException:
return services_status
[docs]
def status(self, services: list[str]) -> None:
print("")
prefix = f"{Configuration.project}{COMPOSE_SEP}"
table: list[list[str]] = []
for container in self.docker.compose.ps():
name = container.name
if not name.startswith(prefix):
continue
# to be replaced with removeprefix
name = name[len(prefix) :]
if COMPOSE_SEP in name:
name = name[0 : name.index(COMPOSE_SEP)]
if name not in services:
continue
status = container.state.status or "N/A"
if status == "shutdown" or status == "complete":
OPEN_COLOR = "[bold blue]"
CLOSE_COLOR = "[/bold blue]"
elif status == "running":
OPEN_COLOR = "[bold green]"
CLOSE_COLOR = "[/bold green]"
elif status == "starting" or status == "ready":
OPEN_COLOR = "[bold yellow]"
CLOSE_COLOR = "[/bold yellow]"
elif status == "failed":
OPEN_COLOR = "[bold red]"
CLOSE_COLOR = "[/bold red]"
else:
OPEN_COLOR = ""
CLOSE_COLOR = ""
ports_list = []
if container.network_settings.ports:
for (
container_port,
host_port,
) in container.network_settings.ports.items():
if host_port:
container_port = container_port.split("/")[0]
ports_list.append(
f"{container_port}->{host_port[0]['HostPort']}"
)
container_image = container.config.image or "N/A"
table.append(
[
container.id[0:12],
f"{OPEN_COLOR}{container.name}{CLOSE_COLOR}",
status,
container.created.strftime("%d-%m-%Y %H:%M:%S"),
container_image,
",".join(ports_list),
],
)
if not table:
log.info("No container is running")
else:
print_table(
["ID", "NAME", "STATUS", "CREATED", "IMAGE", "PORTS"],
table,
table_title="List of containers",
)
[docs]
def logs(self, services: list[str], follow: bool = False, tail: int = 500) -> None:
if len(services) > 1:
timestamps = False
log_prefix = True
elif services[0] in "frontend":
timestamps = True
log_prefix = False
else:
timestamps = False
log_prefix = False
lines = self.docker.compose.logs(
services,
follow=follow,
tail=str(tail),
timestamps=timestamps,
no_log_prefix=not log_prefix,
stream=True,
)
for log_line in lines:
# 'stdout' or 'stderr'
# Both out and err are collapsed in stdout
# Maybe in the future would be useful to keep them separated?
# stdstream = log_line[0]
line = log_line[1]
if isinstance(line, bytes):
line = line.decode("UTF-8")
print(line.strip())