necst.core.commander¶
Interface to send command to any remotely controlled part of telescope.
- class Commander[source]¶
Bases:
PrivilegedNodeSend command to drive any remotely controlled devices.
The execution of some commands require privilege, to reject conflicting commands from different users.
Examples
You can stop the antenna without privilege, for safety reason >>> from necst.core import Commander >>> com = Commander() >>> com.antenna(“stop”)
Driving the antenna to certain position requires privilege >>> from necst.core import Commander >>> com = Commander() >>> com.get_privilege() >>> com.antenna(“point”, target=(30, 45, “altaz”))
- NodeName: str = 'commander'¶
- Namespace: str = '/necst/OMU1P85M/core'¶
- wait_mount_point(az_deg, el_deg, *, command_id=None, timeout_sec=None)[source]¶
Wait for a direct mount-Az point command using non-wrapped Az error.
TrackingStatus intentionally treats sky azimuths modulo 360 unless an absolute-encoder unwrap status is active. A manual mount command such as Az=360 deg must not be considered complete at Az=0 deg, so this helper compares the encoder directly against the requested mechanical Az.
If another terminal issues
necst stopwhile this wait is active, the antenna intentionally stops before reaching the target. In that case the old implementation waited forever because the geometric convergence condition could never become true. We therefore also watch the control status and the manual_stop alert and return as soon as the command is externally interrupted or the controller leaves the requested command id.- Parameters:
az_deg (float) –
el_deg (float) –
command_id (Optional[str]) –
timeout_sec (Optional[Union[int, float]]) –
- Return type:
None
- observation_abort(*, reason='', requester='commander', repeat=3, interval_sec=0.1)[source]¶
Publish an observation abort request.
This does not perform cleanup itself. A running Observation instance subscribes to the request and enters its own cleanup path, preserving recorder/spectral-gate/metadata state as far as possible.
- Parameters:
reason (str) –
requester (str) –
repeat (int) –
interval_sec (float) –
- Return type:
str
- get_message(key, *, time=None, timeout_sec=None)[source]¶
Get the latest message on the topic.
- Parameters:
key (str) –
time (Optional[Union[int, float]]) –
timeout_sec (Optional[Union[int, float]]) –
- Return type:
Dict[str, Any]
- antenna(cmd, /, *, target=None, start=None, stop=None, reference=None, offset=None, scan_frame=None, unit=None, name=None, wait=True, speed=None, margin=None, direct_mode=False, cos_correction=False, az_target_mode='auto', timeout_sec=None)[source]¶
Control antenna direction and motion.
- Parameters:
cmd (Literal['stop', 'point', 'scan', 'error', '?']) – Command to be sent to the antenna.
target (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Position (longitude, latitude, coordinate frame) to point the antenna to.
start (Optional[Tuple[Union[int, float], Union[int, float]]]) – Start position (longitude, latitude) of the scan in the
scan_frame.stop (Optional[Tuple[Union[int, float], Union[int, float]]]) – Stop position (longitude, latitude) of the scan in the
scan_frame.reference (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Reference position (longitude, latitude, coordinate frame) of scan drive.
offset (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Offset amount (longitude, latitude, coordinate frame) to add to command coordinate.
scan_frame (str) – Coordinate frame the scan will be performed in.
unit (Optional[str]) – Unit of the all the angular value supplied as arguments.
name (Optional[str]) – Name of the target to point the antenna to.
wait (bool) – Whether to wait for the command to be completed.
speed (Optional[Union[int, float]]) – Speed of the scan in
unit/s.az_target_mode (str) – Azimuth target interpretation for raw AltAz point commands.
"auto"keeps observation/scan commands as sky coordinates and treats direct raw AltAz point commands as mount coordinates;"sky"means modulo sky azimuth;"mount"means continuous mount azimuth.margin (Optional[float]) –
direct_mode (bool) –
cos_correction (bool) –
timeout_sec (Optional[Union[int, float]]) –
- Return type:
None
Notes
Allowed argument combinations¶ cmdarguments
stopNone (all arguments except
cmdare ignored)pointtargetandunittarget,offsetandunitnamename,offsetandunitscanstart,stop,reference,scan_frame,unitandspeedstart,stop,reference,offset,scan_frame,unitandspeedstart,stop,name,scan_frame,unitandspeedstart,stop,name,offset,scan_frame,unitandspeederrorNone (all arguments except
cmdare ignored)Examples
Stop the antenna immediately
>>> com.antenna("stop")
The following also stops the antenna immediately. Arguments are ignored.
>>> com.antenna("stop", wait=False)
Drive to certain position (Az., El.) = (30, 45)deg
>>> com.antenna("point", target=(30, 45, "altaz"), unit="deg")
Drive to certain position and track the target (R.A., Dec.) = (0, 45)deg
>>> com.antenna("point", target=(0, 45, "fk5"), unit="deg")
Send command to drive to (Az., El.) = (30, 45)deg but don’t wait for the command to complete
>>> com.antenna("point", target=(30, 45, "azel"), unit="deg", wait=False)
Point to a position defined as “0.25deg east (negative azimuth) of center of the moon”
>>> com.antenna("point", offset=(-0.25, 0, "altaz"), unit="deg", name="moon")
Scan the moon in the azimuth direction, starting from 0.5deg to the east from the target, at speed of 600 arcsec/s
>>> com.antenna( ... "scan", start=(-0.5, 0), stop=(0.5, 0), name="moon", ... scan_frame="altaz", speed=1/6, unit="deg" ... )
Scan the target (RA, Dec.) = (0, 45)deg in constant galactic longitude, at speed 150 arcsec/s
>>> com.antenna( ... "scan", start=(0, 1), stop=(0, -1), target=(0, 45, "radec"), ... scan_frame="galactic", speed=1/150, unit="deg" ... )
Get current error between the commanded and actual antenna position
>>> com.antenna("error")
- scan_block(*, sections, scan_frame, target=None, reference=None, offset=None, unit=None, name=None, wait=True, direct_mode=False, cos_correction=False, obsfreq=None, metadata_position=None, metadata_id='')[source]¶
- Parameters:
scan_frame (str) –
target (Optional[Tuple[Union[int, float], Union[int, float], str]]) –
reference (Optional[Tuple[Union[int, float], Union[int, float], str]]) –
offset (Optional[Tuple[Union[int, float], Union[int, float], str]]) –
unit (Optional[str]) –
name (Optional[str]) –
wait (bool) –
direct_mode (bool) –
cos_correction (bool) –
obsfreq (Optional[Union[int, float]]) –
metadata_position (Optional[str]) –
metadata_id (str) –
- Return type:
str
- dome(cmd, /, *, target=None, unit=None, dome_sync=False, direct_mode=True, wait=True)[source]¶
- Parameters:
cmd (Literal['point', 'sync', 'stop', 'open', 'close', '?']) –
target (float) –
unit (Optional[str]) –
dome_sync (bool) –
direct_mode (bool) –
wait (bool) –
- Return type:
None
- drive(cmd, on)[source]¶
- Parameters:
cmd (Literal['drive', 'contactor', '?']) –
on (Literal['on', 'off']) –
- ccd(cmd, /, *, name='')[source]¶
Control the ccd.
- Parameters:
cmd ({"capture", "?"}) – Command to be sent to the ccd.
name (str) –
- Return type:
None
Examples
Capture the photo
>>> com.ccd("capture", name="/home/pi/data/photo.JPG")
- chopper(cmd, /, *, wait=True)[source]¶
Control the position of ambient temperature radio absorber.
- Parameters:
cmd ({"insert", "remove", "?"}) – Command to be sent to the chopper.
wait (bool, optional) – If True, wait until the chopper has been moved to the target position. The default is True.
Examples
Insert the absorber
>>> com.chopper("insert")
Remove the absorber but don’t wait until it has been removed
>>> com.chopper("remove", wait=False)
- mirror(cmd, /, *, position=None, distance=None, wait=True)[source]¶
Control the position of mirror.
- Parameters:
cmd ({"m2", "m4", "?"}) – Command to select mirror.
position ({"IN", "OUT"}) – Position of M4.
distance (float) – Move distance (mm) of M2.
wait (bool, optional) – If True, wait until the mirror has been moved to the target position. The default is True.
Examples
Move M4 “in”
>>> com.mirror("m4", position = "in")
Move M2 2.0 um >>> com.mirror(“m2”, distance = 2.0)
- wait(target, /, *, mode='error', id=None, timeout_sec=None)[source]¶
Wait until the motion has been converged.
- Parameters:
target (Literal['antenna', 'dome']) –
mode (Literal['control', 'error']) –
id (Optional[str]) –
timeout_sec (Optional[Union[int, float]]) –
- Return type:
None
- wait_oc(target, position=None)[source]¶
- Parameters:
target (Literal['dome', 'membrane', 'chopper']) –
position (Union[Literal['insert', 'remove'], int]) –
- pid_parameter(cmd, /, *, Kp=None, Ki=None, Kd=None, axis=None)[source]¶
Change PID parameters.
- Parameters:
cmd (Literal['set', '?']) – Command to execute.
Kp (Optional[Union[int, float]]) – Proportional gain.
Ki (Optional[Union[int, float]]) – Integral gain.
Kd (Optional[Union[int, float]]) – Derivative gain.
axis (Optional[Literal['az', 'el']]) – Axis to change the parameter.
- Return type:
None
Examples
Change the PID parameters of the azimuth axis to (0.1, 0.1, 0.1)
>>> com.pid_parameter("set", Kp=0.1, Ki=0.1, Kd=0.1, axis="az")
- metadata(cmd, /, *, position=None, id=None, time=None, intercept=True, optical_data=[])[source]¶
Set metadata of the spectral data.
- Parameters:
cmd (Literal['set', '?']) – Command to execute.
position (Optional[str]) – Type of the current observation, may be “SKY”, “ON”, “OFF”, etc. Should be less than or equal to 8 characters.
id (Optional[str]) – ID of the current observation, for sequential control. Should be less than or equal to 16 characters.
time (Optional[float]) – Time to change the metadata, in UNIX time.
intercept (bool) – If True, the change will be done immediately. Otherwise, the change will be done after the current control section is finished.
optical_data (Optional[list]) –
- Return type:
None
Examples
Set the metadata to (position=”ON”, id=”scan01”) immediately after current control section is finished
>>> com.metadata("set", position="ON", id="scan01")
Set the metadata to (position=”SKY”, id=”calib01”), without waiting for the current control section to finish
>>> com.metadata("set", position="SKY", id="calib01", intercept=False)
Set the metadata to (position=”OFF”, id=”scan02”) at 2021-01-01 00:00:00
>>> com.metadata( ... "set", position="OFF", id="scan02", time=1609459200, intercept=False)
- quick_look(mode, /, *, range, integ=1)[source]¶
Configure the quick look mode.
- Parameters:
mode (Literal['ch', 'rf', 'if', 'vlsr']) – Mode to configure.
range (Tuple[Union[int, float], Union[int, float]]) – Upper and lower limit of
modeto show.integ (Union[float, int]) – Integration time in second.
- Return type:
None
Examples
Configure the quick look to show the spectrometer channel of (24000, 25000) with 1 second integration time
>>> com.quick_look("ch", (24000, 25000), integ=1)
- record(cmd, /, *, name='', content=None, nth=None, ch=None, spectrometer=None, save=None, savespec=None, tp_mode=None, tp_range=None)[source]¶
Control the recording.
- Parameters:
cmd (Literal['start', 'stop', 'file', 'reduce', 'savespec', 'binning', 'tp_mode', '?']) – Command to execute.
name (str) – Name of the file to record when
cmdisfile. Otherwise, name of the directory to save the record.content (Optional[str]) – Content of the file to record.
nth (Optional[int]) – Every $n$th spectral data will be recorded.
ch (Optional[int]) – Number of spectral channels
spectrometer (Optional[str]) –
save (Optional[bool]) –
savespec (Optional[bool]) –
tp_mode (Optional[bool]) –
tp_range (Optional[list[int, int]]) –
- Return type:
None
Examples
Start recording at
otf_2021-01-01(the directory will be created under the path specified by environment variableNECST_RECORD_ROOT)>>> com.record("start", name="otf_2021-01-01")
Stop recording
>>> com.record("stop")
Record the file
test.txtwith the contentHello, world!>>> com.record("file", name="test.txt", content="Hello, world!")
Record the local file
/home/user/test.txt>>> com.record("file", name="/home/user/test.txt")
Record the remote file
http://192.168.xx.xx:/files/test.txt>>> com.record("file", name="http://192.168.xx.xx:/files/test.txt")
Reduce the sampling rate to 1/10
>>> com.record("reduce", nth=10)
Change the number of spectral channels
>>> com.record("binning", ch=8192, spectrometer="xffts")
Change the recording mode to total power mode : All channels
>>> com.record("tp_mode", tp_mode=True)
Change the recording mode to total power mode : 1000-2000, 3000-4000 channels
>>> com.record("tp_mode", tp_range=[1000, 2000, 3000, 4000])
- apply_spectral_recording_setup(*, snapshot_toml, snapshot_sha256, setup_id, strict=True)[source]¶
Apply a resolved spectral_recording_snapshot.toml to SpectralData.
The setup gate remains closed after this call. The caller should save sidecar files first and then call
set_spectral_recording_gate().- Parameters:
snapshot_toml (str) –
snapshot_sha256 (str) –
setup_id (str) –
strict (bool) –
- set_spectral_recording_gate(*, setup_id, setup_hash, allow_save)[source]¶
Open or close the spectral-recording setup gate.
- Parameters:
setup_id (str) –
setup_hash (str) –
allow_save (bool) –
- clear_spectral_recording_setup(*, setup_id='', setup_hash='', strict=True)[source]¶
Deactivate the active spectral-recording setup after closing the gate.
- Parameters:
setup_id (str) –
setup_hash (str) –
strict (bool) –
- wait_device(target, value=None, range=None, id=None)[source]¶
- Parameters:
target (Literal['sis']) –
value (Optional[Union[int, float]]) –
range (Optional[Union[int, float]]) –
id (Optional[Union[str, list[str]]]) –
- sis_bias(cmd, /, *, mV=None, id=None, thresh=0.199, wait=False)[source]¶
Control the SIS bias voltage.
- Parameters:
cmd (Literal['set', 'finalize', '?']) – Command to execute.
mV (Optional[Union[int, float]]) – Bias voltage in mV.
id (Optional[Union[str, list[str]]]) – Device identifier, may be defined in the configuration file.
thresh (Optional[Union[int, float]]) –
wait (bool) –
- Return type:
None
Examples
Set the SIS bias voltage to 100 mV on the device
LSB>>> com.sis_bias("set", mV=100, id="LSB")
Read the SIS bias voltage on the device
USB>>> com.sis_bias("?", id="USB")
Set the SIS bias voltage to 0 mV as Finalize.
>>> com.sis_bias("finalize")
- hemt_bias(cmd, /, *, id=None)[source]¶
Read the HEMT bias voltage.
- Parameters:
cmd (Literal['?']) – Command to execute.
id (Optional[str]) – Device identifier, may be defined in the configuration file.
- Return type:
None
Examples
Read the HEMT bias voltage on the device
USB>>> com.hemt_bias(“?”, id=”USB”)
- attenuator(cmd, /, *, dB=None, id=None)[source]¶
Control the attenuator.
- Parameters:
cmd (Literal['set', '?']) – Command to execute.
dB (Optional[Union[int, float]]) – Loss in dB.
id (Optional[str]) – Device identifier, may be defined in the configuration file.
- Return type:
None
Examples
Set the loss to 10 dB on the device
LSB>>> com.attenuator("set", dB=10, id="LSB")
Read the loss on the device
USB>>> com.attenuator("?", id="USB")
- local_attenuator(cmd, /, *, id=None, current=0.0)[source]¶
Control the local_attenuator.
- Parameters:
cmd (Literal['pass', 'finalize', '?']) – Command to execute.
id (Optional[Union[str, list[str]]]) – Channel id.
current (float) – Current to output in mA.
- Return type:
None
Examples
output the current to 10mA on 100GHz
>>> com.local_attenuator("pass", id="100GHz", current=10.0)
If you want to finalize
>>> com,local_attenuator("finalize")
Read the LOattenuator reading
>>> com.attenuator("?")
- thermometer(cmd='?', /)[source]¶
Get the thermometer reading.
- Parameters:
cmd (Literal['?']) – Command to execute.
- Return type:
None
Examples
Get the thermometer reading
>>> com.thermometer("?")
- signal_generator(cmd, /, *, GHz=None, dBm=None, id=None)[source]¶
Control the signal generator.
- Parameters:
cmd (Literal['set', 'stop', '?']) – Command to execute.
GHz (Optional[Union[int, float]]) – Frequency in GHz.
dBm (Optional[Union[int, float]]) – Power in dBm.
id (Optional[str]) – Device identifier, may be defined in the configuration file.
- Return type:
None
Examples
Set the frequency to 100 GHz and the power to 10 dBm on the device
LSB2nd>>> com.signal_generator("set", GHz=100, dBm=10, id="LSB2nd")
Stop the signal output on the device
LSB2nd>>> com.signal_generator("stop", id = "LSB2nd")
Read the frequency and the power on the device
1st>>> com.signal_generator("?", id="1st")
- vacuum_gauge(cmd='?', /)[source]¶
Get the vacuum_gauge reading.
- Parameters:
cmd (Literal['?']) – Command to execute.
- Return type:
None
Examples
Get the vacuum gauge reading
>>> com.vacuum_gauge("?")
- analog_logger(cmd='?', /)[source]¶
Get the analog_logger reading.
- Parameters:
cmd (Literal['?']) – Command to execute.
- Return type:
None
Examples
Get the analog_logger reading
>>> com.analog_logger("?")
- sg(cmd, /, *, GHz=None, dBm=None, id=None)¶
Alias of
signal_generator().- Parameters:
cmd (Literal['set', 'stop', '?']) –
GHz (Optional[Union[int, float]]) –
dBm (Optional[Union[int, float]]) –
id (Optional[str]) –
- Return type:
None
- patt(cmd, /, *, dB=None, id=None)¶
Alias of
attenuator().- Parameters:
cmd (Literal['set', '?']) –
dB (Optional[Union[int, float]]) –
id (Optional[str]) –
- Return type:
None
- qlook(mode, /, *, range, integ=1)¶
Alias of
quick_look().- Parameters:
mode (Literal['ch', 'rf', 'if', 'vlsr']) –
range (Tuple[Union[int, float], Union[int, float]]) –
integ (Union[float, int]) –
- Return type:
None
- pid(cmd, /, *, Kp=None, Ki=None, Kd=None, axis=None)¶
Alias of
pid_parameter().- Parameters:
cmd (Literal['set', '?']) –
Kp (Optional[Union[int, float]]) –
Ki (Optional[Union[int, float]]) –
Kd (Optional[Union[int, float]]) –
axis (Optional[Literal['az', 'el']]) –
- Return type:
None
- sis(cmd, /, *, mV=None, id=None, thresh=0.199, wait=False)¶
Alias of
sis_bias().- Parameters:
cmd (Literal['set', 'finalize', '?']) –
mV (Optional[Union[int, float]]) –
id (Optional[Union[str, list[str]]]) –
thresh (Optional[Union[int, float]]) –
wait (bool) –
- Return type:
None
- hemt(cmd, /, *, id=None)¶
Alias of
hemt_bias().- Parameters:
cmd (Literal['?']) –
id (Optional[str]) –
- Return type:
None
- loatt(cmd, /, *, id=None, current=0.0)¶
Alias of
local_attenuator().- Parameters:
cmd (Literal['pass', 'finalize', '?']) –
id (Optional[Union[str, list[str]]]) –
current (float) –
- Return type:
None
- vg(cmd='?', /)¶
Alias of
vacuum_gauge().- Parameters:
cmd (Literal['?']) –
- Return type:
None
- al(cmd='?', /)¶
Alias of
analog_logger()- Parameters:
cmd (Literal['?']) –
- Return type:
None