Example of asynchronous requests (v > 1.1)¶
The scope of this example is to show how to request several products together so that internal resource usage is maximized
We extract the spectrum of the Crab in groups of ‘nscw’ science windows for each year from ‘start_year’ to ‘stop_year’ included
We use a token provided by the web interface to receive dedicated emails
We optionally show how to fit the spectra with a broken power law using xspec
[ ]:
#A few input parameters
osa_version="OSA11.2"
source_name="Crab"
nscw=10
start_year=2017
end_year=2018
systematic_fraction = 0.01
token=''
integral_data_rights = "public"
Token authentication¶
You can provide a valid token as explained in the ‘Authentication’ example or skip the following cell and continue anonymously
[ ]:
# import getpass
# token = getpass.getpass('Insert the token')
[ ]:
# To know details of the token
# import oda_api.token
# oda_api.token.decode_oda_token(token)
[ ]:
#We hardcode a catalog for the Crab
api_cat={
"cat_frame": "fk5",
"cat_coord_units": "deg",
"cat_column_list": [
[0, 7],
["1A 0535+262", "Crab"],
[125.4826889038086, 1358.7255859375],
[84.72280883789062, 83.63166809082031],
[26.312734603881836, 22.016284942626953],
[-32768, -32768],
[2, 2],
[0, 0],
[0.0002800000074785203, 0.0002800000074785203]],
"cat_column_names": [
"meta_ID",
"src_names",
"significance",
"ra",
"dec",
"NEW_SOURCE",
"ISGRI_FLAG",
"FLAG",
"ERR_RAD"
],
"cat_column_descr":
[
["meta_ID", "<i8"],
["src_names", "<U11"],
["significance", "<f8"],
["ra", "<f8"],
["dec", "<f8"],
["NEW_SOURCE", "<i8"],
["ISGRI_FLAG", "<i8"],
["FLAG", "<i8"],
["ERR_RAD", "<f8"]
],
"cat_lat_name": "dec",
"cat_lon_name": "ra"
}
Let’s get some logging¶
This is to help visualizing the progress.
WARNING is the default level
INFO writes some more information
DEBUG is maily for developers and issue tracking
[ ]:
import logging
#default
#logging.getLogger().setLevel(logging.WARNING)
#slightly more verbose
logging.getLogger().setLevel(logging.INFO)
#all messages
#logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('oda_api').addHandler(logging.StreamHandler())
Connection to the dispatcher¶
We build the dispatcher object, which we will use for issuing our requests to the platform. The token will be automatically discovered, if available.
[ ]:
import numpy as np
import json
import oda_api.api
import oda_api
disp = oda_api.api.DispatcherAPI()
disp.get_instrument_description("isgri")
Here, we collect and spectra for each year in a random sample of nscw=10 science windows
We use the hard-coded catalog.
note that we make a loop and submit the jobs without waiting for their completion
at each loop, we test if they completed and we count how many have finished
we continue to poll the dispatcher for unfinished jobs and we terminate the loop when all are done
In this way, we let the platform optimize our requests
There will be a convenience function in future versions of oda_api for this purpose
[ ]:
spectrum_results=[]
disp_by_ys = {}
data_by_ys = {}
par_dict = {'RA': 83.5,
'DEC': 22.0,
'radius': 10,
'instrument':'isgri',
'product': 'isgri_spectrum',
'osa_version' : osa_version,
'product_type': 'Real',
'max_pointings': nscw,
'selected_catalog' : json.dumps(api_cat),
'integral_data_rights' : integral_data_rights}
# Should you need to access private data, just add this option
#,"integral_data_rights": "all-private"}
if token != '':
par_dict.update({'token': token})
elif disp.token is not None:
par_dict.update({'token': disp.token})
while True:
spectrum_results=[]
for year in range(start_year, end_year+1):
T1_utc='%4d-01-01T00:00:00.0'%year
T2_utc='%4d-12-31T23:59:59.0'%year
print(T1_utc,'-',T2_utc)
par_dict.update({'T1': T1_utc,
'T2': T2_utc})
if year >= 2016:
osa_version='OSA11.2'
else:
osa_version='OSA10.2'
#Just renaiming for a general dictionary key
ys = year
# We start one dipatcher for each job,
# they will run in parallel until products are ready
if ys not in disp_by_ys:
disp_by_ys[ys] = oda_api.api.DispatcherAPI(url=disp.url, wait=False) #Note the flag wait=False
_disp = disp_by_ys[ys]
data = data_by_ys.get(ys, None)
if data is None and not _disp.is_failed:
#We submit or we poll
if not _disp.is_submitted:
data = _disp.get_product(**par_dict)
else:
_disp.poll()
print("Is complete ", _disp.is_complete)
# We retrieve data
if not _disp.is_complete:
continue
else:
data = _disp.get_product(**par_dict)
data_by_ys[ys] = data
spectrum_results.append(data)
n_complete = len([ year for year, _disp in disp_by_ys.items() if _disp.is_complete ])
print(f"complete {n_complete} / {len(disp_by_ys)}")
if n_complete == len(disp_by_ys):
print("done!")
break
print("not done")
Elaboration example¶
This part saves the spectra in fits files and updates some keywords
[ ]:
from astropy.io import fits
# This part saves the spectra in fits files and updates some keywords
for year, data in data_by_ys.items():
print(year)
for ID,s in enumerate(data._p_list):
if (s.meta_data['src_name']==source_name):
if(s.meta_data['product']=='isgri_spectrum'):
ID_spec=ID
if(s.meta_data['product']=='isgri_arf'):
ID_arf=ID
if(s.meta_data['product']=='isgri_rmf'):
ID_rmf=ID
print(ID_spec, ID_arf, ID_rmf)
spec=data._p_list[ID_spec].data_unit[1].data
arf=data._p_list[ID_arf].data_unit[1].data
rmf=data._p_list[ID_rmf].data_unit[2].data
expos=data._p_list[0].data_unit[1].header['EXPOSURE']
name=source_name+'_'+str(year)
specname=name+'_spectrum.fits'
arfname=name+'_arf.fits.gz'
rmfname=name+'_rmf.fits.gz'
data._p_list[ID_spec].write_fits_file(specname)
data._p_list[ID_arf].write_fits_file(arfname)
data._p_list[ID_rmf].write_fits_file(rmfname)
hdul = fits.open(specname, mode='update')
hdul[1].header.set('EXPOSURE', expos)
hdul[1].header['RESPFILE']=rmfname
hdul[1].header['ANCRFILE']=arfname
hdul[1].data['SYS_ERR']=systematic_fraction
hdul.close()
Elaboration 2¶
If xspec is available, we make a fit of each spectrum
[ ]:
try:
import xspec
import shutil
from IPython.display import Image
from IPython.display import display
xspec.Fit.statMethod = "chi"
#init dictionaries
fit_by_lt={}
model='cflux*bknpow'
xspec.AllModels.systematic=0.0
low_energies=[20]
freeze_pow_ebreak=1
for year in range(start_year,end_year+1):
for c_emin in low_energies: #np.linspace(17,40,5):
xspec.AllData.clear()
m1=xspec.Model(model)
specname=source_name+'_'+str(year)+'_spectrum.fits'
xspec.AllData(specname)
s = xspec.AllData(1)
isgri = xspec.AllModels(1)
print(m1.nParameters)
xspec.AllData.ignore('bad')
xspec.AllData.ignore('500.0-**')
ig="**-%.2f,500.-**"%c_emin
print("ISGRI ignore: "+ ig)
s.ignore(ig)
#Key for output
lt_key='%d_%.10lg'%(year, c_emin)
isgri.cflux.lg10Flux=-8
isgri.cflux.Emin=20.
isgri.cflux.Emax=80.
isgri.bknpower.norm = "1,-1"
isgri.bknpower.PhoIndx1 = "2.0,.01,1.,1.,3.,3."
isgri.bknpower.PhoIndx2 = "2.2,.01,1.,1.,3.,3."
isgri.bknpower.BreakE = "100,-1,20,20,300,300"
xspec.Fit.perform()
isgri.bknpower.BreakE.frozen = freeze_pow_ebreak > 0
xspec.Fit.perform()
max_chi=np.ceil(xspec.Fit.statistic / xspec.Fit.dof)
xspec.Fit.error("1.0 max %.1f 1-%d"%(max_chi,m1.nParameters))
fit_by_lt[lt_key]=dict(
emin=c_emin,
year=year,
chi2_red=xspec.Fit.statistic/xspec.Fit.dof,
chi2=xspec.Fit.statistic,
ndof=xspec.Fit.dof,
)
for i in range(1,m1.nParameters+1):
if (not isgri(i).frozen) and (not bool(isgri(i).link)):
#use the name plus position because there could be parameters with same name from multiple
#model components (e.g., several gaussians)
print(isgri(i).name, "%.2f"%(isgri(i).values[0]), isgri(i).frozen,bool(isgri(i).link) )
fit_by_lt[lt_key][isgri(i).name+"_%02d"%(i)]=[ isgri(i).values[0], isgri(i).error[0], isgri(i).error[1] ]
xspec.Plot.device="/png"
#xspec.Plot.addCommand("setplot en")
xspec.Plot.xAxis="keV"
xspec.Plot("ldata del")
xspec.Plot.device="/png"
fn="fit_%s.png"%lt_key
fit_by_lt[lt_key]['plot_fname'] = fn
shutil.move("pgplot.png_2", fn)
_=display(Image(filename=fn,format="png"))
except ImportError:
print("no problem!")