Source code for gfinder.scenario

"""Scenario module."""

from gfinder.config import Config
from gfinder.event import Event

import spiceypy as spice

from planetary_coverage import read_events
from planetary_coverage.events.event import AbstractEvent, EventsList, EventsDict

from os import path

KERNELS_PATH_SYMBOL = '$GFINDER_KERNELS_DIR'
"""Symbol used in the config file to specify the output_dir of a mission metakernel
file relative to output_dir of the JUICE SPICE Kernels directory, which specified by the GFINDER_KERNELS_DIR environment variable.
"""

EVENT_ID_SYNONYMS = {
    'TOUR': 'custom:TOUR',
    'Jupiter_Phase_1': 'phase:Approach and first ellipse',
    'Jupiter_Phase_2': 'phase:Energy reduction',
    'Jupiter_Phase_3': 'phase:Europa flybys',
    'Jupiter_Phase_4': 'phase:High-latitude',
    'Jupiter_Phase_5': 'phase:Low energy',
    'Jupiter_Phase_all': 'phase:All Jupiter phases',
    'GEOa': 'phase:GEOa',
    'GCO5000': 'phase:GCO5000',
    'GEOb': 'phase:GEOb',
    'TGCO': 'phase:TGCO',
    'GCO500': 'phase:GCO500',
    'Ganymede_Phase_all': 'phase:All Ganymede phases',
    'Mission_phase_all': 'phase:All mission phases'
}
"""List of synonyms defined for specific mission event IDs."""

import re
# re.search(expr, key, flags=re.IGNORECASE)

