Source code for necst.ctrl.dome.dome_controller

__all__ = ["DomeController"]

import time
from typing import Optional, Tuple

from neclib.coordinates import CoordinateGeneratorManager, DriveLimitChecker, PathFinder
from neclib.coordinates.paths import ControlContext
from necst_msgs.msg import ControlStatus, CoordMsg
from necst_msgs.srv import CoordinateCommand, DomeSync

from ... import config, namespace, service, topic
from ...core import AlertHandlerNode


[docs]class DomeController(AlertHandlerNode): NodeName = "dome_coord" Namespace = namespace.dome def __init__(self) -> None: super().__init__(self.NodeName, namespace=self.Namespace) self.logger = self.get_logger() self.cmd = None self.enc_az = None self.enc_time = 0 self.dome_sync = False self.finder = PathFinder( config.location, config.antenna_pointing_parameter_path ) drive_limit = config.dome_drive self.optimizer = DriveLimitChecker( drive_limit.critical_limit_az, drive_limit.warning_limit_az ) self.publisher = topic.dome_altaz_cmd.publisher(self) topic.dome_encoder.subscription(self, self._update_enc) service.dome_coord.service(self, self._update_cmd) service.dome_sync.service(self, self._update_sync_mode) self.status_publisher = topic.dome_control_status.publisher(self) self.create_timer(1 / config.dome_command_frequency, self.command_realtime) self.create_timer(0.5, self.convert) self.result_queue = [] self.tracking_ok = False self.executing_generator = CoordinateGeneratorManager() self.last_status = None self.direct_mode = True self.gc = self.create_guard_condition(self._clear_cmd) def _clear_cmd(self) -> None: self.cmd = None self.result_queue.clear() def _update_cmd( self, request: CoordinateCommand.Request, response: CoordinateCommand.Response ) -> CoordinateCommand.Response: """Update the target coordinate command. When new command has been received, conversion result will stop for a moment, since coordinate conversion for new coordinate cannot be performed immediately. This suspension won't affect PID control, as it checks the command time. Notes ----- The conversion result will be cleared immediately after receiving the new command. This won't irregular drive, as this command doesn't contain detailed and frequent coordinate information. This is the case for scan command, as the command only contains start/stop position and scan speed. """ if self.dome_sync: response.check = False return response else: self.cmd = request self._parse_cmd(request) self.result_queue.clear() response.id = str(id(self.executing_generator.get())) return response def _update_sync_mode( self, request: DomeSync.Request, response: DomeSync.Response ) -> DomeSync.Response: self.dome_sync = request.dome_sync response.check = self.dome_sync self._clear_cmd() return response def _update_enc(self, msg: CoordMsg) -> None: if (msg.unit != "deg") or (msg.frame != "altaz"): self.logger.debug("Invalid encoder reading detected.") return self.enc_az = msg.lon self.enc_time = msg.time
[docs] def command_realtime(self) -> None: if self.status.critical(): self.logger.warning("Guard condition activated", throttle_duration_sec=1) # Avoid sudden resumption of telescope drive return self.gc.trigger() # No realtime-ness check is performed, just filter outdated commands out now = time.time() cmd = None if len(self.result_queue) > 0: while len(self.result_queue) > 0: cmd = self.result_queue.pop(0) if cmd[2] > now: break if cmd: msg = CoordMsg( lon=cmd[0], lat=cmd[1], time=cmd[2], unit="deg", frame="altaz" ) self.publisher.publish(msg)
def _parse_cmd(self, msg: CoordinateCommand.Request) -> None: if msg.direct_mode: self.direct_mode = True else: self.direct_mode = False self.finder.direct_mode = self.direct_mode self.logger.debug(f"Got POINT-TO-COORD command: {msg}") new_generator = self.finder.track( msg.lon[0], msg.lat[0], msg.frame, unit=msg.unit, ) self.executing_generator.attach(new_generator)
[docs] def convert(self) -> None: if (self.cmd is not None) and (self.enc_time < time.time() - 5): # Don't resume normal operation after communication with encoder lost for 5s self.logger.error( "Lost the communication with the encoder. Command to drive to " f"{self.cmd} has been discarded." ) self.cmd = None if self.cmd is None: return self.telemetry(None) if (len(self.result_queue) > 1) and ( self.result_queue[-1][2] > time.time() + config.dome_command_offset_sec ): # This function will be called twice per 1s, to ensure no run-out of command # but it can cause overloading the data, so judge command update necessity. return try: coord = next(self.executing_generator) self.telemetry(coord.context) except (StopIteration, TypeError): self.cmd = None self.executing_generator.clear() return self.telemetry(None) az = self._validate_drive_range(coord.az) for _az, _t in zip(az, coord.time): if any(x is None for x in [_az, _t]): continue # Remove chronologically duplicated/overlapping commands self.result_queue = list(filter(lambda x: x[2] < _t, self.result_queue)) cmd = (float(_az.to_value("deg")), 45.0, _t) self.result_queue.append(cmd)
def _validate_drive_range(self, az) -> Tuple: # All values are Quantity. enc_az = 180 if self.enc_az is None else self.enc_az _az = self.optimizer.optimize(enc_az, az.to_value("deg"), unit="deg") if _az is not None: return _az return []
[docs] def telemetry(self, status: Optional[ControlContext]) -> None: if status is None: msg = ControlStatus( controlled=False, tight=False, remote=True, id=str(id(self.executing_generator.get())), interrupt_ok=True, time=time.time() + config.antenna_command_offset_sec, ) else: msg = ControlStatus( controlled=True, tight=status.tight, remote=True, id=str(id(self.executing_generator.get())), interrupt_ok=status.infinite and (not status.waypoint), time=status.start, ) self.last_status = msg self.status_publisher.publish(msg)