Source code for necst.utils.ros_thread

__all__ = ["spinning", "ros2context"]

import threading
from contextlib import contextmanager
from typing import Optional, Sequence, Union

import rclpy
from neclib import get_logger
from rclpy.executors import Executor, MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node


[docs]class spinning: """Run tests with spinning some nodes. The implementation is inspired by: https://github.com/ros2/system_tests/blob/rolling/test_cli/test/utils.py#L77 Examples -------- >>> with spinning(node) as e: ... assert my_node.subscribed_parameter is not None """ def __init__( self, node: Union[Node, Sequence[Node]] = [], *, executor: Optional[Executor] = None, n_thread: int = 1, ) -> None: # Default executor should be new instance, since the use of # `get_global_executor` can cause deadlock. default_private_executor = ( SingleThreadedExecutor() if n_thread == 1 else MultiThreadedExecutor(n_thread) ) self.executor = executor or default_private_executor self.using_private_executor = executor is None nodes = [node] if isinstance(node, Node) else node executor_already_attached = [n for n in nodes if n.executor is not None] if len(executor_already_attached) > 0: get_logger(self.__class__.__name__).warning( f"Cannot spin {executor_already_attached}; executor already attached." ) self.nodes = list(filter(lambda n: n.executor is None, nodes)) [self.executor.add_node(n) for n in self.nodes if n.executor is None] def __enter__(self) -> None: self._stop = threading.Event() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def _run(self) -> None: while not self._stop.is_set(): self.executor.spin_once(timeout_sec=0.25) def __exit__(self, exc_type, exc_value, traceback) -> None: self._stop.set() self._thread.join() _ = [self.executor.remove_node(n) for n in self.nodes] if self.using_private_executor: self.executor.shutdown()
[docs]@contextmanager def ros2context(args=None): logger = get_logger(__name__) rclpy.init(args=args) try: yield except Exception as e: logger.error(str(e)) finally: rclpy.shutdown()