necst.rx.spectral_recording_runtime¶
Runtime state and helpers for spectral-recording active setup.
This module is intentionally ROS-free. necst.rx.spectrometer.SpectralData
uses these helpers for PR4/PR5, while the unit tests exercise them with simple
Python objects.
Implemented PR scope¶
- PR4:
active snapshot setup state;
setup hash validation;
setup gate closed-by-default;
legacy channel_binning / tp_mode conflict policy.
- PR5:
spectrum full/slice branch;
no double slicing;
explicit skip for TP streams until PR6.
- PR6:
TP direct append chunk builder;
all-NaN / zero-valid-channel semantics;
TP stream path and metadata helpers.
- exception SpectralRecordingRuntimeError[source]¶
Bases:
RuntimeErrorBase error for runtime spectral-recording setup failures.
- exception SpectralRecordingGateError[source]¶
Bases:
SpectralRecordingRuntimeErrorRaised when setup gate cannot be opened.
- canonical_snapshot_sha256_from_text(snapshot_toml)[source]¶
- Parameters:
snapshot_toml (str) –
- Return type:
str
- normalize_snapshot_hash(snapshot, raw_text='')[source]¶
- Parameters:
snapshot (Mapping[str, Any]) –
raw_text (str) –
- Return type:
str
- load_snapshot_from_toml_text(snapshot_toml)[source]¶
- Parameters:
snapshot_toml (str) –
- Return type:
Dict[str, Any]
- class ActiveSpectralRecordingSetup(snapshot: 'Dict[str, Any]', setup_id: 'str', setup_hash: 'str', gate_allow_save: 'bool' = False, legacy_tp_policy: 'str' = 'error', warnings: 'List[str]' = <factory>, runtime_errors: 'List[str]' = <factory>, fatal_error: 'str' = '')[source]¶
Bases:
object- Parameters:
snapshot (Dict[str, Any]) –
setup_id (str) –
setup_hash (str) –
gate_allow_save (bool) –
legacy_tp_policy (str) –
warnings (List[str]) –
runtime_errors (List[str]) –
fatal_error (str) –
- snapshot: Dict[str, Any]¶
- setup_id: str¶
- setup_hash: str¶
- gate_allow_save: bool = False¶
- legacy_tp_policy: str = 'error'¶
- warnings: List[str]¶
- runtime_errors: List[str]¶
- fatal_error: str = ''¶
- classmethod from_snapshot_toml(*, snapshot_toml, snapshot_sha256='', setup_id='', strict=True)[source]¶
- Parameters:
snapshot_toml (str) –
snapshot_sha256 (str) –
setup_id (str) –
strict (bool) –
- Return type:
- property streams: Dict[str, Dict[str, Any]]¶
- property enabled_stream_ids: List[str]¶
- property spectrum_stream_ids: List[str]¶
- property tp_stream_ids: List[str]¶
- property active_mode_summary: str¶
- latch_fatal_error(message)[source]¶
Record a fatal active-mode error and close the setup gate.
Active snapshot mode must not silently skip rows. Once a schema/setup error is detected, later save attempts remain blocked until the setup is explicitly cleared and a new setup is applied.
- Parameters:
message (str) –
- Return type:
None
- set_gate(*, setup_id, setup_hash, allow_save)[source]¶
- Parameters:
setup_id (str) –
setup_hash (str) –
allow_save (bool) –
- Return type:
None
- streams_for_raw(spectrometer_key, board_id)[source]¶
Return all saved products produced from one raw spectrometer input.
In normal full/channel/envelope modes this list has length one. In
saved_window_policy=multi_windowit can contain multiple recorded products that share the same raw input key but have different saved channel ranges and DB output paths.The stream dictionaries are setup-lifetime cached objects. Callers must treat them as read-only. Returning the cached references avoids copying every stream dictionary in the high-rate spectral callback.
- Parameters:
spectrometer_key (str) –
board_id (int) –
- Return type:
Tuple[Mapping[str, Any], …]
- class SpectralRecordingRuntimeState[source]¶
Bases:
objectMutable holder used by
SpectralData.- property active: bool¶
- apply(*, snapshot_toml, snapshot_sha256, setup_id, strict=True)[source]¶
- Parameters:
snapshot_toml (str) –
snapshot_sha256 (str) –
setup_id (str) –
strict (bool) –
- Return type:
- set_gate(*, setup_id, setup_hash, allow_save)[source]¶
- Parameters:
setup_id (str) –
setup_hash (str) –
allow_save (bool) –
- Return type:
None
- clear(*, setup_id='', setup_hash='', strict=True)[source]¶
- Parameters:
setup_id (str) –
setup_hash (str) –
strict (bool) –
- Return type:
List[str]
- latch_fatal_error(message)[source]¶
Latch a fatal active-mode error without raising from async callbacks.
ROS subscription callbacks such as legacy TPMode/Binning conflict handlers should make the active setup fail-closed, but they should not depend on an unhandled exception escaping the callback. Recording paths observe the latched state via
assert_no_fatal_error()before any append.- Parameters:
message (str) –
- Return type:
None
- slice_spectrum_for_stream(stream, spectral_data)[source]¶
Return full/sliced spectrum for one stream.
The runtime input is expected to be the full XFFTS spectrum. The slice is applied exactly once using full-channel inclusive/exclusive indices.
- Parameters:
stream (Mapping[str, Any]) –
spectral_data (Any) –
- Return type:
Any
- spectrum_extra_chunk(stream, setup)[source]¶
Extra metadata fields appended to the normal Spectral.msg chunk.
- Parameters:
stream (Mapping[str, Any]) –
setup (ActiveSpectralRecordingSetup) –
- Return type:
List[Dict[str, Any]]
- legacy_spectral_string_field(key, value, length)[source]¶
Return a string field byte-for-byte compatible with
Spectral.msg.SpectralData._spectral_chunk()historically keptstring<=Nfields as Python strings padded with spaces before the NECSTDB writer converted them toNscolumns. The active-mode fast path must preserve that base-column convention; otherwise existing readers may see NUL-padded values forposition,id,line_labelortime_spectrometer.- Parameters:
key (str) –
value (Any) –
length (int) –
- Return type:
Dict[str, Any]
- spectrum_chunk_for_stream(stream, setup, *, time, time_spectrometer, position, obs_id, line_index, line_label, spectral_data)[source]¶
Build the active-mode spectrum chunk without constructing a ROS message.
This is the high-rate counterpart of the legacy
Spectral.msgpath. The field order and base field names intentionally follownecst_msgs/Spectralso existing NECSTDB readers see the same dynamic columns, followed by the active setup provenance columns fromspectrum_extra_chunk.The base string columns intentionally use the same space-padded
string<=Nrepresentation asSpectralData._spectral_chunk(). Static active-setup provenance columns keep the fixed-byte runtime helper used by PR8/PR13.- Parameters:
stream (Mapping[str, Any]) –
setup (ActiveSpectralRecordingSetup) –
time (float) –
time_spectrometer (str) –
position (str) –
obs_id (str) –
line_index (int) –
line_label (str) –
spectral_data (Any) –
- Return type:
List[Dict[str, Any]]
- namespace_db_path(namespace_root, db_table_path)[source]¶
Convert canonical snapshot
data/...path to current NECSTDB append path.- Parameters:
namespace_root (str) –
db_table_path (str) –
- Return type:
str
- fixed_string_bytes(value, length, *, key='')[source]¶
Encode and NUL-pad a value to exactly
lengthbytes.The check is byte-based, not Python-character-based, because NECSTDB uses
structbyte string formats. Oversized values are rejected rather than silently truncated; truncatingtp_windows_jsonwould make the JSON invalid and truncating identifiers would make provenance ambiguous.- Parameters:
value (Any) –
length (int) –
key (str) –
- Return type:
bytes
- runtime_chunk_field(key, type_name, value)[source]¶
Build one NECSTDB chunk field with fixed-string schema hardening.
- Parameters:
key (str) –
type_name (str) –
value (Any) –
- Return type:
Dict[str, Any]
- decode_fixed_string_value(value)[source]¶
Testing/debug helper: decode a fixed string value emitted by this module.
- Parameters:
value (Any) –
- Return type:
str
- tp_window_for_stream(stream)[source]¶
Return the PR6 contiguous TP window in full-channel indices.
- Parameters:
stream (Mapping[str, Any]) –
- Return type:
Tuple[int, int]
- compute_tp_statistics_for_stream(stream, spectral_data)[source]¶
Compute TP sum/mean with explicit all-NaN/zero-valid semantics.
- Parameters:
stream (Mapping[str, Any]) –
spectral_data (Any) –
- Return type:
Dict[str, Any]
- tp_chunk_for_stream(stream, setup, *, time, time_spectrometer, position, obs_id, line_index, line_label, spectral_data)[source]¶
Build the fixed-schema TP chunk for direct NECSTDB append.
- Parameters:
stream (Mapping[str, Any]) –
setup (ActiveSpectralRecordingSetup) –
time (float) –
time_spectrometer (str) –
position (str) –
obs_id (str) –
line_index (int) –
line_label (str) –
spectral_data (Any) –
- Return type:
List[Dict[str, Any]]
- validate_setup_chunk_schema(setup)[source]¶
Dry-run fixed-schema metadata chunks before a setup gate can be opened.
This catches snapshot-derived fixed-string overflows at setup-apply time instead of during the first TP/spectrum row. Dynamic observation metadata fields that are truncated by
SpectralDataare represented by legal maximum-length dummy values.- Parameters:
setup (ActiveSpectralRecordingSetup) –
- Return type:
None
- response_lists(setup)[source]¶
- Parameters:
setup (ActiveSpectralRecordingSetup) –
- Return type:
Dict[str, List[str]]