Source code for oda_api.data_products

from __future__ import absolute_import, division, print_function

from builtins import (bytes, str, open, super, range,
                      zip, round, input, int, pow, object, map, zip)

__author__ = "Andrea Tramacere"

# Standard library
# eg copy
# absolute import rg:from copy import deepcopy

# Dependencies
# eg numpy
# absolute import eg: import numpy as np

# Project
# relative import eg: from .mod import f

import typing

from json_tricks import numpy_encode,dumps,loads,numeric_types_hook,hashodict,json_numpy_obj_hook
from astropy.io import fits as pf
from astropy.io import ascii as astropy_io_ascii
import json
from astropy.utils.misc import JsonCustomEncoder

from astropy.table import Table
from astropy.coordinates import Angle
from astropy.wcs import WCS

import  numpy
import numpy as np
import  base64
import  pickle
import gzip
import  hashlib
from numpy import nan,inf
from sys import path_importer_cache, version_info

from io import StringIO, BytesIO
import imghdr
import os
import logging
from matplotlib import image as mpimg
from matplotlib import pyplot as plt

logger = logging.getLogger('oda_api.data_products')

__all__=['sanitize_encoded','_chekc_enc_data','BinaryData','NumpyDataUnit','NumpyDataProduct','ApiCatalog','ODAAstropyTable']

import astropy.io.fits.fitsrec


