necst.core.commander

Interface to send command to any remotely controlled part of telescope.

class Commander[source]

Bases: PrivilegedNode

Send command to drive any remotely controlled devices.

The execution of some commands require privilege, to reject conflicting commands from different users.

Examples

You can stop the antenna without privilege, for safety reason >>> from necst.core import Commander >>> com = Commander() >>> com.antenna(“stop”)

Driving the antenna to certain position requires privilege >>> from necst.core import Commander >>> com = Commander() >>> com.get_privilege() >>> com.antenna(“point”, target=(30, 45, “altaz”))

NodeName: str = 'commander'
Namespace: str = '/necst/OMU1P85M/core'
get_message(key, *, time=None, timeout_sec=None)[source]

Get the latest message on the topic.

Parameters
  • key (str) –

  • time (Optional[Union[int, float]]) –

  • timeout_sec (Optional[Union[int, float]]) –

Return type

Dict[str, Any]

antenna(cmd, /, *, target=None, start=None, stop=None, reference=None, offset=None, scan_frame=None, unit=None, name=None, wait=True, speed=None, margin=None, direct_mode=False)[source]

Control antenna direction and motion.

Parameters
  • cmd (Literal['stop', 'point', 'scan', 'error', '?']) – Command to be sent to the antenna.

  • target (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Position (longitude, latitude, coordinate frame) to point the antenna to.

  • start (Optional[Tuple[Union[int, float], Union[int, float]]]) – Start position (longitude, latitude) of the scan in the scan_frame.

  • stop (Optional[Tuple[Union[int, float], Union[int, float]]]) – Stop position (longitude, latitude) of the scan in the scan_frame.

  • reference (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Reference position (longitude, latitude, coordinate frame) of scan drive.

  • offset (Optional[Tuple[Union[int, float], Union[int, float], str]]) – Offset amount (longitude, latitude, coordinate frame) to add to command coordinate.

  • scan_frame (str) – Coordinate frame the scan will be performed in.

  • unit (Optional[str]) – Unit of the all the angular value supplied as arguments.

  • name (Optional[str]) – Name of the target to point the antenna to.

  • wait (bool) – Whether to wait for the command to be completed.

  • speed (Optional[Union[int, float]]) – Speed of the scan in unit/s.

  • margin (Optional[float]) –

  • direct_mode (bool) –

Return type

None

Notes

Allowed argument combinations

cmd

arguments

stop

None (all arguments except cmd are ignored)

point

target and unit

target, offset and unit

name

name, offset and unit

scan

start, stop, reference, scan_frame, unit and speed

start, stop, reference, offset, scan_frame, unit and speed

start, stop, name, scan_frame, unit and speed

start, stop, name, offset, scan_frame, unit and speed

error

None (all arguments except cmd are ignored)

Examples

Stop the antenna immediately

>>> com.antenna("stop")

The following also stops the antenna immediately. Arguments are ignored.

>>> com.antenna("stop", wait=False)

Drive to certain position (Az., El.) = (30, 45)deg

>>> com.antenna("point", target=(30, 45, "altaz"), unit="deg")

Drive to certain position and track the target (R.A., Dec.) = (0, 45)deg

>>> com.antenna("point", target=(0, 45, "fk5"), unit="deg")

Send command to drive to (Az., El.) = (30, 45)deg but don’t wait for the command to complete

>>> com.antenna("point", target=(30, 45, "azel"), unit="deg", wait=False)

Point to a position defined as “0.25deg east (negative azimuth) of center of the moon”

>>> com.antenna("point", offset=(-0.25, 0, "altaz"), unit="deg", name="moon")

Scan the moon in the azimuth direction, starting from 0.5deg to the east from the target, at speed of 600 arcsec/s

>>> com.antenna(
...     "scan", start=(-0.5, 0), stop=(0.5, 0), name="moon",
...     scan_frame="altaz", speed=1/6, unit="deg"
... )

Scan the target (RA, Dec.) = (0, 45)deg in constant galactic longitude, at speed 150 arcsec/s

>>> com.antenna(
...     "scan", start=(0, 1), stop=(0, -1), target=(0, 45, "radec"),
...     scan_frame="galactic", speed=1/150, unit="deg"
... )

Get current error between the commanded and actual antenna position

>>> com.antenna("error")
dome(cmd, /, *, target=None, unit=None, dome_sync=False, direct_mode=True, wait=True)[source]
Parameters
  • cmd (Literal['point', 'sync', 'stop', 'open', 'close', '?']) –

  • target (float) –

  • unit (Optional[str]) –

  • dome_sync (bool) –

  • direct_mode (bool) –

  • wait (bool) –

Return type

None

memb(cmd, /, *, wait=True)[source]
Parameters
  • cmd (Literal['open', 'close', '?']) –

  • wait (bool) –

drive(cmd, on)[source]
Parameters
  • cmd (Literal['drive', 'contactor', '?']) –

  • on (Literal['on', 'off']) –

ccd(cmd, /, *, name='')[source]

Control the ccd.

Parameters
  • cmd ({"capture", "?"}) – Command to be sent to the ccd.

  • name (str) –

Return type

None

Examples

Capture the photo

>>> com.ccd("capture", name="/home/pi/data/photo.JPG")
chopper(cmd, /, *, wait=True)[source]

Control the position of ambient temperature radio absorber.

Parameters
  • cmd ({"insert", "remove", "?"}) – Command to be sent to the chopper.

  • wait (bool, optional) – If True, wait until the chopper has been moved to the target position. The default is True.

Examples

Insert the absorber

>>> com.chopper("insert")

Remove the absorber but don’t wait until it has been removed

>>> com.chopper("remove", wait=False)
mirror(cmd, /, *, position=None, distance=None, wait=True)[source]

Control the position of mirror.

Parameters
  • cmd ({"m2", "m4", "?"}) – Command to select mirror.

  • position ({"IN", "OUT"}) – Position of M4.

  • distance (float) – Move distance (mm) of M2.

  • wait (bool, optional) – If True, wait until the mirror has been moved to the target position. The default is True.

Examples

Move M4 “in”

>>> com.mirror("m4", position = "in")

Move M2 2.0 um >>> com.mirror(“m2”, distance = 2.0)

wait(target, /, *, mode='error', id=None, timeout_sec=None)[source]

Wait until the motion has been converged.

Parameters
  • target (Literal['antenna', 'dome']) –

  • mode (Literal['control', 'error']) –

  • id (Optional[str]) –

  • timeout_sec (Optional[Union[int, float]]) –

Return type

None

wait_oc(target, position=None)[source]
Parameters
  • target (Literal['dome', 'membrane', 'chopper']) –

  • position (Optional[Literal['insert', 'remove']]) –

pid_parameter(cmd, /, *, Kp=None, Ki=None, Kd=None, axis=None)[source]

Change PID parameters.

Parameters
  • cmd (Literal['set', '?']) – Command to execute.

  • Kp (Optional[Union[int, float]]) – Proportional gain.

  • Ki (Optional[Union[int, float]]) – Integral gain.

  • Kd (Optional[Union[int, float]]) – Derivative gain.

  • axis (Optional[Literal['az', 'el']]) – Axis to change the parameter.

Return type

None

Examples

Change the PID parameters of the azimuth axis to (0.1, 0.1, 0.1)

>>> com.pid_parameter("set", Kp=0.1, Ki=0.1, Kd=0.1, axis="az")
metadata(cmd, /, *, position=None, id=None, time=None, intercept=True, optical_data=[])[source]

Set metadata of the spectral data.

Parameters
  • cmd (Literal['set', '?']) – Command to execute.

  • position (Optional[str]) – Type of the current observation, may be “SKY”, “ON”, “OFF”, etc. Should be less than or equal to 8 characters.

  • id (Optional[str]) – ID of the current observation, for sequential control. Should be less than or equal to 16 characters.

  • time (Optional[float]) – Time to change the metadata, in UNIX time.

  • intercept (bool) – If True, the change will be done immediately. Otherwise, the change will be done after the current control section is finished.

  • optical_data (Optional[list]) –

Return type

None

Examples

Set the metadata to (position=”ON”, id=”scan01”) immediately after current control section is finished

>>> com.metadata("set", position="ON", id="scan01")

Set the metadata to (position=”SKY”, id=”calib01”), without waiting for the current control section to finish

>>> com.metadata("set", position="SKY", id="calib01", intercept=False)

Set the metadata to (position=”OFF”, id=”scan02”) at 2021-01-01 00:00:00

>>> com.metadata(
...     "set", position="OFF", id="scan02", time=1609459200, intercept=False)
quick_look(mode, /, *, range, integ=1)[source]

Configure the quick look mode.

Parameters
  • mode (Literal['ch', 'rf', 'if', 'vlsr']) – Mode to configure.

  • range (Tuple[Union[int, float], Union[int, float]]) – Upper and lower limit of mode to show.

  • integ (Union[float, int]) – Integration time in second.

Return type

None

Examples

Configure the quick look to show the spectrometer channel of (24000, 25000) with 1 second integration time

>>> com.quick_look("ch", (24000, 25000), integ=1)
record(cmd, /, *, name='', content=None, nth=None, ch=None)[source]

Control the recording.

Parameters
  • cmd (Literal['start', 'stop', 'file', 'reduce', 'binning', '?']) – Command to execute.

  • name (str) – Name of the file to record when cmd is file. Otherwise, name of the directory to save the record.

  • content (Optional[str]) – Content of the file to record.

  • nth (Optional[int]) – Every $n$th spectral data will be recorded.

  • ch (Optional[int]) – Number of spectral channels

Return type

None

Examples

Start recording at otf_2021-01-01 (the directory will be created under the path specified by environment variable NECST_RECORD_ROOT)

>>> com.record("start", name="otf_2021-01-01")

Stop recording

>>> com.record("stop")

Record the file test.txt with the content Hello, world!

>>> com.record("file", name="test.txt", content="Hello, world!")

Record the local file /home/user/test.txt

>>> com.record("file", name="/home/user/test.txt")

Record the remote file http://192.168.xx.xx:/files/test.txt

>>> com.record("file", name="http://192.168.xx.xx:/files/test.txt")

Reduce the sampling rate to 1/10

>>> com.record("reduce", nth=10)

Change the number of spectral channels

>>> com.record("binning", ch=8192)
com_delay_test()[source]
com_delay_test_topic()[source]
sis_bias(cmd, /, *, mV=None, id=None)[source]

Control the SIS bias voltage.

Parameters
  • cmd (Literal['set', 'finalize', '?']) – Command to execute.

  • mV (Optional[Union[int, float]]) – Bias voltage in mV.

  • id (Optional[Union[str, list[str]]]) – Device identifier, may be defined in the configuration file.

Return type

None

Examples

Set the SIS bias voltage to 100 mV on the device LSB

>>> com.sis_bias("set", mV=100, id="LSB")

Read the SIS bias voltage on the device USB

>>> com.sis_bias("?", id="USB")

Set the SIS bias voltage to 0 mV as Finalize.

>>> com.sis_bias("finalize")
hemt_bias(cmd, /, *, id=None)[source]

Read the HEMT bias voltage.

Parameters
  • cmd (Literal['?']) – Command to execute.

  • id (Optional[str]) – Device identifier, may be defined in the configuration file.

Return type

None

Examples

Read the HEMT bias voltage on the device USB >>> com.hemt_bias(“?”, id=”USB”)

attenuator(cmd, /, *, dB=None, id=None)[source]

Control the attenuator.

Parameters
  • cmd (Literal['set', '?']) – Command to execute.

  • dB (Optional[Union[int, float]]) – Loss in dB.

  • id (Optional[str]) – Device identifier, may be defined in the configuration file.

Return type

None

Examples

Set the loss to 10 dB on the device LSB

>>> com.attenuator("set", dB=10, id="LSB")

Read the loss on the device USB

>>> com.attenuator("?", id="USB")
local_attenuator(cmd, /, *, id=None, current=0.0)[source]

Control the local_attenuator.

Parameters
  • cmd (Literal['pass', 'finalize', '?']) – Command to execute.

  • id (Optional[Union[str, list[str]]]) – Channel id.

  • current (float) – Current to output in mA.

Return type

None

Examples

output the current to 10mA on 100GHz

>>> com.local_attenuator("pass", id="100GHz", current=10.0)

If you want to finalize

>>> com,local_attenuator("finalize")

Read the LOattenuator reading

>>> com.attenuator("?")
thermometer(cmd='?', /)[source]

Get the thermometer reading.

Parameters

cmd (Literal['?']) – Command to execute.

Return type

None

Examples

Get the thermometer reading

>>> com.thermometer("?")
signal_generator(cmd, /, *, GHz=None, dBm=None, id=None)[source]

Control the signal generator.

Parameters
  • cmd (Literal['set', 'stop', '?']) – Command to execute.

  • GHz (Optional[Union[int, float]]) – Frequency in GHz.

  • dBm (Optional[Union[int, float]]) – Power in dBm.

  • id (Optional[str]) – Device identifier, may be defined in the configuration file.

Return type

None

Examples

Set the frequency to 100 GHz and the power to 10 dBm on the device LSB2nd

>>> com.signal_generator("set", GHz=100, dBm=10, id="LSB2nd")

Stop the signal output on the device LSB2nd

>>> com.signal_generator("stop", id = "LSB2nd")

Read the frequency and the power on the device 1st

>>> com.signal_generator("?", id="1st")
vacuum_gauge(cmd='?', /)[source]

Get the vacuum_gauge reading.

Parameters

cmd (Literal['?']) – Command to execute.

Return type

None

Examples

Get the vacuum gauge reading

>>> com.vacuum_gauge("?")
sg(cmd, /, *, GHz=None, dBm=None, id=None)

Alias of signal_generator().

Parameters
  • cmd (Literal['set', 'stop', '?']) –

  • GHz (Optional[Union[int, float]]) –

  • dBm (Optional[Union[int, float]]) –

  • id (Optional[str]) –

Return type

None

patt(cmd, /, *, dB=None, id=None)

Alias of attenuator().

Parameters
  • cmd (Literal['set', '?']) –

  • dB (Optional[Union[int, float]]) –

  • id (Optional[str]) –

Return type

None

qlook(mode, /, *, range, integ=1)

Alias of quick_look().

Parameters
  • mode (Literal['ch', 'rf', 'if', 'vlsr']) –

  • range (Tuple[Union[int, float], Union[int, float]]) –

  • integ (Union[float, int]) –

Return type

None

pid(cmd, /, *, Kp=None, Ki=None, Kd=None, axis=None)

Alias of pid_parameter().

Parameters
  • cmd (Literal['set', '?']) –

  • Kp (Optional[Union[int, float]]) –

  • Ki (Optional[Union[int, float]]) –

  • Kd (Optional[Union[int, float]]) –

  • axis (Optional[Literal['az', 'el']]) –

Return type

None

sis(cmd, /, *, mV=None, id=None)

Alias of sis_bias().

Parameters
  • cmd (Literal['set', 'finalize', '?']) –

  • mV (Optional[Union[int, float]]) –

  • id (Optional[Union[str, list[str]]]) –

Return type

None

hemt(cmd, /, *, id=None)

Alias of hemt_bias().

Parameters
  • cmd (Literal['?']) –

  • id (Optional[str]) –

Return type

None

loatt(cmd, /, *, id=None, current=0.0)

Alias of local_attenuator().

Parameters
  • cmd (Literal['pass', 'finalize', '?']) –

  • id (Optional[Union[str, list[str]]]) –

  • current (float) –

Return type

None

vg(cmd='?', /)

Alias of vacuum_gauge().

Parameters

cmd (Literal['?']) –

Return type

None