Source code for majis.itl.timeline

"""MAJIS ITL timeline sub-module."""

import warnings
from collections import defaultdict, namedtuple
from pathlib import Path

from numpy import datetime64
from openpyxl import Workbook, load_workbook
from packaging.version import Version

from planetary_coverage import datetime
from planetary_coverage.events import EventsDict, EventsList
from planetary_coverage.html import table

from ..misc import fmt_datetime
from ..misc.events import flatten, flatten_events
from ..misc.evf import ref_key
from ..misc.time import fmt_timedelta
from .reader import read_itl

DEFAULT_TEMPLATE = Path(__file__).parent / 'timeline.xlsm'

BINNING = {
    '1': 'No Binning',
    '2': 'Binning x2',
    '4': 'Binning x4',
}

ITL_MAPPING = {
    'OBS_NAME': ('OBS_NAME', str),
    'start_angle': ('START_ANGLE', float),  # °
    'start_scan_speed': ('START_SCAN_SPEED', float),  # °/s
    'stop_scan_speed': ('STOP_SCAN_SPEED', float),  # °/s
    'Scanner step per frame': ('SYNCHRONOUS', float),  # (-3, 0, 3)
    'stop_angle': ('STOP_ANGLE', float),  # °
    'First CU_frame start (UTC)': ('t_start', fmt_datetime),  # YYYY-MM-DDThh:mm:ss.msZ
    'Last CU_frame stop (UTC)': ('t_end', fmt_datetime),  # YYYY-MM-DDThh:mm:ss.msZ
    'cu_trep_ms': ('CU_TREP', lambda s: int(str(s).replace('ms', ''))),  # ms
    'spatial_binning': ('BINNING', lambda i: BINNING[str(i)]),
    'nb_cu_frames_tot': ('CU_FRAME', int),
    'ppe': ('PPE', int),
    'Start Row VI': ('START_ROW_VIS', int),
    'prime': ('PRIME', lambda b: 'MAJIS' if b else 'other'),
    'Comments': ('COMMENTS', lambda s: s if s else None),
    'ITL name': ('ITL', lambda p: Path(p).name if p else None),
}

ChangeLog = namedtuple('Log', 'version,date,author,change')


