necst.core.commander¶
Interface to send command to any remotely controlled part of telescope.
- class Commander[source]¶
Bases:
PrivilegedNode
Send command to drive any remotely controlled devices.
The execution of some commands require privilege, to reject conflicting commands from different users.
Examples
You can stop the antenna without privilege, for safety reason >>> from necst.core import Commander >>> com = Commander() >>> com.antenna(“stop”)
Driving the antenna to certain position requires privilege >>> from necst.core import Commander >>> com = Commander() >>> com.get_privilege() >>> com.antenna(“point”, target=(30, 45, “altaz”))
- NodeName: str = 'commander'¶
- Namespace: str = '/necst/OMU1P85M/core'¶
- get_message(key, *, time=None, timeout_sec=None)[source]¶
Get the latest message on the topic.
- Parameters
key (str) –
time (Optional[Union[int, float]]) –
timeout_sec (Optional[Union[int, float]]) –
- Return type
Dict[str, Any]
- antenna(cmd, /, *, target=None, start=None, stop=None, reference=None, offset=None, scan_frame=None, unit=None, name=None, wait=True, speed=None, margin=None, direct_mode=False)[source]¶
Control antenna direction and motion.
- Parameters
cmd (Literal['stop', 'point', 'scan', 'error', '?']) – Command to be sent to the antenna.
target (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Position (longitude, latitude, coordinate frame) to point the antenna to.
start (Optional[Tuple[Union[int, float], Union[int, float]]]) – Start position (longitude, latitude) of the scan in the
scan_frame
.stop (Optional[Tuple[Union[int, float], Union[int, float]]]) – Stop position (longitude, latitude) of the scan in the
scan_frame
.reference (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Reference position (longitude, latitude, coordinate frame) of scan drive.
offset (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Offset amount (longitude, latitude, coordinate frame) to add to command coordinate.
scan_frame (str) – Coordinate frame the scan will be performed in.
unit (Optional[str]) – Unit of the all the angular value supplied as arguments.
name (Optional[str]) – Name of the target to point the antenna to.
wait (bool) – Whether to wait for the command to be completed.
speed (Optional[Union[int, float]]) – Speed of the scan in
unit
/s.margin (Optional[float]) –
direct_mode (bool) –
- Return type
None
Notes
¶ cmd
arguments
stop
None (all arguments except
cmd
are ignored)point
target
andunit
target
,offset
andunit
name
name
,offset
andunit
scan
start
,stop
,reference
,scan_frame
,unit
andspeed
start
,stop
,reference
,offset
,scan_frame
,unit
andspeed
start
,stop
,name
,scan_frame
,unit
andspeed
start
,stop
,name
,offset
,scan_frame
,unit
andspeed
error
None (all arguments except
cmd
are ignored)Examples
Stop the antenna immediately
>>> com.antenna("stop")
The following also stops the antenna immediately. Arguments are ignored.
>>> com.antenna("stop", wait=False)
Drive to certain position (Az., El.) = (30, 45)deg
>>> com.antenna("point", target=(30, 45, "altaz"), unit="deg")
Drive to certain position and track the target (R.A., Dec.) = (0, 45)deg
>>> com.antenna("point", target=(0, 45, "fk5"), unit="deg")
Send command to drive to (Az., El.) = (30, 45)deg but don’t wait for the command to complete
>>> com.antenna("point", target=(30, 45, "azel"), unit="deg", wait=False)
Point to a position defined as “0.25deg east (negative azimuth) of center of the moon”
>>> com.antenna("point", offset=(-0.25, 0, "altaz"), unit="deg", name="moon")
Scan the moon in the azimuth direction, starting from 0.5deg to the east from the target, at speed of 600 arcsec/s
>>> com.antenna( ... "scan", start=(-0.5, 0), stop=(0.5, 0), name="moon", ... scan_frame="altaz", speed=1/6, unit="deg" ... )
Scan the target (RA, Dec.) = (0, 45)deg in constant galactic longitude, at speed 150 arcsec/s
>>> com.antenna( ... "scan", start=(0, 1), stop=(0, -1), target=(0, 45, "radec"), ... scan_frame="galactic", speed=1/150, unit="deg" ... )
Get current error between the commanded and actual antenna position
>>> com.antenna("error")
- dome(cmd, /, *, target=None, unit=None, dome_sync=False, direct_mode=True, wait=True)[source]¶
- Parameters
cmd (Literal['point', 'sync', 'stop', 'open', 'close', '?']) –
target (float) –
unit (Optional[str]) –
dome_sync (bool) –
direct_mode (bool) –
wait (bool) –
- Return type
None
- drive(cmd, on)[source]¶
- Parameters
cmd (Literal['drive', 'contactor', '?']) –
on (Literal['on', 'off']) –
- ccd(cmd, /, *, name='')[source]¶
Control the ccd.
- Parameters
cmd ({"capture", "?"}) – Command to be sent to the ccd.
name (str) –
- Return type
None
Examples
Capture the photo
>>> com.ccd("capture", name="/home/pi/data/photo.JPG")
- chopper(cmd, /, *, wait=True)[source]¶
Control the position of ambient temperature radio absorber.
- Parameters
cmd ({"insert", "remove", "?"}) – Command to be sent to the chopper.
wait (bool, optional) – If True, wait until the chopper has been moved to the target position. The default is True.
Examples
Insert the absorber
>>> com.chopper("insert")
Remove the absorber but don’t wait until it has been removed
>>> com.chopper("remove", wait=False)
- mirror(cmd, /, *, position=None, distance=None, wait=True)[source]¶
Control the position of mirror.
- Parameters
cmd ({"m2", "m4", "?"}) – Command to select mirror.
position ({"IN", "OUT"}) – Position of M4.
distance (float) – Move distance (mm) of M2.
wait (bool, optional) – If True, wait until the mirror has been moved to the target position. The default is True.
Examples
Move M4 “in”
>>> com.mirror("m4", position = "in")
Move M2 2.0 um >>> com.mirror(“m2”, distance = 2.0)
- wait(target, /, *, mode='error', id=None, timeout_sec=None)[source]¶
Wait until the motion has been converged.
- Parameters
target (Literal['antenna', 'dome']) –
mode (Literal['control', 'error']) –
id (Optional[str]) –
timeout_sec (Optional[Union[int, float]]) –
- Return type
None
- wait_oc(target, position=None)[source]¶
- Parameters
target (Literal['dome', 'membrane', 'chopper']) –
position (Optional[Literal['insert', 'remove']]) –
- pid_parameter(cmd, /, *, Kp=None, Ki=None, Kd=None, axis=None)[source]¶
Change PID parameters.
- Parameters
cmd (Literal['set', '?']) – Command to execute.
Kp (Optional[Union[int, float]]) – Proportional gain.
Ki (Optional[Union[int, float]]) – Integral gain.
Kd (Optional[Union[int, float]]) – Derivative gain.
axis (Optional[Literal['az', 'el']]) – Axis to change the parameter.
- Return type
None
Examples
Change the PID parameters of the azimuth axis to (0.1, 0.1, 0.1)
>>> com.pid_parameter("set", Kp=0.1, Ki=0.1, Kd=0.1, axis="az")
- metadata(cmd, /, *, position=None, id=None, time=None, intercept=True, optical_data=[])[source]¶
Set metadata of the spectral data.
- Parameters
cmd (Literal['set', '?']) – Command to execute.
position (Optional[str]) – Type of the current observation, may be “SKY”, “ON”, “OFF”, etc. Should be less than or equal to 8 characters.
id (Optional[str]) – ID of the current observation, for sequential control. Should be less than or equal to 16 characters.
time (Optional[float]) – Time to change the metadata, in UNIX time.
intercept (bool) – If True, the change will be done immediately. Otherwise, the change will be done after the current control section is finished.
optical_data (Optional[list]) –
- Return type
None
Examples
Set the metadata to (position=”ON”, id=”scan01”) immediately after current control section is finished
>>> com.metadata("set", position="ON", id="scan01")
Set the metadata to (position=”SKY”, id=”calib01”), without waiting for the current control section to finish
>>> com.metadata("set", position="SKY", id="calib01", intercept=False)
Set the metadata to (position=”OFF”, id=”scan02”) at 2021-01-01 00:00:00
>>> com.metadata( ... "set", position="OFF", id="scan02", time=1609459200, intercept=False)
- quick_look(mode, /, *, range, integ=1)[source]¶
Configure the quick look mode.
- Parameters
mode (Literal['ch', 'rf', 'if', 'vlsr']) – Mode to configure.
range (Tuple[Union[int, float], Union[int, float]]) – Upper and lower limit of
mode
to show.integ (Union[float, int]) – Integration time in second.
- Return type
None
Examples
Configure the quick look to show the spectrometer channel of (24000, 25000) with 1 second integration time
>>> com.quick_look("ch", (24000, 25000), integ=1)
- record(cmd, /, *, name='', content=None, nth=None, ch=None)[source]¶
Control the recording.
- Parameters
cmd (Literal['start', 'stop', 'file', 'reduce', 'binning', '?']) – Command to execute.
name (str) – Name of the file to record when
cmd
isfile
. Otherwise, name of the directory to save the record.content (Optional[str]) – Content of the file to record.
nth (Optional[int]) – Every $n$th spectral data will be recorded.
ch (Optional[int]) – Number of spectral channels
- Return type
None
Examples
Start recording at
otf_2021-01-01
(the directory will be created under the path specified by environment variableNECST_RECORD_ROOT
)>>> com.record("start", name="otf_2021-01-01")
Stop recording
>>> com.record("stop")
Record the file
test.txt
with the contentHello, world!
>>> com.record("file", name="test.txt", content="Hello, world!")
Record the local file
/home/user/test.txt
>>> com.record("file", name="/home/user/test.txt")
Record the remote file
http://192.168.xx.xx:/files/test.txt
>>> com.record("file", name="http://192.168.xx.xx:/files/test.txt")
Reduce the sampling rate to 1/10
>>> com.record("reduce", nth=10)
Change the number of spectral channels
>>> com.record("binning", ch=8192)
- sis_bias(cmd, /, *, mV=None, id=None)[source]¶
Control the SIS bias voltage.
- Parameters
cmd (Literal['set', 'finalize', '?']) – Command to execute.
mV (Optional[Union[int, float]]) – Bias voltage in mV.
id (Optional[Union[str, list[str]]]) – Device identifier, may be defined in the configuration file.
- Return type
None
Examples
Set the SIS bias voltage to 100 mV on the device
LSB
>>> com.sis_bias("set", mV=100, id="LSB")
Read the SIS bias voltage on the device
USB
>>> com.sis_bias("?", id="USB")
Set the SIS bias voltage to 0 mV as Finalize.
>>> com.sis_bias("finalize")
- hemt_bias(cmd, /, *, id=None)[source]¶
Read the HEMT bias voltage.
- Parameters
cmd (Literal['?']) – Command to execute.
id (Optional[str]) – Device identifier, may be defined in the configuration file.
- Return type
None
Examples
Read the HEMT bias voltage on the device
USB
>>> com.hemt_bias(“?”, id=”USB”)
- attenuator(cmd, /, *, dB=None, id=None)[source]¶
Control the attenuator.
- Parameters
cmd (Literal['set', '?']) – Command to execute.
dB (Optional[Union[int, float]]) – Loss in dB.
id (Optional[str]) – Device identifier, may be defined in the configuration file.
- Return type
None
Examples
Set the loss to 10 dB on the device
LSB
>>> com.attenuator("set", dB=10, id="LSB")
Read the loss on the device
USB
>>> com.attenuator("?", id="USB")
- local_attenuator(cmd, /, *, id=None, current=0.0)[source]¶
Control the local_attenuator.
- Parameters
cmd (Literal['pass', 'finalize', '?']) – Command to execute.
id (Optional[Union[str, list[str]]]) – Channel id.
current (float) – Current to output in mA.
- Return type
None
Examples
output the current to 10mA on 100GHz
>>> com.local_attenuator("pass", id="100GHz", current=10.0)
If you want to finalize
>>> com,local_attenuator("finalize")
Read the LOattenuator reading
>>> com.attenuator("?")
- thermometer(cmd='?', /)[source]¶
Get the thermometer reading.
- Parameters
cmd (Literal['?']) – Command to execute.
- Return type
None
Examples
Get the thermometer reading
>>> com.thermometer("?")
- signal_generator(cmd, /, *, GHz=None, dBm=None, id=None)[source]¶
Control the signal generator.
- Parameters
cmd (Literal['set', 'stop', '?']) – Command to execute.
GHz (Optional[Union[int, float]]) – Frequency in GHz.
dBm (Optional[Union[int, float]]) – Power in dBm.
id (Optional[str]) – Device identifier, may be defined in the configuration file.
- Return type
None
Examples
Set the frequency to 100 GHz and the power to 10 dBm on the device
LSB2nd
>>> com.signal_generator("set", GHz=100, dBm=10, id="LSB2nd")
Stop the signal output on the device
LSB2nd
>>> com.signal_generator("stop", id = "LSB2nd")
Read the frequency and the power on the device
1st
>>> com.signal_generator("?", id="1st")
- vacuum_gauge(cmd='?', /)[source]¶
Get the vacuum_gauge reading.
- Parameters
cmd (Literal['?']) – Command to execute.
- Return type
None
Examples
Get the vacuum gauge reading
>>> com.vacuum_gauge("?")
- sg(cmd, /, *, GHz=None, dBm=None, id=None)¶
Alias of
signal_generator()
.- Parameters
cmd (Literal['set', 'stop', '?']) –
GHz (Optional[Union[int, float]]) –
dBm (Optional[Union[int, float]]) –
id (Optional[str]) –
- Return type
None
- patt(cmd, /, *, dB=None, id=None)¶
Alias of
attenuator()
.- Parameters
cmd (Literal['set', '?']) –
dB (Optional[Union[int, float]]) –
id (Optional[str]) –
- Return type
None
- qlook(mode, /, *, range, integ=1)¶
Alias of
quick_look()
.- Parameters
mode (Literal['ch', 'rf', 'if', 'vlsr']) –
range (Tuple[Union[int, float], Union[int, float]]) –
integ (Union[float, int]) –
- Return type
None
- pid(cmd, /, *, Kp=None, Ki=None, Kd=None, axis=None)¶
Alias of
pid_parameter()
.- Parameters
cmd (Literal['set', '?']) –
Kp (Optional[Union[int, float]]) –
Ki (Optional[Union[int, float]]) –
Kd (Optional[Union[int, float]]) –
axis (Optional[Literal['az', 'el']]) –
- Return type
None
- sis(cmd, /, *, mV=None, id=None)¶
Alias of
sis_bias()
.- Parameters
cmd (Literal['set', 'finalize', '?']) –
mV (Optional[Union[int, float]]) –
id (Optional[Union[str, list[str]]]) –
- Return type
None
- hemt(cmd, /, *, id=None)¶
Alias of
hemt_bias()
.- Parameters
cmd (Literal['?']) –
id (Optional[str]) –
- Return type
None
- loatt(cmd, /, *, id=None, current=0.0)¶
Alias of
local_attenuator()
.- Parameters
cmd (Literal['pass', 'finalize', '?']) –
id (Optional[Union[str, list[str]]]) –
current (float) –
- Return type
None
- vg(cmd='?', /)¶
Alias of
vacuum_gauge()
.- Parameters
cmd (Literal['?']) –
- Return type
None