Source code for oda_api.data_products

__author__ = "Andrea Tramacere"

import mimetypes
import typing
import traceback
import warnings

from json_tricks import numpy_encode,dumps
from astropy.io import fits as pf
from astropy.io import ascii as astropy_io_ascii
import json
from astropy.utils.misc import JsonCustomEncoder

from astropy.table import Table
from astropy.coordinates import Angle
from astropy.time import Time as aTime
from astropy.wcs import WCS
from astropy import units as u
from astropy.io.fits import FITS_rec, FitsHDU, Header

import  numpy
import numpy as np
import  base64
import  pickle
import gzip
import  hashlib

from io import StringIO, BytesIO
import pandas as pd
import puremagic
import os
import logging
from matplotlib import image as mpimg
from matplotlib import pyplot as plt

from abc import ABC, abstractmethod

logger = logging.getLogger('oda_api.data_products')



[docs] def _chekc_enc_data(data): if isinstance(data, list): _l=data else: _l=[data] return _l
[docs] class DataProduct(ABC): name: str | None meta_data: dict
[docs] @abstractmethod def encode(self, *args, **kwargs) -> str | dict[str, typing.Any]: ...
[docs] @classmethod @abstractmethod def decode(cls, *args, **kwargs) -> "DataProduct": ...
[docs] @abstractmethod def write_file(self, file_path, overwrite=True): ...
[docs] @classmethod @abstractmethod def from_file(cls, *args, **kwargs) -> "DataProduct": ...
[docs] @abstractmethod def suggest_fn_extension(self) -> str: ...
[docs] class ODAAstropyTable(DataProduct): def __init__(self,table_object: Table, name: str | None = None, meta_data: dict | None = None): if meta_data is None: meta_data = {} self.name=name self.meta_data=meta_data self._table=table_object def suggest_fn_extension(self) -> str: return 'fits' @property def table(self): return self._table
[docs] def write(self,file_name,format='fits',overwrite=True): self._table.write(file_name,format=format,overwrite=overwrite)
[docs] def write_fits_file(self,file_name,overwrite=True): self.write(file_name,overwrite=overwrite,format='fits')
def write_file(self, file_path, overwrite=True): if file_path.endswith('.fits'): self.write_fits_file(file_path, overwrite=overwrite) else: # determine the format from the file extension self._table.write(file_path, overwrite=overwrite)
[docs] @classmethod def from_file(cls, file_path, name=None, delimiter=None, format=None): _allowed_formats_=['ascii','ascii.ecsv','fits'] if format == 'fits': # print('==>',file_name) table = Table.read(file_path, format=format) elif format == 'ascii.ecsv' or format=='ascii': table = Table.read(file_path, format=format, delimiter=delimiter) else: raise RuntimeError('table format not understood, allowed',_allowed_formats_) meta = None if hasattr(table, 'meta'): meta = table.meta return cls(table, meta_data=meta, name=name)
[docs] def encode(self,use_binary=False,to_json = False): _o_dict = {} _o_dict['binary']=None _o_dict['ascii']=None if use_binary is True: _binarys = base64.b64encode(pickle.dumps(self.table, protocol=2)).decode('utf-8') _o_dict['binary'] = _binarys else: #with StringIO() as fh: fh=StringIO() self.table.write(fh, format='ascii.ecsv') _text = fh.getvalue() fh.close() _o_dict['ascii'] = _text _o_dict['name']=self.name _o_dict['meta_data']=dumps(self.meta_data) if to_json: _o_dict=json.dumps(_o_dict) return _o_dict
[docs] @classmethod def decode(cls, o_dict: dict[str, typing.Any], use_binary=False): if isinstance(o_dict, dict): _o_dict = o_dict elif isinstance(o_dict, str): _o_dict = json.loads(o_dict) else: raise RuntimeError('Wrong table structure') encoded_name = _o_dict['name'] encoded_meta_data = _o_dict['meta_data'] if use_binary is True: t_rec = base64.b64decode(_o_dict['binary']) try: t_rec = pickle.loads(t_rec) except Exception: t_rec = pickle.loads(t_rec, encoding='latin') else: t_rec = astropy_io_ascii.read(_o_dict['ascii']) return cls(typing.cast(Table, t_rec),name=encoded_name,meta_data=encoded_meta_data)
[docs] class BinaryData(object): def __init__(self,file_path=None): warnings.warn('BinaryData class is deprecated, please use BinaryProduct instead', DeprecationWarning, stacklevel=2) self.file_path=file_path
[docs] def encode(self,file_path=None): if file_path is None: file_path=self.file_path _file_binary = open(file_path, 'rb').read() # type: ignore _file_b64 = base64.urlsafe_b64encode(_file_binary) _file_b64_md5 = hashlib.md5(_file_binary).hexdigest() return _file_b64,_file_b64_md5
[docs] def decode(self,encoded_obj): return base64.urlsafe_b64decode(encoded_obj.encode('ascii', 'ignore'))
[docs] class BinaryProduct(DataProduct): # New implementation of binary data product. # The meaning of the methods is more in-line with the rest of the products def __init__(self, bin_data: bytes, name: str | None = None): self.bin_data = bin_data if name == 'None': name = None self.name = name
[docs] def suggest_fn_extension(self) -> str: suggested_extension = puremagic.from_string(self.bin_data) if suggested_extension: return suggested_extension.strip('.') return 'bin'
[docs] def encode(self): return { 'name': self.name, 'data': base64.urlsafe_b64encode(self.bin_data).decode(), 'md5': hashlib.md5(self.bin_data).hexdigest() }
[docs] @classmethod def decode(cls, encoded_obj: dict | str): if isinstance(encoded_obj, str): _encoded_obj: dict = json.loads(encoded_obj) else: _encoded_obj = encoded_obj name = _encoded_obj['name'] bin_data = base64.urlsafe_b64decode(_encoded_obj['data'].encode('ascii', 'ignore')) decoded_md5 = hashlib.md5(bin_data).hexdigest() assert decoded_md5 == _encoded_obj['md5'] return cls(bin_data, name)
[docs] def write_file(self, file_path, overwrite = True): if os.path.exists(file_path) and not overwrite: raise FileExistsError(f'File {file_path} already exists') with open(file_path, 'wb') as fd: fd.write(self.bin_data)
[docs] @classmethod def from_file(cls, file_path, name=None): with open(file_path, 'rb') as fd: bin_data = fd.read() return cls(bin_data, name)
[docs] class NumpyDataUnit: def __init__(self, data: numpy.ndarray | None, data_header: dict | None =None, meta_data: dict | None = None, hdu_type: str | None = None, name: str | None = None, units_dict: dict | None = None): if meta_data is None: meta_data = {} if data_header is None: data_header = {} self._hdu_type_list_ = ['primary', 'image', 'table', 'bintable'] self.name=name self._check_data(data) self._check_hdu_type(hdu_type) self._check_dict(data_header) self._check_dict(meta_data) self.data=data self.header=data_header self.meta_data=meta_data self.hdu_type=hdu_type self.units_dict=units_dict @property def name(self): return self._name @name.setter def name(self, name_value): if name_value is None: self._name = 'table' else: self._name = name_value # interface with a typo, preserving with a warning def _warn_chekc_typo(self): logger.debug('please _check_* instead of _chekc_* functions, they will be removed') def _chekc_data(self, data): self._warn_chekc_typo() return self._check_data(data) def _chekc_hdu_type(self,hdu_type): return self._check_hdu_type(hdu_type) def _chekc_dict(self, _kw): self._warn_chekc_typo() return self._check_dict(_kw) def _check_data(self, data: numpy.ndarray | None): if not (isinstance(data, numpy.ndarray) or data is None): raise RuntimeError('data is not numpy ndarray object') def _check_hdu_type(self, hdu_type: str | None): if not (hdu_type is None or hdu_type in self._hdu_type_list_): raise RuntimeError(f"hdu type {hdu_type} not in allowed {self._hdu_type_list_}") def _check_dict(self, d: dict): if not isinstance(d, dict): raise RuntimeError('object is not dict')
[docs] @classmethod def from_fits_hdu(cls, hdu: FitsHDU, name=None): if name is None or name == '': name = hdu.name r = cls(data=hdu.data, data_header={k:v for k, v in hdu.header.items()}, hdu_type=cls._map_hdu_type(hdu),name=name) # this is needed to re-read the file due to variable length file r.to_fits_hdu() return r
[docs] def to_fits_hdu(self): try: logger.debug('------------------------------') logger.debug('inside to_fits_hdu methods') logger.debug(f'name: {self.name}') logger.debug(f'header: {self.header}') logger.debug(f'data: {repr(self.data)}') logger.debug(f'units_dict: {self.units_dict}') logger.debug(f'hdu_type: {self.hdu_type}') logger.debug('------------------------------') for k,v in self.header.items(): if isinstance(v, list): s='' for el in v: s+='%s,'%str(el) self.header[k] = s #unicode(",".join(map(str,v))) return self.new_hdu_from_data( self.data, header=pf.header.Header(self.header), hdu_type=self.hdu_type, units_dict=self.units_dict) except Exception as e: error_message = 'an exception occurred in oda_api when binary products are formatted to fits header: ' + repr(e) logger.error(error_message) logger.error('traceback: %s', traceback.format_exc()) raise Exception("an exception occurred in oda_api when binary products are formatted to fits header") from e
@staticmethod def _map_hdu_type(hdu: FitsHDU): _t='' if isinstance(hdu, pf.PrimaryHDU): _t= 'primary' elif isinstance(hdu, pf.ImageHDU): _t = 'image' elif isinstance(hdu, pf.BinTableHDU): _t = 'bintable' elif isinstance(hdu, pf.TableHDU): _t = 'table' else: raise RuntimeError('hdu type not understood') return _t
[docs] def new_hdu_from_data(self, data: numpy.ndarray | None, hdu_type: str | None, header: Header | None = None, units_dict: dict | None = None): self._check_hdu_type(hdu_type) if hdu_type=='primary': h = pf.PrimaryHDU elif hdu_type=='image': h = pf.ImageHDU elif hdu_type == 'bintable': h = pf.BinTableHDU elif hdu_type == 'table': h = pf.TableHDU else: raise RuntimeError('hdu type ', hdu_type, 'not in allowed', self._hdu_type_list_) _h = h(data=data, header=header) _h.name = self.name if units_dict is not None: if isinstance(_h, pf.ImageHDU) or isinstance(_h, pf.PrimaryHDU): raise RuntimeError('units_dict is not None but hdu is not a table') for k in units_dict.keys(): _h.columns.change_unit(k, units_dict[k]) # pyright: ignore[reportOptionalMemberAccess] self.header=dict(_h.header) return _h
@staticmethod def _eval_dt(dt): try: dt = numpy.dtype(dt) except Exception: dt = eval(dt) return dt
[docs] def encode(self,use_pickle=False,use_gzip=False,to_json=False): _data = [] _meata_d=[] _kw_d = [] _d = None _dt = None _binarys = None if self.data is not None: _dt= numpy_encode(numpy.array(self.data))['dtype'] if use_pickle: if isinstance(self.data, FITS_rec): pickled_data = pickle.dumps(self.data) else: pickled_data = pickle.dumps(numpy.array(self.data)) if use_gzip: with BytesIO(pickled_data) as out_file: with gzip.GzipFile(fileobj=out_file, mode='wb') as gzip_file: gzip_file.write(pickled_data) _binarys = base64.b64encode(out_file.getvalue()) else: _binarys= base64.b64encode( pickled_data ) _binarys = _binarys.decode() else: _d= json.dumps(self.data, cls=JsonCustomEncoder) _o_dict = {'data': _d, 'dt': _dt, 'name': self.name, 'header': self.header, 'binarys': _binarys, 'meta_data': self.meta_data, 'hdu_type': self.hdu_type, 'units_dict': self.units_dict} if to_json: _o_dict_json = json.dumps(_o_dict) return _o_dict_json return _o_dict
[docs] @classmethod def decode(cls, encoded_obj: str | dict, use_gzip=False, from_json=False): if not from_json: try: encoded_obj = json.loads(encoded_obj) # type: ignore except (json.JSONDecodeError, TypeError): pass encoded_obj = typing.cast(dict, encoded_obj) encoded_data = encoded_obj['data'] encoded_dt=encoded_obj['dt'] encoded_header = encoded_obj['header'] encoded_meta_data=encoded_obj['meta_data'] _name=encoded_obj['name'] _hdu_type=encoded_obj['hdu_type'] _binarys=encoded_obj['binarys'] _units_dict = encoded_obj.get('units_dict') if _binarys is not None: with BytesIO() as in_file: _binarys = base64.b64decode(_binarys) if use_gzip: in_file.write(_binarys) in_file.seek(0) with gzip.GzipFile(fileobj=in_file, mode='rb') as gzip_file: _data = gzip_file.read().decode('utf-8') _data = pickle.loads(_data) # type: ignore else: _data = pickle.loads(_binarys, encoding='bytes') elif encoded_data is not None: encoded_data=eval(encoded_data) # !! for ID,c in enumerate(encoded_data): encoded_data[ID]=tuple(c) _data=numpy.asanyarray(encoded_data,dtype=cls._eval_dt(encoded_dt)) else: _data=None if (isinstance(_data, FITS_rec) and _hdu_type in ["table", "bintable"] and getattr(_data, "_tbsize", None) is None): _data._tbsize = int(np.prod([encoded_header[f"NAXIS{i}"] for i in range(1, encoded_header['NAXIS']+1)])) return cls(data=_data, data_header=encoded_header, meta_data=encoded_meta_data,name=_name,hdu_type=_hdu_type, units_dict=_units_dict)
@classmethod def from_pandas(cls, pandas_dataframe: pd.DataFrame, name: str | None = None, column_names: list | None = None, units_dict: dict | None = None, meta_data: dict | None = None, data_header: dict | None = None): if data_header is None: data_header = {} if meta_data is None: meta_data = {} if units_dict is None: units_dict = {} if column_names is None: column_names = [] if column_names and isinstance(column_names, list): pandas_dataframe = pandas_dataframe.loc[:, column_names] elif column_names and isinstance(column_names, dict): pandas_dataframe = pandas_dataframe.loc[:, column_names.keys()] pandas_dataframe.rename(columns=column_names, inplace=True) rec_array = pandas_dataframe.to_records(index=False) return cls(data = rec_array, name=name, units_dict = units_dict, meta_data = meta_data, data_header = data_header, hdu_type = 'bintable')
[docs] class NumpyDataProduct(DataProduct): def __init__( self, data_unit: NumpyDataUnit | list[NumpyDataUnit], name: str | None = None, meta_data: dict | None = None, ): if meta_data is None: meta_data = {} self.name=name self.data_unit=self._set_data(data_unit) self._check_dict(meta_data) self.meta_data=meta_data def suggest_fn_extension(self) -> str: return 'fits'
[docs] def show(self): print('------------------------------') print('name:',self.name) print('meta_data',self.meta_data.keys()) print ('number of data units',len(self.data_unit)) print ('------------------------------') for ID,du in enumerate(self.data_unit): print('data uniti',ID,',name:',du.name)
[docs] def show_meta(self): print('------------------------------') for k,v in self.meta_data.items(): print(k,':',v) print ('------------------------------')
[docs] def get_data_unit(self, ID: int) -> NumpyDataUnit: try: return self.data_unit[ID] except IndexError as e: raise RuntimeError(f"problem get_data_unit ID:{ID} in self.data_unit:{self.data_unit}") from e
[docs] def get_data_unit_by_name(self, name: str) -> typing.Union[NumpyDataUnit, None]: _du = None du = None for du in self.data_unit: if du.name == name: if _du is not None: logger.warning(f"get_data_unit_by_name found multiple du for name {name}") _du = du logger.info(f'--> NAME {du.name}') if _du is None: found_names = '; '.join([ str(_du.name) + ":" + repr(du) for _du in self.data_unit ]) logger.warning(f"get_data_unit_by_name found no du for name {name}, have {found_names}") return _du
def _set_data(self, data: NumpyDataUnit | list[NumpyDataUnit]): if isinstance(data, list): _dl = data else: _dl = [data] for _ID, _d in enumerate(_dl): if not isinstance(_d, NumpyDataUnit): raise RuntimeError ('DataUnit not valid') return _dl def _chekc_enc_data(self, data): return _chekc_enc_data(data) def _check_dict(self, _kw: dict): if not isinstance(_kw, dict): raise RuntimeError('object is not not dict')
[docs] def encode(self, use_pickle=True, use_gzip=False, to_json=False): _enc = [] for _ID, data_unit in enumerate(self.data_unit): _enc.append( data_unit.encode( use_pickle=use_pickle, use_gzip=use_gzip ) ) _o_dict={'data_unit_list':_enc,'name':self.name,'meta_data':dumps(self.meta_data)} if to_json: return json.dumps(_o_dict) return _o_dict
[docs] def to_fits_hdu_list(self): _hdul=pf.HDUList() for _ID,_d in enumerate(self.data_unit): _hdul.append(_d.to_fits_hdu()) return _hdul
[docs] def write_fits_file(self, filename: str, overwrite=True): self.to_fits_hdu_list().writeto(filename, overwrite=overwrite)
[docs] @classmethod def from_fits_file( cls, filename: str, ext: int | str | None = None, hdu_name: str | None = None, meta_data: dict | None = None, name: str = "", ): if meta_data is None: meta_data = {} hdul=pf.open(filename) if ext is not None: hdul = [hdul[ext]] if hdu_name is not None: _hdul = [] for hdu in hdul: if hdu.name == hdu_name: # type: ignore _hdul.append(hdu) hdul=_hdul return cls(data_unit=[NumpyDataUnit.from_fits_hdu(h) for h in hdul],meta_data=meta_data,name=name) # pyright: ignore[reportArgumentType]
[docs] @classmethod def decode(cls, encoded_obj: typing.Union[str, dict], from_json=False): if encoded_obj is not None: # from_json has the opposite meaning of what the name implies obj_dict: dict if from_json: if isinstance(encoded_obj, dict): obj_dict = encoded_obj else: logger.warning('decoding from unexpected object') obj_dict = encoded_obj # type: ignore else: if isinstance(encoded_obj, dict): obj_dict = encoded_obj else: logger.warning('decoding from unexpected object') try: obj_dict = json.loads(encoded_obj) except Exception as e: raise RuntimeError('unable to decode json object') from e encoded_data_unit_list = obj_dict['data_unit_list'] encoded_name = obj_dict['name'] encoded_meta_data = obj_dict['meta_data'] _data_unit_list=[] #print('encoded_data_unit_list',encoded_data_unit_list) for enc_data_unit in encoded_data_unit_list: _data_unit_list.append(NumpyDataUnit.decode(enc_data_unit,from_json=False)) else: _data_unit_list=[] encoded_name=None encoded_meta_data={} if isinstance(encoded_meta_data, str): try: encoded_meta_data = json.loads(encoded_meta_data) except Exception: # fallback if was not properly encoded, backward compatibility encoded_meta_data = eval(encoded_meta_data) return cls(data_unit=_data_unit_list,name=encoded_name,meta_data=encoded_meta_data)
@classmethod def from_file( cls, filename: str, ext: int | str | None = None, hdu_name: str | None = None, meta_data: dict | None = None, name: str = "", ): return cls.from_fits_file( filename=filename, ext=ext, hdu_name=hdu_name, meta_data=meta_data, name=name) def write_file(self, file_path, overwrite=True): self.write_fits_file(file_path, overwrite=overwrite)
[docs] class ApiCatalog(DataProduct): def __init__(self, catalog_data: dict | Table, name: str | None = None): self.name=name _skip_list=['meta_ID'] if isinstance(catalog_data, dict): meta = {} lon_name = None if 'cat_lon_name' in catalog_data.keys(): lon_name = catalog_data['cat_lon_name'] lat_name = None if 'cat_lat_name' in catalog_data.keys(): lat_name = catalog_data['cat_lat_name'] frame = None if 'cat_frame' in catalog_data.keys(): frame = catalog_data['cat_frame'] coord_units = None if 'cat_coord_units' in catalog_data.keys(): coord_units = catalog_data['cat_coord_units'] if 'cat_meta' in catalog_data.keys(): cat_meta_entry = catalog_data['cat_meta'] meta.update(cat_meta_entry) meta['FRAME'] = frame meta['COORD_UNIT'] = coord_units meta['LON_NAME'] = lon_name meta['LAT_NAME'] = lat_name self.table =Table(catalog_data['cat_column_list'], names=catalog_data['cat_column_names'],meta=meta) if coord_units is not None: self.table[lon_name]=Angle(self.table[lon_name],unit=coord_units) self.table[lat_name]=Angle(self.table[lat_name],unit=coord_units) self.lat_name=lat_name self.lon_name=lon_name else: self.table = catalog_data meta = getattr(self.table, 'meta', {}) self.lat_name = meta.get('LAT_NAME') self.lon_name = meta.get('LON_NAME')
[docs] def get_api_dictionary(self, dump_string=True): column_lists = [] for colname in self.table.colnames: column_lists.append([x if str(x) != 'nan' else None for x in self.table[colname].tolist()]) # type: ignore cat_dict = dict(cat_frame=self.table.meta['FRAME'], # pyright: ignore[reportOptionalSubscript] cat_coord_units=self.table.meta['COORD_UNIT'], # pyright: ignore[reportOptionalSubscript] cat_column_list=column_lists, cat_column_names=self.table.colnames, cat_column_descr=self.table.dtype.descr, cat_lat_name=self.lat_name, cat_lon_name=self.lon_name) if dump_string: return json.dumps(cat_dict) else: return cat_dict
def encode(self): return self.get_api_dictionary(dump_string=False) @classmethod def decode(cls, encoded_obj : str | dict[str, typing.Any], name: str | None = None) -> "ApiCatalog": if isinstance(encoded_obj, str): obj = json.loads(encoded_obj) else: obj = encoded_obj return cls(obj, name = name) def suggest_fn_extension(self) -> str: return 'ecsv' def write_file(self, file_path, overwrite=True): # determine format from the file extension self.table.write(file_path, overwrite=overwrite) @classmethod def from_file(cls, file_path: str, name: str | None = None, delimiter: str | None = None, format: str | None = None): allowed_formats=['ascii','ascii.ecsv','fits'] if format in allowed_formats: kw = {'delimiter': delimiter} if format != 'fits' and delimiter is not None else {} table = Table.read(file_path, format=format, **kw) else: raise RuntimeError(f'Catalog file format not understood, allowed: {allowed_formats}') return cls(table, name=name)
[docs] class GWEventContours: def __init__(self, event_contour_dict, name='') -> None: self._contour_dict = event_contour_dict self.name = name self.levels = event_contour_dict['levels'] self.contours = event_contour_dict['contours']
[docs] class GWContoursDataProduct: def __init__(self, contours_dict: dict) -> None: self._cont_data = contours_dict self.event_list = list(self._cont_data.keys()) self.contours = {} for key, val in self._cont_data.items(): self.contours[key] = GWEventContours(val, name = key) setattr(self, key, self.contours[key])
[docs] class LightCurveDataProduct(NumpyDataProduct):
[docs] @classmethod def from_arrays(cls, times, timedel = None, fluxes = None, magnitudes = None, rates = None, counts = None, errors = None, units_spec = None, # TODO: not used yet time_format = None, name = None): if units_spec is None: units_spec = {} data_header = {} meta_data = {} # meta data could be attached to both NumpyDataUnit and NumpyDataProduct. Decide on this if (fluxes is not None) + (magnitudes is not None) + (rates is not None) + (counts is not None) != 1: raise ValueError('Only one type of values should be set') elif fluxes is not None: col_name = 'FLUX' values = fluxes elif magnitudes is not None: col_name = 'MAG' values = magnitudes elif rates is not None: col_name = 'RATE' values = rates elif counts is not None: col_name = 'COUNTS' values = counts else: raise ValueError('One of fluxes/magnitudes/rates/counts arguments is required') if len(values) != len(times): raise ValueError(f'Value column length {len(values)} do not coincide with time {len(times)} column length') if errors is not None and len(errors) != len(times): raise ValueError('Error column length do not coincide with time column length') # TODO: possibility for other time units # TODO: add time-related keywords to header (OGIP) units_dict = {'TIME': 'd'} if any(isinstance(x, aTime) for x in times): times = aTime(times) if isinstance(times, aTime): mjd = times.mjd elif time_format is not None: atimes = aTime(times, format=time_format) # NOTE: do we assume paticular scale? mjd = atimes.mjd else: atimes = aTime(times) mjd = atimes.mjd if any(isinstance(x, u.Quantity) for x in values): values = u.Quantity(values) if isinstance(values, u.Quantity): units_dict[col_name] = values.unit.to_string(format='OGIP') # pyright: ignore[reportOptionalMemberAccess] values = values.value if errors is not None and any(isinstance(x, u.Quantity) for x in errors): errors = u.Quantity(errors) if errors is not None and isinstance(errors, u.Quantity): units_dict['ERROR'] = errors.unit.to_string(format='OGIP') # pyright: ignore[reportOptionalMemberAccess] errors = errors.value timedel_is_array = False if timedel is not None: # Accept scalar numeric (int/float), numpy scalar, or astropy Quantity if isinstance(timedel, u.Quantity): # convert timedel to days (internal TIME unit is 'd') td_days = timedel.to(u.day).value else: td_days = timedel if np.isscalar(td_days): # treat numeric scalar as days by default td_days = float(td_days) # pyright: ignore[reportArgumentType] data_header['TIMEDEL'] = td_days else: # If it's array-like, ensure it matches times length try: if len(td_days) != len(times): raise ValueError('timedel length must match times length when providing an array') units_dict['TIMEDEL'] = 'd' timedel = np.asarray(timedel) except TypeError as e: raise ValueError('timedel type not understood; expected scalar, Quantity, or array-like') from e if timedel is None or timedel_is_array is False: rec_array = np.rec.fromarrays([mjd, values, errors], names=("TIME", col_name, "ERROR")) # type:ignore else: rec_array = np.rec.fromarrays([mjd, values, errors, timedel], names=("TIME", col_name, "ERROR", "TIMEDEL")) # type:ignore return cls([NumpyDataUnit(data=np.array([]), name = 'PRIMARY', hdu_type = 'primary'), NumpyDataUnit(data = rec_array, units_dict = units_dict, meta_data = meta_data, data_header = data_header, hdu_type = 'bintable', name = 'LC')], name = name)
[docs] class PictureProduct(DataProduct): def __init__(self, binary_data, name=None, metadata=None, file_path=None, write_on_creation=False): if metadata is None: metadata = {} self.binary_data = binary_data self.metadata = metadata self.name = name if file_path is not None and os.path.isfile(file_path): self.file_path = file_path logger.info(f'Image file {file_path} already exist. No automatical rewriting.') elif write_on_creation: self.write_file(file_path) else: self.file_path = None tp = puremagic.from_string(binary_data).lstrip('.') known_img = [t.lstrip('.') for t,v in mimetypes.types_map.items() if v.startswith('image')] if tp not in known_img: raise ValueError('Provided data is not an image') self.img_type = tp
[docs] def suggest_fn_extension(self) -> str: return f'{self.img_type}'
[docs] @classmethod def from_file(cls, file_path, name=None): with open(file_path, 'rb') as fd: binary_data = fd.read() return cls(binary_data, name=name, file_path=file_path)
[docs] def write_file(self, file_path, overwrite=False): if os.path.isfile(file_path) and not overwrite: raise FileExistsError(f'File {file_path} already exists. Use overwrite=True') logger.info(f'Creating image file {file_path}.') with open(file_path, 'wb') as fd: fd.write(self.binary_data) self.file_path = file_path
[docs] def encode(self): b64data = base64.urlsafe_b64encode(self.binary_data) output_dict = {} output_dict['img_type'] = self.img_type output_dict['b64data'] = b64data.decode() output_dict['metadata'] = self.metadata output_dict['name'] = self.name if self.file_path: output_dict['filename'] = os.path.basename(self.file_path) return output_dict
[docs] @classmethod def decode(cls, encoded_data, write_on_creation = False): if isinstance(encoded_data, dict): _encoded_data = encoded_data else: _encoded_data = json.loads(encoded_data) binary_data = base64.urlsafe_b64decode(_encoded_data['b64data'].encode('ascii', 'ignore')) return cls(binary_data, metadata = _encoded_data['metadata'], file_path = _encoded_data.get('filename'), name = _encoded_data.get('name'), write_on_creation = write_on_creation)
[docs] def show(self): byte_stream = BytesIO(self.binary_data) image = mpimg.imread(byte_stream) plt.imshow(image) plt.axis('off') plt.show()
[docs] class ImageDataProduct(NumpyDataProduct):
[docs] @classmethod def from_fits_file(cls,filename,ext=None,hdu_name=None,meta_data=None,name=None): if meta_data is None: meta_data = {} npdp = super().from_fits_file(filename,ext=ext,hdu_name=hdu_name,meta_data=meta_data,name=name) contains_image = cls.check_contains_image(npdp) if contains_image: return npdp else: raise ValueError(f'FITS file {filename} doesn\'t contain image data.')
[docs] @staticmethod def check_contains_image(numpy_data_prod): for hdu in numpy_data_prod.data_unit: if hdu.hdu_type in ['primary', 'image']: try: WCS(hdu.header) return True except Exception: pass return False
[docs] class TextLikeProduct(DataProduct): def __init__(self, value, name=None, meta_data=None): if meta_data is None: meta_data = {} self.value = value self.name = name self.meta_data = meta_data
[docs] def encode(self): return {'name': self.name, 'value': self.value, 'meta_data': self.meta_data}
[docs] @classmethod def decode(cls, encoded): if not isinstance(encoded, dict): encoded = json.loads(encoded) return cls(name=encoded.get('name'), value=encoded['value'], meta_data=encoded.get('meta_data', {}))
def __repr__(self): return self.encode().__repr__()
[docs] def write_file(self, file_path, overwrite=True): if os.path.isfile(file_path) and not overwrite: raise FileExistsError(f"File {file_path} already exists") with open(file_path, 'w') as fd: fd.write(self.value)
[docs] @classmethod def from_file(cls, file_path: str, name: str | None = None, meta_data: dict | None = None) -> DataProduct: with open(file_path, 'r') as fd: value = fd.read() return cls(name=name, value=value, meta_data=meta_data)
[docs] def suggest_fn_extension(self) -> str: return 'txt'