[docs] class TimelineChangeLog: """MAJIS timeline change log.""" def __init__(self, template: Workbook, sheet_name='change log'): self.log = self.read_changelog(list(template[sheet_name])[1:]) def __str__(self): return '\n\n'.join( f'>>> {str(log.version):>6} | {log.date} | {log.author}\n{log.change}' for log in self.log ) def __repr__(self): return str(self) def __getitem__(self, i): return self.log[i] def __iter__(self): return iter(self.log) def _repr_html_(self): return table(self.log, header=['Version', 'Date', 'Author', 'Changes'])
[docs] def read_changelog(self, rows: list) -> (list, list): """Read changelog changes grouped by version, date and author.""" log = defaultdict(list) # Group release changes key = None, None, None for row in rows: version, date, change, author = (cell.value for cell in row[:4]) if change and change != '…': key = ( Version(str(version)) if version else key[0], str(date.date()) if date else key[1], str(author) if author else key[2], ) log[key].extend(change.splitlines()) # Reverse changes order (latest first) and add line returns between changes return [ChangeLog(*key, '\n'.join(log[key])) for key in reversed(log)]
[docs] class Timeline: """MAJIS timeline from template. You can either append an existing timeline template or use the default one. Warning ------- Some extensions (`Data Validation`) are not included into the exported file. This is a openpyxl limitation. """ sheet_name = 'Timeline' def __init__( self, observations: str | Path | EventsList | EventsDict | None = None, timeline: str | Path | None = None, ca_ref: str | dict | None = None, refs: dict | str | list | None = None, ): self.fname = Path(timeline) if timeline else DEFAULT_TEMPLATE self._load_data() # Set global reference time w.r.t. C/A self.ca_ref = ca_ref if observations: self.append(observations, refs=refs) def __repr__(self): return f"{self.__class__.__name__}('{self.fname}')" def __len__(self) -> int: return len( [ obs for obs in self._timeline[self.fields['OBS_NAME']][self.header :] if obs.value ] ) def __getitem__(self, item: str | int | tuple) -> list | dict | str | int | float: if isinstance(item, str): if item in self.fields: return [ col.value for col in self._timeline[self.fields[item]][ self.header : self.header + len(self) ] ] raise KeyError(f'Unknown `{item}` key') if isinstance(item, int): if 1 <= item <= len(self): return { key: cell.value for key, cell in zip( self.fields, self._timeline[self.header + item], strict=True ) if cell.value and not str(cell.value).startswith('=') } raise IndexError(f'Invalid index: {item} not between 1 and {len(self)}') if isinstance(item, tuple): name, i = item return self[name][i - 1] raise TypeError( f'Only `str`, `int` or `str, int` are accepted (`{type(item).__name__}` provided)' ) def __setitem__(self, item: (str, int), value: str | int | float) -> None: name, i = item self._timeline[self.fields[name] + str(i + self.header)] = value def _repr_html_(self): return table( [ [cell.value for cell in row] for row in self._timeline[f'{self.header + 1}:{self.header + len(self)}'] ], header=list(self.fields), ) def _load_data(self) -> None: """Load timeline data.""" # Disable openpyxl warning about unsupported extension (`Data validation`) with warnings.catch_warnings(): warnings.simplefilter('ignore') self._template = load_workbook(self.fname, keep_vba=True) # Load Timeline spreadsheet self._timeline = self._template[self.sheet_name] # Load Timeline header fields mapping self.fields = {cell.value: cell.column_letter for cell in self._timeline[1]} # Load template changelog if 'template change log' in self._template.sheetnames: sheet_name = 'template change log' else: sheet_name = 'change log' self.log = TimelineChangeLog(self._template, sheet_name=sheet_name) @property def version(self): """Template version from changelog.""" return self.log[0].version @property def header(self): """Timeline header size.""" return 2 if self.version < Version('2.0') else 3
[docs] def append( self, observations: str | Path | EventsList | EventsDict, refs: dict | str | list | None = None, ) -> None: """Append ITL observations blocks.""" if isinstance(observations, str | Path): observations = read_itl(observations, refs=refs) for i, obs in enumerate(flatten(observations), start=len(self) + 1): # Required fields for field, (key, fmt) in ITL_MAPPING.items(): self[field, i] = fmt(obs[key]) # Additional fields self['Mirror Flag', i] = ( 'ENABLE' if float(obs['START_SCAN_SPEED']) or float(obs['STOP_SCAN_SPEED']) else 'DISABLE' ) self['First CU_frame start wrt C/A', i] = self._fmt_timedelta(obs.start) self['Last CU_frame stop wrt C/A', i] = self._fmt_timedelta(obs.stop)
@property def ca_ref(self) -> datetime: """C/A reference for relative time.""" return self._ca_ref @ca_ref.setter def ca_ref(self, ca_ref): """Set relative time w.r.t. C/A reference. Warning ------- All the previous relative values will be recomputed if this value is changed. """ if ca_ref: _, self._ca_ref = ref_key(ca_ref) # Recompute relative values for all observation w.r.t. C/A reference for i in range(1, len(self) + 1): self['First CU_frame start wrt C/A', i] = self._fmt_timedelta( self['First CU_frame start (UTC)', i] ) self['Last CU_frame stop wrt C/A', i] = self._fmt_timedelta( self['Last CU_frame stop (UTC)', i] ) else: self._ca_ref = None def _fmt_timedelta(self, t: str | datetime64) -> str | None: """Compute relative time w.r.t. C/A reference.""" return fmt_timedelta(datetime(t) - self.ca_ref) if self.ca_ref else None
[docs] def save(self, fout: str | Path | None = None) -> Path: """Save MAJIS timeline.""" if fout: fout = Path(fout) elif self.fname != DEFAULT_TEMPLATE: fout = self.fname else: raise FileExistsError( 'Can not overwrite default template. Please provide and export filename.' ) self._template.save(fout) return fout
@property def science(self) -> TimelineChangeLog: """Science change log. Raises ------ ValueError: Only available for template ≥ 2.0 """ if self.version < Version('2.0'): raise ValueError( f'Science changelog is only available in template ≥ 2.0 (current: `{self.version}`)' ) return TimelineChangeLog(self._template, sheet_name='science change log')
@flatten_events def save_itl_xlsm( fout: str | Path | None, *events: EventsList | EventsDict, ca_ref: str | dict | None = None, overlap: bool = False, timeline: str | Path | None = None, ) -> Path: """Save ITL events to XLSM timeline. If a timeline is provided but no explicit output file the output file will be same in the original timeline. """ return Timeline(events, timeline=timeline, ca_ref=ca_ref).save(fout)