necst.procedures.observations.progress

Lightweight observation-progress reporting utilities.

This module is intentionally independent of ROS imports. The observation runner may attach a Commander instance; if that Commander has an observation_progress publisher, JSON snapshots are published as std_msgs/String. File and topic publish failures are swallowed so progress reporting can never stop an observation.

json_safe(value)[source]

Convert common observation objects into JSON-serializable values.

Parameters:

value (Any) –

Return type:

Any

deep_update(base, update)[source]
Parameters:
  • base (Dict[str, Any]) –

  • update (Mapping[str, Any]) –

Return type:

Dict[str, Any]

class NullProgressReporter[source]

Bases: object

No-op reporter used before a real reporter is attached.

enabled = False
attach_commander(commander)[source]
Parameters:

commander (Any) –

Return type:

None

set_lifecycle(state, **fields)[source]
Parameters:
  • state (str) –

  • fields (Any) –

Return type:

None

update(**sections)[source]
Parameters:

sections (Any) –

Return type:

None

event(event, **fields)[source]
Parameters:
  • event (str) –

  • fields (Any) –

Return type:

None

set_plan(items, **fields)[source]
Parameters:
  • items (Iterable[Mapping[str, Any]]) –

  • fields (Any) –

Return type:

None

item(**fields)[source]
Parameters:

fields (Any) –

drive(**fields)[source]
Parameters:

fields (Any) –

integration(**fields)[source]
Parameters:

fields (Any) –

record_sidecars()[source]
Return type:

None

close(state='finished', **fields)[source]
Parameters:
  • state (str) –

  • fields (Any) –

Return type:

None

class ObservationProgressReporter(*, observation_type, record_name, obs_file=None, target=None, started_at_unix=None, logger=None, root=None)[source]

Bases: object

Best-effort structured progress reporter.

The reporter writes three files below NECST_PROGRESS_ROOT (default: /tmp/necst_progress):

  • current_observation_progress.json: latest snapshot for CLI/GUI.

  • <record_name>/observation_progress.json: latest snapshot for this run.

  • <record_name>/observation_events.jsonl: state-change event stream.

  • <record_name>/observation_plan.json: static-ish plan geometry.

None of the reporting operations raise to the caller.

Parameters:
  • observation_type (str) –

  • record_name (str) –

  • obs_file (Optional[Any]) –

  • target (Optional[str]) –

  • started_at_unix (Optional[float]) –

  • logger (Optional[Any]) –

  • root (Optional[Any]) –

attach_commander(commander)[source]
Parameters:

commander (Any) –

Return type:

None

update(_replace_sections=None, **sections)[source]

Best-effort snapshot update.

By default, nested dictionaries are merged so callers can update only a few fields. Some sections, however, must be replaced wholesale at state-boundaries. In particular plan and geometry may contain keys such as index0_end, lines, start or stop that are meaningful for one item but wrong for the next one. _replace_sections prevents stale keys from leaking across observation items and confusing necst progress --watch or a future GUI.

Parameters:
  • _replace_sections (Optional[Iterable[str]]) –

  • sections (Any) –

Return type:

None

set_lifecycle(state, **fields)[source]
Parameters:
  • state (str) –

  • fields (Any) –

Return type:

None

event(event, **fields)[source]
Parameters:
  • event (str) –

  • fields (Any) –

Return type:

None

set_plan(items, **fields)[source]
Parameters:
  • items (Iterable[Mapping[str, Any]]) –

  • fields (Any) –

Return type:

None

item(**fields)[source]
Parameters:

fields (Any) –

Return type:

Iterator[None]

drive(*, kind, stage, geometry=None, **fields)[source]
Parameters:
  • kind (str) –

  • stage (str) –

  • geometry (Optional[Mapping[str, Any]]) –

  • fields (Any) –

Return type:

Iterator[None]

integration(*, phase, metadata_position, id, line_index=None, location_context=None, geometry=None, **fields)[source]
Parameters:
  • phase (str) –

  • metadata_position (str) –

  • id (Any) –

  • line_index (Optional[int]) –

  • location_context (Optional[str]) –

  • geometry (Optional[Mapping[str, Any]]) –

  • fields (Any) –

Return type:

Iterator[None]

record_sidecars()[source]

Best-effort save of progress sidecars through the NECST recorder.

The recorder service can only accept files while the recorder is active, and the standard FileWriter prepends a comment header to saved files. This method is still useful during cleanup while recording is active, but copy_sidecars_to_local_record_dir() is used after the final lifecycle update to leave parseable JSON/JSONL sidecars in the local record directory when that directory is visible from this process.

Return type:

None

copy_sidecars_to_local_record_dir(*, record_name=None, record_root=None)[source]

Best-effort direct copy of final sidecars into a local record dir.

RecorderController.write_file rejects files after record('stop') and FileWriter adds comment headers to files saved through the service. Therefore the final parseable snapshot is copied directly when the NECST record directory is locally visible. This is deliberately best-effort: remote recorder deployments or different NECST_RECORD_ROOT values simply fall back to the live progress directory.

Parameters:
  • record_name (Optional[str]) –

  • record_root (Optional[Any]) –

Return type:

None

close(state='finished', **fields)[source]
Parameters:
  • state (str) –

  • fields (Any) –

Return type:

None