necst.rx.spectral_recording_runtime

Runtime state and helpers for spectral-recording active setup.

This module is intentionally ROS-free. necst.rx.spectrometer.SpectralData uses these helpers for PR4/PR5, while the unit tests exercise them with simple Python objects.

Implemented PR scope

PR4:
  • active snapshot setup state;

  • setup hash validation;

  • setup gate closed-by-default;

  • legacy channel_binning / tp_mode conflict policy.

PR5:
  • spectrum full/slice branch;

  • no double slicing;

  • explicit skip for TP streams until PR6.

PR6:
  • TP direct append chunk builder;

  • all-NaN / zero-valid-channel semantics;

  • TP stream path and metadata helpers.

exception SpectralRecordingRuntimeError[source]

Bases: RuntimeError

Base error for runtime spectral-recording setup failures.

exception SpectralRecordingGateError[source]

Bases: SpectralRecordingRuntimeError

Raised when setup gate cannot be opened.

canonical_snapshot_sha256_from_text(snapshot_toml)[source]
Parameters:

snapshot_toml (str) –

Return type:

str

normalize_snapshot_hash(snapshot, raw_text='')[source]
Parameters:
  • snapshot (Mapping[str, Any]) –

  • raw_text (str) –

Return type:

str

load_snapshot_from_toml_text(snapshot_toml)[source]
Parameters:

snapshot_toml (str) –

Return type:

Dict[str, Any]

class ActiveSpectralRecordingSetup(snapshot: 'Dict[str, Any]', setup_id: 'str', setup_hash: 'str', gate_allow_save: 'bool' = False, legacy_tp_policy: 'str' = 'error', warnings: 'List[str]' = <factory>, runtime_errors: 'List[str]' = <factory>, fatal_error: 'str' = '')[source]

Bases: object

Parameters:
  • snapshot (Dict[str, Any]) –

  • setup_id (str) –

  • setup_hash (str) –

  • gate_allow_save (bool) –

  • legacy_tp_policy (str) –

  • warnings (List[str]) –

  • runtime_errors (List[str]) –

  • fatal_error (str) –

snapshot: Dict[str, Any]
setup_id: str
setup_hash: str
gate_allow_save: bool = False
legacy_tp_policy: str = 'error'
warnings: List[str]
runtime_errors: List[str]
fatal_error: str = ''
classmethod from_snapshot_toml(*, snapshot_toml, snapshot_sha256='', setup_id='', strict=True)[source]
Parameters:
  • snapshot_toml (str) –

  • snapshot_sha256 (str) –

  • setup_id (str) –

  • strict (bool) –

Return type:

ActiveSpectralRecordingSetup

property streams: Dict[str, Dict[str, Any]]
property enabled_stream_ids: List[str]
property spectrum_stream_ids: List[str]
property tp_stream_ids: List[str]
property active_mode_summary: str
latch_fatal_error(message)[source]

Record a fatal active-mode error and close the setup gate.

Active snapshot mode must not silently skip rows. Once a schema/setup error is detected, later save attempts remain blocked until the setup is explicitly cleared and a new setup is applied.

Parameters:

message (str) –

Return type:

None

assert_no_fatal_error()[source]
Return type:

None

set_gate(*, setup_id, setup_hash, allow_save)[source]
Parameters:
  • setup_id (str) –

  • setup_hash (str) –

  • allow_save (bool) –

Return type:

None

streams_for_raw(spectrometer_key, board_id)[source]

Return all saved products produced from one raw spectrometer input.

In normal full/channel/envelope modes this list has length one. In saved_window_policy=multi_window it can contain multiple recorded products that share the same raw input key but have different saved channel ranges and DB output paths.

The stream dictionaries are setup-lifetime cached objects. Callers must treat them as read-only. Returning the cached references avoids copying every stream dictionary in the high-rate spectral callback.

Parameters:
  • spectrometer_key (str) –

  • board_id (int) –

Return type:

Tuple[Mapping[str, Any], …]

stream_for_raw(spectrometer_key, board_id)[source]
Parameters:
  • spectrometer_key (str) –

  • board_id (int) –

Return type:

Optional[Dict[str, Any]]

check_save_allowed()[source]
Return type:

bool

reject_legacy_channel_binning()[source]
Return type:

None

reject_legacy_tp_mode()[source]
Return type:

None

class SpectralRecordingRuntimeState[source]

Bases: object

Mutable holder used by SpectralData.

property active: bool
apply(*, snapshot_toml, snapshot_sha256, setup_id, strict=True)[source]
Parameters:
  • snapshot_toml (str) –

  • snapshot_sha256 (str) –

  • setup_id (str) –

  • strict (bool) –

Return type:

ActiveSpectralRecordingSetup

set_gate(*, setup_id, setup_hash, allow_save)[source]
Parameters:
  • setup_id (str) –

  • setup_hash (str) –

  • allow_save (bool) –

Return type:

None

clear(*, setup_id='', setup_hash='', strict=True)[source]
Parameters:
  • setup_id (str) –

  • setup_hash (str) –

  • strict (bool) –

Return type:

List[str]

latch_fatal_error(message)[source]

Latch a fatal active-mode error without raising from async callbacks.

ROS subscription callbacks such as legacy TPMode/Binning conflict handlers should make the active setup fail-closed, but they should not depend on an unhandled exception escaping the callback. Recording paths observe the latched state via assert_no_fatal_error() before any append.