# these 3 functions are remnants of misusing repr() to serialize data instead of json
[docs] def sanitize_encoded(d): d = d.replace('null', 'None') d = d.replace('true', 'True') d = d.replace('false', 'False') d = d.replace('NaN', 'nan') d = d.replace('Infinity', 'inf') return d
def json_to_literal(d): d = d.replace('null', 'None') d = d.replace('true', 'True') d = d.replace('false', 'False') d = d.replace('NaN', 'nan') d = d.replace('Infinity', 'inf') return d def literal_to_json(d): d = d.replace('None', 'null') d = d.replace('True', 'true') d = d.replace('False', 'false') d = d.replace('nan', 'NaN') d = d.replace('inf', 'Infinity') return d
[docs] def _chekc_enc_data(data): if type(data)==list: _l=data else: _l=[data] return _l
[docs] class ODAAstropyTable(object): def __init__(self,table_object,name=None, meta_data={}): self.name=name self.meta_data=meta_data self._table=table_object @property def table(self): return self._table
[docs] def write(self,file_name,format='fits',overwrite=True): self._table.write(file_name,format=format,overwrite=overwrite)
[docs] def write_fits_file(self,file_name,overwrite=True): self.write(file_name,overwrite=overwrite,format='fits')
[docs] @classmethod def from_file(cls, file_path, name=None, delimiter=None, format=None): _allowed_formats_=['ascii','ascii.ecsv','fits'] if format == 'fits': # print('==>',file_name) table = Table.read(file_path, format=format) elif format == 'ascii.ecsv' or format=='ascii': table = Table.read(file_path, format=format, delimiter=delimiter) else: raise RuntimeError('table format not understood, allowed',_allowed_formats_) meta = None if hasattr(table, 'meta'): meta = table.meta return cls(table, meta_data=meta, name=name)
[docs] def encode(self,use_binary=False,to_json = False): _o_dict = {} _o_dict['binary']=None _o_dict['ascii']=None if use_binary is True: _binarys = base64.b64encode(pickle.dumps(self.table, protocol=2)).decode('utf-8') _o_dict['binary'] = _binarys else: #with StringIO() as fh: fh=StringIO() self.table.write(fh, format='ascii.ecsv') _text = fh.getvalue() fh.close() _o_dict['ascii'] = _text _o_dict['name']=self.name _o_dict['meta_data']=dumps(self.meta_data) if to_json == True: _o_dict=json.dumps(_o_dict) return _o_dict
[docs] @classmethod def decode(cls,o_dict,use_binary=False): if isinstance(o_dict, dict): _o_dict = o_dict elif isinstance(o_dict, str): _o_dict = json.loads(literal_to_json(o_dict)) encoded_name = _o_dict['name'] encoded_meta_data = _o_dict['meta_data'] if use_binary is True: t_rec = base64.b64decode(_o_dict['binary']) try: t_rec = pickle.loads(t_rec) except: t_rec= pickle.loads(t_rec,encoding='latin') else: t_rec = astropy_io_ascii.read(_o_dict['ascii']) return cls(t_rec,name=encoded_name,meta_data=encoded_meta_data)
[docs] class BinaryData(object): def __init__(self,file_path=None): self.file_path=file_path
[docs] def encode(self,file_path=None): if file_path==None: file_path=self.file_path _file_binary = open(file_path, 'rb').read() _file_b64 = base64.urlsafe_b64encode(_file_binary) _file_b64_md5 = hashlib.md5(_file_binary).hexdigest() return _file_b64,_file_b64_md5
[docs] def decode(self,encoded_obj): return base64.urlsafe_b64decode(encoded_obj.encode('ascii', 'ignore'))
class BinaryProduct: # New implementation of binary data product. # The meaning of the methods is more in-line with the rest of the products def __init__(self, bin_data, name=None): self.bin_data = bin_data if name == 'None': name = None self.name = name def encode(self): return { 'name': self.name, 'data': base64.urlsafe_b64encode(self.bin_data).decode(), 'md5': hashlib.md5(self.bin_data).hexdigest() } @classmethod def decode(cls, encoded_obj): if not isinstance(encoded_obj, dict): encoded_obj = json.loads(encoded_obj) name = encoded_obj['name'] bin_data = base64.urlsafe_b64decode(encoded_obj['data'].encode('ascii', 'ignore')) decoded_md5 = hashlib.md5(bin_data).hexdigest() assert decoded_md5 == encoded_obj['md5'] return cls(bin_data, name) def write_file(self, file_path): with open(file_path, 'wb') as fd: fd.write(self.bin_data) @classmethod def from_file(cls, file_path, name=None): with open(file_path, 'rb') as fd: bin_data = fd.read() return cls(bin_data, name)
[docs] class NumpyDataUnit(object): def __init__(self, data, data_header={}, meta_data={}, hdu_type=None, name=None, units_dict=None): self._hdu_type_list_ = ['primary', 'image', 'table', 'bintable'] self.name=name self._check_data(data) self._check_hdu_type(hdu_type) self._check_dict(data_header) self._check_dict(meta_data) self.data=data self.header=data_header self.meta_data=meta_data self.hdu_type=hdu_type self.units_dict=units_dict @property def name(self): return self._name @name.setter def name(self, name_value): if name_value is None: self._name = 'table' else: self._name = name_value # interface with a typo, preserving with a warning def _warn_chekc_typo(self): logger.debug('please _check_* instead of _chekc_* functions, they will be removed') def _chekc_data(self, data): self._warn_chekc_typo() return self._check_data(data) def _chekc_hdu_type(self,hdu_type): return self._check_hdu_type(hdu_type) def _chekc_dict(self, _kw): self._warn_chekc_typo() return self._check_dict(_kw) def _check_data(self,data): if isinstance(data, numpy.ndarray) or data is None: pass else: raise RuntimeError('data is not numpy ndarray object') def _check_hdu_type(self,hdu_type): if hdu_type is None: pass elif hdu_type in self._hdu_type_list_: pass else: raise RuntimeError('hdu type ', hdu_type, 'not in allowed', self._hdu_type_list_) def _check_dict(self,_kw): if isinstance(_kw, dict): pass else: raise RuntimeError('object is not not dict')
[docs] @classmethod def from_fits_hdu(cls,hdu,name=None): if name is None or name == '': name=hdu.name return cls(data=hdu.data, data_header={k:v for k, v in hdu.header.items()}, hdu_type=cls._map_hdu_type(hdu),name=name)
[docs] def to_fits_hdu(self): try: logger.debug('------------------------------') logger.debug('inside to_fits_hdu methods') logger.debug(f'name: {self.name}') logger.debug(f'header: {self.header}') logger.debug(f'data: {self.data}') logger.debug(f'units_dict: {self.units_dict}') logger.debug(f'hdu_type: {self.hdu_type}') logger.debug('------------------------------') for k,v in self.header.items(): if isinstance(v, list): s='' for l in v: s+='%s,'%str(l) self.header[k] = s #unicode(",".join(map(str,v))) return self.new_hdu_from_data(self.data, header=pf.header.Header(self.header), hdu_type=self.hdu_type,units_dict=self.units_dict) except Exception as e: raise Exception("an exception occurred in oda_api when binary products are formatted to fits header: " + repr(e))
@staticmethod def _map_hdu_type(hdu): _t='' if isinstance(hdu,pf.PrimaryHDU): _t= 'primary' elif isinstance(hdu,pf.ImageHDU): _t = 'image' elif isinstance(hdu, pf.BinTableHDU): _t = 'bintable' elif isinstance(hdu, pf.TableHDU): _t = 'table' else: raise RuntimeError('hdu type not understood') #print('_t',_t) return _t
[docs] def new_hdu_from_data(self,data,hdu_type, header=None,units_dict=None): self._chekc_hdu_type(hdu_type) if hdu_type=='primary': h = pf.PrimaryHDU elif hdu_type=='image': h = pf.ImageHDU elif hdu_type == 'bintable': h = pf.BinTableHDU elif hdu_type == 'table': h = pf.TableHDU else: raise RuntimeError('hdu type ', hdu_type, 'not in allowed', self._hdu_type_list_) _h=h(data=data, header=header) _h.name=self.name if units_dict is not None: for k in units_dict.keys(): _h.columns.change_unit(k,units_dict[k]) self.header=dict(_h.header) return _h
@staticmethod def _eval_dt(dt): #print('dt', type(dt), dt) try: dt = numpy.dtype(dt) except: dt = eval(dt) #print('dt', type(dt), dt) return dt
[docs] def encode(self,use_pickle=False,use_gzip=False,to_json=False): _data = [] _meata_d=[] _kw_d = [] _d = None _dt = None _binarys = None if self.data is not None: _dt= numpy_encode(numpy.array(self.data))['dtype'] if use_pickle is True: if isinstance(self.data, astropy.io.fits.fitsrec.FITS_rec): pickled_data = pickle.dumps(self.data) else: pickled_data = pickle.dumps(numpy.array(self.data)) if use_gzip==True: out_file = StringIO(pickled_data) gzip_file = gzip.GzipFile(fileobj=out_file, mode='wb') gzip_file.write(pickled_data) _binarys = base64.b64encode(out_file.getvalue()) gzip_file.close() else: _binarys= base64.b64encode( pickled_data ) _binarys = _binarys.decode() else: _d= json.dumps(self.data, cls=JsonCustomEncoder) _o_dict = {'data': _d, 'dt': _dt, 'name': self.name, 'header': self.header, 'binarys': _binarys, 'meta_data': self.meta_data, 'hdu_type': self.hdu_type, 'units_dict': self.units_dict} if to_json: _o_dict_json = json.dumps(_o_dict) return _o_dict_json return _o_dict
[docs] @classmethod def decode(cls,encoded_obj,use_gzip=False,from_json=False): #encoded_obj=eval(encoded_obj) #encoded_obj=json.loads(encoded_obj) #print('-->encoded_obj',type(encoded_obj)) if from_json == False: try: encoded_obj = json.loads(encoded_obj) except: pass encoded_data = encoded_obj['data'] encoded_dt=encoded_obj['dt'] encoded_header = encoded_obj['header'] encoded_meta_data=encoded_obj['meta_data'] _name=encoded_obj['name'] _hdu_type=encoded_obj['hdu_type'] _binarys=encoded_obj['binarys'] _units_dict = encoded_obj.get('units_dict') if _binarys is not None: #print('dec ->', type(_binarys)) in_file = StringIO() _binarys = base64.b64decode(_binarys) if use_gzip ==True: in_file.write(_binarys) in_file.seek(0) gzip_file = gzip.GzipFile(fileobj=in_file, mode='rb') _data = gzip_file.read() _data = _data.decode('utf-8') _data = pickle.loads(_data) gzip_file.close() else: if version_info[0] > 2: _data = pickle.loads(_binarys,encoding='bytes') else: _data = pickle.loads(_binarys) elif encoded_data is not None: encoded_data=eval(encoded_data) # !! for ID,c in enumerate(encoded_data): encoded_data[ID]=tuple(c) _data=numpy.asanyarray(encoded_data,dtype=cls._eval_dt(encoded_dt)) else: _data=None return cls(data=_data, data_header=encoded_header, meta_data=encoded_meta_data,name=_name,hdu_type=_hdu_type, units_dict=_units_dict)
@classmethod def from_pandas(cls, pandas_dataframe, name = None, column_names=[], units_dict={}, meta_data = {}, data_header = {}): if column_names and type(column_names) == list: pandas_dataframe = pandas_dataframe.loc[:, column_names] elif column_names and type(column_names) == dict: pandas_dataframe = pandas_dataframe.loc[:, column_names.keys()] pandas_dataframe.rename(columns=column_names, inplace=True) rec_array = pandas_dataframe.to_records(index=False) return cls(data = rec_array, name=name, units_dict = units_dict, meta_data = meta_data, data_header = data_header, hdu_type = 'bintable')
[docs] class NumpyDataProduct(object): def __init__(self, data_unit, name=None, meta_data={}): self.name=name self.data_unit=self._seta_data(data_unit) self._chekc_dict(meta_data) self.meta_data=meta_data
[docs] def show(self): print('------------------------------') print('name:',self.name) print('meta_data',self.meta_data.keys()) print ('number of data units',len(self.data_unit)) print ('------------------------------') for ID,du in enumerate(self.data_unit): print('data uniti',ID,',name:',du.name)
[docs] def show_meta(self): print('------------------------------') for k,v in self.meta_data.items(): print(k,':',v) print ('------------------------------')
[docs] def get_data_unit(self, ID: int) -> NumpyDataUnit: try: return self.data_unit[ID] except IndexError as e: raise RuntimeError(f"problem get_data_unit ID:{ID} in self.data_unit:{self.data_unit}")
[docs] def get_data_unit_by_name(self, name: str) -> typing.Union[NumpyDataUnit, None]: _du = None for du in self.data_unit: if du.name == name: if _du is not None: print(f"\033[31mWARNING: get_data_unit_by_name found multiple du for name {name}\033[0m") _du = du print('--> NAME',du.name) if _du is None: found_names = '; '.join([ str(_du.name) + ":" + repr(du) for _du in self.data_unit ]) print(f"\033[31mWARNING: get_data_unit_by_name found no du for name {name}, have {found_names}\033[0m") return _du
def _seta_data(self,data): if type(data) == list: _dl = data else: _dl = [data] for ID, _d in enumerate(_dl): if isinstance(_d, NumpyDataUnit): pass else: raise RuntimeError ('DataUnit not valid') return _dl def _chekc_enc_data(self,data): return _chekc_enc_data(data) def _chekc_dict(self,_kw): if isinstance(_kw, dict): pass else: raise RuntimeError('object is not not dict')
[docs] def encode(self, use_pickle=True, use_gzip=False, to_json=False): _enc = [] for ID, data_unit in enumerate(self.data_unit): _enc.append( data_unit.encode( use_pickle=use_pickle, use_gzip=use_gzip ) ) _o_dict={'data_unit_list':_enc,'name':self.name,'meta_data':dumps(self.meta_data)} if to_json==True: return json.dumps(_o_dict) return _o_dict
[docs] def to_fits_hdu_list(self): _hdul=pf.HDUList() for ID,_d in enumerate(self.data_unit): _hdul.append(_d.to_fits_hdu()) return _hdul
[docs] def write_fits_file(self,filename,overwrite=True): self.to_fits_hdu_list().writeto(filename,overwrite=overwrite)
[docs] @classmethod def from_fits_file(cls,filename,ext=None,hdu_name=None,meta_data={},name=''): hdul=pf.open(filename) if ext is not None: hdul=[hdul[ext]] if hdu_name is not None: _hdul=[] for hdu in hdul: if hdu.name == hdu_name: _hdul.append(hdu) hdul=_hdul return cls(data_unit=[NumpyDataUnit.from_fits_hdu(h) for h in hdul],meta_data=meta_data,name=name)
[docs] @classmethod def decode(cls, encoded_obj: typing.Union[str, dict], from_json=False): if encoded_obj is not None: # from_json has the opposite meaning of what the name implies obj_dict: dict if from_json: if isinstance(encoded_obj, dict): obj_dict = encoded_obj else: logger.warning('decoding from unexpected object') obj_dict = encoded_obj # type: ignore else: if isinstance(encoded_obj, dict): obj_dict = encoded_obj else: logger.warning('decoding from unexpected object') try: obj_dict = json.loads(literal_to_json(encoded_obj)) except Exception as e: logger.debug('unable to decode json object: %s', e) # why not raise here? encoded_data_unit_list = obj_dict['data_unit_list'] encoded_name = obj_dict['name'] encoded_meta_data = obj_dict['meta_data'] _data_unit_list=[] #print('encoded_data_unit_list',encoded_data_unit_list) for enc_data_unit in encoded_data_unit_list: _data_unit_list.append(NumpyDataUnit.decode(enc_data_unit,from_json=False)) else: _data_unit_list=[] encoded_name=None encoded_meta_data={} return cls(data_unit=_data_unit_list,name=encoded_name,meta_data=eval(encoded_meta_data))
[docs] class ApiCatalog(object): def __init__(self,cat_dict,name=None): self.name=name _skip_list=['meta_ID'] meta = {} lon_name = None if 'cat_lon_name' in cat_dict.keys(): lon_name = cat_dict['cat_lon_name'] lat_name = None if 'cat_lat_name' in cat_dict.keys(): lat_name = cat_dict['cat_lat_name'] frame = None if 'cat_frame' in cat_dict.keys(): frame = cat_dict['cat_frame'] coord_units = None if 'cat_coord_units' in cat_dict.keys(): coord_units = cat_dict['cat_coord_units'] if 'cat_meta' in cat_dict.keys(): cat_meta_entry = cat_dict['cat_meta'] meta.update(cat_meta_entry) meta['FRAME'] = frame meta['COORD_UNIT'] = coord_units meta['LON_NAME'] = lon_name meta['LAT_NAME'] = lat_name self.table =Table(cat_dict['cat_column_list'], names=cat_dict['cat_column_names'],meta=meta) if coord_units is not None: self.table[lon_name]=Angle(self.table[lon_name],unit=coord_units) self.table[lat_name]=Angle(self.table[lat_name],unit=coord_units) self.lat_name=lat_name self.lon_name=lon_name
[docs] def get_api_dictionary(self ): column_lists=[self.table[name].tolist() for name in self.table.colnames] for ID,_col in enumerate(column_lists): column_lists[ID] = [x if str(x)!='nan' else None for x in _col] return json.dumps(dict(cat_frame=self.table.meta['FRAME'], cat_coord_units=self.table.meta['COORD_UNIT'], cat_column_list=column_lists, cat_column_names=self.table.colnames, cat_column_descr=self.table.dtype.descr, cat_lat_name=self.lat_name, cat_lon_name=self.lon_name))
class GWEventContours: def __init__(self, event_contour_dict, name='') -> None: self._contour_dict = event_contour_dict self.name = name self.levels = event_contour_dict['levels'] self.contours = event_contour_dict['contours'] class GWContoursDataProduct: def __init__(self, contours_dict: dict) -> None: self._cont_data = contours_dict self.event_list = list(self._cont_data.keys()) self.contours = {} for key, val in self._cont_data.items(): self.contours[key] = GWEventContours(val, name = key) setattr(self, key, self.contours[key]) class LightCurveDataProduct(NumpyDataProduct): @classmethod def from_arrays(cls, times, fluxes = None, magnitudes = None, rates = None, counts = None, errors = None, units_spec = {}, # TODO: not used yet time_format = None, name = None): data_header = {} meta_data = {} # meta data could be attached to both NumpyDataUnit and NumpyDataProduct. Decide on this if (fluxes is not None) + (magnitudes is not None) + (rates is not None) + (counts is not None) != 1: raise ValueError('Only one type of values should be set') elif fluxes is not None: col_name = 'FLUX' values = fluxes elif magnitudes is not None: col_name = 'MAG' values = magnitudes elif rates is not None: col_name = 'RATE' values = rates elif counts is not None: col_name = 'COUNTS' values = counts if len(values) != len(times): raise ValueError(f'Value column length {len(values)} do not coincide with time {len(times)} column length') if errors is not None and len(errors) != len(times): raise ValueError('Error column length do not coincide with time column length') # TODO: possibility for other time units # TODO: add time-related keywords to header (OGIP) units_dict = {'TIME': 'd'} if any(isinstance(x, astropy.time.Time) for x in times): times = astropy.time.Time(times) if isinstance(times, astropy.time.Time): mjd = times.mjd elif time_format is not None: atimes = astropy.time.Time(times, format=time_format) # NOTE: do we assume paticular scale? mjd = atimes.mjd else: atimes = astropy.time.Time(times) mjd = atimes.mjd if any(isinstance(x, astropy.units.Quantity) for x in values): values = astropy.units.Quantity(values) if isinstance(values, astropy.units.Quantity): units_dict[col_name] = values.unit.to_string(format='OGIP') values = values.value if errors is not None and any(isinstance(x, astropy.units.Quantity) for x in errors): errors = astropy.units.Quantity(errors) if errors is not None and isinstance(errors, astropy.units.Quantity): units_dict['ERROR'] = errors.unit.to_string(format='OGIP') errors = errors.value rec_array = np.core.records.fromarrays([mjd, values, errors], names=('TIME', col_name, 'ERROR')) return cls([NumpyDataUnit(data=np.array([]), name = 'PRIMARY', hdu_type = 'primary'), NumpyDataUnit(data = rec_array, units_dict = units_dict, meta_data = meta_data, data_header = data_header, hdu_type = 'bintable', name = 'LC')], name = name) class PictureProduct: def __init__(self, binary_data, name=None, metadata={}, file_path=None, write_on_creation = False): self.binary_data = binary_data self.metadata = metadata self.name = name if file_path is not None and os.path.isfile(file_path): self.file_path = file_path logger.info(f'Image file {file_path} already exist. No automatical rewriting.') elif write_on_creation: self.write_file(file_path) else: self.file_path = None byte_stream = BytesIO(binary_data) tp = imghdr.what(byte_stream) if tp is None: raise ValueError('Provided data is not an image') self.img_type = tp @classmethod def from_file(cls, file_path, name=None): with open(file_path, 'rb') as fd: binary_data = fd.read() return cls(binary_data, name=name, file_path=file_path) def write_file(self, file_path): logger.info(f'Creating image file {file_path}.') with open(file_path, 'wb') as fd: fd.write(self.binary_data) self.file_path = file_path def encode(self): b64data = base64.urlsafe_b64encode(self.binary_data) output_dict = {} output_dict['img_type'] = self.img_type output_dict['b64data'] = b64data.decode() output_dict['metadata'] = self.metadata output_dict['name'] = self.name if self.file_path: output_dict['filename'] = os.path.basename(self.file_path) return output_dict @classmethod def decode(cls, encoded_data, write_on_creation = False): if isinstance(encoded_data, dict): _encoded_data = encoded_data else: _encoded_data = json.loads(encoded_data) binary_data = base64.urlsafe_b64decode(_encoded_data['b64data'].encode('ascii', 'ignore')) return cls(binary_data, metadata = _encoded_data['metadata'], file_path = _encoded_data.get('filename'), name = _encoded_data.get('name'), write_on_creation = write_on_creation) def show(self): byte_stream = BytesIO(self.binary_data) image = mpimg.imread(byte_stream) plt.imshow(image) plt.axis('off') plt.show() class ImageDataProduct(NumpyDataProduct): @classmethod def from_fits_file(cls,filename,ext=None,hdu_name=None,meta_data={},name=None): npdp = super().from_fits_file(filename,ext=ext,hdu_name=hdu_name,meta_data=meta_data,name=name) contains_image = cls.check_contains_image(npdp) if contains_image: return npdp else: raise ValueError(f'FITS file {filename} doesn\'t contain image data.') @staticmethod def check_contains_image(numpy_data_prod): for hdu in numpy_data_prod.data_unit: if hdu.hdu_type in ['primary', 'image']: try: wcs = WCS(hdu.header) return True except: pass return False class TextLikeProduct: def __init__(self, value, name=None, meta_data={}): self.value = value self.name = name self.meta_data = meta_data def encode(self): return {'name': self.name, 'value': self.value, 'meta_data': self.meta_data} @classmethod def decode(cls, encoded): if not isinstance(encoded, dict): encoded = json.loads(encoded) return cls(name=encoded.get('name'), value=encoded['value'], meta_data=encoded.get('meta_data', {})) def __repr__(self): return self.encode().__repr__()