necst.utils.ros_interface¶
General utilities around ROS interfaces (msg / srv / action).
- interface_type_path(interface, sep='/')[source]¶
Get interface type identifier.
Examples
>>> necst.utils.get_console_path(std_msgs.msg.Float64) 'std_msgs/msg/Float64'
- Parameters
interface (Any) –
sep (str) –
- Return type
str
- get_absolute_name(name, namespace=None)[source]¶
Absolute name for node, topic, service, etc.
Examples
>>> node = rclpy.create_node("node_name", namespace="/test") >>> necst.utils.get_absolute_name(node.get_name(), node.get_namespace()) '/test/node_name'
- Parameters
name (S) –
namespace (Optional[str]) –
- Return type
S
- wait_for_server_to_pick_up(client, timeout_sec=None)[source]¶
Wait until ROS Service server is available to client.
Examples
>>> node = rclpy.create_node(...) >>> cli = node.create_client(...) >>> assert necst.utils.wait_for_server_to_pick_up(cli) >>> cli.call_async(...)
- Parameters
client (Client) –
timeout_sec (Optional[float]) –
- Return type
bool
- class Topic(msg_type: Any, topic: str, qos_profile: Union[int, rclpy.qos.QoSProfile], namespace: Optional[str] = None, support_index: bool = False)[source]¶
Bases:
Generic
[T
]- Parameters
msg_type (Any) –
topic (str) –
qos_profile (Union[int, QoSProfile]) –
namespace (Optional[str]) –
support_index (bool) –
- msg_type: Any¶
- topic: str¶
- qos_profile: Union[int, QoSProfile]¶
- namespace: Optional[str] = None¶
- support_index: bool = False¶
- class Service(srv_type: Any, srv_name: str, namespace: Optional[str] = None, qos_profile: Union[int, rclpy.qos.QoSProfile, NoneType] = None)[source]¶
Bases:
Generic
[T
]- Parameters
srv_type (Any) –
srv_name (str) –
namespace (Optional[str]) –
qos_profile (Optional[Union[int, QoSProfile]]) –
- srv_type: Any¶
- srv_name: str¶
- namespace: Optional[str] = None¶
- qos_profile: Optional[Union[int, QoSProfile]] = None¶