Source code for neclib.devices.device_base

from abc import ABC, abstractmethod
from collections import UserDict
from typing import (
    Any,
    ClassVar,
    Dict,
    Generic,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
    final,
    overload,
)

from .. import config, utils
from ..core import Parameters
from ..core.configuration import Configuration
from ..core.normalization import partial


[docs]def get_device_configuration(): return {k[:-2]: v for k, v in config.items() if k.endswith("._")}
[docs]class DeviceBase(ABC): Model: ClassVar[str] Manufacturer: ClassVar[str] Identifier: ClassVar[Optional[str]] = None is_simulator: ClassVar[bool] = False Config: ClassVar[Union[Configuration, None]] = None _instances: ClassVar[Dict[Any, "DeviceBase"]] _initialized: ClassVar[bool] = False _implementations: List[Type["DeviceBase"]] = [] _is_autogenerated: ClassVar[bool] = False _kind: ClassVar[Type["DeviceBase"]] def __init_subclass__(cls) -> None: if hasattr(cls, "Model") and (not cls._is_autogenerated): # `cls.__subclasses__()` returns its immediate subclasses only, and they # should be device kind definition, not controller implementations for each # model. This (`cls._implementations`) is similar to that, but this includes # grandchild classes, and not includes abstract classes (no `cls.Model` is # set). cls._implementations.append(cls) if not hasattr(cls, "_instances"): setattr(cls, "_instances", {}) if cls.__base__ is DeviceBase: cls._kind = cls def __new__(cls) -> "DeviceBase": if (cls is DeviceBase) or (not hasattr(cls, "Model")): # Ensure no instantiation of ABC, to avoid unintended class variable change. raise TypeError # Singleton check. identity = cls._identity(cls.Config, cls.Identifier) if identity not in cls._instances: inst = super().__new__(cls) cls._instances[identity] = inst # Initialization status check. if cls._initialized: cls.__init__ = lambda *args, **kwargs: None cls._initialized = True return cls._instances[identity]
[docs] @classmethod def get_simulator_class(cls) -> Type["DeviceBase"]: if cls.is_simulator: return cls same_kind = cls._kind.__subclasses__() simulator = [c for c in same_kind if c.is_simulator] if len(simulator) == 0: raise NotImplementedError( f"No simulator implementation for {cls.__name__} " f"({cls._kind.__name__}) is found." ) return simulator[0]
@staticmethod def _normalize(key: str, /) -> str: return key.replace("_", "").lower() @classmethod def _find_config( cls, name: str, identifier: Optional[str] = None ) -> Optional[Configuration]: this_device_config = config[name] model = cls._normalize(this_device_config._) # type: ignore identity = None if identifier is not None: identity = getattr(this_device_config, identifier, None) device_configuration = get_device_configuration() model_filtered = [ k for k, v in device_configuration.items() if cls._normalize(v) == model ] _cfgs: List[Configuration] = [config[k] for k in model_filtered] # type: ignore cfg = None for _cfg in _cfgs: if (identifier is None) or (getattr(_cfg, identifier, None) == identity): cfg += _cfg return cfg @staticmethod def _identity( cfg: Union[Configuration, None], identifier: Optional[str] = None ) -> Any: if identifier is None: return None identity = getattr(cfg, identifier, None) if isinstance(identity, Parameters): return None return identity @overload @classmethod def bind(cls, name: str, model: str) -> Type["DeviceBase"]: ... @overload @classmethod def bind( cls, name: str, model: Dict[str, str] ) -> "Devices[str, Type[DeviceBase]]": ...
[docs] @final @classmethod def bind( cls, name: str, model: Union[str, Dict[str, str]] ) -> Union[Type["DeviceBase"], "Devices[str, Type[DeviceBase]]"]: impl = {cls._normalize(_impl.Model): _impl for _impl in cls._implementations} if isinstance(model, dict): bound_devices = { key: cls.bind(f"{name}.{key}", _model) for key, _model in model.items() } return Devices(bound_devices) else: model_impl = impl[cls._normalize(model)] if config.simulator: try: simulator = model_impl.get_simulator_class() simulator.Identifier = model_impl.Identifier model_impl = simulator except NotImplementedError: pass Name = ".".join([utils.toCamelCase(n) for n in name.split(".")]) cfg = cls._find_config(name, model_impl.Identifier) identity = cls._identity(cfg, model_impl.Identifier) if identity in model_impl._instances: return model_impl._instances[identity].__class__ return type( Name, (model_impl,), dict(Config=cfg, __module__=cls.__module__, _is_autogenerated=True), )
def __repr__(self) -> str: model = f"model={self.Model}" manufacturer = f"manufacturer={self.Manufacturer}" identity = "<no device identity defined>" if self.Identifier: _id = getattr(self.Config, self.Identifier, None) identity = f"{self.Identifier}={_id!r}" metadata = ", ".join([model, manufacturer, identity]) return f"{self.__class__.__name__}({metadata})" __str__ = __repr__
[docs] @abstractmethod def finalize(self) -> None: ...
T_key = TypeVar("T_key") T_value = TypeVar("T_value")
[docs]class Devices(UserDict, Generic[T_key, T_value]): def __repr__(self) -> str: return f"{self.__class__.__name__}({super().__repr__()})" __str__ = __repr__ def __init__( self, anonymous_device: Optional[ Union[ DeviceBase, Type[DeviceBase], Dict[str, DeviceBase], Dict[str, Type[DeviceBase]], ] ] = None, /, **named_devices: Union[DeviceBase, Type[DeviceBase]], ) -> None: super().__init__() if (anonymous_device is not None) and (len(named_devices) > 0): raise ValueError("Cannot specify both anonymous and named devices") if anonymous_device is None: self.update(named_devices) elif isinstance(anonymous_device, dict): # Keep compatibility with `dict` constructor's signature; # `__init__(__map, /, **kwargs) -> None` self.update(anonymous_device) else: self.update({None: anonymous_device}) @property def _is_single_anonymous_device(self) -> bool: return (len(self) == 1) and (list(self.keys())[0] is None) def _parse_id_query( self, id: Optional[str] = None ) -> Tuple[Union[None, str], Union[None, str]]: if id is None: return (None, None) if self._is_single_anonymous_device: # Anonymous device doesn't have device-ID by definition return (None, id) if id.find(".") != -1: # Dot-separated value is parsed to `<device-ID>.<channel-ID>` return tuple(id.rsplit(".", 1)) # Device-ID takes precedence when given `id` doesn't match certain pattern return (id, None) @overload def __call__(self, *args, id: str, **kwargs) -> DeviceBase: ... @overload def __call__(self, *args, **kwargs) -> "Devices": ... def __call__(self, *args, **kwargs) -> Union[DeviceBase, "Devices"]: """Emulate initialization of attached device controllers.""" device_id, channel_id = self._parse_id_query(kwargs.get("id", None)) if (device_id is None) and (channel_id is None): return Devices({k: v(*args, **kwargs) for k, v in self.items()}) kwargs["id"] = channel_id return self[device_id](*args, **kwargs) def __getattr__(self, key: str, /) -> Any: """Emulate attribute access to attached device controllers.""" targets = {k: getattr(v, key) for k, v in self.items()} if all(callable(t) for t in targets.values()): def func(*args, **kwargs) -> Union[Any, Dict[Union[str, None], Any]]: device_id, channel_id = self._parse_id_query(kwargs.get("id", None)) if (device_id is None) and (channel_id is None): if self._is_single_anonymous_device: return targets[None](*args, **kwargs) return {k: t(*args, **kwargs) for k, t in targets.items()} kwargs["id"] = channel_id return targets[device_id](*args, **kwargs) return func elif self._is_single_anonymous_device: return targets[None] else: return targets def __getitem__(self, key: Union[str, None], /) -> Any: try: return super().__getitem__(key) except KeyError: pass if isinstance(key, str): item_normalized = { k.replace("_", "").lower() if isinstance(k, str) else k: v for k, v in self.items() } _key = key.replace("_", "").lower() if _key in item_normalized: return item_normalized[_key] device_id, channel_id = self._parse_id_query(_key) if device_id in item_normalized: item = item_normalized[device_id] for attr_name in dir(item): attr = getattr(item, attr_name, None) if attr is None: continue try: modified = partial(attr, kwargs={"id": channel_id}) print(attr_name, modified) object.__setattr__(item, attr_name, modified) except TypeError as e: print(attr_name, e) continue return item raise KeyError(channel_id) raise KeyError(key)