from __future__ import absolute_import, division, print_function
from builtins import (bytes, str, open, super, range,
zip, round, input, int, pow, object, map, zip)
__author__ = "Andrea Tramacere"
# Standard library
# eg copy
# absolute import rg:from copy import deepcopy
# Dependencies
# eg numpy
# absolute import eg: import numpy as np
# Project
# relative import eg: from .mod import f
import typing
from json_tricks import numpy_encode,dumps,loads,numeric_types_hook,hashodict,json_numpy_obj_hook
from astropy.io import fits as pf
from astropy.io import ascii as astropy_io_ascii
import json
from astropy.utils.misc import JsonCustomEncoder
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.wcs import WCS
from astropy import units as u
import numpy
import numpy as np
import base64
import pickle
import gzip
import hashlib
from numpy import nan,inf
from sys import path_importer_cache, version_info
from io import StringIO, BytesIO
import imghdr
import os
import logging
from matplotlib import image as mpimg
from matplotlib import pyplot as plt
logger = logging.getLogger('oda_api.data_products')
__all__=['sanitize_encoded','_chekc_enc_data','BinaryData','NumpyDataUnit','NumpyDataProduct','ApiCatalog','ODAAstropyTable']
import astropy.io.fits.fitsrec
# these 3 functions are remnants of misusing repr() to serialize data instead of json
[docs]
def sanitize_encoded(d):
d = d.replace('null', 'None')
d = d.replace('true', 'True')
d = d.replace('false', 'False')
d = d.replace('NaN', 'nan')
d = d.replace('Infinity', 'inf')
return d
def json_to_literal(d):
d = d.replace('null', 'None')
d = d.replace('true', 'True')
d = d.replace('false', 'False')
d = d.replace('NaN', 'nan')
d = d.replace('Infinity', 'inf')
return d
def literal_to_json(d):
d = d.replace('None', 'null')
d = d.replace('True', 'true')
d = d.replace('False', 'false')
d = d.replace('nan', 'NaN')
d = d.replace('inf', 'Infinity')
return d
[docs]
def _chekc_enc_data(data):
if type(data)==list:
_l=data
else:
_l=[data]
return _l
[docs]
class ODAAstropyTable(object):
def __init__(self,table_object,name=None, meta_data={}):
self.name=name
self.meta_data=meta_data
self._table=table_object
@property
def table(self):
return self._table
[docs]
def write(self,file_name,format='fits',overwrite=True):
self._table.write(file_name,format=format,overwrite=overwrite)
[docs]
def write_fits_file(self,file_name,overwrite=True):
self.write(file_name,overwrite=overwrite,format='fits')
[docs]
@classmethod
def from_file(cls, file_path, name=None, delimiter=None, format=None):
_allowed_formats_=['ascii','ascii.ecsv','fits']
if format == 'fits':
# print('==>',file_name)
table = Table.read(file_path, format=format)
elif format == 'ascii.ecsv' or format=='ascii':
table = Table.read(file_path, format=format, delimiter=delimiter)
else:
raise RuntimeError('table format not understood, allowed',_allowed_formats_)
meta = None
if hasattr(table, 'meta'):
meta = table.meta
return cls(table, meta_data=meta, name=name)
[docs]
def encode(self,use_binary=False,to_json = False):
_o_dict = {}
_o_dict['binary']=None
_o_dict['ascii']=None
if use_binary is True:
_binarys = base64.b64encode(pickle.dumps(self.table, protocol=2)).decode('utf-8')
_o_dict['binary'] = _binarys
else:
#with StringIO() as fh:
fh=StringIO()
self.table.write(fh, format='ascii.ecsv')
_text = fh.getvalue()
fh.close()
_o_dict['ascii'] = _text
_o_dict['name']=self.name
_o_dict['meta_data']=dumps(self.meta_data)
if to_json == True:
_o_dict=json.dumps(_o_dict)
return _o_dict
[docs]
@classmethod
def decode(cls,o_dict,use_binary=False):
if isinstance(o_dict, dict):
_o_dict = o_dict
elif isinstance(o_dict, str):
_o_dict = json.loads(literal_to_json(o_dict))
else:
raise RuntimeError('Wrong table structure')
encoded_name = _o_dict['name']
encoded_meta_data = _o_dict['meta_data']
if use_binary is True:
t_rec = base64.b64decode(_o_dict['binary'])
try:
t_rec = pickle.loads(t_rec)
except:
t_rec= pickle.loads(t_rec,encoding='latin')
else:
t_rec = astropy_io_ascii.read(_o_dict['ascii'])
return cls(t_rec,name=encoded_name,meta_data=encoded_meta_data)
[docs]
class BinaryData(object):
def __init__(self,file_path=None):
self.file_path=file_path
[docs]
def encode(self,file_path=None):
if file_path==None:
file_path=self.file_path
_file_binary = open(file_path, 'rb').read()
_file_b64 = base64.urlsafe_b64encode(_file_binary)
_file_b64_md5 = hashlib.md5(_file_binary).hexdigest()
return _file_b64,_file_b64_md5
[docs]
def decode(self,encoded_obj):
return base64.urlsafe_b64decode(encoded_obj.encode('ascii', 'ignore'))
class BinaryProduct:
# New implementation of binary data product.
# The meaning of the methods is more in-line with the rest of the products
def __init__(self, bin_data, name=None):
self.bin_data = bin_data
if name == 'None': name = None
self.name = name
def encode(self):
return {
'name': self.name,
'data': base64.urlsafe_b64encode(self.bin_data).decode(),
'md5': hashlib.md5(self.bin_data).hexdigest()
}
@classmethod
def decode(cls, encoded_obj):
if not isinstance(encoded_obj, dict):
encoded_obj = json.loads(encoded_obj)
name = encoded_obj['name']
bin_data = base64.urlsafe_b64decode(encoded_obj['data'].encode('ascii', 'ignore'))
decoded_md5 = hashlib.md5(bin_data).hexdigest()
assert decoded_md5 == encoded_obj['md5']
return cls(bin_data, name)
def write_file(self, file_path):
with open(file_path, 'wb') as fd:
fd.write(self.bin_data)
@classmethod
def from_file(cls, file_path, name=None):
with open(file_path, 'rb') as fd:
bin_data = fd.read()
return cls(bin_data, name)
[docs]
class NumpyDataUnit(object):
def __init__(self, data, data_header={}, meta_data={}, hdu_type=None, name=None, units_dict=None):
self._hdu_type_list_ = ['primary', 'image', 'table', 'bintable']
self.name=name
self._check_data(data)
self._check_hdu_type(hdu_type)
self._check_dict(data_header)
self._check_dict(meta_data)
self.data=data
self.header=data_header
self.meta_data=meta_data
self.hdu_type=hdu_type
self.units_dict=units_dict
@property
def name(self):
return self._name
@name.setter
def name(self, name_value):
if name_value is None:
self._name = 'table'
else:
self._name = name_value
# interface with a typo, preserving with a warning
def _warn_chekc_typo(self):
logger.debug('please _check_* instead of _chekc_* functions, they will be removed')
def _chekc_data(self, data):
self._warn_chekc_typo()
return self._check_data(data)
def _chekc_hdu_type(self,hdu_type):
return self._check_hdu_type(hdu_type)
def _chekc_dict(self, _kw):
self._warn_chekc_typo()
return self._check_dict(_kw)
def _check_data(self,data):
if isinstance(data, numpy.ndarray) or data is None:
pass
else:
raise RuntimeError('data is not numpy ndarray object')
def _check_hdu_type(self,hdu_type):
if hdu_type is None:
pass
elif hdu_type in self._hdu_type_list_:
pass
else:
raise RuntimeError('hdu type ', hdu_type, 'not in allowed', self._hdu_type_list_)
def _check_dict(self,_kw):
if isinstance(_kw, dict):
pass
else:
raise RuntimeError('object is not not dict')
[docs]
@classmethod
def from_fits_hdu(cls,hdu,name=None):
if name is None or name == '':
name=hdu.name
return cls(data=hdu.data,
data_header={k:v for k, v in hdu.header.items()},
hdu_type=cls._map_hdu_type(hdu),name=name)
[docs]
def to_fits_hdu(self):
try:
logger.debug('------------------------------')
logger.debug('inside to_fits_hdu methods')
logger.debug(f'name: {self.name}')
logger.debug(f'header: {self.header}')
logger.debug(f'data: {self.data}')
logger.debug(f'units_dict: {self.units_dict}')
logger.debug(f'hdu_type: {self.hdu_type}')
logger.debug('------------------------------')
for k,v in self.header.items():
if isinstance(v, list):
s=''
for l in v:
s+='%s,'%str(l)
self.header[k] = s
#unicode(",".join(map(str,v)))
return self.new_hdu_from_data(self.data,
header=pf.header.Header(self.header),
hdu_type=self.hdu_type,units_dict=self.units_dict)
except Exception as e:
raise Exception("an exception occurred in oda_api when binary products are formatted to fits header: " + repr(e))
@staticmethod
def _map_hdu_type(hdu):
_t=''
if isinstance(hdu,pf.PrimaryHDU):
_t= 'primary'
elif isinstance(hdu,pf.ImageHDU):
_t = 'image'
elif isinstance(hdu, pf.BinTableHDU):
_t = 'bintable'
elif isinstance(hdu, pf.TableHDU):
_t = 'table'
else:
raise RuntimeError('hdu type not understood')
#print('_t',_t)
return _t
[docs]
def new_hdu_from_data(self,data,hdu_type, header=None,units_dict=None):
self._chekc_hdu_type(hdu_type)
if hdu_type=='primary':
h = pf.PrimaryHDU
elif hdu_type=='image':
h = pf.ImageHDU
elif hdu_type == 'bintable':
h = pf.BinTableHDU
elif hdu_type == 'table':
h = pf.TableHDU
else:
raise RuntimeError('hdu type ', hdu_type, 'not in allowed', self._hdu_type_list_)
_h=h(data=data, header=header)
_h.name=self.name
if units_dict is not None:
for k in units_dict.keys():
_h.columns.change_unit(k,units_dict[k])
self.header=dict(_h.header)
return _h
@staticmethod
def _eval_dt(dt):
#print('dt', type(dt), dt)
try:
dt = numpy.dtype(dt)
except:
dt = eval(dt)
#print('dt', type(dt), dt)
return dt
[docs]
def encode(self,use_pickle=False,use_gzip=False,to_json=False):
_data = []
_meata_d=[]
_kw_d = []
_d = None
_dt = None
_binarys = None
if self.data is not None:
_dt= numpy_encode(numpy.array(self.data))['dtype']
if use_pickle is True:
if isinstance(self.data, astropy.io.fits.fitsrec.FITS_rec):
pickled_data = pickle.dumps(self.data)
else:
pickled_data = pickle.dumps(numpy.array(self.data))
if use_gzip==True:
out_file = StringIO(pickled_data)
gzip_file = gzip.GzipFile(fileobj=out_file, mode='wb')
gzip_file.write(pickled_data)
_binarys = base64.b64encode(out_file.getvalue())
gzip_file.close()
else:
_binarys= base64.b64encode(
pickled_data
)
_binarys = _binarys.decode()
else:
_d= json.dumps(self.data, cls=JsonCustomEncoder)
_o_dict = {'data': _d,
'dt': _dt,
'name': self.name,
'header': self.header,
'binarys': _binarys,
'meta_data': self.meta_data,
'hdu_type': self.hdu_type,
'units_dict': self.units_dict}
if to_json:
_o_dict_json = json.dumps(_o_dict)
return _o_dict_json
return _o_dict
[docs]
@classmethod
def decode(cls,encoded_obj,use_gzip=False,from_json=False):
#encoded_obj=eval(encoded_obj)
#encoded_obj=json.loads(encoded_obj)
#print('-->encoded_obj',type(encoded_obj))
if from_json == False:
try:
encoded_obj = json.loads(encoded_obj)
except:
pass
encoded_data = encoded_obj['data']
encoded_dt=encoded_obj['dt']
encoded_header = encoded_obj['header']
encoded_meta_data=encoded_obj['meta_data']
_name=encoded_obj['name']
_hdu_type=encoded_obj['hdu_type']
_binarys=encoded_obj['binarys']
_units_dict = encoded_obj.get('units_dict')
if _binarys is not None:
#print('dec ->', type(_binarys))
in_file = StringIO()
_binarys = base64.b64decode(_binarys)
if use_gzip ==True:
in_file.write(_binarys)
in_file.seek(0)
gzip_file = gzip.GzipFile(fileobj=in_file, mode='rb')
_data = gzip_file.read()
_data = _data.decode('utf-8')
_data = pickle.loads(_data)
gzip_file.close()
else:
if version_info[0] > 2:
_data = pickle.loads(_binarys,encoding='bytes')
else:
_data = pickle.loads(_binarys)
elif encoded_data is not None:
encoded_data=eval(encoded_data) # !!
for ID,c in enumerate(encoded_data):
encoded_data[ID]=tuple(c)
_data=numpy.asanyarray(encoded_data,dtype=cls._eval_dt(encoded_dt))
else:
_data=None
return cls(data=_data, data_header=encoded_header, meta_data=encoded_meta_data,name=_name,hdu_type=_hdu_type, units_dict=_units_dict)
@classmethod
def from_pandas(cls,
pandas_dataframe,
name = None,
column_names=[],
units_dict={},
meta_data = {},
data_header = {}):
if column_names and type(column_names) == list:
pandas_dataframe = pandas_dataframe.loc[:, column_names]
elif column_names and type(column_names) == dict:
pandas_dataframe = pandas_dataframe.loc[:, column_names.keys()]
pandas_dataframe.rename(columns=column_names, inplace=True)
rec_array = pandas_dataframe.to_records(index=False)
return cls(data = rec_array,
name=name,
units_dict = units_dict,
meta_data = meta_data,
data_header = data_header,
hdu_type = 'bintable')
[docs]
class NumpyDataProduct(object):
def __init__(self, data_unit, name=None, meta_data={}):
self.name=name
self.data_unit=self._seta_data(data_unit)
self._chekc_dict(meta_data)
self.meta_data=meta_data
[docs]
def show(self):
print('------------------------------')
print('name:',self.name)
print('meta_data',self.meta_data.keys())
print ('number of data units',len(self.data_unit))
print ('------------------------------')
for ID,du in enumerate(self.data_unit):
print('data uniti',ID,',name:',du.name)
[docs]
def get_data_unit(self, ID: int) -> NumpyDataUnit:
try:
return self.data_unit[ID]
except IndexError as e:
raise RuntimeError(f"problem get_data_unit ID:{ID} in self.data_unit:{self.data_unit}")
[docs]
def get_data_unit_by_name(self, name: str) -> typing.Union[NumpyDataUnit, None]:
_du = None
for du in self.data_unit:
if du.name == name:
if _du is not None:
print(f"\033[31mWARNING: get_data_unit_by_name found multiple du for name {name}\033[0m")
_du = du
print('--> NAME',du.name)
if _du is None:
found_names = '; '.join([ str(_du.name) + ":" + repr(du) for _du in self.data_unit ])
print(f"\033[31mWARNING: get_data_unit_by_name found no du for name {name}, have {found_names}\033[0m")
return _du
def _seta_data(self,data):
if type(data) == list:
_dl = data
else:
_dl = [data]
for ID, _d in enumerate(_dl):
if isinstance(_d, NumpyDataUnit):
pass
else:
raise RuntimeError ('DataUnit not valid')
return _dl
def _chekc_enc_data(self,data):
return _chekc_enc_data(data)
def _chekc_dict(self,_kw):
if isinstance(_kw, dict):
pass
else:
raise RuntimeError('object is not not dict')
[docs]
def encode(self, use_pickle=True, use_gzip=False, to_json=False):
_enc = []
for ID, data_unit in enumerate(self.data_unit):
_enc.append(
data_unit.encode(
use_pickle=use_pickle,
use_gzip=use_gzip
)
)
_o_dict={'data_unit_list':_enc,'name':self.name,'meta_data':dumps(self.meta_data)}
if to_json==True:
return json.dumps(_o_dict)
return _o_dict
[docs]
def to_fits_hdu_list(self):
_hdul=pf.HDUList()
for ID,_d in enumerate(self.data_unit):
_hdul.append(_d.to_fits_hdu())
return _hdul
[docs]
def write_fits_file(self,filename,overwrite=True):
self.to_fits_hdu_list().writeto(filename,overwrite=overwrite)
[docs]
@classmethod
def from_fits_file(cls,filename,ext=None,hdu_name=None,meta_data={},name=''):
hdul=pf.open(filename)
if ext is not None:
hdul=[hdul[ext]]
if hdu_name is not None:
_hdul=[]
for hdu in hdul:
if hdu.name == hdu_name:
_hdul.append(hdu)
hdul=_hdul
return cls(data_unit=[NumpyDataUnit.from_fits_hdu(h) for h in hdul],meta_data=meta_data,name=name)
[docs]
@classmethod
def decode(cls, encoded_obj: typing.Union[str, dict], from_json=False):
if encoded_obj is not None:
# from_json has the opposite meaning of what the name implies
obj_dict: dict
if from_json:
if isinstance(encoded_obj, dict):
obj_dict = encoded_obj
else:
logger.warning('decoding from unexpected object')
obj_dict = encoded_obj # type: ignore
else:
if isinstance(encoded_obj, dict):
obj_dict = encoded_obj
else:
logger.warning('decoding from unexpected object')
try:
obj_dict = json.loads(literal_to_json(encoded_obj))
except Exception as e:
logger.debug('unable to decode json object: %s', e)
# why not raise here?
encoded_data_unit_list = obj_dict['data_unit_list']
encoded_name = obj_dict['name']
encoded_meta_data = obj_dict['meta_data']
_data_unit_list=[]
#print('encoded_data_unit_list',encoded_data_unit_list)
for enc_data_unit in encoded_data_unit_list:
_data_unit_list.append(NumpyDataUnit.decode(enc_data_unit,from_json=False))
else:
_data_unit_list=[]
encoded_name=None
encoded_meta_data={}
return cls(data_unit=_data_unit_list,name=encoded_name,meta_data=eval(encoded_meta_data))
[docs]
class ApiCatalog(object):
def __init__(self,cat_dict,name=None):
self.name=name
_skip_list=['meta_ID']
meta = {}
lon_name = None
if 'cat_lon_name' in cat_dict.keys():
lon_name = cat_dict['cat_lon_name']
lat_name = None
if 'cat_lat_name' in cat_dict.keys():
lat_name = cat_dict['cat_lat_name']
frame = None
if 'cat_frame' in cat_dict.keys():
frame = cat_dict['cat_frame']
coord_units = None
if 'cat_coord_units' in cat_dict.keys():
coord_units = cat_dict['cat_coord_units']
if 'cat_meta' in cat_dict.keys():
cat_meta_entry = cat_dict['cat_meta']
meta.update(cat_meta_entry)
meta['FRAME'] = frame
meta['COORD_UNIT'] = coord_units
meta['LON_NAME'] = lon_name
meta['LAT_NAME'] = lat_name
self.table =Table(cat_dict['cat_column_list'], names=cat_dict['cat_column_names'],meta=meta)
if coord_units is not None:
self.table[lon_name]=Angle(self.table[lon_name],unit=coord_units)
self.table[lat_name]=Angle(self.table[lat_name],unit=coord_units)
self.lat_name=lat_name
self.lon_name=lon_name
[docs]
def get_api_dictionary(self ):
column_lists=[self.table[name].tolist() for name in self.table.colnames]
for ID,_col in enumerate(column_lists):
column_lists[ID] = [x if str(x)!='nan' else None for x in _col]
return json.dumps(dict(cat_frame=self.table.meta['FRAME'],
cat_coord_units=self.table.meta['COORD_UNIT'],
cat_column_list=column_lists,
cat_column_names=self.table.colnames,
cat_column_descr=self.table.dtype.descr,
cat_lat_name=self.lat_name,
cat_lon_name=self.lon_name))
class GWEventContours:
def __init__(self, event_contour_dict, name='') -> None:
self._contour_dict = event_contour_dict
self.name = name
self.levels = event_contour_dict['levels']
self.contours = event_contour_dict['contours']
class GWContoursDataProduct:
def __init__(self, contours_dict: dict) -> None:
self._cont_data = contours_dict
self.event_list = list(self._cont_data.keys())
self.contours = {}
for key, val in self._cont_data.items():
self.contours[key] = GWEventContours(val, name = key)
setattr(self, key, self.contours[key])
class LightCurveDataProduct(NumpyDataProduct):
@classmethod
def from_arrays(cls,
times,
fluxes = None,
magnitudes = None,
rates = None,
counts = None,
errors = None,
units_spec = {}, # TODO: not used yet
time_format = None,
name = None):
data_header = {}
meta_data = {} # meta data could be attached to both NumpyDataUnit and NumpyDataProduct. Decide on this
if (fluxes is not None) + (magnitudes is not None) + (rates is not None) + (counts is not None) != 1:
raise ValueError('Only one type of values should be set')
elif fluxes is not None:
col_name = 'FLUX'
values = fluxes
elif magnitudes is not None:
col_name = 'MAG'
values = magnitudes
elif rates is not None:
col_name = 'RATE'
values = rates
elif counts is not None:
col_name = 'COUNTS'
values = counts
else:
raise ValueError('One of fluxes/magnitudes/rates/counts arguments is required')
if len(values) != len(times):
raise ValueError(f'Value column length {len(values)} do not coincide with time {len(times)} column length')
if errors is not None and len(errors) != len(times):
raise ValueError('Error column length do not coincide with time column length')
# TODO: possibility for other time units
# TODO: add time-related keywords to header (OGIP)
units_dict = {'TIME': 'd'}
if any(isinstance(x, astropy.time.Time) for x in times):
times = astropy.time.Time(times)
if isinstance(times, astropy.time.Time):
mjd = times.mjd
elif time_format is not None:
atimes = astropy.time.Time(times, format=time_format) # NOTE: do we assume paticular scale?
mjd = atimes.mjd
else:
atimes = astropy.time.Time(times)
mjd = atimes.mjd
if any(isinstance(x, u.Quantity) for x in values):
values = u.Quantity(values)
if isinstance(values, u.Quantity):
units_dict[col_name] = values.unit.to_string(format='OGIP')
values = values.value
if errors is not None and any(isinstance(x, u.Quantity) for x in errors):
errors = u.Quantity(errors)
if errors is not None and isinstance(errors, u.Quantity):
units_dict['ERROR'] = errors.unit.to_string(format='OGIP')
errors = errors.value
rec_array = np.core.records.fromarrays([mjd, values, errors], names=('TIME', col_name, 'ERROR'))
return cls([NumpyDataUnit(data=np.array([]), name = 'PRIMARY', hdu_type = 'primary'),
NumpyDataUnit(data = rec_array,
units_dict = units_dict,
meta_data = meta_data,
data_header = data_header,
hdu_type = 'bintable',
name = 'LC')],
name = name)
class PictureProduct:
def __init__(self, binary_data, name=None, metadata={}, file_path=None, write_on_creation = False):
self.binary_data = binary_data
self.metadata = metadata
self.name = name
if file_path is not None and os.path.isfile(file_path):
self.file_path = file_path
logger.info(f'Image file {file_path} already exist. No automatical rewriting.')
elif write_on_creation:
self.write_file(file_path)
else:
self.file_path = None
byte_stream = BytesIO(binary_data)
tp = imghdr.what(byte_stream)
if tp is None:
raise ValueError('Provided data is not an image')
self.img_type = tp
@classmethod
def from_file(cls, file_path, name=None):
with open(file_path, 'rb') as fd:
binary_data = fd.read()
return cls(binary_data, name=name, file_path=file_path)
def write_file(self, file_path):
logger.info(f'Creating image file {file_path}.')
with open(file_path, 'wb') as fd:
fd.write(self.binary_data)
self.file_path = file_path
def encode(self):
b64data = base64.urlsafe_b64encode(self.binary_data)
output_dict = {}
output_dict['img_type'] = self.img_type
output_dict['b64data'] = b64data.decode()
output_dict['metadata'] = self.metadata
output_dict['name'] = self.name
if self.file_path:
output_dict['filename'] = os.path.basename(self.file_path)
return output_dict
@classmethod
def decode(cls, encoded_data, write_on_creation = False):
if isinstance(encoded_data, dict):
_encoded_data = encoded_data
else:
_encoded_data = json.loads(encoded_data)
binary_data = base64.urlsafe_b64decode(_encoded_data['b64data'].encode('ascii', 'ignore'))
return cls(binary_data,
metadata = _encoded_data['metadata'],
file_path = _encoded_data.get('filename'),
name = _encoded_data.get('name'),
write_on_creation = write_on_creation)
def show(self):
byte_stream = BytesIO(self.binary_data)
image = mpimg.imread(byte_stream)
plt.imshow(image)
plt.axis('off')
plt.show()
class ImageDataProduct(NumpyDataProduct):
@classmethod
def from_fits_file(cls,filename,ext=None,hdu_name=None,meta_data={},name=None):
npdp = super().from_fits_file(filename,ext=ext,hdu_name=hdu_name,meta_data=meta_data,name=name)
contains_image = cls.check_contains_image(npdp)
if contains_image:
return npdp
else:
raise ValueError(f'FITS file {filename} doesn\'t contain image data.')
@staticmethod
def check_contains_image(numpy_data_prod):
for hdu in numpy_data_prod.data_unit:
if hdu.hdu_type in ['primary', 'image']:
try:
wcs = WCS(hdu.header)
return True
except:
pass
return False
class TextLikeProduct:
def __init__(self, value, name=None, meta_data={}):
self.value = value
self.name = name
self.meta_data = meta_data
def encode(self):
return {'name': self.name,
'value': self.value,
'meta_data': self.meta_data}
@classmethod
def decode(cls, encoded):
if not isinstance(encoded, dict):
encoded = json.loads(encoded)
return cls(name=encoded.get('name'),
value=encoded['value'],
meta_data=encoded.get('meta_data', {}))
def __repr__(self):
return self.encode().__repr__()