Source code for docker_launch.launch

"""Start, watch and terminate distributed containers.

Most of launch functionalities involve network communications, so they are run
concurrently in multi-threads.

"""

import concurrent.futures
import time
from typing import Any, Dict, Hashable, List, Tuple

import docker

from docker_launch import logger
from . import utils
from .config_parser import LaunchConfiguration, parse
from .exceptions import LaunchError
from .typing import PathLike


[docs]class Containers: def __init__(self, config_path: PathLike) -> None: self.config_path = config_path self.containers_list = [] self.last_ping = int(time.time()) @property def config(self) -> Dict[Hashable, List[LaunchConfiguration]]: return parse(self.config_path) @staticmethod def _flatten(dict_of_lists: Dict[Any, List]) -> List: ret = [] _ = [ret.extend(elem) for elem in dict_of_lists.values()] return ret
[docs] def start( self, **docker_run_kwargs ) -> Dict[str, List[docker.client.ContainerCollection]]: if len(self.containers_list) > 0: raise LaunchError("This process is already running a launch group.") def _start( client: docker.DockerClient, image: str, command: str, **kwargs ) -> docker.client.ContainerCollection: container = client.containers.run(image, command, detach=True, **kwargs) _base_url = client.api.base_url.split("//")[-1] logger.info( f"Container '{container.name}' ({container.short_id}) started " f"on '{_base_url}'" ) return container with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor: futures = [] for machine, conf in self.config.items(): daemon_url = utils.resolve_base_url(machine) client = docker.DockerClient(base_url=daemon_url) img_and_cmd = list(map(lambda x: (x["image"], x["cmd"]), conf)) _futures = [ executor.submit(_start, client, img, cmd, **docker_run_kwargs) for img, cmd in img_and_cmd ] futures.extend(_futures) _futures = concurrent.futures.as_completed(futures, timeout=60) _futures = map(lambda x: x.result(), _futures) self.containers_list.extend(_futures) return self.containers_list
[docs] def stop(self) -> None: def _stop(container: docker.client.ContainerCollection) -> None: try: container.stop(timeout=3) # Escalate to SIGKILL after 3 sec. logger.info(f"Container {container} has stopped.") except Exception as e: logger.warning(str(e)) logger.info("Gracefully stopping containers, may take time.") with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor: futures = [executor.submit(_stop, c) for c in self.containers_list] _ = concurrent.futures.as_completed(futures, timeout=30)
[docs] def remove(self) -> None: def _remove(container: docker.client.ContainerCollection) -> None: try: container.remove() logger.info(f"Container {container} successfully removed.") except Exception as e: logger.warning(str(e)) with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor: futures = [executor.submit(_remove, c) for c in self.containers_list] _ = concurrent.futures.as_completed(futures, timeout=30)
[docs] def ping(self) -> Dict[str, List[docker.client.ContainerCollection]]: now = int(time.time()) def _ping( container: docker.client.ContainerCollection, ) -> Tuple[docker.client.ContainerCollection, str]: try: container.reload() logs = container.logs(timestamps=True, since=self.last_ping, until=now) if logs: base_url = container.client.api.base_url info = f"{container.short_id}@{base_url} : {logs.decode('utf-8')}" logger.info(info) return container, container.status except docker.errors.APIError: return container, "not found" with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor: futures = [executor.submit(_ping, c) for c in self.containers_list] futures = concurrent.futures.as_completed(futures, timeout=30) self.last_ping = now result = [f.result() for f in futures] info = [{"container": c, "status": s} for c, s in result if s != "running"] return utils.groupby(info, "status")
[docs] def watch(self): try: while True: not_running = self.ping() if not_running: logger.info(str(not_running)) time.sleep(1) except Exception as e: logger.error(e) self.stop()
[docs] @classmethod def launch(cls, config_path: PathLike, **kwargs) -> None: """Launch containers described in config_path. .. warning:: To stop all the containers, press Ctrl+C. Killing the process will leave the launched containers unmanaged. """ c = cls(config_path) c.start(**kwargs) try: c.watch() finally: c.stop()
launch_containers = Containers.launch