necst.core.auth.privileged_node¶
- class PrivilegedNode(node_name, **kwargs)[source]¶
Bases:
ServerNode
Manage privilege for conflict-unsafe operations.
Using this privilege client requires communication with running
Authorizer
node.- Variables
identity (str) – Unique identity of this privilege client instance.
has_privilege (bool) – Whether this client has privilege or not.
request_cli (rclpy.client.Client) – ROS 2 service client, to send privilege request.
signal_handler (function) – Function to finalize the node releasing the privilege.
ping_srv (rclpy.service.Service or None) – ROS 2 service server, to respond to status check from
Authorizer
.
- Parameters
node_name (str) –
Examples
>>> class MyNode(necst.core.PrivilegedNode): ... def __init__(self): ... super().__init__("my_node") ... ... @necst.core.require_privilege ... def some_operation(self): ... ... ... ... def main(args=None): ... rclpy.init(args=args) ... node = MyNode() ... logger = node.get_logger() ... try: ... assert node.get_privilege(), "Privilege isn't granted" ... # `Authorizer` should be running somewhere, to get privilege. ... rclpy.spin(node) ... except KeyboardInterrupt: ... pass ... except AssertionError: ... import traceback ... logger.warning(f"Error :\n{traceback.format_exc()}") ... finally: ... node.destroy_node() ... rclpy.try_shutdown() ... ... if __name__ == "__main__": ... main()
- Namespace: str = '/necst/OMU1P85M/core/auth'¶
- property has_privilege: bool¶
True
if privilege is granted for this node.
- get_privilege()[source]¶
Request for privilege.
- Returns
privileged –
True
if successfully acquired privilege.- Return type
bool
- quit_privilege()[source]¶
Give up privilege.
- Returns
privileged –
False
if successfully released the privilege.- Return type
bool
- static require_privilege(callable_obj=None, *, escape_cmd=[])[source]¶
Decorator to mark conflict-unsafe functions.
- Parameters
escape_cmds – List of commands allowed to execute ignoring the privilege. Command should passsed as the first argument of the decorated function.
callable_obj (Optional[Callable[[...], Any]]) –
escape_cmd (List[Any]) –
- Return type
Callable[[…], Any]
Notes
This decorator assumes the function is defined and called as method of arbitrary class, not a standalone function object.
Examples
>>> @necst.core.require_privilege ... def some_unsafe_operation(self, *args): ... ...
>>> @necst.core.require_privilege(escape_cmds=["?"]) ... def operation_or_query(self, command, *args): ... ...
- destroy_node()[source]¶
Add minimal privilege removal procedure.
More “formal” privilege removal i.e.
quit_privilege
cannot be invoked, when Ctrl-C is detected. This method doesn’t interfere with the behavior, instead make intra-node privilege check (require_privilege
) can detect the removal.The server will recognize the state change via its own
_ping
method.- Return type
None
- require_privilege(callable_obj=None, *, escape_cmd=[])¶
Decorator to mark conflict-unsafe functions.
- Parameters
escape_cmds – List of commands allowed to execute ignoring the privilege. Command should passsed as the first argument of the decorated function.
callable_obj (Optional[Callable[[...], Any]]) –
escape_cmd (List[Any]) –
- Return type
Callable[[…], Any]
Notes
This decorator assumes the function is defined and called as method of arbitrary class, not a standalone function object.
Examples
>>> @necst.core.require_privilege ... def some_unsafe_operation(self, *args): ... ...
>>> @necst.core.require_privilege(escape_cmds=["?"]) ... def operation_or_query(self, command, *args): ... ...