"""Scenario module."""
from gfinder.config import Config
from gfinder.event import Event
import spiceypy as spice
from planetary_coverage import read_events
from planetary_coverage.events.event import AbstractEvent, EventsList, EventsDict
from os import path
KERNELS_PATH_SYMBOL = '$GFINDER_KERNELS_DIR'
"""Symbol used in the config file to specify the output_dir of a mission metakernel
file relative to output_dir of the JUICE SPICE Kernels directory, which specified by the GFINDER_KERNELS_DIR environment variable.
"""
EVENT_ID_SYNONYMS = {
'TOUR': 'custom:TOUR',
'Jupiter_Phase_1': 'phase:Approach and first ellipse',
'Jupiter_Phase_2': 'phase:Energy reduction',
'Jupiter_Phase_3': 'phase:Europa flybys',
'Jupiter_Phase_4': 'phase:High-latitude',
'Jupiter_Phase_5': 'phase:Low energy',
'Jupiter_Phase_all': 'phase:All Jupiter phases',
'GEOa': 'phase:GEOa',
'GCO5000': 'phase:GCO5000',
'GEOb': 'phase:GEOb',
'TGCO': 'phase:TGCO',
'GCO500': 'phase:GCO500',
'Ganymede_Phase_all': 'phase:All Ganymede phases',
'Mission_phase_all': 'phase:All mission phases'
}
"""List of synonyms defined for specific mission event IDs."""
import re
# re.search(expr, key, flags=re.IGNORECASE)
[docs]def crema_flyby_target(flyby_crema_name):
"""Returns target name corresponding to input flyby Crema name.
For example: '7E1' -> 'EUROPA'
"""
TARGETS = {
'E': 'EUROPA',
'C': 'CALLISTO',
'G': 'GANYMEDE'
}
target = ''
total_occ = 0
target_occ = 0
for target_key in TARGETS.keys():
pos = flyby_crema_name.find(target_key)
if pos > 0 and pos < (len(flyby_crema_name)-1):
target = TARGETS[target_key]
total_occ_str = flyby_crema_name[0:pos]
target_occ_str = flyby_crema_name[pos+1:]
try:
total_occ = int(total_occ_str)
target_occ = int(target_occ_str)
except:
pass
# print(target)
# print(total_occ, type(total_occ))
# print(target_occ, type(target_occ))
if total_occ == 0 or target_occ == 0:
return ''
else:
return target
[docs]class MissionScenario:
"""MissionScenario.
"""
def __init__(self, mission_scenario_dict, basedir):
"""Inits MissionScenario object.
Args:
mission_scenario_dict:
basedir: base directory of event files
"""
self.id = mission_scenario_dict['id']
self.name = mission_scenario_dict['name']
self.description = mission_scenario_dict['description']
spice_metakernel = mission_scenario_dict['spice_metakernel']
if KERNELS_PATH_SYMBOL in spice_metakernel:
# replace KERNELS_PATH_SYMBOL by kerneks directory output_dir
spice_metakernel = spice_metakernel.replace(KERNELS_PATH_SYMBOL, Config().kernels_dir, 1)
spice_metakernel = spice_metakernel.replace('//', '/', 1)
elif spice_metakernel[0] != '/': # not absolute output_dir
# use data store mission scenario base directory
spice_metakernel = basedir +'/'+ path.basename(spice_metakernel)
self.spice_metakernel = spice_metakernel
# PATCH handling metakernel ID so that exported PTR files can be validated by AGM,
# see: https://juigitlab.esac.esa.int/python/ptr/-/issues/9
self.agm_metakernel_id = mission_scenario_dict['agm_metakernel_id']
# set event files
self.event_files_dict = mission_scenario_dict['event_files']
self.event_file_keys = []
self.valid = True
if not self.id:
print('ERROR: Missing Mission Scenario ID in ' + basedir)
self.valid = False
if not path.exists(self.spice_metakernel):
print('WARNING: Missing Mission Meta-Kernel file ' + self.spice_metakernel)
self.valid = False
# check event files exists, and set absolute paths.
self.event_files = {}
if self.event_files_dict:
for key in self.event_files_dict:
event_file_abspath = path.join(basedir, self.event_files_dict[key])
if not path.exists(event_file_abspath):
print('WARNING: Missing Mission Event file ' + event_file_abspath)
self.valid = False
else:
self.event_files_dict[key] = event_file_abspath # update output_dir
# print(f'Loading ESA events associated to {self.name} ...')
self.load_events_files()
else:
print(f'WARNING: No Event files found for {self.id}')
self.valid = False
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def isValid(self):
"""Returns validity of a MissionScenario object.
Valid means mission scenario has an identifier and that paths to meta-kernel and event files exist.
"""
return self.valid
[docs] def get_dict(self):
mission_scenario_dict = {
'id': self.id,
'name': self.name,
'description': self.description,
'spice_metakernel': self.spice_metakernel,
'agm_metakernel_id': self.agm_metakernel_id,
'event_files': self.event_files_dict
}
return mission_scenario_dict
[docs] def getKernels(self):
"""Returns list of SPICE kernel files associated to a mission scenario.
Used by (un)loadKernels methods.
"""
spice.ldpool(self.spice_metakernel)
path_symb, found = spice.gcpool('PATH_SYMBOLS', 0, 1, 20)
kernels, found = spice.gcpool('KERNELS_TO_LOAD', 0, 5000, 100)
mission_kernels = []
for kernel in kernels:
mission_kernels.append(kernel.replace('$'+path_symb[0], Config().kernels_dir))
return mission_kernels
[docs] def loadKernels(self):
"""Load SPICE kernels associated to a mission scenario.
"""
spice.kclear()
if not self.kernelsLoaded():
for kernel in self.getKernels():
spice.furnsh(kernel)
[docs] def kernelsLoaded(self):
"""Return True if all mission kernels are loaded in the SPICE kernel pool.
"""
kernels_loaded = True
n_loaded_kernels = spice.ktotal('ALL')
if n_loaded_kernels == 0:
kernels_loaded = False
else:
loaded_kernels = []
for i in range(n_loaded_kernels):
file, filtyp, source, handle, found = spice.kdata(i, 'ALL')
loaded_kernels.append(file)
for kernel in self.getKernels():
if kernel not in loaded_kernels:
kernels_loaded = False
return kernels_loaded
[docs] def unloadKernels(self):
"""Unload SPICE kernels associated to a mission scenario, except the LSK and JUICE OPS FK kernels.
"""
for kernel in self.getKernels():
lsk_kernels = []
ops_fk_kernels = []
if ('.tls' in kernel) or ('juice_ops_v' in kernel): # LSK or JUICE OPS FK kernel (patch for PointingViewer to work)
if (kernel in lsk_kernels) or (kernel in ops_fk_kernels): # LSK already in the pool
spice.unload(kernel)
else:
if '.tls' in kernel:
lsk_kernels.append(kernel)
elif 'juice_ops_v' in kernel:
ops_fk_kernels.append(kernel)
else: # not a LSK or or JUICE OPS FK kernel
spice.unload(kernel)
[docs] def getKernelsCoverage(self):
idcode, exists = spice.bodn2c('JUICE')
start_time, stop_time = ('','')
n_kernels = spice.ktotal('SPK')
for i in range(n_kernels):
file, type, source, handle, found = spice.kdata(i, 'SPK', 256, 33, 256)
if found:
spk_file = path.abspath(file)
cov_event = Event(time_window=spice.spkcov(spk_file, idcode))
if cov_event.n_intervals > 0:
# Reduce time window by N days to avoid "Insufficient ephemeris data" SPICE error (for unknown reasons)
cov_event = Event(
start_time=cov_event.get_start_time()+spice.spd(),
stop_time=cov_event.get_stop_time()-spice.spd()
)
return cov_event # TODO: handling multiple spk files coverage
# /Users/nmanaud/workspace/git-repos/spice_kernels/juice/kernels/spk/juice_mat_crema_4_0b_20230809_20360821_v02.bsp
return None
[docs] def synonym_event_id(self, synonym):
"""Returns MOS event ID from synonym.
Attempts to:
- use synonyms from within the EVENT_ID_SYNONYMS dictionary (eg: 'TOUR' -> 'custom:TOUR'),
- identify flyby Crema name as synonym (eg: '7E1' -> 'timeline:FLYBY_EUROPA:7E1'),
- identify perijove and apojove as synonym (eg: 'PJ12' -> 'timeline:PERIJOVE_12PJ'),
- identify orbit number as synonym (eg: 'ORB54' -> 'orbit:54')
"""
if synonym in EVENT_ID_SYNONYMS.keys():
return EVENT_ID_SYNONYMS[synonym]
target = crema_flyby_target(synonym)
if target: # meaning input crema name corresponds to a flyby, and is valid
return f'timeline:FLYBY_{target}:{synonym}'
if synonym[0:2].upper() == 'PJ': # 'PJ12' -> 'timeline:PERIJOVE_12PJ'
occstr = synonym[2:]
try:
occ = int(occstr)
return f'timeline:PERIJOVE_{occ}PJ'
except:
return ''
if synonym[0:2].upper() == 'AP': # 'AP58' -> 'timeline:APOJOVE_58AP'
occstr = synonym[2:]
try:
occ = int(occstr)
return f'timeline:APOJOVE_{occ}AP'
except:
return ''
if synonym[0:3].upper() == 'ORB': # 'ORB54' -> 'orbit:54'
occstr = synonym[3:]
try:
occ = int(occstr)
return f'orbit:{occ}'
except:
return ''
if synonym[0:2].upper() == 'DL': # 'DL666' -> 'dl:DL_:666'
occstr = synonym[2:]
try:
occ = int(occstr)
return f'dl:DL_:{occ}'
except:
return ''
return ''
[docs] def load_events_files(self):
"""Loads defined ESA events files.
"""
self.event_files = {}
for event_file_key in self.event_files_dict.keys():
event_file_path = self.event_files_dict[event_file_key]
event_file = read_events(event_file_path) # planetary-coverage "ESA" Events
self.event_files[event_file_key] = event_file # hypothesis: only one event file per key
self.event_file_keys.append(event_file_key)
[docs] def get_events_ids(self, event_file_key='', filter=None, before=None, after=None) -> list[str]:
"""Returns a list of event ids matching given filters.
Temporary implementation of 'filter': EventsDict.find() not working as expected, filtering output event_ids
Examples:
filter='^VIS.*E$' # to filter event names starting with 'VIS' and ending with a 'E'
"""
if before:
raise NotImplementedError
if after:
raise NotImplementedError
event_file_keys = self.event_file_keys
event_ids = []
if event_file_key:
event_file_keys = [event_file_key]
eventsdict = None
for event_file_key in event_file_keys:
# print(event_file_key)
event_file = self.event_files[event_file_key]
# NOT WORKING: asked Benoit why (Jan 20, 2023).
# if filter:
# try:
# result = event_file.find(filter) # AbstractEvent, EventsList or EventsDict object is returned
# if isinstance(result, AbstractEvent):
# print('AbstractEvent')
# eventsdict = EventsDict(result)
# elif isinstance(result, EventsList):
# print('EventsList')
# events = []
# for event in result:
# print(event)
# events.append(event)
# eventsdict = EventsDict(events)
# print(f'{len(events)} events.')
# elif isinstance(result, EventsDict):
# print('EventsDict')
# eventsdict = result
# except:
# pass # no results for this event file
# else:
# eventsdict = event_file
eventsdict = event_file # temporary `filter` implementation.
if eventsdict: # AbstractEventsFile(EventsDict) or search result put into an EventsDict object
for key in eventsdict.keys():
try:
keynum = int(key)
key = keynum
except:
pass
if isinstance(eventsdict[key], AbstractEvent):
event_id = f'{event_file_key}:{key}'
if filter: # temporaty filter implementation
if filter.lower() in event_id.lower():
event_ids.append(event_id)
else:
event_ids.append(event_id)
elif isinstance(eventsdict[key], EventsList):
if eventsdict[key].crema_names:
for crema_name, event in zip(eventsdict[key].crema_names, eventsdict[key]):
event_id = f'{event_file_key}:{key}:{crema_name}'
if filter: # temporaty filter implementation
if filter.lower() in event_id.lower():
event_ids.append(event_id)
else:
event_ids.append(event_id)
else:
for index, event in enumerate(eventsdict[key]):
index += 1
event_id = f'{event_file_key}:{key}:{index}'
if filter: # temporaty filter implementation
if filter.lower() in event_id.lower():
event_ids.append(event_id)
else:
event_ids.append(event_id)
return event_ids
[docs] def get_event(self, event_id) -> Event:
"""Returns an (MOS) Event object from the DataStore for given event ID.
An Event ID within MOS is defined as a unique string identifier allowing to retrieve an event defined in the
data store events files. Value is formed as follows:
event_id: '<event_file_key>:<key1>[:<key2>]' or '<synonym>'
where:
- <event_file_key> is the key of event file as defined in the Mission Scenario Index file, for example:
'phase', 'timeline', 'orbit', 'dl' and 'custom'.
- <key1> is the name of the event (eg: 'GCO500' or 'PERIJOVE_12PJ'), or the name of the events list (eg:
'FLYBY_EUROPA' or 'DL_').
- <key2> is the optional name of the event, when exists (eg: '7E1' for 'FLYBY_EUROPA' events list), or the
occurence number of the event in the events list (eg: '66' for 'IO_TRANSIT' events list).
When only <event_file_key> is provided (no <key1> and <key2>), it is assumed to be a synonym and its corresponding
event ID is derived; see self.synonym_event_id() method.
"""
# parse input event_id to retrieve events file keys
event_file_key, key1, key2 = None, None, None
full_event_id = event_id
event_id_split = event_id.split(':')
if len(event_id_split) == 1: # handle synonyms (note: all 'custom' event names must be prefixed with 'custom:')
synonym = event_id_split[0]
full_event_id = self.synonym_event_id(synonym)
if full_event_id == '':
return None # invalid input event ID synonym
# derive event keys from event id
full_event_id_split = full_event_id.split(':')
if len(full_event_id_split) == 2:
event_file_key = full_event_id_split[0]
key1 = full_event_id_split[1]
key2 = None
elif len(full_event_id_split) == 3:
event_file_key = full_event_id_split[0]
key1 = full_event_id_split[1]
key2 = full_event_id_split[2]
# cast key1 and key2 into integers if castable ('17' -> 17)
try:
keynum = int(key1)
key1 = keynum
except:
pass
try:
keynum = int(key2)
key2 = keynum
except:
pass
# get ESA Event object from loaded events files
if key2:
try:
esa_event = self.event_files[event_file_key][key1][key2]
except:
esa_event = None
else:
try:
esa_event = self.event_files[event_file_key][key1]
except:
esa_event = None
# create MOS Event object from ESA Event object
event = None
if esa_event:
event = Event(
id=full_event_id,
start_time=str(esa_event.start),
stop_time=str(esa_event.stop),
format='utc',
data=esa_event.data
)
return event