"""Start, watch and terminate distributed containers.
Most of launch functionalities involve network communications, so they are run
concurrently in multi-threads.
"""
import concurrent.futures
import time
from typing import Any, Dict, Hashable, List, Tuple
import docker
from docker_launch import logger
from . import utils
from .config_parser import LaunchConfiguration, parse
from .exceptions import LaunchError
from .typing import PathLike
[docs]class Containers:
def __init__(self, config_path: PathLike) -> None:
self.config_path = config_path
self.containers_list = []
self.last_ping = int(time.time())
@property
def config(self) -> Dict[Hashable, List[LaunchConfiguration]]:
return parse(self.config_path)
@staticmethod
def _flatten(dict_of_lists: Dict[Any, List]) -> List:
ret = []
_ = [ret.extend(elem) for elem in dict_of_lists.values()]
return ret
[docs] def start(
self, **docker_run_kwargs
) -> Dict[str, List[docker.client.ContainerCollection]]:
if len(self.containers_list) > 0:
raise LaunchError("This process is already running a launch group.")
def _start(
client: docker.DockerClient, image: str, command: str, **kwargs
) -> docker.client.ContainerCollection:
container = client.containers.run(image, command, detach=True, **kwargs)
_base_url = client.api.base_url.split("//")[-1]
logger.info(
f"Container '{container.name}' ({container.short_id}) started "
f"on '{_base_url}'"
)
return container
with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
futures = []
for machine, conf in self.config.items():
daemon_url = utils.resolve_base_url(machine)
client = docker.DockerClient(base_url=daemon_url)
img_and_cmd = list(map(lambda x: (x["image"], x["cmd"]), conf))
_futures = [
executor.submit(_start, client, img, cmd, **docker_run_kwargs)
for img, cmd in img_and_cmd
]
futures.extend(_futures)
_futures = concurrent.futures.as_completed(futures, timeout=60)
_futures = map(lambda x: x.result(), _futures)
self.containers_list.extend(_futures)
return self.containers_list
[docs] def stop(self) -> None:
def _stop(container: docker.client.ContainerCollection) -> None:
try:
container.stop(timeout=3) # Escalate to SIGKILL after 3 sec.
logger.info(f"Container {container} has stopped.")
except Exception as e:
logger.warning(str(e))
logger.info("Gracefully stopping containers, may take time.")
with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
futures = [executor.submit(_stop, c) for c in self.containers_list]
_ = concurrent.futures.as_completed(futures, timeout=30)
[docs] def remove(self) -> None:
def _remove(container: docker.client.ContainerCollection) -> None:
try:
container.remove()
logger.info(f"Container {container} successfully removed.")
except Exception as e:
logger.warning(str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
futures = [executor.submit(_remove, c) for c in self.containers_list]
_ = concurrent.futures.as_completed(futures, timeout=30)
[docs] def ping(self) -> Dict[str, List[docker.client.ContainerCollection]]:
now = int(time.time())
def _ping(
container: docker.client.ContainerCollection,
) -> Tuple[docker.client.ContainerCollection, str]:
try:
container.reload()
logs = container.logs(timestamps=True, since=self.last_ping, until=now)
if logs:
base_url = container.client.api.base_url
info = f"{container.short_id}@{base_url} : {logs.decode('utf-8')}"
logger.info(info)
return container, container.status
except docker.errors.APIError:
return container, "not found"
with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
futures = [executor.submit(_ping, c) for c in self.containers_list]
futures = concurrent.futures.as_completed(futures, timeout=30)
self.last_ping = now
result = [f.result() for f in futures]
info = [{"container": c, "status": s} for c, s in result if s != "running"]
return utils.groupby(info, "status")
[docs] def watch(self):
try:
while True:
not_running = self.ping()
if not_running:
logger.info(str(not_running))
time.sleep(1)
except Exception as e:
logger.error(e)
self.stop()
[docs] @classmethod
def launch(cls, config_path: PathLike, **kwargs) -> None:
"""Launch containers described in config_path.
.. warning::
To stop all the containers, press Ctrl+C. Killing the process will leave the
launched containers unmanaged.
"""
c = cls(config_path)
c.start(**kwargs)
try:
c.watch()
finally:
c.stop()
launch_containers = Containers.launch