Parameters:

message (str) –

Return type:

None

assert_no_fatal_error()[source]
Return type:

None

streams_for_raw(spectrometer_key, board_id)[source]
Parameters:
  • spectrometer_key (str) –

  • board_id (int) –

Return type:

Tuple[Mapping[str, Any], …]

stream_for_raw(spectrometer_key, board_id)[source]
Parameters:
  • spectrometer_key (str) –

  • board_id (int) –

Return type:

Optional[Dict[str, Any]]

slice_spectrum_for_stream(stream, spectral_data)[source]

Return full/sliced spectrum for one stream.

The runtime input is expected to be the full XFFTS spectrum. The slice is applied exactly once using full-channel inclusive/exclusive indices.

Parameters:
  • stream (Mapping[str, Any]) –

  • spectral_data (Any) –

Return type:

Any

spectrum_extra_chunk(stream, setup)[source]

Extra metadata fields appended to the normal Spectral.msg chunk.

Parameters:
Return type:

List[Dict[str, Any]]

legacy_spectral_string_field(key, value, length)[source]

Return a string field byte-for-byte compatible with Spectral.msg.

SpectralData._spectral_chunk() historically kept string<=N fields as Python strings padded with spaces before the NECSTDB writer converted them to Ns columns. The active-mode fast path must preserve that base-column convention; otherwise existing readers may see NUL-padded values for position, id, line_label or time_spectrometer.

Parameters:
  • key (str) –

  • value (Any) –

  • length (int) –

Return type:

Dict[str, Any]

spectrum_chunk_for_stream(stream, setup, *, time, time_spectrometer, position, obs_id, line_index, line_label, spectral_data)[source]

Build the active-mode spectrum chunk without constructing a ROS message.

This is the high-rate counterpart of the legacy Spectral.msg path. The field order and base field names intentionally follow necst_msgs/Spectral so existing NECSTDB readers see the same dynamic columns, followed by the active setup provenance columns from spectrum_extra_chunk.

The base string columns intentionally use the same space-padded string<=N representation as SpectralData._spectral_chunk(). Static active-setup provenance columns keep the fixed-byte runtime helper used by PR8/PR13.

Parameters:
  • stream (Mapping[str, Any]) –

  • setup (ActiveSpectralRecordingSetup) –

  • time (float) –

  • time_spectrometer (str) –

  • position (str) –

  • obs_id (str) –

  • line_index (int) –

  • line_label (str) –

  • spectral_data (Any) –

Return type:

List[Dict[str, Any]]

namespace_db_path(namespace_root, db_table_path)[source]

Convert canonical snapshot data/... path to current NECSTDB append path.

Parameters:
  • namespace_root (str) –

  • db_table_path (str) –

Return type:

str

fixed_string_bytes(value, length, *, key='')[source]

Encode and NUL-pad a value to exactly length bytes.

The check is byte-based, not Python-character-based, because NECSTDB uses struct byte string formats. Oversized values are rejected rather than silently truncated; truncating tp_windows_json would make the JSON invalid and truncating identifiers would make provenance ambiguous.

Parameters:
  • value (Any) –

  • length (int) –

  • key (str) –

Return type:

bytes

runtime_chunk_field(key, type_name, value)[source]

Build one NECSTDB chunk field with fixed-string schema hardening.

Parameters:
  • key (str) –

  • type_name (str) –

  • value (Any) –

Return type:

Dict[str, Any]

decode_fixed_string_value(value)[source]

Testing/debug helper: decode a fixed string value emitted by this module.

Parameters:

value (Any) –

Return type:

str

tp_window_for_stream(stream)[source]

Return the PR6 contiguous TP window in full-channel indices.

Parameters:

stream (Mapping[str, Any]) –

Return type:

Tuple[int, int]

compute_tp_statistics_for_stream(stream, spectral_data)[source]

Compute TP sum/mean with explicit all-NaN/zero-valid semantics.

Parameters:
  • stream (Mapping[str, Any]) –

  • spectral_data (Any) –

Return type:

Dict[str, Any]

tp_chunk_for_stream(stream, setup, *, time, time_spectrometer, position, obs_id, line_index, line_label, spectral_data)[source]

Build the fixed-schema TP chunk for direct NECSTDB append.

Parameters:
  • stream (Mapping[str, Any]) –

  • setup (ActiveSpectralRecordingSetup) –

  • time (float) –

  • time_spectrometer (str) –

  • position (str) –

  • obs_id (str) –

  • line_index (int) –

  • line_label (str) –

  • spectral_data (Any) –

Return type:

List[Dict[str, Any]]

validate_setup_chunk_schema(setup)[source]

Dry-run fixed-schema metadata chunks before a setup gate can be opened.

This catches snapshot-derived fixed-string overflows at setup-apply time instead of during the first TP/spectrum row. Dynamic observation metadata fields that are truncated by SpectralData are represented by legal maximum-length dummy values.

Parameters:

setup (ActiveSpectralRecordingSetup) –

Return type:

None

is_tp_stream(stream)[source]
Parameters:

stream (Mapping[str, Any]) –

Return type:

bool

response_lists(setup)[source]
Parameters:

setup (ActiveSpectralRecordingSetup) –

Return type:

Dict[str, List[str]]