Source code for neclib.recorders.recorder
import os
import time
from datetime import datetime
from pathlib import Path
from threading import Event, Thread
from typing import List, Optional, Union
from ..core import get_logger
from .writer_base import Writer
[docs]class Recorder:
"""Data recorder, to be used with arbitrary writers.
Parameters
----------
record_root
Root directory of data storage. All the data will be stored inside this
directory. Subdirectories will be structured by attached writers.
Examples
--------
>>> recorder = neclib.recorders.Recorder("/home/user/data")
>>> recorder.add_writer(neclib.recorders.DBWriter(), neclib.recorders.FileWriter())
>>> recorder.start_recording()
>>> recorder.append("test", 1) # Arbitrary number and type of data can be passed
>>> recorder.stop_recording()
"""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, record_root: Path) -> None:
self.__writers: List[Writer] = []
self.record_root = Path(record_root)
self.recording_path = None
self.logger = get_logger(self.__class__.__name__)
self._thread: Optional[Thread] = None
self._event: Optional[Event] = None
[docs] def add_writer(self, *writers: Writer) -> None:
"""Attach writer(s) to this recorder."""
if any(type(w) is type for w in writers):
raise TypeError("Writer should be instantiated.")
self.__writers.extend(writers)
@property
def writers(self):
"""List of attached writers."""
return self.__writers
[docs] def drop_writer(self, *writers: Union[int, Writer]) -> None:
"""Drop writer(s) from this recorder."""
for writer in writers:
if isinstance(writer, int):
to_remove = self.__writers.pop(writer)
to_remove.stop_recording()
else:
writer.stop_recording()
self.__writers.remove(writer)
[docs] def start_recording(
self, record_dir: Optional[os.PathLike] = None, *, noreset: bool = False
) -> None:
"""Activate all attached writers."""
if self.is_recording:
return
if record_dir is not None:
if (self._thread is not None) or (self._event is not None):
raise RuntimeError(
"Cannot start named recording with background check running. "
"Please stop the recorder first."
)
self.recording_path = self.record_root / Path(record_dir)
else:
self.recording_path = self._auto_generate_record_dir()
if not noreset:
self._thread = Thread(target=self._check_db_date, daemon=True)
self._event = Event()
self._thread.start()
for writer in self.__writers:
writer.start_recording(self.recording_path)
[docs] def append(self, *args, **kwargs) -> None:
"""Pass data to all attached writers."""
if self.recording_path is None:
raise RuntimeError("Recorder not started. Incoming data won't be kept.")
handled = [writer.append(*args, **kwargs) for writer in self.__writers]
if not any(handled):
err_msg = f"No writer handled the data: {args, kwargs}"
self.logger.warning(err_msg[slice(0, min(100, len(err_msg)))])
[docs] def stop_recording(self, *, noreset: bool = False) -> None:
"""Deactivate all attached writers."""
if not self.is_recording:
return
if not noreset:
if self._event is not None:
self._event.set()
if self._thread is not None:
self._thread.join()
self._thread = self._event = None
for writer in self.__writers:
writer.stop_recording()
self.recording_path = None
def _auto_generate_record_dir(self) -> Path:
now = datetime.utcnow()
record_dir = self.record_root / now.strftime("%Y%m") / now.strftime("%Y%m%d")
record_dir.mkdir(parents=True, exist_ok=True)
return record_dir
def _check_db_date(self) -> None:
if self._event is None:
return
while not self._event.is_set():
if self.recording_path != self._auto_generate_record_dir():
self.stop_recording(noreset=True)
self.start_recording(noreset=True)
time.sleep(1)
@property
def is_recording(self) -> bool:
"""Whether this recorder is accepting data or not."""
return self.recording_path is not None