Source code for docker_launch.ssh

import subprocess
from pathlib import Path
from typing import List, Optional, Type

import paramiko

from . import utils
from .connection import _get_ssh_client
from .typing import PathLike

SSH_DIR = Path.home() / ".ssh"


def _get_existent_default_private_key_path(
    include_incomplete: bool = False,
) -> List[Path]:
    def _exists(private_key_path: Path, pair_only: bool) -> bool:
        public_key_path = private_key_path.parent / (private_key_path.name + ".pub")
        return (
            (private_key_path.exists() and public_key_path.exists())
            if pair_only
            else (private_key_path.exists() or public_key_path.exists())
        )

    # Check if default keys exist or not. Paramiko searches for them by default.
    # https://docs.paramiko.org/en/stable/api/client.html#paramiko.client.SSHClient.connect
    default_keys = ["id_rsa", "id_dsa", "id_ecdsa"]
    return [
        SSH_DIR / key
        for key in default_keys
        if _exists(SSH_DIR / key, not include_incomplete)
    ]


[docs]def get_default_key_path(allow_locked: bool = False) -> Optional[Path]: default_keys = _get_existent_default_private_key_path() for key in default_keys: if allow_locked or (not is_locked(key)): return key
[docs]def is_locked(key_path: Path) -> bool: for key_handler in [paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey]: try: key_handler.from_private_key_file(key_path) return False except paramiko.PasswordRequiredException: return True except Exception: pass raise ValueError(f"Key type for '{key_path}' unknown.")
[docs]def ssh_copy_id(pubkey_path: PathLike, address: str, *, username: str = None) -> int: ipaddr, username = utils.parse_address(address, username) command = ["ssh-copy-id", "-i", str(pubkey_path), f"{username}@{ipaddr}"] with subprocess.Popen(command) as p: try: return_code = p.wait() except KeyboardInterrupt: return_code = p.wait() return return_code
[docs]def generate_default_key(comment: str = "generated-by-docker-launch") -> Path: key_generation_config = { "id_rsa": (paramiko.RSAKey, (4096,)), "id_dsa": (paramiko.DSSKey, (3072,)), "id_ecdsa": (paramiko.ECDSAKey, ()), } existent = _get_existent_default_private_key_path(include_incomplete=True) existent_types = [path.name for path in existent] new_key_type = set(key_generation_config.keys()) - set(existent_types) if len(new_key_type) < 1: raise FileExistsError( f"Default keys {list(key_generation_config.keys())} already exist. " "Consider using/unlocking them or remove unused one." ) new_key_name = new_key_type.pop() key_generator, args = key_generation_config[new_key_name] new_key = key_generator.generate(*args) public_key = f"{new_key.get_name()} {new_key.get_base64()} {comment}" private_key_path = SSH_DIR / new_key_name public_key_path = SSH_DIR / f"{new_key_name}.pub" new_key.write_private_key_file(private_key_path) public_key_path.write_text(public_key) public_key_path.chmod(0o644) return private_key_path
[docs]def get_ssh_error( address: str, *, username: str = None, port: int = 22, timeout: float = 3.0 ) -> Optional[Type[Exception]]: ipaddr, username = utils.parse_address(address, username) client = _get_ssh_client() try: client.connect(ipaddr, port=port, username=username, timeout=timeout) client.close() return except Exception as e: return e.__class__