Source code for daxa.process.xmm.generate

#  This code is a part of the Democratising Archival X-ray Astronomy (DAXA) module.
#  Last modified by David J Turner (turne540@msu.edu) 02/01/2025, 15:57. Copyright (c) The Contributors
import os
import shutil
from typing import Tuple
from warnings import warn

import numpy as np
import pandas as pd
import xga
from astropy.units import Quantity, UnitConversionError
from xga.sas import eexpmap
from xga.sources import NullSource

from daxa import NUM_CORES
from daxa.archive.base import Archive
from daxa.process.xmm._common import ALLOWED_XMM_MISSIONS


def _en_checks(lo_en: Quantity, hi_en: Quantity) -> Tuple[Quantity, Quantity]:
    """
    This internal functions performs the checks on low and high energy bounds that are common to multiple generation
    functions such as those that generate imaged and those that generate exposure maps.

    :param Quantity lo_en: The lower energy bound(s) for the product being generated. This can either be passed as a
        scalar Astropy Quantity or, if sets of the same product in different energy bands are to be generated, as a
        non-scalar Astropy Quantity. If multiple lower bounds are passed, they must each have an entry in the
        hi_en argument.
    :param Quantity hi_en: The upper energy bound(s) for the product being generated. This can either be passed as a
        scalar Astropy Quantity or, if sets of the same product in different energy bands are to be generated, as a
        non-scalar Astropy Quantity. If multiple upper bounds are passed, they must each have an entry in the
        lo_en argument.
    :return: The lower and upper energy bound arguments, converted to keV.
    :rtype: Tuple[Quantity, Quantity]
    """

    if not lo_en.unit.is_equivalent('keV') or not hi_en.unit.is_equivalent('keV'):
        raise UnitConversionError("Both the lo_en and hi_en arguments must be in units that can be converted to keV.")
    # Multiple lower energy and upper energy bounds can be passed, if multiple sets of images are to be generated. Here
    #  we check that if multiple lower energy bounds have been passed, multiple upper bounds have been
    #  too (and vica versa)
    elif lo_en.isscalar != hi_en.isscalar:
        raise ValueError("If either lo_en or hi_en is scalar (one value), the other energy bound argument must be too.")
    # At this point we're sure that both energy bounds are either scalar or not, so just checking for non-scalarness
    #  with one of the bounds is fine - we now need to make sure that the same number of bounds are in each parameter
    elif not lo_en.isscalar and len(lo_en) != len(hi_en):
        raise ValueError("The lo_en argument has {le} entries, and the hi_en argument has {he} entries - the number "
                         "of entries must be the same.".format(le=len(lo_en), he=len(hi_en)))
    else:
        # Make sure that the energies are in keV
        lo_en = lo_en.to('keV')
        hi_en = hi_en.to('keV')

    # Next we make sure that if only one pair of energy bounds have been passed, we can still iterate through them
    if lo_en.isscalar:
        lo_en = np.array([lo_en])
        hi_en = np.array([hi_en])

    # Finally, need to check that the lower energy bounds are all smaller than the upper, anything else
    #  would be unphysical.
    if not (lo_en < hi_en).all():
        raise ValueError("All lower energy bounds (lo_en) must be smaller than their upper bound (hi_en) equivalent.")

    return lo_en, hi_en


