Source code for gfinder.opportunity

"""Opportunity module."""

from gfinder.event import Event
from gfinder.geometryevent import Sequence
from gfinder.geometry import GeometryFactory, Condition, OBSRVR
from gfinder.config import DEFAULT_N_STEPS  # TODO: this is temporary workaround
from gfinder.exporter import Exporter

import copy
import json
from geojson import FeatureCollection
import warnings

import numpy as np
import spiceypy as spice


[docs]class OpportunityDefinition: """Class representing an opportunity definition. Attributes: odf_file (str): name (str): description (str): observation_type (str): target (str): observer (str): detector (str): geometry_defs_dict (dict): searchable_geometry_defs (list): valid (bool): """ def __init__(self, odf_file, target=None, binning=None, majis_scan_angle=None, sc_slew_angles=None, datastore=None, replace_defvar=True, **kwargs): """Constructor method. Args: odf_file: target: binning: majis_scan_angle: sc_slew_angles: datastore: """ self.odf_file = '' self.name = '' self.description = '' self.observer = OBSRVR self.target = '' self.detector = '' self.observation_type = '' self.ptr_pointing_type = '' self.geometry_defs_dict = {'Sequence': [], 'Observation': [], 'Measurement': []} self.searchable_geometry_defs = [] self.extra_attributes = [] self.included_odf_list = [] self.valid = True self.set(odf_file, target=target, binning=binning, datastore=datastore, **kwargs) # Replace all @variable-type values if replace_defvar: for geoevent_class in self.geometry_defs_dict.keys(): # 'Sequence', 'Observation', or 'Measurement' (GeometryEvent class) geometry_defs = self.geometry_defs_dict[geoevent_class] for i, geometry_def in enumerate(geometry_defs): # print(geometry_def) updated_geometry_def, attr_missing = self.replace_defvar(geometry_def) if attr_missing: self.valid = False else: self.geometry_defs_dict[geoevent_class][i] = updated_geometry_def # print(updated_geometry_def) # print() # Retrieve definitions of geometries that are searchable (which have a 'condition' key). Geometries not defined # as measurement geometries are discard with no warning. Geometry class must inherit from the Quantity class, # be a scalar (not check here). measurement_gdefs = self.geometry_defs_dict['Measurement'] searchable_gdefs = [] for measurement_gdef in measurement_gdefs: if 'condition' in measurement_gdef.keys(): searchable_gdefs.append(measurement_gdef) # return searchable_gdefs self.searchable_geometry_defs = searchable_gdefs # Update opportunity definition from "commanding" inputs # # Overwrite MAJIS_Scan_Angle geometry parameters values and overwrite Simulated_Scan_Frame geometry parameters # accordingly: "scan_rotation_angle": "Motion_Compensation_Angle" -> "MAJIS_Scan_Angle" if majis_scan_angle: majis_scan_angle_params = json.loads(majis_scan_angle) majis_scan_angle_def = dict(name='MAJIS_Scan_Angle', parameters=majis_scan_angle_params) self.update_geometry_def(majis_scan_angle_def) simulated_scan_frame_def = self.get_geometry_definition('Simulated_Scan_Frame') if simulated_scan_frame_def: simulated_scan_frame_def['parameters']['scan_rotation_angle'] = 'MAJIS_Scan_Angle' # "scan_zero" remains unchanged self.update_geometry_def(simulated_scan_frame_def) else: print(f'{self.odf_file} ODF file does not contain `Simulated_Scan_Frame` geometry definition' f', and therefore `majis_scan_angle` option cannot be used.') self.valid = False # Overwrite SC_Slew_Angle parameters and force offset rotations to be given by this geometry. if sc_slew_angles: sc_slew_angles_params = json.loads(sc_slew_angles) sc_slew_angles_def = dict(name='SC_Slew_Angles', parameters=sc_slew_angles_params) self.update_geometry_def(sc_slew_angles_def) simulated_sc_frame_def = self.get_geometry_definition('Simulated_SC_Frame') if simulated_sc_frame_def: simulated_sc_frame_def['parameters']['offset_rotations'] = 'SC_Slew_Angles' # making sure offset-rotation attribute is not empty "". self.update_geometry_def(simulated_sc_frame_def) else: print(f'{self.odf_file} ODF file does not contain `Simulated_SC_Frame` geometry definition' f', and therefore `sc_slew_angles` option cannot be used.') self.valid = False def __repr__(self): return ( f'<{self.__class__.__name__}> ' f'ODF file full path: {self.odf_file} | ' f'Name: {self.name} | ' f'Observation type: {self.observation_type} | ' f'PTR pointing type: {self.ptr_pointing_type} | ' f'Target: {self.target} | ' f'Observer: {self.observer} | ' f'Detector: {self.detector} | ' f'Included ODF files: {self.included_odf_list}\n' f'- Nb of geometrical conditions: {len(self.getGeometryDefinitions("Measurement", searchable=True))}\n' f'- Nb of measurement geometries: {len(self.getGeometryDefinitions("Measurement"))}\n' f'- Nb of observation geometries: {len(self.getGeometryDefinitions("Observation"))}\n' f'- Nb of sequence geometries: {len(self.getGeometryDefinitions("Sequence"))}' )
[docs] def set(self, odf_file, target=None, binning=None, datastore=None, **kwargs): # Read input ODF JSON file # print('>>', odf_file) try: with open(odf_file, 'r') as fp: json_dict = json.load(fp) except Exception as e: print(f'Input {odf_file} ODF JSON formatting issue.') print(e) self.valid = False return self.odf_file = odf_file if 'name' in json_dict.keys(): self.name = json_dict['name'] else: print(f'Missing "name" attribute in {self.odf_file} ODF file') self.valid = False return if 'description' in json_dict.keys(): # optional self.description = json_dict['description'] if 'target' in json_dict.keys(): self.target = json_dict['target'] if target: self.target = target else: print(f'Missing "target" attribute in {self.odf_file} ODF file.') self.valid = False return if 'detector' in json_dict.keys(): self.detector = json_dict['detector'] if binning: # MAJIS-specific "friendly-binning" self.detector = self.majis_detector_name(self.detector, binning) else: print(f'Missing "detector" attribute in {self.odf_file} ODF file.') self.valid = False return if 'observation_type' in json_dict.keys(): # optional self.observation_type = json_dict['observation_type'] if 'ptr_pointing_type' in json_dict.keys(): # optional self.ptr_pointing_type = json_dict['ptr_pointing_type'] # Set extra ODF attributes (keys) std_attributes = [ 'name', 'author', 'status', 'description', 'observation_type', 'ptr_pointing_type', 'target', 'observer', 'detector', 'measurement_geometries', 'observation_geometries', 'sequence_geometries' ] self.extra_attributes = [] for key in json_dict.keys(): if key not in std_attributes: self.extra_attributes.append(key) if key in kwargs: # if ODF attribute matches one of the provided optional keyword arguments setattr(self, key, kwargs[key]) # use optional keyword value (eg: latitude=-40) # print(f'{key}={kwargs[key]}') else: setattr(self, key, json_dict[key]) # use ODF attribute value (eg: "latitude": 0) # print(f'{key}={json_dict[key]}') # Validate that this ODF conforms to observation type master ODF # if not self.validate(self.observation_type): # print(f'Input ODF file does not conform to <{self.observation_type}> observation type.') # self.observation_type = '' # self.valid = False # might not be needed if .validate() function sets self.valid to False. # return for geoevt_class in self.geometry_defs_dict.keys(): # 'Sequence', 'Observation', or 'Measurement' (GeometryEvent class) geometries_key = geoevt_class.lower() + '_geometries' # 'sequence_geometries', 'observation_geometries', or 'measurement_geometries' if geometries_key in json_dict.keys(): odf_geometries_defs = json_dict[geometries_key] # list of geometry definitions in ODF for a given GeometryEvent class for odf_geometry_def in odf_geometries_defs: odf_geometry_def, valid = self.check_geometry_def(odf_geometry_def, datastore=datastore, geoevt_class=geoevt_class) if valid: if isinstance(odf_geometry_def, dict): # only one geometry definition self.geometry_defs_dict[geoevt_class].append(odf_geometry_def) elif isinstance(odf_geometry_def, list): # one or several geometry definitions from "include" ODF for this_odf_geometry_def in odf_geometry_def: self.geometry_defs_dict[geoevt_class].append(this_odf_geometry_def) else: print('Invalid geometry definition in {} ODF file: {}'.format(self.odf_file, odf_geometry_def)) else: print('WARNING: Missing {} definition in {}'.format(geometries_key, self.odf_file))
[docs] def replace_defvar(self, geometry_def_dict): # Create a copy that will hold updated @-type values in input dictionary. update_geometry_def_dict = copy.copy(geometry_def_dict) attr_missing = False # Every attribute of String-type is checked for '@' character; if found, variable token is replaced by default # ODF attribute. If no associated default ODF attribute, then warning is reported and OpportuniryDefinition # object is not valid. for key in geometry_def_dict.keys(): if isinstance(geometry_def_dict[key], str): if geometry_def_dict[key] != '': # ignore empty values (eg: "offset_rotations": "") if geometry_def_dict[key][0] == '@': # check that ODF attribute exists attr_name = geometry_def_dict[key][1:] if hasattr(self, attr_name): update_geometry_def_dict[key] = getattr(self, attr_name) else: attr_missing = True warnings.warn(f'Missing `{attr_name}` attribute in {self.odf_file} ODF file') elif isinstance(geometry_def_dict[key], dict): update_geometry_def_dict[key], attr_missing = self.replace_defvar(geometry_def_dict[key]) return update_geometry_def_dict, attr_missing
[docs] def has_conditions(self): if len(self.searchable_geometry_defs) > 0: return True else: return False
[docs] def update_geometry_def(self, geometry_def): for key in self.geometry_defs_dict.keys(): # 'Sequence', 'Observation', or 'Measurement' (GeometryEvent class) for i, odf_geometry_def in enumerate(self.geometry_defs_dict[key]): if odf_geometry_def['name'] == geometry_def['name']: updated_geometry_def, valid = self.check_geometry_def(geometry_def) if valid: self.geometry_defs_dict[key][i] = updated_geometry_def else: print('Invalid geometry definition in {} ODF file: {}'.format(self.odf_file, odf_geometry_def))
[docs] def check_geometry_def(self, in_geometry_def, datastore=None, geoevt_class='Measurement'): # check that input geometry definition contains at least `name` and `parameters` keys if 'name' and 'parameters' not in in_geometry_def.keys(): if 'include' in in_geometry_def.keys(): if datastore: include_odf_file = datastore.get_ODF_filename(in_geometry_def['include']) if include_odf_file is None: print(f'{in_geometry_def["include"]} `include` file in {self.odf_file} does not exist.') self.valid = False return in_geometry_def, False else: include_odf_geometries_defs = OpportunityDefinition(include_odf_file, replace_defvar=False).getGeometryDefinitions(geoevt_class) if in_geometry_def['include'] not in self.included_odf_list: self.included_odf_list.append(in_geometry_def['include']) return include_odf_geometries_defs, True else: # print(f'WARNING: No DataStore object to retrieve "include" ODF file output_dir: `{in_geometry_def["include"]}` (use datastore keyword argument)') warnings.warn(f'No DataStore object to retrieve "include" ODF file path: `{in_geometry_def["include"]}` (use datastore keyword argument)') self.valid = False else: print(f'Missing mandatory `name` and `parameters` keys in {self.odf_file} ODF file.') print(in_geometry_def) self.valid = False return in_geometry_def, False return in_geometry_def, True
[docs] def get_pointing_odf(self): pointing_odf = '' for included_odf in self.included_odf_list: if 'pointing/' in included_odf: pointing_odf = included_odf return pointing_odf
[docs] def majis_detector_name(self, detector_name, binning): """Returns the valid MAJIS SPICE detector name corresponding to input detector name and binning value. """ majis_detector_names = [ 'JUICE_MAJIS', 'JUICE_MAJIS_VISNIR', 'JUICE_MAJIS_VISNIR_B2', 'JUICE_MAJIS_VISNIR_B4', 'JUICE_MAJIS_IR', 'JUICE_MAJIS_IR_B2', 'JUICE_MAJIS_IR_B4', ] majis_binnings = [1, 2, 4] if detector_name not in majis_detector_names: print('Invalid input MAJIS detector name: {}.'.format(detector_name)) return detector_name # set channel ID, either VISNIR or IR detector_name_tokens = detector_name.split('_') if len(detector_name_tokens) == 2: # JUICE_MAJIS case print('Binning not applicable for {} detector.'.format(detector_name)) return detector_name channel_id = detector_name_tokens[2] if binning not in majis_binnings: print('Non-applicable binning value: {}. Allowed values are {}, {} or {}.'.format(binning, *majis_binnings)) return detector_name if binning != 1: return 'JUICE_MAJIS_{}_B{}'.format(channel_id, binning) else: return 'JUICE_MAJIS_{}'.format(channel_id)
[docs] def getDict(self): odf_dict = {} attributes = ['name','author', 'status', 'description', 'observation_type', 'ptr_pointing_type', 'target', 'observer', 'detector'] #, 'latitude', 'max_incidence_angle'] attributes = attributes + self.extra_attributes for attribute in attributes: if hasattr(self,attribute): odf_dict[attribute] = getattr(self,attribute) odf_dict['measurement_geometries'] = self.geometry_defs_dict['Measurement'] odf_dict['observation_geometries'] = self.geometry_defs_dict['Observation'] odf_dict['sequence_geometries'] = self.geometry_defs_dict['Sequence'] return odf_dict
[docs] def to_json(self): print(json.dumps(self.getDict(), sort_keys=False, indent=4))
[docs] def get_name(self): return self.name
[docs] def get_target(self): return self.target
[docs] def getGeometryDefinitions(self, geoevt_class, searchable=False): if geoevt_class in self.geometry_defs_dict.keys(): geoevt_class_gdefs = self.geometry_defs_dict[geoevt_class] if searchable: # return only geometry definition with a condition (searchable). return self.searchable_geometry_defs else: return geoevt_class_gdefs else: print('Invalid input GeometryEvent class. Accepted values are {}, {}, or {}.'.format( *self.geometry_defs_dict.keys())) return []
[docs] def get_geometry_definition(self, geometry_name): for geoevt_class in self.geometry_defs_dict.keys(): geoevt_class_gdefs = self.geometry_defs_dict[geoevt_class] for geometry_def in geoevt_class_gdefs: if geometry_def['name'] == geometry_name: return geometry_def return None
[docs]class TimeInputs: """Class that represents time inputs required for computation (search or simulation). A TimeInputs object is used to initiate an Opportunity object. Attributes: opportunity_id: opportunity identifier event_id: mission event identifier start_time: start time stop_time: stop time time_step: computation time step in seconds n_steps (int): number of computation steps input_event (Event): Event object corresponding to time inputs mission_event (Event): Mission Event object valid (bool): """ def __init__(self, time_inputs_dict, opportunity=None, mission_scenario=None): """Inits TimeInputs object. Args: time_inputs_dict: opportunity: mission_scenario: """ self.opportunity_id = time_inputs_dict['opportunity_id'] self.event_id = time_inputs_dict['event_id'] self.start_time = time_inputs_dict['start_time'] self.stop_time = time_inputs_dict['stop_time'] self.time_step = time_inputs_dict['time_step'] self.n_steps = time_inputs_dict['n_steps'] self.input_event = None self.mission_event = None self.valid = False self.derive_event(opportunity=opportunity, mission_scenario=mission_scenario) def __repr__(self): return ( f'<{self.__class__.__name__}> ' f'Opportunity ID: {self.opportunity_id} | ' f'Event ID: {self.event_id} | ' f'Start time: {self.start_time} | ' f'Stop time: {self.stop_time} | ' f'Nb of steps: {self.n_steps} | ' f'Time step (sec): {self.time_step:.3f}' )
[docs] def get_dict(self): time_inputs_dict = { 'opportunity_id': self.opportunity_id, 'event_id': self.event_id, 'start_time': self.start_time, 'stop_time': self.stop_time, 'n_steps': self.n_steps, 'time_step': self.time_step } return time_inputs_dict
[docs] def derive_event(self, opportunity=None, mission_scenario=None): """Derive Event object associated with time inputs. if opportunity_id is provided, search result time window is used. It has precedence over all other following inputs. if no event_id is provided: - if start_time and stop_time (in UTC) -> used to create event object. - if only start_time -> time_step and n_steps are used to derived stop_time. - if no start_time and stop_time -> kernels coverage start_time and stop_time are used. if event_id is provided: - if no start_time and stop_time -> event_id, start_time and stop_time are used (duration must be different than 0). - if start_time and stop_time -> start_time and stop_time (in relative time) are used with event_id reference time. - if only start_time -> time_step and n_steps are used to derived stop_time. """ if self.opportunity_id: # opportunity should not be None self.input_event = Event(time_window=opportunity.sequence.get_time_window()) if self.input_event.duration != 0.0: self.valid = True if mission_scenario: mission_event, exist = mission_scenario.getMissionEventFile().get_event(self.event_id) if exist: self.mission_event = mission_event return if not self.event_id: if not self.start_time and not self.stop_time: if mission_scenario: cov_event = mission_scenario.getKernelsCoverage() if cov_event: self.input_event = cov_event else: print('Could not retrieve kernels coverage to derive start and stop times.') return else: print('Missing mission_scenario object to retrieve kernels coverage.') return elif self.start_time: # expected in UTC et_start_time = spice.str2et(self.start_time) if not self.stop_time: # time_step and n_steps are required if self.time_step: if self.n_steps: et_stop_time = et_start_time + self.n_steps * self.time_step # print(self.n_steps * self.time_step) # print(et_stop_time-et_start_time) else: print('Missing n_steps to derive start and stop times.') return else: print('Missing time_step to derive start and stop times.') return else: et_stop_time = spice.str2et(self.stop_time) self.input_event = Event(start_time=et_start_time, stop_time=et_stop_time, format='et') self.valid = True else: print('Missing start_time.') return else: # self.event_id if mission_scenario: # mission_event, exist = mission_scenario.getMissionEventFile().get_event(self.event_id) mission_event = mission_scenario.get_event(self.event_id) if mission_event: self.input_event = mission_event self.mission_event = mission_event else: print('Unknown input mission event name: ' + self.event_id) return else: print('Missing mission_scenario object to retrieve mission event.') return if not self.start_time and not self.stop_time: self.input_event = self.mission_event elif self.start_time: # expected to be in time relative to event reference time (eg: '-11:23:00'). et_start_time = mission_event.get_reference_time(format='et') + mission_event.to_seconds(self.start_time) if not self.stop_time: # time_step and n_steps are required if self.time_step: if self.n_steps: et_stop_time = et_start_time + self.n_steps*self.time_step else: print('Missing n_steps to derive start and stop times.') return else: print('Missing time_step to derive start and stop times.') return else: et_stop_time = mission_event.get_reference_time(format='et') + mission_event.to_seconds(self.stop_time) self.input_event = Event(start_time=et_start_time, stop_time=et_stop_time, format='et') else: print('Missing start_time.') return if self.input_event.duration != 0.0: self.valid = True
[docs] def get_time_window(self): """Returns SPICE Time Window corresponding to time inputs. """ return self.input_event.get_time_window()
[docs] def get_event(self): """Returns Event object corresponding to time inputs. """ return self.input_event
[docs]class Opportunity: """Class that represent a calculation of opportunity times and/or geometry. """ def __init__(self, mission_scenario, opportunity_definition, time_inputs, binning=1, sim_sc_att=False, sim_scanner=False, dict=None): self.id = '' self.mission_scenario = mission_scenario self.opportunity_definition = opportunity_definition self.time_inputs = time_inputs self.sequence = None self.searched = False self.computed = False self.search_step = None self.sim_sc_att = sim_sc_att self.sim_scanner = sim_scanner self.binning = binning if time_inputs.valid: # set input timing information used by search() and compute() self.input_window = time_inputs.get_time_window() self.n_steps = time_inputs.n_steps self.time_step = time_inputs.time_step self.valid = True else: print('[WARNING] Invalid time inputs.') self.valid = False if not self.opportunity_definition.valid: self.valid = False # if dict: # print('Loading-opportunity being implemented...') # -> see DataStore.load_opportunity() # return None def __repr__(self): return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def isValid(self): return self.valid
[docs] def setID(self, opportunity_id): self.id = opportunity_id
# Define what goes into an Opportunity JSON object or file
[docs] def get_dict(self): opportunity_dict = { 'id': self.id, 'mission_scenario': self.mission_scenario.get_dict(), 'opportunity_definition': { 'odf_file': self.opportunity_definition.odf_file, 'target': self.opportunity_definition.target, 'observer': self.opportunity_definition.observer }, 'time_inputs': self.time_inputs.get_dict(), 'input_time_intervals': self.time_inputs.get_event().get_interval_times(format='utc'), 'searched': self.searched, 'computed': self.computed, 'n_steps': self.n_steps, 'time_step': self.time_step, 'search_step': self.search_step, 'binning': self.binning, 'sim_sc_att': self.sim_sc_att, 'sim_scanner': self.sim_scanner, 'sequence_file': 'sequence.json', 'n_intervals': self.sequence.n_intervals, 'interval_times': self.sequence.get_interval_times(format='utc') } return opportunity_dict
# Load sequence object from geoevents_dict
[docs] def loadSequence(self, geoevents_dict): seq_geoevt_dict = geoevents_dict['sequence'] sequence = Sequence(geoevt_dict=seq_geoevt_dict) for obs_geoevt_dict in geoevents_dict['observations']: sequence.add_observation(obs_geoevt_dict) self.sequence = sequence
[docs] def search(self): print('Searching for ' + self.opportunity_definition.get_name() + ' opportunities...') # Get the searchable geometry objects. searchable_geometry_defs = self.opportunity_definition.getGeometryDefinitions('Measurement', searchable=True) # Set time step. TODO: should be an optional argument (see #92) t_steps = self.n_steps times = [] for i in range(spice.wncard(self.input_window)): t1, t2 = spice.wnfetd(self.input_window, i) times.append(t1) times.append(t2) et1 = min(times) et2 = max(times) step = (et2-et1)/(t_steps-1) self.search_step = step print(f'Number of steps = {self.n_steps}') print(f'Search time step = {step:.3f} sec.') # Search engine is here! search_window = self.input_window opportunity_window = self.input_window # init opportunity_window for geometry_def in searchable_geometry_defs: # print(geometry_def) geometry = GeometryFactory().create(geometry_def, geoevt=self.sequence) if spice.wncard(opportunity_window) > 0: opportunity_window = geometry.search(Condition(geometry_def['condition']), search_window, step) search_window = opportunity_window # print(spice.wncard(opportunity_window)) # Create sequence and sub-events # - opportunity_window is used to create Observation object. # - n_steps and time_step are used to create Measurement objects. self.sequence = Sequence(opportunity_definition=self.opportunity_definition, time_window=opportunity_window, n_steps=DEFAULT_N_STEPS) # Set searched status to true so as to distinguish from simple computation self.searched = True
[docs] def compute(self, searchable_only=False): if self.sequence: self.sequence.compute(searchable_only=searchable_only) else: # print('> Init Sequence object.') # print('>', self.n_steps, self.time_step) if self.sim_sc_att: if not self.opportunity_definition.get_geometry_definition('Simulated_SC_Frame'): print(f'Simulated_SC_Frame cannot be simulated because it is not defined in {self.opportunity_definition.odf_file} ODF file.') return if self.sim_scanner: if not self.opportunity_definition.get_geometry_definition('Simulated_Scan_Frame'): print(f'Simulated_Scan_Frame cannot be simulated because it is not defined in {self.opportunity_definition.odf_file} ODF file.') return self.sequence = Sequence(opportunity_definition=self.opportunity_definition, time_window=self.input_window, n_steps=self.n_steps, time_step=self.time_step) self.sequence.compute(searchable_only=searchable_only) self.computed = True
[docs] def get_opportunities(self): return [self]
[docs] def get_observation(self): return self.sequence.sub_events[0]
[docs] def get_observations(self): return self.sequence.sub_events
[docs] def get_observation_geometry(self, name, obs_idx=0): return self.sequence.sub_events[obs_idx].get_geometry(name)
[docs] def getMeasurementsGeoJSON(self, geometry_name, observation_id=1, surface_only=False, split=False): observation = self.sequence.sub_events[observation_id-1] feature_collection = [] for i, measurement in enumerate(observation.sub_events): et_start_time = observation.get_start_time(format='et') if measurement.has_geometry(geometry_name): feature = measurement.get_geometry(geometry_name).getGeoJSON(surface_only=surface_only, split=split) feature.properties = { 'id': str(i+1), 'utc_time': measurement.get_reference_time(format='utc'), 'reference_time': measurement.get_reference_time(format='rel', mission_event=self.getMissionEvent()), 'time': measurement.get_reference_time(format='et') - et_start_time } feature_collection.append(feature) feature_collection = FeatureCollection(feature_collection) return feature_collection
[docs] def getGeometryNames(self, geoevt_class_name): geometry_names = [] if geoevt_class_name == 'sequence': geometry_names = self.sequence.get_geometry_names() elif geoevt_class_name == 'observation': geometry_names = self.sequence.sub_events[0].get_geometry_names() elif geoevt_class_name == 'measurement': geometry_names = self.sequence.sub_events[0].sub_events[0].get_geometry_names() else: print('Invalid <{}> geometry class name. Allowed values are: sequence, observation, or measurement.'.format(geoevt_class_name)) return geometry_names
[docs] def getMissionEvent(self): return self.time_inputs.mission_event
[docs] def get_no_comp_times(self, t_int, binning=None, margin=None, observation_id=1, format='rel'): """ Returns times at which no scanner motion compensation is required. Args: t_int: binning: margin: observation_id: format: Returns: """ # Get Observation object observation = self.get_observations()[observation_id - 1] if not margin: margin = 0.1 # Attempt to retrieve margin parameters defined in No_Compensation_Required geometry_def = self.opportunity_definition.get_geometry_definition('No_Compensation_Required') if geometry_def: margin = geometry_def['parameters']['margin'] # Retrieve measurements times times = observation.get_measurements_times(format=format, mission_event=self.getMissionEvent()) # Retrieve dwell times if available, and scale up/down if input binning different than opportunity binning. dwell_times = observation.get_measurements_geometry_data('Dwell_Time') if not dwell_times[0]: print('No Dwell_Time geometry data available to compute "no-compensation" times.') return None dwell_times = np.array(dwell_times) if binning: dwell_times = dwell_times * (binning/self.binning) indexes = np.where((dwell_times > t_int * (1 - margin)) & (dwell_times < t_int * (1 + margin)))[0] time_windows = [] if indexes.size > 0: # handle cases where there can be up to two time windows i_cuts = np.where((indexes[1:] - indexes[0:-1]) > 1)[0] n_windows = i_cuts.size + 1 if n_windows == 1: time_windows = [[times[indexes[0]], times[indexes[-1]]]] elif n_windows == 2: i_cut = i_cuts[0] time_windows = [[times[indexes[0]], times[indexes[i_cut]]], [times[indexes[i_cut + 1]], times[indexes[-1]]]] return time_windows
[docs] def summary(self, depth=2): print('POINTING SIMULATOR') print() print('SC pointing simulator:', 'ON' if self.sim_sc_att else 'OFF') if self.sim_sc_att: simulated_sc_frame_def = self.opportunity_definition.get_geometry_definition('Simulated_SC_Frame') if simulated_sc_frame_def: offset_rotations_geometry_name = simulated_sc_frame_def['parameters']['offset_rotations'] offset_rotations_def = self.opportunity_definition.get_geometry_definition(offset_rotations_geometry_name) simulated_sc_frame_def['parameters']['offset_rotations'] = offset_rotations_def print(json.dumps(simulated_sc_frame_def['parameters'], indent=2)) else: print('[WARNING] Undefined Simulated_Scan_Frame geometry in ODF file.') print() print('MAJIS scanner pointing simulator:', 'ON' if self.sim_scanner else 'OFF') if self.sim_scanner: simulated_scan_frame_def = self.opportunity_definition.get_geometry_definition('Simulated_Scan_Frame') if simulated_scan_frame_def: scan_rotation_angle_geometry_name = simulated_scan_frame_def['parameters']['scan_rotation_angle'] scan_rotation_angle_def = self.opportunity_definition.get_geometry_definition(scan_rotation_angle_geometry_name) simulated_scan_frame_def['parameters']['scan_rotation_angle'] = scan_rotation_angle_def print(json.dumps(simulated_scan_frame_def['parameters'], indent=2)) else: print('[WARNING] Undefined Simulated_Scan_Frame geometry in ODF file.') print() print('OPPORTUNITY DEFINITION') print() print(' {}'.format(self.opportunity_definition.name)) geometry_definitions = self.opportunity_definition.getGeometryDefinitions('Measurement', searchable=True) for geometry_definition in geometry_definitions: name = geometry_definition['name'] rel_condition = geometry_definition['condition']['relationalCondition'] ref_value = geometry_definition['condition']['referenceValue'] print(' {} {} {}'.format(name, rel_condition, ref_value)) print() print('INPUT TIMES') print() mission_event = self.getMissionEvent() if mission_event: mission_event_ref_time = mission_event.get_reference_time(format='utc') event_id = self.time_inputs.event_id else: mission_event_ref_time = self.time_inputs.get_event().get_start_time(format='utc') event_id = 'NONE' utc_input_time_intervals = self.time_inputs.get_event().get_interval_times(format='utc', mission_event=mission_event) rel_input_time_intervals = self.time_inputs.get_event().get_interval_times(format='rel', mission_event=mission_event) n_input_time_intervals = len(utc_input_time_intervals) if n_input_time_intervals > 1: print(' Number of Intervals : {}'.format(n_input_time_intervals)) print() for utc_input_time_interval, rel_input_time_interval in zip(utc_input_time_intervals, rel_input_time_intervals): print(' Start Time (UTC) : {} ({})'.format(utc_input_time_interval[0], rel_input_time_interval[0])) print(' Stop Time (UTC) : {} ({})'.format(utc_input_time_interval[1], rel_input_time_interval[1])) print() if self.searched: print(' Search Time Step : {:.3f}'.format(self.search_step)) if self.computed: print(' Number of Steps : {}'.format(self.n_steps)) if self.time_step: print(' Time Step : {:.3f}'.format(self.time_step)) print(' Reference Time (UTC) : {} ({})'.format(mission_event_ref_time, event_id)) print() print('SEARCH RESULT') print() if self.searched: print(' Number of opportunity windows : {}'.format(self.sequence.n_intervals)) print() observations = self.sequence.get_observations() for observation in observations: utc_start_time = observation.get_start_time(format='utc') utc_stop_time = observation.get_stop_time(format='utc') rel_start_time = observation.get_start_time(format='rel', mission_event=mission_event) rel_stop_time = observation.get_stop_time(format='rel', mission_event=mission_event) duration_str = observation.get_duration(format='str') print(' Start Time (UTC) : {} ({})'.format(utc_start_time, rel_start_time)) print(' Stop Time (UTC) : {} ({})'.format(utc_stop_time, rel_stop_time)) print(' Duration : {}'.format(duration_str)) print() else: print(' No search.') print() if self.computed: print('COMPUTATION RESULT') print() n_input_time_intervals = len(utc_input_time_intervals) if n_input_time_intervals > 1: print(' Number of opportunity windows : {}'.format(self.sequence.n_intervals)) print() observations = self.sequence.get_observations() for i, observation in enumerate(observations): utc_start_time = observation.get_start_time(format='utc') rel_start_time = observation.get_start_time(format='rel', mission_event=mission_event) duration_str = observation.get_duration(format='str') duration = observation.get_duration() print(' #{:04} {} ({}) {} ({:.3f} s)'.format(observation.index+1, utc_start_time, rel_start_time, duration_str, duration)) for geometry_name in observation.get_geometry_names(): geometry = observation.get_geometry(geometry_name) if isinstance(geometry.data, list): print() print(' {}:'.format(geometry_name)) for value, prefix, unit in zip(geometry.data, geometry.prefix, geometry.units): print(' {} = {} {}'.format(prefix, value, unit)) else: print(' {} = {} {}'.format(geometry_name, geometry.data, geometry.units)) print() if depth == 3: # display measurements data measurements = observation.get_measurements() measurements_header = measurements[0].get_csv_header() print(f'# {measurements_header}\n') event_basename = self.opportunity_definition.observation_type for j, measurement in enumerate(measurements): event_name = f'{event_basename}_{i + 1:03}_{j:04}' line = measurement.get_csv_line(event_name=event_name, subgroup='', group='WG2') print(line) # print('\n') print()
[docs] def cross_validate(self, ck_file): """Cross validate MOS and AGM simulated pointings. Used by PTR Exporter class. """ self.mission_scenario.loadKernels() spice.furnsh(str(ck_file)) sim_sc_rotmats = self.sequence.sub_events[0].get_sub_events_geometry_data('Simulated_SC_Frame') times = self.sequence.sub_events[0].get_sub_events_reference_times(format='et') rot_deltas, x_deltas, y_deltas, z_deltas = [], [], [], [] x = [1., 0., 0.] y = [0., 1., 0.] z = [0., 0., 1.] for et, sim_sc_rotmat in zip(times, sim_sc_rotmats): sc_rotmat = spice.pxform('JUICE_SPACECRAFT', 'J2000', et) sim_sc_rotmat = np.array(sim_sc_rotmat) x_deltas.append(spice.vsep(spice.mxv(sc_rotmat, x), spice.mxv(sim_sc_rotmat, x)) * spice.dpr()) y_deltas.append(spice.vsep(spice.mxv(sc_rotmat, y), spice.mxv(sim_sc_rotmat, y)) * spice.dpr()) z_deltas.append(spice.vsep(spice.mxv(sc_rotmat, z), spice.mxv(sim_sc_rotmat, z)) * spice.dpr()) # compute rotation angle between the two frames # ref: http://www.boris-belousov.net/2016/12/01/quat-dist/ r = np.dot(sc_rotmat, sim_sc_rotmat.T) theta = (np.trace(r) - 1) / 2 rot_delta = np.arccos(theta) * spice.dpr() rot_deltas.append(rot_delta) x_deltas = np.array(x_deltas) y_deltas = np.array(y_deltas) z_deltas = np.array(z_deltas) rot_deltas = np.array(rot_deltas) spice.unload(str(ck_file)) self.mission_scenario.unloadKernels() # report on discrepancies print(f'{"":<10} {"Frames delta angle (deg)":<25} {"X-axis delta angle (deg)":<25} {"Y-axis delta angle (deg)":<25} {"Z-axis delta angle (deg)":<30}') print(f'{"":<10} {"-" * 25} {"-" * 25} {"-" * 25} {"-" * 25}') print(f'{"average:":>10} {np.average(rot_deltas):<25} {np.average(x_deltas):<25} {np.average(y_deltas):<25} {np.average(z_deltas):<25}') print(f'{"median:":>10} {np.median(rot_deltas):<25} {np.median(x_deltas):<25} {np.median(y_deltas):<25} {np.mean(z_deltas):<25}') print(f'{"std:":>10} {np.std(rot_deltas):<25} {np.std(x_deltas):<25} {np.std(y_deltas):<25} {np.std(z_deltas):<25}') discrepancy = (rot_deltas, x_deltas, y_deltas, z_deltas) return discrepancy
[docs] def export(self, format, obs_idx=None, path='', overwrite=False, agm_validation=None): Exporter(format, path=path, overwrite=overwrite).export(self, measurements=obs_idx, agm_validation=agm_validation)
# OpportunityFile #
[docs]class OpportunityFile: def __init__(self, filepath=None): self.opportunity_file_path = filepath if filepath else '' self.valid = True def __repr__(self): return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def write(self, opportunity, file=None): if file is not None: self.opportunity_file_path = file # filepath ?? opportunity_dict = opportunity.get_dict() #print(opportunity_dict) with open(self.opportunity_file_path, 'w') as f: json.dump(opportunity_dict, f, indent=2)
[docs] def read(self, filepath=None): if filepath is not None: self.opportunity_file_path = filepath with open(self.opportunity_file_path) as f: opportunity_dict = json.load(f) # print(opportunity_dict) # print() return opportunity_dict