Source code for oda_api.gallery_api

import hashlib

from .api import DispatcherAPI, UserError
from . import plot_tools

from datetime import datetime
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord
import numpy as np

import logging
import typing
import requests
import os
import tempfile
import shutil
import json
import inspect

logger = logging.getLogger("oda_api.gallery_api")


__all__ = ['GalleryDispatcherAPI']

[docs] class GalleryDispatcherAPI(DispatcherAPI): def get_list_terms_gallery(self, group: typing.Optional[str] = None, parent: typing.Optional[str] = None, parent_id: typing.Optional[str] = None, token: typing.Optional[str] = None ): logger.debug("Getting the list of available instruments on the gallery") params = { 'group': group, 'parent': parent, 'parent_id': parent_id, 'token': token } res = requests.get("%s/get_list_terms" % self.url, params={**params} ) response_json = self._decode_res_json(res) if res.status_code != 200: logger.warning(f"An issue occurred while getting the list of terms from the group {group}, " f"from the product gallery : {res.text}") else: logger.info(f"List of terms from the group {group} successfully returned") return response_json def parse_observation_time_arg_product_gallery(self, t1=None, t2=None, observation_time_format: str = 'ISOT' ): if t1 is not None: if observation_time_format is not None and observation_time_format.upper() == 'MJD': try: logger.info("The value of T1 has been provided in a difference format from UTC, " "this will be attempted to be converted to UTC before being uploaded over the gallery") t1 = self.convert_mjd_to_utc(float(t1)) except Exception as e: logger.warning(f'Error during the time conversion: {e}\n' 'please check the time arguments you provided and the relative format') raise UserError('Error during the time conversion, ' 'please check the time arguments you provided and the relative format') elif observation_time_format is None or observation_time_format.upper() == 'ISOT': try: datetime.strptime(t1, '%Y-%m-%dT%H:%M:%S') except Exception as e: logger.warning(f'Error during the time conversion: {e}\n' 'please check the time arguments you provided and the relative format') raise UserError(f'Error during the time conversion: {e}\n' 'please check the time arguments you provided and the relative format') if t2 is not None: if observation_time_format is not None and observation_time_format.upper() == 'MJD': try: logger.info("The value of T2 has been provided in a difference format from UTC, " "this will be attempted to be converted to UTC before being uploaded over the gallery") t2 = self.convert_mjd_to_utc(float(t2)) except Exception as e: logger.warning(f'Error during the time conversion: {e}\n' 'please check the time arguments you provided and the relative format') raise UserError('Error during the time conversion, ' 'please check the time arguments you provided and the relative format') elif observation_time_format is None or observation_time_format.upper() == 'ISOT': try: datetime.strptime(t2, '%Y-%m-%dT%H:%M:%S') except Exception as e: logger.warning(f'Error during the time conversion: {e}\n' 'please check the time arguments you provided and the relative format') raise UserError(f'Error during the time conversion: {e}\n' 'please check the time arguments you provided and the relative format') return t1, t2 def get_yaml_files_observation_with_title(self, observation_title: str = None, token: str = None): params = { 'title': observation_title, 'token': token } res = requests.get(os.path.join(self.url, "get_observation_attachments"), params={**params}, ) if res.status_code != 200: response_json = res.json() error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") if 'error_message' in response_json: error_message += '\n' + response_json['error_message'] if 'drupal_helper_error_message' in response_json: error_message += '-' + response_json['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: response_json = res.json() msg = f"Observation with title {observation_title}" if 'file_content' in response_json: msg += " contains a yaml file\n" logger.info(msg) return response_json def get_list_images_angular_distance(self, token: str = None, instrument=None, e1_kev=None, e2_kev=None, t1=None, t2=None, ra_ref=None, dec_ref=None, r=None ): product_list = self.get_list_images_with_conditions(token=token, instrument=instrument, e1_kev=e1_kev, e2_kev=e2_kev, t1=t1, t2=t2) if isinstance(product_list, list) and ra_ref is not None and dec_ref is not None and r is not None: source_coord_ref = SkyCoord(ra_ref, dec_ref, unit=(u.deg, u.deg)) ra_values_prod_list = list(map(lambda x: x['ra'], filter(lambda x: 'ra' in x, product_list))) dec_values_prod_list = list(map(lambda x: x['dec'], filter(lambda x: 'dec' in x, product_list))) coords_prod_list = SkyCoord(ra_values_prod_list, dec_values_prod_list, unit=(u.deg, u.deg)) separation = source_coord_ref.separation(coords_prod_list).deg ind = (separation <= r) product_list = list(np.array(product_list)[ind]) return product_list def get_list_images_with_conditions(self, token: str = None, instrument=None, e1_kev=None, e2_kev=None, t1=None, t2=None, min_span_rev=None, max_span_rev=None): return self.get_list_products_with_conditions_generic(token=token, instrument=instrument, e1_kev=e1_kev, e2_kev=e2_kev, t1=t1, t2=t2, min_span_rev=min_span_rev, max_span_rev=max_span_rev, product_type='image') def get_list_lightcurve_with_conditions(self, token: str = None, instrument=None, source_name=None, e1_kev=None, e2_kev=None, t1=None, t2=None, min_span_rev=None, max_span_rev=None): return self.get_list_products_with_conditions_generic(token=token, instrument=instrument, source_name=source_name, e1_kev=e1_kev, e2_kev=e2_kev, t1=t1, t2=t2, min_span_rev=min_span_rev, max_span_rev=max_span_rev, product_type='lightcurve') def get_list_spectra_with_conditions(self, token: str = None, instrument=None, source_name=None, t1=None, t2=None, min_span_rev=None, max_span_rev=None): return self.get_list_products_with_conditions_generic(token=token, instrument=instrument, source_name=source_name, t1=t1, t2=t2, min_span_rev=min_span_rev, max_span_rev=max_span_rev, product_type='spectrum') def get_revnum(self, time_to_convert, token): rev1_value = None params = {'time_to_convert': time_to_convert, 'token': token} res = requests.get(os.path.join(self.url, "get_revnum"), params={**params} ) if res.status_code != 200: response_json = res.json() error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") if 'error_message' in response_json: error_message += '\n' + response_json['error_message'] if 'drupal_helper_error_message' in response_json: error_message += '-' + response_json['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: response_json = res.json() rev1_value = response_json['revnum'] return rev1_value def get_list_products_with_conditions_generic(self, token: str = None, instrument=None, source_name=None, e1_kev=None, e2_kev=None, t1=None, t2=None, min_span_rev=None, max_span_rev=None, product_type=None): if min_span_rev is not None and min_span_rev < 0: raise UserError("min_span_rev must be non-negative") if max_span_rev is not None and max_span_rev < 0: raise UserError("max_span_rev must be non-negative") if min_span_rev is not None and max_span_rev is not None and min_span_rev > max_span_rev: raise UserError("min_span_rev must be less than or equal to max_span_rev") rev1_value = None if t1 is not None: rev1_value = self.get_revnum(t1, token) rev2_value = None if t2 is not None: rev2_value = self.get_revnum(t2, token) product_list = [] # query call optimization: if min_span_rev is None or 0, or if max_span_rev is 0, we split the query in smaller, # more manageable queries (that should not fail) because we want to include those products that have a span_rev # of 0, and it's risky to do that with a single query if min_span_rev is None or min_span_rev == 0 or (max_span_rev is not None and max_span_rev == 0): list_rev_range = list(np.arange(rev1_value, rev2_value, 250)) # type: ignore for rev in list_rev_range: rev1 = rev if rev == list_rev_range[-1]: rev2 = rev2_value else: rev2 = rev + 249 product_list += self.get_list_products_with_conditions(token=token, instrument_name=instrument, product_type=product_type, src_name=source_name, e1_kev_value=e1_kev, e2_kev_value=e2_kev, rev1_value=rev1, rev2_value=rev2, max_span_rev_value=0) # if we have to include also the products with span_rev > 0, we do it with a single, not-split, query if max_span_rev is None or max_span_rev > 0: product_list += self.get_list_products_with_conditions(token=token, instrument_name=instrument, product_type=product_type, src_name=source_name, e1_kev_value=e1_kev, e2_kev_value=e2_kev, rev1_value=rev1_value, rev2_value=rev2_value, min_span_rev_value=1, max_span_rev_value=max_span_rev) else: product_list = self.get_list_products_with_conditions(token=token, instrument_name=instrument, product_type=product_type, src_name=source_name, e1_kev_value=e1_kev, e2_kev_value=e2_kev, rev1_value=rev1_value, rev2_value=rev2_value, min_span_rev_value=min_span_rev, max_span_rev_value=max_span_rev) return product_list def get_list_products_with_conditions(self, token: str = None, **kwargs): """ :param token: user token :param kwargs: keyword arguments representing the main parameters values used to generate the product. Amongst them, it is important to mention the following ones: * instrument_name: name of the instrument used for the generated product (e.g. isgri, jemx1) * product_type: type of product generated (e.g. lightcurve, image) * src_name: name of a single, or a list of, known sources (eg Crab, Cyg X-1) * others: other parameters used for the product. Not all the parameters are currently supported, but the list of the supported ones will be extended. E1_kev=25 """ params = { 'token': token, **kwargs } res = requests.get(os.path.join(self.url, "get_data_product_list_with_conditions"), params=params ) if res.status_code != 200: response_json = res.json() error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") if 'error_message' in response_json: error_message += '\n' + response_json['error_message'] if 'drupal_helper_error_message' in response_json: error_message += '-' + response_json['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: response_json = self._decode_res_json(res) return response_json def get_list_products_by_source_name(self, source_name: str = None, token: str = None): params = { 'token': token, 'src_name': source_name } res = requests.get(os.path.join(self.url, "get_data_product_list_by_source_name"), params=params ) if res.status_code != 200: response_json = res.json() error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") if 'error_message' in response_json: error_message += '\n' + response_json['error_message'] if 'drupal_helper_error_message' in response_json: error_message += '-' + response_json['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: response_json = self._decode_res_json(res) return response_json def update_source_with_name(self, source_name: str = None, auto_update: bool = False, token: str = None, **kwargs): copied_kwargs = kwargs.copy() copied_kwargs['src_name'] = source_name params = { 'token': token, 'update_astro_entity': True, 'auto_update': auto_update, **copied_kwargs } posting_msg = f'Updating an astro entity with title {source_name} on the gallery' logger.info(posting_msg) res = requests.post(os.path.join(self.url, "post_astro_entity_to_gallery"), data=params, ) response_json = self._decode_res_json(res) if res.status_code != 200: res_obj = res.json() error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") if 'error_message' in res_obj: error_message += '\n' + res_obj['error_message'] if 'drupal_helper_error_message' in res_obj: error_message += ' - ' + res_obj['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: source_link = response_json['_links']['self']['href'].split("?")[0] returned_source_name = response_json['title'][0]['value'] logger.info( f"Source with title {returned_source_name} successfully posted on the gallery, at the link {source_link}\n" f"Using the above link you can modify the newly created source in the future.\n") return response_json def update_observation_with_title(self, observation_title: str = None, yaml_file_path=None, token: str = None, observation_time_format: str = 'ISOT', create_new=False, **kwargs): copied_kwargs = kwargs.copy() # generate file obj files_obj = {} if yaml_file_path is not None: if isinstance(yaml_file_path, list): for yaml_path in yaml_file_path: files_obj['yaml_file_' + str(yaml_file_path.index(yaml_path))] = open(yaml_path, 'rb') elif isinstance(yaml_file_path, str): files_obj['yaml_file'] = open(yaml_file_path, 'rb') copied_kwargs['T1'], copied_kwargs['T2'] = self.parse_observation_time_arg_product_gallery( t1=kwargs.get('T1', None), t2=kwargs.get('T2', None), observation_time_format=observation_time_format ) obsid_arg = kwargs.get('obsid', None) if obsid_arg is not None: if isinstance(obsid_arg, list): obsid_list = ','.join(map(str, obsid_arg)) else: obsid_list = obsid_arg copied_kwargs['obsid'] = obsid_list params = { 'title': observation_title, 'token': token, 'update_observation': True, 'create_new': create_new, **copied_kwargs } posting_msg = f'Posting an observation with title {observation_title} on the gallery' logger.info(posting_msg) res = requests.post(os.path.join(self.url, "post_observation_to_gallery"), data=params, files=files_obj ) response_json = self._decode_res_json(res) if res.status_code != 200: res_obj = res.json() error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") if 'error_message' in res_obj: error_message += '\n' + res_obj['error_message'] if 'drupal_helper_error_message' in res_obj: error_message += '-' + res_obj['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: observation_link = response_json['_links']['self']['href'].split("?")[0] observation_title = response_json['title'][0]['value'] logger.info( f"Observation with title {observation_title} successfully posted on the gallery, at the link {observation_link}\n" f"Using the above link you can modify the newly created observation in the future.\n") return response_json def post_observation_to_gallery(self, observation_title: str = None, yaml_file_path=None, token: str = None, observation_time_format: str = 'ISOT', **kwargs): copied_kwargs = kwargs.copy() # generate file obj files_obj = {} if yaml_file_path is not None: if isinstance(yaml_file_path, list): for yaml_path in yaml_file_path: files_obj['yaml_file_' + str(yaml_file_path.index(yaml_path))] = open(yaml_path, 'rb') elif isinstance(yaml_file_path, str): files_obj['yaml_file'] = open(yaml_file_path, 'rb') copied_kwargs['T1'], copied_kwargs['T2'] = self.parse_observation_time_arg_product_gallery( t1=kwargs.get('T1', None), t2=kwargs.get('T2', None), observation_time_format=observation_time_format ) obsid_arg = kwargs.get('obsid', None) if obsid_arg is not None: if isinstance(obsid_arg, list): obsid_list = ','.join(map(str, obsid_arg)) else: obsid_list = obsid_arg copied_kwargs['obsid'] = obsid_list params = { 'title': observation_title, 'token': token, **copied_kwargs } posting_msg = f'Posting an observation with title {observation_title} on the gallery' logger.info(posting_msg) res = requests.post(os.path.join(self.url, "post_observation_to_gallery"), data=params, files=files_obj ) if res.status_code != 200: error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") try: response_json = res.json() except json.decoder.JSONDecodeError: error_msg = res.text response_json = {'error_message': error_msg} logger.debug(response_json) if 'error_message' in response_json: error_message += '\n' + response_json['error_message'] if 'drupal_helper_error_message' in response_json: error_message += '-' + response_json['drupal_helper_error_message'] else: error_message += res.text logger.warning(error_message) else: response_json = self._decode_res_json(res) observation_link = response_json['_links']['self']['href'].split("?")[0] observation_title = response_json['title'][0]['value'] logger.info(f"Observation with title {observation_title} successfully posted on the gallery, at the link {observation_link}\n" f"Using the above link you can modify the newly created observation in the future.\n") return response_json def delete_data_product_from_gallery_via_product_id(self, product_id: str, token: typing.Optional[str] = None): """ :param product_id: identifier of a data product assigned by the user, this can be used during the creation of a new data-product, as well as to identify an already existing one and update it with the arguments provided by the user :param token: user token """ if token is None: token = self.token params = { 'content_type': 'data_product', 'token': token, 'product_id': product_id, } posting_msg = f'Deleting from the gallery a product with product_id {product_id}' logger.info(posting_msg) res = requests.post("%s/delete_product_to_gallery" % self.url, data=params ) if res.status_code != 200: error_message = (f"An issue occurred while performing a request on the product gallery, " f"the following error was returned:\n") try: response_json = res.json() except json.decoder.JSONDecodeError: error_msg = res.text response_json = {'error_message': error_msg} logger.debug(response_json) if 'error_message' in response_json: error_message += '\n' + response_json['error_message'] if 'drupal_helper_error_message' in response_json: error_message += '-' + response_json['drupal_helper_error_message'] logger.warning(error_message) else: response_json = self._decode_res_json(res) logger.info(f"Product successfully deleted from the gallery") return response_json def resolve_source(self, src_name: typing.Optional[str] = None, token: typing.Optional[str] = None): resolved_obj = None if src_name is not None and src_name != '': params = { 'name': src_name, 'token': token } logger.info(f"Searching the object {src_name}\n") res = requests.get("%s/resolve_name" % self.url, params={**params} ) resolved_obj = self._decode_res_json(res) if resolved_obj is not None and 'message' in resolved_obj: logger.info(f'{resolved_obj["message"]}') else: logger.info("Please provide the name of the source\n") return resolved_obj def convert_ijd_to_utc(self, t_ijd): # TODO to reply on a dedicated service in the dispatcher res = requests.get(f"https://www.astro.unige.ch/mmoda/dispatch-data/gw/timesystem/api/v1.0/converttime/IJD/{t_ijd}/UTC") if res.status_code == 200: t_utc = res.text return t_utc else: return None def convert_mjd_to_utc(self, t_mjd): # TODO to reply on a dedicated service in the dispatcher t_utc = self.convert_ijd_to_utc(t_mjd - 51544) return t_utc def check_gallery_data_product_policy(self, token: typing.Optional[str] = None, **kwargs): product_type = kwargs.get('product_type', None) if product_type is not None and product_type != '': params = { 'term': product_type, 'group': 'products', 'token': token } logger.info(f"Applying the policy for the product {product_type}\n") res = requests.get("%s/get_parents_term" % self.url, params={**params} ) parents_term_list = self._decode_res_json(res) if parents_term_list is not None and isinstance(parents_term_list, list): # loop over the available ODAProduct from the plot_tools and find the correspondent one for name, c in inspect.getmembers(plot_tools, inspect.isclass): if issubclass(c, plot_tools.OdaProduct) \ and hasattr(c, 'name') and c.name is not None and c.name in parents_term_list \ and hasattr(c, 'check_product_for_gallery'): return c.check_product_for_gallery(**kwargs) logger.info(f"A policy for the product_type {product_type} could not be applied\n") else: logger.info("A product_type has not been provided for the given data product, " "therefore no policy will be verified\n") return True def check_missing_parameters_data_product(self, response, token: typing.Optional[str] = None, **kwargs): missing_instrument = True instrument_used = None missing_product_type = True if '_links' in response: for field_link in response['_links']: field = field_link.split('/')[-1] if field == 'field_instrumentused': missing_instrument = False instrument_used = kwargs.get('instrument', None) elif field == 'field_data_product_type': missing_product_type = False if missing_instrument: list_instruments = self.get_list_terms_gallery(group='instruments', token=token) logger.info(f'\nWe noticed no instrument has been specified, the following are available:\n' f'{list_instruments}\n' 'Please remember that this can be set at a later stage by editing the newly created data product.\n') if missing_product_type: if not missing_instrument and instrument_used is not None: list_instrument_data_products = self.get_list_terms_gallery(group='products', parent=instrument_used, token=token) if list_instrument_data_products is not None: logger.info(f'\nWe noticed no product type has been specified,\n' f'for the instrument {instrument_used}, the following products are available:\n' f'{list_instrument_data_products}\n' 'Please remember that this can be set at a later stage by editing the newly created data product.\n')