Source code for gfinder.datastore

"""DataStore module."""

from gfinder.config import Config
from gfinder.scenario import MissionScenario
from gfinder.geometryevent import GeoEvtFile
from gfinder.opportunity import OpportunityDefinition, Opportunity, OpportunityFile, TimeInputs

from os import path, makedirs, remove
import json
import geojson
import glob


MISSION_SCENARIOS_INDEX_FILE = 'mission_scenarios.json'
"""Mission Scenarios index file.
"""

OBSERVATION_TYPES_INDEX_FILE = 'observation_types.json'
"""Observation types index file.
"""

[docs]class DataStore: """ DataStore. Attributes: app_data_dir (str): Application data directory output_dir. user_data_dir (str): User data directory output_dir. odf_basedir (str): opportunities_basedir (str): roi_basedir (str): mission_scenarios (list): List of mission scenarios. opportunities (list): List of opportunities. opportunity_definitions (list): List of opportunity definitions (ODF). observation_types (list): List of observation types. valid (bool): Validity flag. """ def __init__(self, app_data_dir=Config().app_data_dir, user_data_dir=Config().user_data_dir): """Constructor method. Args: app_data_dir (str, optional): Set application data directory output_dir. user_data_dir (str, optional): Set user data directory output_dir. """ # Set app and user data directory paths. # self.basedir -> self.app_data_dir if app_data_dir: self.app_data_dir = app_data_dir if user_data_dir: self.user_data_dir = user_data_dir # Check that both app and user data directories exist # TODO: throw exceptions instead of the "self.valid = False; return" approach if not path.exists(self.app_data_dir): print('Input application data directory does not exist: {}'.format(path.abspath(self.app_data_dir))) self.valid = False return if not path.exists(self.user_data_dir): print('Input user data directory does not exist: {}'.format(path.abspath(self.user_data_dir))) # TODO: create user data directory ? self.valid = False return # Set scenarios base directory output_dir if path.exists(path.join(self.user_data_dir, 'scenarios')): self.scenarios_basedir = path.join(self.user_data_dir, 'scenarios') elif path.exists(path.join(self.app_data_dir, 'scenarios')): self.scenarios_basedir = path.join(self.app_data_dir, 'scenarios') # self.odf_basedir = '' # self.basedir + '/odf' # Set output opportunity data base directory self.opportunities_basedir = path.join(self.user_data_dir, 'opportunities') # Set ROI base directory output_dir if path.exists(self.user_data_dir + '/rois'): self.roi_basedir = self.user_data_dir + '/rois' elif path.exists(self.app_data_dir+'/rois'): self.roi_basedir = self.app_data_dir + '/rois' self.mission_scenarios = [] # dicts self.opportunities = [] # dicts self.opportunity_definitions = [] # dicts self.observation_types = [] # dicts self.valid = True # Read mission scenarios definitions (dicts) # # for mission_scenario_dict in dict['mission_scenarios']: # mission_scenario = MissionScenario(mission_scenario_dict, self.scenarios_basedir) # if mission_scenario.isValid(): # self.mission_scenarios.append(mission_scenario) mission_scenarios_index_file_path = path.join(self.scenarios_basedir, MISSION_SCENARIOS_INDEX_FILE) with open(mission_scenarios_index_file_path) as f: json_dict = json.load(f) if 'mission_scenarios' in json_dict: self.mission_scenarios = json_dict['mission_scenarios'] # TODO: check that each scenario definition has "id" key else: raise Exception(f'Invalid JSON Mission Scenarios Index file: {mission_scenarios_index_file_path}') # Check and index observation types obstypes_index_path = '' user_obstypes_index_path = path.join(self.user_data_dir, 'odf', OBSERVATION_TYPES_INDEX_FILE) app_obstypes_index_path = path.join(self.app_data_dir, 'odf', OBSERVATION_TYPES_INDEX_FILE) if path.exists(user_obstypes_index_path): obstypes_index_path = user_obstypes_index_path # print(f'[WARNING] Using `user` ODF data store observation types index file: {obstypes_index_path}.') elif path.exists(app_obstypes_index_path): obstypes_index_path = app_obstypes_index_path # print('Using `application` ODF data store observation types index file.') else: print(f'[WARNING] Missing {OBSERVATION_TYPES_INDEX_FILE} file in data store.') if obstypes_index_path: with open(obstypes_index_path) as f: dict = json.load(f) self.observation_types = dict['observation_types'] # Index valid ODF files available in both the user and app data directories. # print('Scanning User ODF directory...') files = glob.glob(self.user_data_dir + '/odf/**/*.json', recursive=True) user_odf_files = [] user_odf_relpaths = [] for file in files: if 'deprecated' not in file: if path.basename(file) != OBSERVATION_TYPES_INDEX_FILE: # exclude observation types index file user_odf_files.append(file) user_odf_relpaths.append(path.relpath(file, self.user_data_dir + '/odf')) # print('Scanning App ODF directory...') files = glob.glob(self.app_data_dir + '/odf/**/*.json', recursive=True) app_odf_files = [] app_odf_relpaths = [] for file in files: if 'deprecated' not in file: if path.basename(file) != OBSERVATION_TYPES_INDEX_FILE: # exclude observation types index file app_odf_files.append(file) app_odf_relpaths.append(path.relpath(file, self.app_data_dir + '/odf')) # User ODF files have precedence over app ODF files odf_files = user_odf_files odf_relpaths = user_odf_relpaths for app_odf_relpath, app_odf_file in zip(app_odf_relpaths, app_odf_files): if app_odf_relpath != OBSERVATION_TYPES_INDEX_FILE: # exclude observation types index file if app_odf_relpath not in user_odf_relpaths: odf_relpaths.append(app_odf_relpath) odf_files.append(app_odf_file) # First pass to index ODF file output_dir, without checking their validity. This is required in order for "included" # ODF files to be indexed prior to instantiating OpportunityDefinition objects. opportunity_definitions_dicts = [] for odf_file, odf_relpath in zip(odf_files, odf_relpaths): opportunity_definition_dict = { 'odf_path': odf_relpath, 'odf_file': odf_file } opportunity_definitions_dicts.append(opportunity_definition_dict) self.opportunity_definitions = opportunity_definitions_dicts # Second pass, checking the validity of ODF files. opportunity_definitions_dicts = [] for odf_file, odf_relpath in zip(odf_files, odf_relpaths): # print(f'> {odf_file}') opportunity_definition = OpportunityDefinition(odf_file, datastore=self) if opportunity_definition.valid: opportunity_definition_dict = { 'odf_path': odf_relpath, 'odf_file': odf_file, 'observation_type': opportunity_definition.observation_type, 'target': opportunity_definition.target, 'observer': opportunity_definition.observer, 'detector': opportunity_definition.detector, 'pointing_odf': opportunity_definition.get_pointing_odf(), 'ptr_pointing_type': opportunity_definition.ptr_pointing_type } opportunity_definitions_dicts.append(opportunity_definition_dict) self.opportunity_definitions = opportunity_definitions_dicts # Check and load opportunities (dicts) # Opportunities are indexed in memory when a DataStore object is initiated. # # only scan <mission_scenario_id> directories that are defined in the mission scenario index file. # TODO: for perforance reason, make loadable only once, when needed for the first time (eg: call to *Opportunity* methods) for mission_scenario in self.mission_scenarios: # look for opportunity.json files pattern = self.opportunities_basedir+'/'+mission_scenario['id']+'/*/opportunity.json' opportunity_json_files = glob.glob(pattern) for opportunity_json_file in opportunity_json_files: opportunity_dict = OpportunityFile(filepath=opportunity_json_file).read() if opportunity_dict['id']: self.opportunities.append(opportunity_dict) def __repr__(self): return ( f'<{self.__class__.__name__}> ' f'Application data directory: {self.app_data_dir} | ' f'User data directory: {self.user_data_dir}\n' f'- Nb of mission scenarios: {len(self.mission_scenarios)}\n' f'- Nb of observation types: {len(self.observation_types)}\n' f'- Nb of opportunity definitions (ODF): {len(self.opportunity_definitions)}\n' f'- Nb of opportunities: {len(self.opportunities)}' )
[docs] def get_observation_types(self): return self.observation_types
[docs] def get_definitions(self, obs_type=None, target=None): opportunity_definition_dicts = [] for opportunity_definition_dict in self.opportunity_definitions: if obs_type: if obs_type in opportunity_definition_dict['observation_type']: opportunity_definition_dicts.append(opportunity_definition_dict) else: opportunity_definition_dicts.append(opportunity_definition_dict) return opportunity_definition_dicts
# def getODFs(self): # odf_files = [] # for opportunity_definition_dict in self.opportunity_definitions: # odf_files.append(opportunity_definition_dict['odf_file']) # return odf_files
[docs] def get_ODF_filename(self, odf_path): # returns full output_dir """ Returns ODF file full output_dir name from relative output_dir. Args: odf_path: Input ODF file output_dir relative to ODF base directory. Returns: """ for opportunity_definition_dict in self.opportunity_definitions: if odf_path == opportunity_definition_dict['odf_path']: return opportunity_definition_dict['odf_file']
# odf_filename = '' # odf_filenames = glob.glob(self.user_data_dir + '/odf/*' + odf_rel_filename) # if not odf_filenames: # odf_filenames = glob.glob(self.app_data_dir + '/odf/*' + odf_rel_filename) # # if len(odf_filenames) > 0: # odf_filename = odf_filenames[0] # return odf_filename # MISSION SCENARIO METHODS
[docs] def getMissionScenarioIDs(self): mission_scenario_ids = [] for mission_scenario in self.mission_scenarios: mission_scenario_ids.append(mission_scenario['id']) return mission_scenario_ids
[docs] def getMissionScenario(self, id): for mission_scenario_dict in self.mission_scenarios: if mission_scenario_dict['id'] == id: mission_scenario = MissionScenario(mission_scenario_dict, self.scenarios_basedir) return mission_scenario return None
# OPPORTUNITY METHODS
[docs] def opportunityExists(self, opportunity, suffix=None): opportunity_id = self.assignOpportunityID(opportunity, suffix=suffix) opportunity_dict = self.getOpportunityDict(opportunity_id) exist = not(opportunity_dict == None) return exist, opportunity_id
[docs] def getOpportunityIDs(self): #, mission_scenario_id=None, event_id=None, target=None): optional opportunity_ids = [] for opportunity_dict in self.opportunities: opportunity_ids.append(opportunity_dict['id']) return opportunity_ids
[docs] def getOpportunityDict(self, opportunity_id): for opportunity_dict in self.opportunities: if opportunity_dict['id'] == opportunity_id: return opportunity_dict return None
[docs] def assignOpportunityID(self, opportunity, suffix=None): mission_scenario = opportunity.mission_scenario opportunity_definition = opportunity.opportunity_definition event = opportunity.time_inputs.get_event() mission_scenario_id = mission_scenario.id target_ids = { 'jupiter' : 'JUP', 'ganymede': 'GAN', 'callisto': 'CAL', 'europa' : 'EUR', 'io' : 'IO'} target_name = opportunity_definition.get_target().lower() if target_name in target_ids.keys(): target_id = target_ids[target_name] else: target_id = target_name[0:3].upper() odf_id = path.splitext(path.basename(opportunity_definition.odf_file))[0].upper() # event = opportunity.time_inputs.get_event() start_time = event.get_start_time(format='YYYYMMDD') stop_time = event.get_stop_time(format='YYYYMMDD') opportunity_id = '{}_{}_{}_{}_{}'.format(mission_scenario_id, target_id, odf_id, start_time, stop_time) if suffix: opportunity_id += '_'+suffix return opportunity_id.lower()
[docs] def assignOpportunityPath(self, opportunity, suffix=None): mission_scenario = opportunity.mission_scenario opportunity_definition = opportunity.opportunity_definition event = opportunity.time_inputs.get_event() mission_scenario_id = mission_scenario.id opportunity_id = self.assignOpportunityID(opportunity, suffix=suffix) opportunity_path = '{}/{}/{}'.format(self.opportunities_basedir,mission_scenario_id,opportunity_id) return opportunity_path
[docs] def getOpportunityPath(self, opportunity_id): opportunity_dict = self.getOpportunityDict(opportunity_id) mission_scenario_id = opportunity_dict['mission_scenario']['id'] opportunity_path = '{}/{}/{}'.format(self.opportunities_basedir,mission_scenario_id,opportunity_id) return opportunity_path
[docs] def getAddendumKernels(self, opportunity_id): ck_kernels = glob.glob(self.getOpportunityPath(opportunity_id) + '/kernels/*.bc') return ck_kernels
[docs] def writeOpportunity(self, opportunity, overwrite=False, suffix=None): # Exit if opportunity exists already ? exist, opportunity_id = self.opportunityExists(opportunity, suffix=suffix) if exist and not overwrite: print('WARNING: Cannot write into data store.') print('Opportunity exists already: ' + opportunity_id) return if not opportunity.id: #print('Setting Opportunity ID') opportunity.setID(self.assignOpportunityID(opportunity, suffix=suffix)) mission_scenario = opportunity.mission_scenario opportunity_definition = opportunity.opportunity_definition event = opportunity.time_inputs.get_event() opportunity_path = self.assignOpportunityPath(opportunity, suffix=suffix) sequence_file_path = opportunity_path+'/sequence.json' makedirs(path.dirname(sequence_file_path), exist_ok=True) # remove opportunity data files for f in glob.glob(opportunity_path+'/*.*'): remove(f) # write opportunity file opportunity_file_path = opportunity_path+'/opportunity.json' OpportunityFile(filepath=opportunity_file_path).write(opportunity) # write used ODF file used_odf_path = path.join(opportunity_path, path.basename(opportunity.opportunity_definition.odf_file)) with open(used_odf_path, 'w') as f: json.dump(opportunity.opportunity_definition.getDict(), f, indent=2) # write opportunity Sequence GeoEvt file GeoEvtFile(geoevt_file=sequence_file_path).write(opportunity.sequence) for i, observation in enumerate(opportunity.sequence.sub_events): observation_file_path = '{}/observation_{:04}.json'.format(opportunity_path, i+1) # geojson_path = '{}/observation_{:04}.geojson'.format(opportunity_path, i+1) # write opportunity Observation GeoEvt file GeoEvtFile(geoevt_file=observation_file_path).write(observation) # # Write observation GeoJSON file # with open(geojson_path, 'w') as f: # feature_collection = opportunity.getMeasurementsGeoJSON( # 'Detector_FOV_Footprint', observation_id=i, split=True, surface_only=True) # geojson.dump(feature_collection, f) # Convert geojson to KML for visualisation in GoogleEarth #kml_path = '{}/observation_{:04}.kml'.format(opportunity_path,i+1) #ogr2ogr -f KML observation_0001.kml observation_0001.geojson return opportunity_id, opportunity_path
[docs] def load_opportunity(self, opportunity_id): # get opportunity path opportunity_path = self.getOpportunityPath(opportunity_id) # Get opportunity time_inputs_dict for input ID opportunity_dict = self.getOpportunityDict(opportunity_id) # Load mission scenario mission_scenario_dict = opportunity_dict['mission_scenario'] mission_scenario = MissionScenario(mission_scenario_dict, self.scenarios_basedir) if not mission_scenario.kernelsLoaded(): mission_scenario.loadKernels() # Load opportunity definition opportunity_definition_dict = opportunity_dict['opportunity_definition'] odf_file = path.join(opportunity_path, path.basename(opportunity_definition_dict['odf_file'])) # odf_file = opportunity_definition_dict['odf_file'] # source ODF file # load generated ODF file instead opportunity_definition = OpportunityDefinition(odf_file, datastore=self) if not opportunity_definition.valid: print(f'Invalid input <{odf_file}> ODF file.') return None # Load time inputs time_inputs_dict = opportunity_dict['time_inputs'] time_inputs = TimeInputs(time_inputs_dict, mission_scenario=mission_scenario) # Initiate opportunity opportunity = Opportunity(mission_scenario, opportunity_definition, time_inputs) # Load geoevents dicts (sequence + observations) sequence_file_path = opportunity_path +'/sequence.json' sequence_dict = GeoEvtFile(geoevt_file=sequence_file_path).read() geoevents_dict = {'sequence': sequence_dict, 'observations': [] } for i in range(sequence_dict['n_intervals']): observation_file_path = '{}/observation_{:04}.json'.format(opportunity_path,i+1) observation_dict = GeoEvtFile(geoevt_file=observation_file_path).read() geoevents_dict['observations'].append(observation_dict) # Load sequence opportunity.loadSequence(geoevents_dict) # Set other opportunity attributes opportunity.searched = opportunity_dict['searched'] opportunity.computed = opportunity_dict['computed'] opportunity.search_step = opportunity_dict['search_step'] opportunity.binning = opportunity_dict['binning'] opportunity.sim_sc_att = opportunity_dict['sim_sc_att'] opportunity.sim_scanner = opportunity_dict['sim_scanner'] # print(opportunity_dict) if not opportunity.id: opportunity.setID(opportunity_id) return opportunity