[docs]def crema_flyby_target(flyby_crema_name): """Returns target name corresponding to input flyby Crema name. For example: '7E1' -> 'EUROPA' """ TARGETS = { 'E': 'EUROPA', 'C': 'CALLISTO', 'G': 'GANYMEDE' } target = '' total_occ = 0 target_occ = 0 for target_key in TARGETS.keys(): pos = flyby_crema_name.find(target_key) if pos > 0 and pos < (len(flyby_crema_name)-1): target = TARGETS[target_key] total_occ_str = flyby_crema_name[0:pos] target_occ_str = flyby_crema_name[pos+1:] try: total_occ = int(total_occ_str) target_occ = int(target_occ_str) except: pass # print(target) # print(total_occ, type(total_occ)) # print(target_occ, type(target_occ)) if total_occ == 0 or target_occ == 0: return '' else: return target
[docs]class MissionScenario: """MissionScenario. """ def __init__(self, mission_scenario_dict, basedir): """Inits MissionScenario object. Args: mission_scenario_dict: basedir: base directory of event files """ self.id = mission_scenario_dict['id'] self.name = mission_scenario_dict['name'] self.description = mission_scenario_dict['description'] spice_metakernel = mission_scenario_dict['spice_metakernel'] if KERNELS_PATH_SYMBOL in spice_metakernel: # replace KERNELS_PATH_SYMBOL by kerneks directory output_dir spice_metakernel = spice_metakernel.replace(KERNELS_PATH_SYMBOL, Config().kernels_dir, 1) spice_metakernel = spice_metakernel.replace('//', '/', 1) elif spice_metakernel[0] != '/': # not absolute output_dir # use data store mission scenario base directory spice_metakernel = basedir +'/'+ path.basename(spice_metakernel) self.spice_metakernel = spice_metakernel # PATCH handling metakernel ID so that exported PTR files can be validated by AGM, # see: https://juigitlab.esac.esa.int/python/ptr/-/issues/9 self.agm_metakernel_id = mission_scenario_dict['agm_metakernel_id'] # set event files self.event_files_dict = mission_scenario_dict['event_files'] self.event_file_keys = [] self.valid = True if not self.id: print('ERROR: Missing Mission Scenario ID in ' + basedir) self.valid = False if not path.exists(self.spice_metakernel): print('WARNING: Missing Mission Meta-Kernel file ' + self.spice_metakernel) self.valid = False # check event files exists, and set absolute paths. self.event_files = {} if self.event_files_dict: for key in self.event_files_dict: event_file_abspath = path.join(basedir, self.event_files_dict[key]) if not path.exists(event_file_abspath): print('WARNING: Missing Mission Event file ' + event_file_abspath) self.valid = False else: self.event_files_dict[key] = event_file_abspath # update output_dir # print(f'Loading ESA events associated to {self.name} ...') self.load_events_files() else: print(f'WARNING: No Event files found for {self.id}') self.valid = False def __repr__(self): return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def isValid(self): """Returns validity of a MissionScenario object. Valid means mission scenario has an identifier and that paths to meta-kernel and event files exist. """ return self.valid
[docs] def get_dict(self): mission_scenario_dict = { 'id': self.id, 'name': self.name, 'description': self.description, 'spice_metakernel': self.spice_metakernel, 'agm_metakernel_id': self.agm_metakernel_id, 'event_files': self.event_files_dict } return mission_scenario_dict
[docs] def getKernels(self): """Returns list of SPICE kernel files associated to a mission scenario. Used by (un)loadKernels methods. """ spice.ldpool(self.spice_metakernel) path_symb, found = spice.gcpool('PATH_SYMBOLS', 0, 1, 20) kernels, found = spice.gcpool('KERNELS_TO_LOAD', 0, 5000, 100) mission_kernels = [] for kernel in kernels: mission_kernels.append(kernel.replace('$'+path_symb[0], Config().kernels_dir)) return mission_kernels
[docs] def loadKernels(self): """Load SPICE kernels associated to a mission scenario. """ spice.kclear() if not self.kernelsLoaded(): for kernel in self.getKernels(): spice.furnsh(kernel)
[docs] def kernelsLoaded(self): """Return True if all mission kernels are loaded in the SPICE kernel pool. """ kernels_loaded = True n_loaded_kernels = spice.ktotal('ALL') if n_loaded_kernels == 0: kernels_loaded = False else: loaded_kernels = [] for i in range(n_loaded_kernels): file, filtyp, source, handle, found = spice.kdata(i, 'ALL') loaded_kernels.append(file) for kernel in self.getKernels(): if kernel not in loaded_kernels: kernels_loaded = False return kernels_loaded
[docs] def unloadKernels(self): """Unload SPICE kernels associated to a mission scenario, except the LSK and JUICE OPS FK kernels. """ for kernel in self.getKernels(): lsk_kernels = [] ops_fk_kernels = [] if ('.tls' in kernel) or ('juice_ops_v' in kernel): # LSK or JUICE OPS FK kernel (patch for PointingViewer to work) if (kernel in lsk_kernels) or (kernel in ops_fk_kernels): # LSK already in the pool spice.unload(kernel) else: if '.tls' in kernel: lsk_kernels.append(kernel) elif 'juice_ops_v' in kernel: ops_fk_kernels.append(kernel) else: # not a LSK or or JUICE OPS FK kernel spice.unload(kernel)
[docs] def getKernelsCoverage(self): idcode, exists = spice.bodn2c('JUICE') start_time, stop_time = ('','') n_kernels = spice.ktotal('SPK') for i in range(n_kernels): file, type, source, handle, found = spice.kdata(i, 'SPK', 256, 33, 256) if found: spk_file = path.abspath(file) cov_event = Event(time_window=spice.spkcov(spk_file, idcode)) if cov_event.n_intervals > 0: # Reduce time window by N days to avoid "Insufficient ephemeris data" SPICE error (for unknown reasons) cov_event = Event( start_time=cov_event.get_start_time()+spice.spd(), stop_time=cov_event.get_stop_time()-spice.spd() ) return cov_event # TODO: handling multiple spk files coverage # /Users/nmanaud/workspace/git-repos/spice_kernels/juice/kernels/spk/juice_mat_crema_4_0b_20230809_20360821_v02.bsp return None
[docs] def synonym_event_id(self, synonym): """Returns MOS event ID from synonym. Attempts to: - use synonyms from within the EVENT_ID_SYNONYMS dictionary (eg: 'TOUR' -> 'custom:TOUR'), - identify flyby Crema name as synonym (eg: '7E1' -> 'timeline:FLYBY_EUROPA:7E1'), - identify perijove and apojove as synonym (eg: 'PJ12' -> 'timeline:PERIJOVE_12PJ'), - identify orbit number as synonym (eg: 'ORB54' -> 'orbit:54') """ if synonym in EVENT_ID_SYNONYMS.keys(): return EVENT_ID_SYNONYMS[synonym] target = crema_flyby_target(synonym) if target: # meaning input crema name corresponds to a flyby, and is valid return f'timeline:FLYBY_{target}:{synonym}' if synonym[0:2].upper() == 'PJ': # 'PJ12' -> 'timeline:PERIJOVE_12PJ' occstr = synonym[2:] try: occ = int(occstr) return f'timeline:PERIJOVE_{occ}PJ' except: return '' if synonym[0:2].upper() == 'AP': # 'AP58' -> 'timeline:APOJOVE_58AP' occstr = synonym[2:] try: occ = int(occstr) return f'timeline:APOJOVE_{occ}AP' except: return '' if synonym[0:3].upper() == 'ORB': # 'ORB54' -> 'orbit:54' occstr = synonym[3:] try: occ = int(occstr) return f'orbit:{occ}' except: return '' if synonym[0:2].upper() == 'DL': # 'DL666' -> 'dl:DL_:666' occstr = synonym[2:] try: occ = int(occstr) return f'dl:DL_:{occ}' except: return '' return ''
[docs] def load_events_files(self): """Loads defined ESA events files. """ self.event_files = {} for event_file_key in self.event_files_dict.keys(): event_file_path = self.event_files_dict[event_file_key] event_file = read_events(event_file_path) # planetary-coverage "ESA" Events self.event_files[event_file_key] = event_file # hypothesis: only one event file per key self.event_file_keys.append(event_file_key)
[docs] def get_events_ids(self, event_file_key='', filter=None, before=None, after=None) -> list[str]: """Returns a list of event ids matching given filters. Temporary implementation of 'filter': EventsDict.find() not working as expected, filtering output event_ids Examples: filter='^VIS.*E$' # to filter event names starting with 'VIS' and ending with a 'E' """ if before: raise NotImplementedError if after: raise NotImplementedError event_file_keys = self.event_file_keys event_ids = [] if event_file_key: event_file_keys = [event_file_key] eventsdict = None for event_file_key in event_file_keys: # print(event_file_key) event_file = self.event_files[event_file_key] # NOT WORKING: asked Benoit why (Jan 20, 2023). # if filter: # try: # result = event_file.find(filter) # AbstractEvent, EventsList or EventsDict object is returned # if isinstance(result, AbstractEvent): # print('AbstractEvent') # eventsdict = EventsDict(result) # elif isinstance(result, EventsList): # print('EventsList') # events = [] # for event in result: # print(event) # events.append(event) # eventsdict = EventsDict(events) # print(f'{len(events)} events.') # elif isinstance(result, EventsDict): # print('EventsDict') # eventsdict = result # except: # pass # no results for this event file # else: # eventsdict = event_file eventsdict = event_file # temporary `filter` implementation. if eventsdict: # AbstractEventsFile(EventsDict) or search result put into an EventsDict object for key in eventsdict.keys(): try: keynum = int(key) key = keynum except: pass if isinstance(eventsdict[key], AbstractEvent): event_id = f'{event_file_key}:{key}' if filter: # temporaty filter implementation if filter.lower() in event_id.lower(): event_ids.append(event_id) else: event_ids.append(event_id) elif isinstance(eventsdict[key], EventsList): if eventsdict[key].crema_names: for crema_name, event in zip(eventsdict[key].crema_names, eventsdict[key]): event_id = f'{event_file_key}:{key}:{crema_name}' if filter: # temporaty filter implementation if filter.lower() in event_id.lower(): event_ids.append(event_id) else: event_ids.append(event_id) else: for index, event in enumerate(eventsdict[key]): index += 1 event_id = f'{event_file_key}:{key}:{index}' if filter: # temporaty filter implementation if filter.lower() in event_id.lower(): event_ids.append(event_id) else: event_ids.append(event_id) return event_ids
[docs] def get_event(self, event_id) -> Event: """Returns an (MOS) Event object from the DataStore for given event ID. An Event ID within MOS is defined as a unique string identifier allowing to retrieve an event defined in the data store events files. Value is formed as follows: event_id: '<event_file_key>:<key1>[:<key2>]' or '<synonym>' where: - <event_file_key> is the key of event file as defined in the Mission Scenario Index file, for example: 'phase', 'timeline', 'orbit', 'dl' and 'custom'. - <key1> is the name of the event (eg: 'GCO500' or 'PERIJOVE_12PJ'), or the name of the events list (eg: 'FLYBY_EUROPA' or 'DL_'). - <key2> is the optional name of the event, when exists (eg: '7E1' for 'FLYBY_EUROPA' events list), or the occurence number of the event in the events list (eg: '66' for 'IO_TRANSIT' events list). When only <event_file_key> is provided (no <key1> and <key2>), it is assumed to be a synonym and its corresponding event ID is derived; see self.synonym_event_id() method. """ # parse input event_id to retrieve events file keys event_file_key, key1, key2 = None, None, None full_event_id = event_id event_id_split = event_id.split(':') if len(event_id_split) == 1: # handle synonyms (note: all 'custom' event names must be prefixed with 'custom:') synonym = event_id_split[0] full_event_id = self.synonym_event_id(synonym) if full_event_id == '': return None # invalid input event ID synonym # derive event keys from event id full_event_id_split = full_event_id.split(':') if len(full_event_id_split) == 2: event_file_key = full_event_id_split[0] key1 = full_event_id_split[1] key2 = None elif len(full_event_id_split) == 3: event_file_key = full_event_id_split[0] key1 = full_event_id_split[1] key2 = full_event_id_split[2] # cast key1 and key2 into integers if castable ('17' -> 17) try: keynum = int(key1) key1 = keynum except: pass try: keynum = int(key2) key2 = keynum except: pass # get ESA Event object from loaded events files if key2: try: esa_event = self.event_files[event_file_key][key1][key2] except: esa_event = None else: try: esa_event = self.event_files[event_file_key][key1] except: esa_event = None # create MOS Event object from ESA Event object event = None if esa_event: event = Event( id=full_event_id, start_time=str(esa_event.start), stop_time=str(esa_event.stop), format='utc', data=esa_event.data ) return event