necst.core.auth.privileged_node

class PrivilegedNode(node_name, **kwargs)[source]

Bases: ServerNode

Manage privilege for conflict-unsafe operations.

Using this privilege client requires communication with running Authorizer node.

Variables
  • identity (str) – Unique identity of this privilege client instance.

  • has_privilege (bool) – Whether this client has privilege or not.

  • request_cli (rclpy.client.Client) – ROS 2 service client, to send privilege request.

  • signal_handler (function) – Function to finalize the node releasing the privilege.

  • ping_srv (rclpy.service.Service or None) – ROS 2 service server, to respond to status check from Authorizer.

Parameters

node_name (str) –

Examples

>>> class MyNode(necst.core.PrivilegedNode):
...     def __init__(self):
...         super().__init__("my_node")
...
...     @necst.core.require_privilege
...     def some_operation(self):
...         ...
...
... def main(args=None):
...     rclpy.init(args=args)
...     node = MyNode()
...     logger = node.get_logger()
...     try:
...         assert node.get_privilege(), "Privilege isn't granted"
...         # `Authorizer` should be running somewhere, to get privilege.
...         rclpy.spin(node)
...     except KeyboardInterrupt:
...         pass
...     except AssertionError:
...         import traceback
...         logger.warning(f"Error :\n{traceback.format_exc()}")
...     finally:
...         node.destroy_node()
...         rclpy.try_shutdown()
...
... if __name__ == "__main__":
...     main()
Namespace: str = '/necst/OMU1P85M/core/auth'
property has_privilege: bool

True if privilege is granted for this node.

get_privilege()[source]

Request for privilege.

Returns

privilegedTrue if successfully acquired privilege.

Return type

bool

quit_privilege()[source]

Give up privilege.

Returns

privilegedFalse if successfully released the privilege.

Return type

bool

static require_privilege(callable_obj=None, *, escape_cmd=[])[source]

Decorator to mark conflict-unsafe functions.

Parameters
  • escape_cmds – List of commands allowed to execute ignoring the privilege. Command should passsed as the first argument of the decorated function.

  • callable_obj (Optional[Callable[[...], Any]]) –

  • escape_cmd (List[Any]) –

Return type

Callable[[…], Any]

Notes

This decorator assumes the function is defined and called as method of arbitrary class, not a standalone function object.

Examples

>>> @necst.core.require_privilege
... def some_unsafe_operation(self, *args):
...     ...
>>> @necst.core.require_privilege(escape_cmds=["?"])
... def operation_or_query(self, command, *args):
...     ...
destroy_node()[source]

Add minimal privilege removal procedure.

More “formal” privilege removal i.e. quit_privilege cannot be invoked, when Ctrl-C is detected. This method doesn’t interfere with the behavior, instead make intra-node privilege check (require_privilege) can detect the removal.

The server will recognize the state change via its own _ping method.

Return type

None

require_privilege(callable_obj=None, *, escape_cmd=[])

Decorator to mark conflict-unsafe functions.

Parameters
  • escape_cmds – List of commands allowed to execute ignoring the privilege. Command should passsed as the first argument of the decorated function.

  • callable_obj (Optional[Callable[[...], Any]]) –

  • escape_cmd (List[Any]) –

Return type

Callable[[…], Any]

Notes

This decorator assumes the function is defined and called as method of arbitrary class, not a standalone function object.

Examples

>>> @necst.core.require_privilege
... def some_unsafe_operation(self, *args):
...     ...
>>> @necst.core.require_privilege(escape_cmds=["?"])
... def operation_or_query(self, command, *args):
...     ...