Source code for neclib.devices.spectrometer.xffts
import queue
import struct
import time
import traceback
from threading import Event, Thread
from typing import Dict, List, Tuple
import xfftspy
from ... import get_logger
from .spectrometer_base import Spectrometer
[docs]class XFFTS(Spectrometer):
"""Spectrometer, which can do FFT in 8 IF.
Notes
-----
Configuration items for this device:
host : str
IP address for ethernet communicator.
If you operate this device in local network, you should be set
this parameter to “localhost”.
data_port : int
Ethernet port of using devices. This port is used for data
transmmition. The default value of this device is 25144.
cmd_port : str
Ethernet port of using devices. This port is used for command
operation. The default value of this device is 16210.
synctime_us : int
Sync time of data transmmition in unit us. The minimum value of
this device is 100000.
bw_MHz : Dict[int]
Band width of each XFFTS boads in MHz unit.
You must define this parameter to all boad which you use.
For Example: You use 4 boads and set band width to 2000 MHz,
``{ 1 = 2000, 2 = 2000, 3 = 2000, 4 = 2000 }``
The maximum value of band width is 2500 MHz.
max_ch : int
Max spectral channel of spectrometer. This number should be
power of 2.
The maximum number of this device is 32768.
See defaults setting file in ``neclib/defaults/config.toml``.
"""
Manufacturer: str = "Radiometer Physics GmbH"
Model: str = "XFFTS"
Identifier = "host"
def __init__(self) -> None:
self.logger = get_logger(self.__class__.__name__)
self.host = self.Config.host
self.cmd_port = self.Config.cmd_port
self.data_port = self.Config.data_port
self.synctime_us = self.Config.synctime_us
self.bw_mhz = {int(k): v for k, v in self.Config.bw_MHz.items()}
self.data_input, self.setting_output = self.initialize()
self.data_queue = queue.Queue(maxsize=self.Config.record_quesize)
self.thread = None
self.event = None
self.start()
self.warn = False
[docs] def start(self) -> None:
if (self.thread is not None) or (self.event is not None):
self.stop()
self.thread = Thread(target=self._read_data, daemon=True)
self.event = Event()
self.thread.start()
def _read_data(self) -> None:
while (self.event is not None) and (not self.event.is_set()):
if self.data_queue.full():
if self.warn:
self.logger.warning(
"Dropping the data due to low readout frequency."
)
self.warn = False
self.data_queue.get()
try:
data = self.data_input.receive_once()
self.data_queue.put((time.time(), data["data"]))
except struct.error:
exc = traceback.format_exc()
self.logger.warning(exc[slice(0, min(len(exc), 100))])
[docs] def stop(self) -> None:
if self.event is not None:
self.event.set()
if self.thread is not None:
self.thread.join()
self.event = self.thread = None
self.warn = False
[docs] def initialize(self) -> Tuple[xfftspy.data_consumer, xfftspy.udp_client]:
"""Get configured data input and setting output."""
setting_output = xfftspy.udp_client(self.host, self.cmd_port, print=False)
setting_output.stop()
setting_output.set_synctime(self.synctime_us) # synctime in us
_sections = [int(i in self.bw_mhz) for i in range(1, max(self.bw_mhz) + 1)]
setting_output.set_usedsections(_sections)
for board_id, bw_mhz in self.bw_mhz.items():
setting_output.set_board_bandwidth(board_id, bw_mhz)
setting_output.configure() # Apply settings
setting_output.caladc() # Calibrate ADCs
setting_output.start()
data_input = xfftspy.data_consumer(self.host, self.data_port)
data_input.clear_buffer()
return data_input, setting_output
[docs] def get_spectra(self) -> Tuple[float, Dict[int, List[float]]]:
self.warn = True
return self.data_queue.get()
[docs] def change_spec_ch(self, chan):
self.setting_output.stop()
for board in self.bw_mhz.keys():
self.logger.info(
f"Record channel number changed; {chan} ch data will be saved"
)
self.setting_output.set_board_numspecchan(board, chan)
self.setting_output.configure()
self.setting_output.start()
[docs] def finalize(self) -> None:
self.setting_output.stop()
self.stop()