import time
from typing import Dict
from neclib.devices import LocalAttenuator
from necst_msgs.msg import LocalAttenuatorMsg
from rclpy.publisher import Publisher
from .. import namespace, topic
from ..core import DeviceNode
[docs]class LocalAttenuatorController(DeviceNode):
NodeName = "local_attenuator"
Namespace = namespace.rx
def __init__(self) -> None:
try:
super().__init__(self.NodeName, namespace=self.Namespace)
self.logger = self.get_logger()
self.io = LocalAttenuator()
self.publisher: Dict[str, Publisher] = {}
self.create_safe_subscription(
topic.local_attenuator_cmd, self.output_current
)
self.create_safe_timer(1, self.stream)
self.create_safe_timer(1, self.check_publisher)
self.logger.info(f"Started {self.NodeName} Node...")
except Exception as e:
self.logger.error(f"{self.NodeName} Node is shutdown due to Exception: {e}")
self.destroy_node()
[docs] def output_current(self, msg: LocalAttenuatorMsg) -> None:
if msg.finalize:
self.io.finalize()
return
for id in msg.id:
self.io.set_current(id=id, mA=msg.current)
self.io.apply_current()
self.logger.info(f"Set current {msg.current} mA for ch {msg.id}")
time.sleep(0.01)
[docs] def check_publisher(self) -> None:
for name in self.io.Config.channel.keys():
if name not in self.publisher:
self.publisher[name] = topic.local_attenuator.publisher(self)
[docs] def stream(self) -> None:
for name, publisher in self.publisher.items():
for id in self.io.Config.local_attenuator.channel.keys():
outputrange = self.io.get_outputrange(id)
msg = LocalAttenuatorMsg(
id=[id], outputrange=str(outputrange), time=time.time()
)
publisher.publish(msg)
time.sleep(0.01)
[docs]def main(args=None):
import rclpy
rclpy.init(args=args)
node = LocalAttenuatorController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.io.close()
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()