necst.utils.ros_interface

General utilities around ROS interfaces (msg / srv / action).

interface_type_path(interface, sep='/')[source]

Get interface type identifier.

Examples

>>> necst.utils.get_console_path(std_msgs.msg.Float64)
'std_msgs/msg/Float64'
Parameters
  • interface (Any) –

  • sep (str) –

Return type

str

get_absolute_name(name, namespace=None)[source]

Absolute name for node, topic, service, etc.

Examples

>>> node = rclpy.create_node("node_name", namespace="/test")
>>> necst.utils.get_absolute_name(node.get_name(), node.get_namespace())
'/test/node_name'
Parameters
  • name (S) –

  • namespace (Optional[str]) –

Return type

S

wait_for_server_to_pick_up(client, timeout_sec=None)[source]

Wait until ROS Service server is available to client.

Examples

>>> node = rclpy.create_node(...)
>>> cli = node.create_client(...)
>>> assert necst.utils.wait_for_server_to_pick_up(cli)
>>> cli.call_async(...)
Parameters
  • client (Client) –

  • timeout_sec (Optional[float]) –

Return type

bool

class Topic(msg_type: Any, topic: str, qos_profile: Union[int, rclpy.qos.QoSProfile], namespace: Optional[str] = None, support_index: bool = False)[source]

Bases: Generic[T]

Parameters
  • msg_type (Any) –

  • topic (str) –

  • qos_profile (Union[int, QoSProfile]) –

  • namespace (Optional[str]) –

  • support_index (bool) –

msg_type: Any
topic: str
qos_profile: Union[int, QoSProfile]
namespace: Optional[str] = None
support_index: bool = False
subscription(node, callback, **kwargs)[source]
Parameters
  • node (Node) –

  • callback (Callable[[Any], None]) –

Return type

Subscription

publisher(node, **kwargs)[source]
Parameters

node (Node) –

Return type

Publisher

get_children(node)[source]
Parameters

node (Node) –

Return type

Dict[str, Topic]

class Service(srv_type: Any, srv_name: str, namespace: Optional[str] = None, qos_profile: Union[int, rclpy.qos.QoSProfile, NoneType] = None)[source]

Bases: Generic[T]

Parameters
  • srv_type (Any) –

  • srv_name (str) –

  • namespace (Optional[str]) –

  • qos_profile (Optional[Union[int, QoSProfile]]) –

srv_type: Any
srv_name: str
namespace: Optional[str] = None
qos_profile: Optional[Union[int, QoSProfile]] = None
service(node, callback, **kwargs)[source]
Parameters
  • node (Node) –

  • callback (Callable[[Any, Any], Any]) –

Return type

Service

client(node, **kwargs)[source]
Parameters

node (Node) –

Return type

Client

import_msg(path)[source]
Parameters

path (str) –

Return type

Any

serialize(msg)[source]
Parameters

msg (Any) –

Return type

Generator[Dict[str, Any], None, None]