[docs]class DataStore:
""" DataStore.
Attributes:
app_data_dir (str): Application data directory output_dir.
user_data_dir (str): User data directory output_dir.
odf_basedir (str):
opportunities_basedir (str):
roi_basedir (str):
mission_scenarios (list): List of mission scenarios.
opportunities (list): List of opportunities.
opportunity_definitions (list): List of opportunity definitions (ODF).
observation_types (list): List of observation types.
valid (bool): Validity flag.
"""
def __init__(self, app_data_dir=Config().app_data_dir, user_data_dir=Config().user_data_dir):
"""Constructor method.
Args:
app_data_dir (str, optional): Set application data directory output_dir.
user_data_dir (str, optional): Set user data directory output_dir.
"""
# Set app and user data directory paths.
# self.basedir -> self.app_data_dir
if app_data_dir:
self.app_data_dir = app_data_dir
if user_data_dir:
self.user_data_dir = user_data_dir
# Check that both app and user data directories exist
# TODO: throw exceptions instead of the "self.valid = False; return" approach
if not path.exists(self.app_data_dir):
print('Input application data directory does not exist: {}'.format(path.abspath(self.app_data_dir)))
self.valid = False
return
if not path.exists(self.user_data_dir):
print('Input user data directory does not exist: {}'.format(path.abspath(self.user_data_dir)))
# TODO: create user data directory ?
self.valid = False
return
# Set scenarios base directory output_dir
if path.exists(path.join(self.user_data_dir, 'scenarios')):
self.scenarios_basedir = path.join(self.user_data_dir, 'scenarios')
elif path.exists(path.join(self.app_data_dir, 'scenarios')):
self.scenarios_basedir = path.join(self.app_data_dir, 'scenarios')
# self.odf_basedir = '' # self.basedir + '/odf'
# Set output opportunity data base directory
self.opportunities_basedir = path.join(self.user_data_dir, 'opportunities')
# Set ROI base directory output_dir
if path.exists(self.user_data_dir + '/rois'):
self.roi_basedir = self.user_data_dir + '/rois'
elif path.exists(self.app_data_dir+'/rois'):
self.roi_basedir = self.app_data_dir + '/rois'
self.mission_scenarios = [] # dicts
self.opportunities = [] # dicts
self.opportunity_definitions = [] # dicts
self.observation_types = [] # dicts
self.valid = True
# Read mission scenarios definitions (dicts)
#
# for mission_scenario_dict in dict['mission_scenarios']:
# mission_scenario = MissionScenario(mission_scenario_dict, self.scenarios_basedir)
# if mission_scenario.isValid():
# self.mission_scenarios.append(mission_scenario)
mission_scenarios_index_file_path = path.join(self.scenarios_basedir, MISSION_SCENARIOS_INDEX_FILE)
with open(mission_scenarios_index_file_path) as f:
json_dict = json.load(f)
if 'mission_scenarios' in json_dict:
self.mission_scenarios = json_dict['mission_scenarios']
# TODO: check that each scenario definition has "id" key
else:
raise Exception(f'Invalid JSON Mission Scenarios Index file: {mission_scenarios_index_file_path}')
# Check and index observation types
obstypes_index_path = ''
user_obstypes_index_path = path.join(self.user_data_dir, 'odf', OBSERVATION_TYPES_INDEX_FILE)
app_obstypes_index_path = path.join(self.app_data_dir, 'odf', OBSERVATION_TYPES_INDEX_FILE)
if path.exists(user_obstypes_index_path):
obstypes_index_path = user_obstypes_index_path
# print(f'[WARNING] Using `user` ODF data store observation types index file: {obstypes_index_path}.')
elif path.exists(app_obstypes_index_path):
obstypes_index_path = app_obstypes_index_path
# print('Using `application` ODF data store observation types index file.')
else:
print(f'[WARNING] Missing {OBSERVATION_TYPES_INDEX_FILE} file in data store.')
if obstypes_index_path:
with open(obstypes_index_path) as f:
dict = json.load(f)
self.observation_types = dict['observation_types']
# Index valid ODF files available in both the user and app data directories.
# print('Scanning User ODF directory...')
files = glob.glob(self.user_data_dir + '/odf/**/*.json', recursive=True)
user_odf_files = []
user_odf_relpaths = []
for file in files:
if 'deprecated' not in file:
if path.basename(file) != OBSERVATION_TYPES_INDEX_FILE: # exclude observation types index file
user_odf_files.append(file)
user_odf_relpaths.append(path.relpath(file, self.user_data_dir + '/odf'))
# print('Scanning App ODF directory...')
files = glob.glob(self.app_data_dir + '/odf/**/*.json', recursive=True)
app_odf_files = []
app_odf_relpaths = []
for file in files:
if 'deprecated' not in file:
if path.basename(file) != OBSERVATION_TYPES_INDEX_FILE: # exclude observation types index file
app_odf_files.append(file)
app_odf_relpaths.append(path.relpath(file, self.app_data_dir + '/odf'))
# User ODF files have precedence over app ODF files
odf_files = user_odf_files
odf_relpaths = user_odf_relpaths
for app_odf_relpath, app_odf_file in zip(app_odf_relpaths, app_odf_files):
if app_odf_relpath != OBSERVATION_TYPES_INDEX_FILE: # exclude observation types index file
if app_odf_relpath not in user_odf_relpaths:
odf_relpaths.append(app_odf_relpath)
odf_files.append(app_odf_file)
# First pass to index ODF file output_dir, without checking their validity. This is required in order for "included"
# ODF files to be indexed prior to instantiating OpportunityDefinition objects.
opportunity_definitions_dicts = []
for odf_file, odf_relpath in zip(odf_files, odf_relpaths):
opportunity_definition_dict = {
'odf_path': odf_relpath,
'odf_file': odf_file
}
opportunity_definitions_dicts.append(opportunity_definition_dict)
self.opportunity_definitions = opportunity_definitions_dicts
# Second pass, checking the validity of ODF files.
opportunity_definitions_dicts = []
for odf_file, odf_relpath in zip(odf_files, odf_relpaths):
# print(f'> {odf_file}')
opportunity_definition = OpportunityDefinition(odf_file, datastore=self)
if opportunity_definition.valid:
opportunity_definition_dict = {
'odf_path': odf_relpath,
'odf_file': odf_file,
'observation_type': opportunity_definition.observation_type,
'target': opportunity_definition.target,
'observer': opportunity_definition.observer,
'detector': opportunity_definition.detector,
'pointing_odf': opportunity_definition.get_pointing_odf(),
'ptr_pointing_type': opportunity_definition.ptr_pointing_type
}
opportunity_definitions_dicts.append(opportunity_definition_dict)
self.opportunity_definitions = opportunity_definitions_dicts
# Check and load opportunities (dicts)
# Opportunities are indexed in memory when a DataStore object is initiated.
#
# only scan <mission_scenario_id> directories that are defined in the mission scenario index file.
# TODO: for perforance reason, make loadable only once, when needed for the first time (eg: call to *Opportunity* methods)
for mission_scenario in self.mission_scenarios:
# look for opportunity.json files
pattern = self.opportunities_basedir+'/'+mission_scenario['id']+'/*/opportunity.json'
opportunity_json_files = glob.glob(pattern)
for opportunity_json_file in opportunity_json_files:
opportunity_dict = OpportunityFile(filepath=opportunity_json_file).read()
if opportunity_dict['id']:
self.opportunities.append(opportunity_dict)
def __repr__(self):
return (
f'<{self.__class__.__name__}> '
f'Application data directory: {self.app_data_dir} | '
f'User data directory: {self.user_data_dir}\n'
f'- Nb of mission scenarios: {len(self.mission_scenarios)}\n'
f'- Nb of observation types: {len(self.observation_types)}\n'
f'- Nb of opportunity definitions (ODF): {len(self.opportunity_definitions)}\n'
f'- Nb of opportunities: {len(self.opportunities)}'
)
[docs] def get_observation_types(self):
return self.observation_types
[docs] def get_definitions(self, obs_type=None, target=None):
opportunity_definition_dicts = []
for opportunity_definition_dict in self.opportunity_definitions:
if obs_type:
if obs_type in opportunity_definition_dict['observation_type']:
opportunity_definition_dicts.append(opportunity_definition_dict)
else:
opportunity_definition_dicts.append(opportunity_definition_dict)
return opportunity_definition_dicts
# def getODFs(self):
# odf_files = []
# for opportunity_definition_dict in self.opportunity_definitions:
# odf_files.append(opportunity_definition_dict['odf_file'])
# return odf_files
[docs] def get_ODF_filename(self, odf_path): # returns full output_dir
""" Returns ODF file full output_dir name from relative output_dir.
Args:
odf_path: Input ODF file output_dir relative to ODF base directory.
Returns:
"""
for opportunity_definition_dict in self.opportunity_definitions:
if odf_path == opportunity_definition_dict['odf_path']:
return opportunity_definition_dict['odf_file']
# odf_filename = ''
# odf_filenames = glob.glob(self.user_data_dir + '/odf/*' + odf_rel_filename)
# if not odf_filenames:
# odf_filenames = glob.glob(self.app_data_dir + '/odf/*' + odf_rel_filename)
#
# if len(odf_filenames) > 0:
# odf_filename = odf_filenames[0]
# return odf_filename
# MISSION SCENARIO METHODS
[docs] def getMissionScenarioIDs(self):
mission_scenario_ids = []
for mission_scenario in self.mission_scenarios:
mission_scenario_ids.append(mission_scenario['id'])
return mission_scenario_ids
[docs] def getMissionScenario(self, id):
for mission_scenario_dict in self.mission_scenarios:
if mission_scenario_dict['id'] == id:
mission_scenario = MissionScenario(mission_scenario_dict, self.scenarios_basedir)
return mission_scenario
return None
# OPPORTUNITY METHODS
[docs] def opportunityExists(self, opportunity, suffix=None):
opportunity_id = self.assignOpportunityID(opportunity, suffix=suffix)
opportunity_dict = self.getOpportunityDict(opportunity_id)
exist = not(opportunity_dict == None)
return exist, opportunity_id
[docs] def getOpportunityIDs(self): #, mission_scenario_id=None, event_id=None, target=None): optional
opportunity_ids = []
for opportunity_dict in self.opportunities:
opportunity_ids.append(opportunity_dict['id'])
return opportunity_ids
[docs] def getOpportunityDict(self, opportunity_id):
for opportunity_dict in self.opportunities:
if opportunity_dict['id'] == opportunity_id:
return opportunity_dict
return None
[docs] def assignOpportunityID(self, opportunity, suffix=None):
mission_scenario = opportunity.mission_scenario
opportunity_definition = opportunity.opportunity_definition
event = opportunity.time_inputs.get_event()
mission_scenario_id = mission_scenario.id
target_ids = {
'jupiter' : 'JUP',
'ganymede': 'GAN',
'callisto': 'CAL',
'europa' : 'EUR',
'io' : 'IO'}
target_name = opportunity_definition.get_target().lower()
if target_name in target_ids.keys():
target_id = target_ids[target_name]
else:
target_id = target_name[0:3].upper()
odf_id = path.splitext(path.basename(opportunity_definition.odf_file))[0].upper()
# event = opportunity.time_inputs.get_event()
start_time = event.get_start_time(format='YYYYMMDD')
stop_time = event.get_stop_time(format='YYYYMMDD')
opportunity_id = '{}_{}_{}_{}_{}'.format(mission_scenario_id, target_id, odf_id, start_time, stop_time)
if suffix:
opportunity_id += '_'+suffix
return opportunity_id.lower()
[docs] def assignOpportunityPath(self, opportunity, suffix=None):
mission_scenario = opportunity.mission_scenario
opportunity_definition = opportunity.opportunity_definition
event = opportunity.time_inputs.get_event()
mission_scenario_id = mission_scenario.id
opportunity_id = self.assignOpportunityID(opportunity, suffix=suffix)
opportunity_path = '{}/{}/{}'.format(self.opportunities_basedir,mission_scenario_id,opportunity_id)
return opportunity_path
[docs] def getOpportunityPath(self, opportunity_id):
opportunity_dict = self.getOpportunityDict(opportunity_id)
mission_scenario_id = opportunity_dict['mission_scenario']['id']
opportunity_path = '{}/{}/{}'.format(self.opportunities_basedir,mission_scenario_id,opportunity_id)
return opportunity_path
[docs] def getAddendumKernels(self, opportunity_id):
ck_kernels = glob.glob(self.getOpportunityPath(opportunity_id) + '/kernels/*.bc')
return ck_kernels
[docs] def writeOpportunity(self, opportunity, overwrite=False, suffix=None):
# Exit if opportunity exists already ?
exist, opportunity_id = self.opportunityExists(opportunity, suffix=suffix)
if exist and not overwrite:
print('WARNING: Cannot write into data store.')
print('Opportunity exists already: ' + opportunity_id)
return
if not opportunity.id:
#print('Setting Opportunity ID')
opportunity.setID(self.assignOpportunityID(opportunity, suffix=suffix))
mission_scenario = opportunity.mission_scenario
opportunity_definition = opportunity.opportunity_definition
event = opportunity.time_inputs.get_event()
opportunity_path = self.assignOpportunityPath(opportunity, suffix=suffix)
sequence_file_path = opportunity_path+'/sequence.json'
makedirs(path.dirname(sequence_file_path), exist_ok=True)
# remove opportunity data files
for f in glob.glob(opportunity_path+'/*.*'):
remove(f)
# write opportunity file
opportunity_file_path = opportunity_path+'/opportunity.json'
OpportunityFile(filepath=opportunity_file_path).write(opportunity)
# write used ODF file
used_odf_path = path.join(opportunity_path, path.basename(opportunity.opportunity_definition.odf_file))
with open(used_odf_path, 'w') as f:
json.dump(opportunity.opportunity_definition.getDict(), f, indent=2)
# write opportunity Sequence GeoEvt file
GeoEvtFile(geoevt_file=sequence_file_path).write(opportunity.sequence)
for i, observation in enumerate(opportunity.sequence.sub_events):
observation_file_path = '{}/observation_{:04}.json'.format(opportunity_path, i+1)
# geojson_path = '{}/observation_{:04}.geojson'.format(opportunity_path, i+1)
# write opportunity Observation GeoEvt file
GeoEvtFile(geoevt_file=observation_file_path).write(observation)
# # Write observation GeoJSON file
# with open(geojson_path, 'w') as f:
# feature_collection = opportunity.getMeasurementsGeoJSON(
# 'Detector_FOV_Footprint', observation_id=i, split=True, surface_only=True)
# geojson.dump(feature_collection, f)
# Convert geojson to KML for visualisation in GoogleEarth
#kml_path = '{}/observation_{:04}.kml'.format(opportunity_path,i+1)
#ogr2ogr -f KML observation_0001.kml observation_0001.geojson
return opportunity_id, opportunity_path
[docs] def load_opportunity(self, opportunity_id):
# get opportunity path
opportunity_path = self.getOpportunityPath(opportunity_id)
# Get opportunity time_inputs_dict for input ID
opportunity_dict = self.getOpportunityDict(opportunity_id)
# Load mission scenario
mission_scenario_dict = opportunity_dict['mission_scenario']
mission_scenario = MissionScenario(mission_scenario_dict, self.scenarios_basedir)
if not mission_scenario.kernelsLoaded():
mission_scenario.loadKernels()
# Load opportunity definition
opportunity_definition_dict = opportunity_dict['opportunity_definition']
odf_file = path.join(opportunity_path, path.basename(opportunity_definition_dict['odf_file']))
# odf_file = opportunity_definition_dict['odf_file'] # source ODF file
# load generated ODF file instead
opportunity_definition = OpportunityDefinition(odf_file, datastore=self)
if not opportunity_definition.valid:
print(f'Invalid input <{odf_file}> ODF file.')
return None
# Load time inputs
time_inputs_dict = opportunity_dict['time_inputs']
time_inputs = TimeInputs(time_inputs_dict, mission_scenario=mission_scenario)
# Initiate opportunity
opportunity = Opportunity(mission_scenario, opportunity_definition, time_inputs)
# Load geoevents dicts (sequence + observations)
sequence_file_path = opportunity_path +'/sequence.json'
sequence_dict = GeoEvtFile(geoevt_file=sequence_file_path).read()
geoevents_dict = {'sequence': sequence_dict, 'observations': [] }
for i in range(sequence_dict['n_intervals']):
observation_file_path = '{}/observation_{:04}.json'.format(opportunity_path,i+1)
observation_dict = GeoEvtFile(geoevt_file=observation_file_path).read()
geoevents_dict['observations'].append(observation_dict)
# Load sequence
opportunity.loadSequence(geoevents_dict)
# Set other opportunity attributes
opportunity.searched = opportunity_dict['searched']
opportunity.computed = opportunity_dict['computed']
opportunity.search_step = opportunity_dict['search_step']
opportunity.binning = opportunity_dict['binning']
opportunity.sim_sc_att = opportunity_dict['sim_sc_att']
opportunity.sim_scanner = opportunity_dict['sim_scanner']
# print(opportunity_dict)
if not opportunity.id:
opportunity.setID(opportunity_id)
return opportunity