Source code for necst.rx.powermeter

import time
from typing import Dict

from neclib.devices import PowerMeter
from necst_msgs.msg import DeviceReading
from rclpy.publisher import Publisher

from .. import namespace, topic
from ..core import DeviceNode


[docs]class PowermeterController(DeviceNode): NodeName = "powermeter" Namespace = namespace.rx def __init__(self) -> None: try: super().__init__(self.NodeName, namespace=self.Namespace) self.logger = self.get_logger() self.io = PowerMeter() self.publisher: Dict[str, Publisher] = {} self.create_safe_timer(1, self.stream) self.create_safe_timer(1, self.check_publisher) self.logger.info(f"Started {self.NodeName} Node...") except Exception as e: self.logger.error(f"{self.NodeName} Node is shutdown due to Exception: {e}") self.destroy_node()
[docs] def check_publisher(self) -> None: for name in self.io.keys(): if name not in self.publisher: self.publisher[name] = topic.powermeter.publisher(self)
[docs] def stream(self) -> None: for name, publisher in self.publisher.items(): power = self.io.get_power().to_value("dBm").item() msg = DeviceReading(time=time.time(), value=power, id="") publisher.publish(msg)
[docs]def main(args=None): import rclpy rclpy.init(args=args) node = PowermeterController() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.io.close() node.destroy_node() rclpy.try_shutdown()
if __name__ == "__main__": main()