[docs] def generate_images_expmaps(obs_archive: Archive, lo_en: Quantity = Quantity([0.5, 2.0], 'keV'), hi_en: Quantity = Quantity([2.0, 10.0], 'keV'), num_cores: int = NUM_CORES): """ A function to generate images and exposure maps for a processed XMM mission dataset contained within an archive. Users can select the energy band(s) that they wish to generate images and exposure maps within. :param Archive obs_archive: :param lo_en: The lower energy bound(s) for the product being generated. This can either be passed as a scalar Astropy Quantity or, if sets of the same product in different energy bands are to be generated, as a non-scalar Astropy Quantity. If multiple lower bounds are passed, they must each have an entry in the hi_en argument. The default is 'Quantity([0.5, 2.0], 'keV')', which will generate two sets of products, one with lower bound 0.5 keV, the other with lower bound 2 keV. :param hi_en: The upper energy bound(s) for the product being generated. This can either be passed as a scalar Astropy Quantity or, if sets of the same product in different energy bands are to be generated, as a non-scalar Astropy Quantity. If multiple upper bounds are passed, they must each have an entry in the lo_en argument. The default is 'Quantity([2.0, 10.0], 'keV')', which will generate two sets of products, one with upper bound 2.0 keV, the other with upper bound 10 keV. :param int num_cores: The number of cores to use, default is set to 90% of available. """ # Run the energy bounds checks _en_checks(lo_en, hi_en) # Just grabs the XMM missions, we already know there will be at least one because otherwise _sas_process_setup # would have thrown an error xmm_miss = [mission for mission in obs_archive if mission.name in ALLOWED_XMM_MISSIONS] # We are iterating through XMM missions (options could include xmm_pointed and xmm_slew for instance). for miss in xmm_miss: # This will trigger and exception if the mission is for slew data, which XGA doesn't work with at # the time of writing. if miss == 'xmm_slew': raise NotImplementedError("XGA cannot currently be used to generate XMM slew observation images.") # Identifiers for all the valid data are fetched, and will be narrowed down later so that only those which # had cleaned event lists generated successfully are selected. For this check I know I only want the # ObsIDs and instruments - and also that there shouldn't be duplicates rel_obs_info = np.array(obs_archive.get_obs_to_process(miss.name))[:, :2] # This ensures the rows of rel_obs_info are unique rel_obs_info = np.unique(rel_obs_info, axis=0) # If the merging function hasn't been run, I won't allow this function to run - I was thinking I would let # the user generate sub-exposure images, but actually XGA doesn't support that, and though it would be # possible by iterating and bodging, I'm not going to do that now good_ce = obs_archive.check_dependence_success(miss.name, rel_obs_info, 'merge_subexposures') val_obs_info = np.array(rel_obs_info)[good_ce] # If there are no entries in rel_ids then there are no event lists to work on, so we raise a warning if len(val_obs_info) == 0: warn("Every merge_subexposures run for the {m} mission in the {a} archive is reporting as a " "failure, skipping process.".format(m=miss.name, a=obs_archive.archive_name), stacklevel=2) continue # The dictionary that stores which ObsIDs have which instruments which_obs = {} # A dictionary that stores example file names for event lists - these will be overwritten every iteration # but that's okay. evt_names = {} # A bit ugly and inelegant but oh well - this just iterates through all the relevant IDs, separating them # into a dictionary that has ObsIDs as top level keys, and a list of available instruments as the values for obs_info in val_obs_info: # Unpacking the obs_info obs_id, inst = obs_info # Combining the info for a current ID to access merged event lists with cur_id = ''.join(obs_info) if obs_id not in which_obs: which_obs[obs_id] = [inst] else: which_obs[obs_id].append(inst) # Grabs the output path for the final event list, then splits it to remove the absolute bit of the # absolute path, leaving just the filename. evt_path = obs_archive.process_extra_info[miss.name]['merge_subexposures'][cur_id]['final_evt'] evt_names[inst] = evt_path.split('/')[-1].replace(obs_id, '{obs_id}') # It is conceivable that there are no observations with a particular instrument, so we check for that and # put a dummy path in the dictionary because whilst it may need to actually point at a file, XGA does need # an entry in the configuration file for all instruments if 'PN' not in evt_names: evt_names['PN'] = 'PN_clean_evts.fits' if 'M1' not in evt_names: evt_names['M1'] = 'M1_clean_evts.fits' if 'M2' not in evt_names: evt_names['M2'] = 'M2_clean_evts.fits' # Now we can do some post-processing, and turn the information in 'which_obs' into the various # dataframes required to trick XGA into making our images for us. Normally you have to setup a configuration # file to use XGA, that points it to where the event lists etc. live, but here we're going to alter the # XGA variables that tell the module where to look for data AFTER it's been imported # These are the column names for an XGA census - will be used in the creation of a pandas dataframe census_cols = ['ObsID', 'RA_PNT', 'DEC_PNT', 'USE_PN', 'USE_MOS1', 'USE_MOS2'] # This list will get filled in with the observation data census_data = [] for obs_id in which_obs: # Grabs the information about the current ObsID, which we will use to supply the pointing coordinates obs_info = obs_archive[miss.name].all_obs_info[obs_archive[miss.name].all_obs_info['ObsID'] == obs_id].iloc[0] census_data.append([obs_id, obs_info['ra'], obs_info['dec'], 'PN' in which_obs[obs_id], 'M1' in which_obs[obs_id], 'M2' in which_obs[obs_id]]) # Constructs the census dataframe from the data we assembled, and the columns we defined earlier census = pd.DataFrame(census_data, columns=census_cols) # Makes an empty blacklist dataframe - we don't want to blacklist any observation, so we have to replace # whatever XGA already had loaded blacklist_cols = ["ObsID", "EXCLUDE_PN", "EXCLUDE_MOS1", "EXCLUDE_MOS2"] blacklist = pd.DataFrame(None, columns=blacklist_cols) # Setting up the configuration file - all that really matters in this is the paths to the cleaned event # lists, and the path to the attitude file. # TODO set the attitude file programmatically xmm_files = {"root_xmm_dir": obs_archive.archive_path+'processed_data/' + miss.name + '/', "clean_pn_evts": obs_archive.archive_path+'processed_data/' + miss.name + '/{obs_id}/events/' + evt_names['PN'], "clean_mos1_evts": obs_archive.archive_path+'processed_data/' + miss.name + '/{obs_id}/events/' + evt_names['M1'], "clean_mos2_evts": obs_archive.archive_path+'processed_data/' + miss.name + '/{obs_id}/events/' + evt_names['M2'], "attitude_file": obs_archive.archive_path+'processed_data/' + miss.name + '/{obs_id}/P{obs_id}OBX000ATTTSR0000.FIT', "lo_en": ['0.50', '2.00'], "hi_en": ['2.00', '10.00'], "pn_image": "/this/is/optional/{obs_id}/{obs_id}-{lo_en}-{hi_en}keV-pn_merged_img.fits", "mos1_image": "/this/is/optional/{obs_id}/{obs_id}-{lo_en}-{hi_en}keV-mos1_merged_img.fits", "mos2_image": "/this/is/optional/{obs_id}/{obs_id}-{lo_en}-{hi_en}keV-mos2_merged_img.fits", "pn_expmap": "/this/is/optional/{obs_id}/{obs_id}-{lo_en}-{hi_en}keV-pn_merged_img.fits", "mos1_expmap": "/this/is/optional/{obs_id}/{obs_id}-{lo_en}-{hi_en}keV-mos1_merged_expmap.fits", "mos2_expmap": "/this/is/optional/{obs_id}/{obs_id}-{lo_en}-{hi_en}keV-mos2_merged_expmap.fits", "region_file": "/this/is/optional/xmm_obs/regions/{obs_id}/regions.reg"} # The actual config file has another subdictionary, but that doesn't matter for this bodge xmm_config = {'XMM_FILES': xmm_files} # This is where the outputs from XGA will be stored new_out = obs_archive.archive_path+'processed_data/' + miss.name + '/xga_output/' # This makes sure that the directory exists, if it doesn't already if not os.path.exists(new_out): os.makedirs(new_out) # Here we manually replace the global variables in XGA - this is viable because the NullSource class is very # limited and we're using it to do a very limited number of things. Otherwise replacing variables like this # in a module is a very bad idea - you'll see that some variables have to be overwritten multiple times in # different places, and that's because the sub-modules load in the CENSUS, OUTPUT etc. global variables from # utils when they're imported, and if this function is called multiple times for multiple archives in one # session then we have to overwrite the OUTPUT variable in those individual sub-modules. xga.sources.base.CENSUS = census xga.sources.base.xga_conf = xmm_config xga.sources.base.BLACKLIST = blacklist xga.sources.base.OUTPUT = new_out xga.sas.phot.OUTPUT = new_out xga.sas.misc.OUTPUT = new_out # Setting up a NullSource, which will contain every ObsID in this archive null_src = NullSource() # Iterating through the energy band pairs (if there are multiple), to generate images and exposure maps. for en_ind, lo in enumerate(lo_en): hi = hi_en[en_ind] # This function will also generate images, before it makes exposure maps eexpmap(null_src, lo, hi, num_cores=num_cores) # This goes through and for obs_id in which_obs: # This is the directory where XGA stored the files generated for the current value of ObsID cur_path = new_out + obs_id + '/' # We set up the path we're going to move things too, in the existing DAXA directory structure - file # names will be added onto the end dest_dir = obs_archive.construct_processed_data_path(miss.name, obs_id) + 'images/' # We make sure that directory exists (can't think why it wouldn't, but better to be safe). if os.path.exists(cur_path): # Make a list of the files at the current path, but making sure to exclude the calibration index # file that we know will be there as well - everything else is something that we want to rename # and move to the final images directory rel_files = [f for f in os.listdir(cur_path) if 'ccf' not in f] # Now we iterate through the identified files, setup their final names/full paths, and move them for file_name in rel_files: # We convert them to the new DAXA naming convention for files if 'img' in file_name: inst = file_name.split('_')[1] inst = miss.check_inst_names(inst, error_on_bad_inst=False, show_warn=False)[0] cur_lo, cur_hi = file_name.split("_")[-1].split('keV')[0].split("-") new_name = "obsid{oi}-inst{i}-subexpALL-en{l}_{h}keV-image.fits".format(oi=obs_id, i=inst, l=cur_lo, h=cur_hi) elif 'expmap' in file_name: inst = file_name.split('_')[1] inst = miss.check_inst_names(inst, error_on_bad_inst=False, show_warn=False)[0] cur_lo, cur_hi = file_name.split("_")[-1].split('keV')[0].split("-") new_name = "obsid{oi}-inst{i}-subexpALL-en{l}_{h}keV-expmap.fits".format(oi=obs_id, i=inst, l=cur_lo, h=cur_hi) # Move the file to its new home, with its new name dest_file_path = dest_dir + new_name # Doing the actual moving of the directory shutil.move(cur_path + file_name, dest_file_path) # Finally we remove the XGA output directory. shutil.rmtree(new_out)