"""Opportunity module."""
from gfinder.event import Event
from gfinder.geometryevent import Sequence
from gfinder.geometry import GeometryFactory, Condition, OBSRVR
from gfinder.config import DEFAULT_N_STEPS # TODO: this is temporary workaround
from gfinder.exporter import Exporter
import copy
import json
from geojson import FeatureCollection
import warnings
import numpy as np
import spiceypy as spice
[docs]class OpportunityDefinition:
"""Class representing an opportunity definition.
Attributes:
odf_file (str):
name (str):
description (str):
observation_type (str):
target (str):
observer (str):
detector (str):
geometry_defs_dict (dict):
searchable_geometry_defs (list):
valid (bool):
"""
def __init__(self, odf_file, target=None, binning=None, majis_scan_angle=None, sc_slew_angles=None, datastore=None, replace_defvar=True, **kwargs):
"""Constructor method.
Args:
odf_file:
target:
binning:
majis_scan_angle:
sc_slew_angles:
datastore:
"""
self.odf_file = ''
self.name = ''
self.description = ''
self.observer = OBSRVR
self.target = ''
self.detector = ''
self.observation_type = ''
self.ptr_pointing_type = ''
self.geometry_defs_dict = {'Sequence': [], 'Observation': [], 'Measurement': []}
self.searchable_geometry_defs = []
self.extra_attributes = []
self.included_odf_list = []
self.valid = True
self.set(odf_file, target=target, binning=binning, datastore=datastore, **kwargs)
# Replace all @variable-type values
if replace_defvar:
for geoevent_class in self.geometry_defs_dict.keys(): # 'Sequence', 'Observation', or 'Measurement' (GeometryEvent class)
geometry_defs = self.geometry_defs_dict[geoevent_class]
for i, geometry_def in enumerate(geometry_defs):
# print(geometry_def)
updated_geometry_def, attr_missing = self.replace_defvar(geometry_def)
if attr_missing:
self.valid = False
else:
self.geometry_defs_dict[geoevent_class][i] = updated_geometry_def
# print(updated_geometry_def)
# print()
# Retrieve definitions of geometries that are searchable (which have a 'condition' key). Geometries not defined
# as measurement geometries are discard with no warning. Geometry class must inherit from the Quantity class,
# be a scalar (not check here).
measurement_gdefs = self.geometry_defs_dict['Measurement']
searchable_gdefs = []
for measurement_gdef in measurement_gdefs:
if 'condition' in measurement_gdef.keys():
searchable_gdefs.append(measurement_gdef)
# return searchable_gdefs
self.searchable_geometry_defs = searchable_gdefs
# Update opportunity definition from "commanding" inputs
#
# Overwrite MAJIS_Scan_Angle geometry parameters values and overwrite Simulated_Scan_Frame geometry parameters
# accordingly: "scan_rotation_angle": "Motion_Compensation_Angle" -> "MAJIS_Scan_Angle"
if majis_scan_angle:
majis_scan_angle_params = json.loads(majis_scan_angle)
majis_scan_angle_def = dict(name='MAJIS_Scan_Angle', parameters=majis_scan_angle_params)
self.update_geometry_def(majis_scan_angle_def)
simulated_scan_frame_def = self.get_geometry_definition('Simulated_Scan_Frame')
if simulated_scan_frame_def:
simulated_scan_frame_def['parameters']['scan_rotation_angle'] = 'MAJIS_Scan_Angle' # "scan_zero" remains unchanged
self.update_geometry_def(simulated_scan_frame_def)
else:
print(f'{self.odf_file} ODF file does not contain `Simulated_Scan_Frame` geometry definition'
f', and therefore `majis_scan_angle` option cannot be used.')
self.valid = False
# Overwrite SC_Slew_Angle parameters and force offset rotations to be given by this geometry.
if sc_slew_angles:
sc_slew_angles_params = json.loads(sc_slew_angles)
sc_slew_angles_def = dict(name='SC_Slew_Angles', parameters=sc_slew_angles_params)
self.update_geometry_def(sc_slew_angles_def)
simulated_sc_frame_def = self.get_geometry_definition('Simulated_SC_Frame')
if simulated_sc_frame_def:
simulated_sc_frame_def['parameters']['offset_rotations'] = 'SC_Slew_Angles' # making sure offset-rotation attribute is not empty "".
self.update_geometry_def(simulated_sc_frame_def)
else:
print(f'{self.odf_file} ODF file does not contain `Simulated_SC_Frame` geometry definition'
f', and therefore `sc_slew_angles` option cannot be used.')
self.valid = False
def __repr__(self):
return (
f'<{self.__class__.__name__}> '
f'ODF file full path: {self.odf_file} | '
f'Name: {self.name} | '
f'Observation type: {self.observation_type} | '
f'PTR pointing type: {self.ptr_pointing_type} | '
f'Target: {self.target} | '
f'Observer: {self.observer} | '
f'Detector: {self.detector} | '
f'Included ODF files: {self.included_odf_list}\n'
f'- Nb of geometrical conditions: {len(self.getGeometryDefinitions("Measurement", searchable=True))}\n'
f'- Nb of measurement geometries: {len(self.getGeometryDefinitions("Measurement"))}\n'
f'- Nb of observation geometries: {len(self.getGeometryDefinitions("Observation"))}\n'
f'- Nb of sequence geometries: {len(self.getGeometryDefinitions("Sequence"))}'
)
[docs] def set(self, odf_file, target=None, binning=None, datastore=None, **kwargs):
# Read input ODF JSON file
# print('>>', odf_file)
try:
with open(odf_file, 'r') as fp:
json_dict = json.load(fp)
except Exception as e:
print(f'Input {odf_file} ODF JSON formatting issue.')
print(e)
self.valid = False
return
self.odf_file = odf_file
if 'name' in json_dict.keys():
self.name = json_dict['name']
else:
print(f'Missing "name" attribute in {self.odf_file} ODF file')
self.valid = False
return
if 'description' in json_dict.keys(): # optional
self.description = json_dict['description']
if 'target' in json_dict.keys():
self.target = json_dict['target']
if target:
self.target = target
else:
print(f'Missing "target" attribute in {self.odf_file} ODF file.')
self.valid = False
return
if 'detector' in json_dict.keys():
self.detector = json_dict['detector']
if binning: # MAJIS-specific "friendly-binning"
self.detector = self.majis_detector_name(self.detector, binning)
else:
print(f'Missing "detector" attribute in {self.odf_file} ODF file.')
self.valid = False
return
if 'observation_type' in json_dict.keys(): # optional
self.observation_type = json_dict['observation_type']
if 'ptr_pointing_type' in json_dict.keys(): # optional
self.ptr_pointing_type = json_dict['ptr_pointing_type']
# Set extra ODF attributes (keys)
std_attributes = [
'name', 'author', 'status', 'description', 'observation_type', 'ptr_pointing_type', 'target', 'observer', 'detector',
'measurement_geometries', 'observation_geometries', 'sequence_geometries'
]
self.extra_attributes = []
for key in json_dict.keys():
if key not in std_attributes:
self.extra_attributes.append(key)
if key in kwargs: # if ODF attribute matches one of the provided optional keyword arguments
setattr(self, key, kwargs[key]) # use optional keyword value (eg: latitude=-40)
# print(f'{key}={kwargs[key]}')
else:
setattr(self, key, json_dict[key]) # use ODF attribute value (eg: "latitude": 0)
# print(f'{key}={json_dict[key]}')
# Validate that this ODF conforms to observation type master ODF
# if not self.validate(self.observation_type):
# print(f'Input ODF file does not conform to <{self.observation_type}> observation type.')
# self.observation_type = ''
# self.valid = False # might not be needed if .validate() function sets self.valid to False.
# return
for geoevt_class in self.geometry_defs_dict.keys(): # 'Sequence', 'Observation', or 'Measurement' (GeometryEvent class)
geometries_key = geoevt_class.lower() + '_geometries' # 'sequence_geometries', 'observation_geometries', or 'measurement_geometries'
if geometries_key in json_dict.keys():
odf_geometries_defs = json_dict[geometries_key] # list of geometry definitions in ODF for a given GeometryEvent class
for odf_geometry_def in odf_geometries_defs:
odf_geometry_def, valid = self.check_geometry_def(odf_geometry_def, datastore=datastore, geoevt_class=geoevt_class)
if valid:
if isinstance(odf_geometry_def, dict): # only one geometry definition
self.geometry_defs_dict[geoevt_class].append(odf_geometry_def)
elif isinstance(odf_geometry_def, list): # one or several geometry definitions from "include" ODF
for this_odf_geometry_def in odf_geometry_def:
self.geometry_defs_dict[geoevt_class].append(this_odf_geometry_def)
else:
print('Invalid geometry definition in {} ODF file: {}'.format(self.odf_file, odf_geometry_def))
else:
print('WARNING: Missing {} definition in {}'.format(geometries_key, self.odf_file))
[docs] def replace_defvar(self, geometry_def_dict):
# Create a copy that will hold updated @-type values in input dictionary.
update_geometry_def_dict = copy.copy(geometry_def_dict)
attr_missing = False
# Every attribute of String-type is checked for '@' character; if found, variable token is replaced by default
# ODF attribute. If no associated default ODF attribute, then warning is reported and OpportuniryDefinition
# object is not valid.
for key in geometry_def_dict.keys():
if isinstance(geometry_def_dict[key], str):
if geometry_def_dict[key] != '': # ignore empty values (eg: "offset_rotations": "")
if geometry_def_dict[key][0] == '@':
# check that ODF attribute exists
attr_name = geometry_def_dict[key][1:]
if hasattr(self, attr_name):
update_geometry_def_dict[key] = getattr(self, attr_name)
else:
attr_missing = True
warnings.warn(f'Missing `{attr_name}` attribute in {self.odf_file} ODF file')
elif isinstance(geometry_def_dict[key], dict):
update_geometry_def_dict[key], attr_missing = self.replace_defvar(geometry_def_dict[key])
return update_geometry_def_dict, attr_missing
[docs] def has_conditions(self):
if len(self.searchable_geometry_defs) > 0:
return True
else:
return False
[docs] def update_geometry_def(self, geometry_def):
for key in self.geometry_defs_dict.keys(): # 'Sequence', 'Observation', or 'Measurement' (GeometryEvent class)
for i, odf_geometry_def in enumerate(self.geometry_defs_dict[key]):
if odf_geometry_def['name'] == geometry_def['name']:
updated_geometry_def, valid = self.check_geometry_def(geometry_def)
if valid:
self.geometry_defs_dict[key][i] = updated_geometry_def
else:
print('Invalid geometry definition in {} ODF file: {}'.format(self.odf_file, odf_geometry_def))
[docs] def check_geometry_def(self, in_geometry_def, datastore=None, geoevt_class='Measurement'):
# check that input geometry definition contains at least `name` and `parameters` keys
if 'name' and 'parameters' not in in_geometry_def.keys():
if 'include' in in_geometry_def.keys():
if datastore:
include_odf_file = datastore.get_ODF_filename(in_geometry_def['include'])
if include_odf_file is None:
print(f'{in_geometry_def["include"]} `include` file in {self.odf_file} does not exist.')
self.valid = False
return in_geometry_def, False
else:
include_odf_geometries_defs = OpportunityDefinition(include_odf_file, replace_defvar=False).getGeometryDefinitions(geoevt_class)
if in_geometry_def['include'] not in self.included_odf_list:
self.included_odf_list.append(in_geometry_def['include'])
return include_odf_geometries_defs, True
else:
# print(f'WARNING: No DataStore object to retrieve "include" ODF file output_dir: `{in_geometry_def["include"]}` (use datastore keyword argument)')
warnings.warn(f'No DataStore object to retrieve "include" ODF file path: `{in_geometry_def["include"]}` (use datastore keyword argument)')
self.valid = False
else:
print(f'Missing mandatory `name` and `parameters` keys in {self.odf_file} ODF file.')
print(in_geometry_def)
self.valid = False
return in_geometry_def, False
return in_geometry_def, True
[docs] def get_pointing_odf(self):
pointing_odf = ''
for included_odf in self.included_odf_list:
if 'pointing/' in included_odf:
pointing_odf = included_odf
return pointing_odf
[docs] def majis_detector_name(self, detector_name, binning):
"""Returns the valid MAJIS SPICE detector name corresponding to input detector name and binning value.
"""
majis_detector_names = [
'JUICE_MAJIS',
'JUICE_MAJIS_VISNIR',
'JUICE_MAJIS_VISNIR_B2',
'JUICE_MAJIS_VISNIR_B4',
'JUICE_MAJIS_IR',
'JUICE_MAJIS_IR_B2',
'JUICE_MAJIS_IR_B4',
]
majis_binnings = [1, 2, 4]
if detector_name not in majis_detector_names:
print('Invalid input MAJIS detector name: {}.'.format(detector_name))
return detector_name
# set channel ID, either VISNIR or IR
detector_name_tokens = detector_name.split('_')
if len(detector_name_tokens) == 2: # JUICE_MAJIS case
print('Binning not applicable for {} detector.'.format(detector_name))
return detector_name
channel_id = detector_name_tokens[2]
if binning not in majis_binnings:
print('Non-applicable binning value: {}. Allowed values are {}, {} or {}.'.format(binning, *majis_binnings))
return detector_name
if binning != 1:
return 'JUICE_MAJIS_{}_B{}'.format(channel_id, binning)
else:
return 'JUICE_MAJIS_{}'.format(channel_id)
[docs] def getDict(self):
odf_dict = {}
attributes = ['name','author', 'status', 'description', 'observation_type', 'ptr_pointing_type', 'target', 'observer', 'detector'] #, 'latitude', 'max_incidence_angle']
attributes = attributes + self.extra_attributes
for attribute in attributes:
if hasattr(self,attribute):
odf_dict[attribute] = getattr(self,attribute)
odf_dict['measurement_geometries'] = self.geometry_defs_dict['Measurement']
odf_dict['observation_geometries'] = self.geometry_defs_dict['Observation']
odf_dict['sequence_geometries'] = self.geometry_defs_dict['Sequence']
return odf_dict
[docs] def to_json(self):
print(json.dumps(self.getDict(), sort_keys=False, indent=4))
[docs] def get_name(self):
return self.name
[docs] def get_target(self):
return self.target
[docs] def getGeometryDefinitions(self, geoevt_class, searchable=False):
if geoevt_class in self.geometry_defs_dict.keys():
geoevt_class_gdefs = self.geometry_defs_dict[geoevt_class]
if searchable: # return only geometry definition with a condition (searchable).
return self.searchable_geometry_defs
else:
return geoevt_class_gdefs
else:
print('Invalid input GeometryEvent class. Accepted values are {}, {}, or {}.'.format(
*self.geometry_defs_dict.keys()))
return []
[docs] def get_geometry_definition(self, geometry_name):
for geoevt_class in self.geometry_defs_dict.keys():
geoevt_class_gdefs = self.geometry_defs_dict[geoevt_class]
for geometry_def in geoevt_class_gdefs:
if geometry_def['name'] == geometry_name:
return geometry_def
return None
[docs]class Opportunity:
"""Class that represent a calculation of opportunity times and/or geometry.
"""
def __init__(self, mission_scenario, opportunity_definition, time_inputs, binning=1, sim_sc_att=False, sim_scanner=False, dict=None):
self.id = ''
self.mission_scenario = mission_scenario
self.opportunity_definition = opportunity_definition
self.time_inputs = time_inputs
self.sequence = None
self.searched = False
self.computed = False
self.search_step = None
self.sim_sc_att = sim_sc_att
self.sim_scanner = sim_scanner
self.binning = binning
if time_inputs.valid: # set input timing information used by search() and compute()
self.input_window = time_inputs.get_time_window()
self.n_steps = time_inputs.n_steps
self.time_step = time_inputs.time_step
self.valid = True
else:
print('[WARNING] Invalid time inputs.')
self.valid = False
if not self.opportunity_definition.valid:
self.valid = False
# if dict:
# print('Loading-opportunity being implemented...')
# -> see DataStore.load_opportunity()
# return None
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def isValid(self):
return self.valid
[docs] def setID(self, opportunity_id):
self.id = opportunity_id
# Define what goes into an Opportunity JSON object or file
[docs] def get_dict(self):
opportunity_dict = {
'id': self.id,
'mission_scenario': self.mission_scenario.get_dict(),
'opportunity_definition': {
'odf_file': self.opportunity_definition.odf_file,
'target': self.opportunity_definition.target,
'observer': self.opportunity_definition.observer
},
'time_inputs': self.time_inputs.get_dict(),
'input_time_intervals': self.time_inputs.get_event().get_interval_times(format='utc'),
'searched': self.searched,
'computed': self.computed,
'n_steps': self.n_steps,
'time_step': self.time_step,
'search_step': self.search_step,
'binning': self.binning,
'sim_sc_att': self.sim_sc_att,
'sim_scanner': self.sim_scanner,
'sequence_file': 'sequence.json',
'n_intervals': self.sequence.n_intervals,
'interval_times': self.sequence.get_interval_times(format='utc')
}
return opportunity_dict
# Load sequence object from geoevents_dict
[docs] def loadSequence(self, geoevents_dict):
seq_geoevt_dict = geoevents_dict['sequence']
sequence = Sequence(geoevt_dict=seq_geoevt_dict)
for obs_geoevt_dict in geoevents_dict['observations']:
sequence.add_observation(obs_geoevt_dict)
self.sequence = sequence
[docs] def search(self):
print('Searching for ' + self.opportunity_definition.get_name() + ' opportunities...')
# Get the searchable geometry objects.
searchable_geometry_defs = self.opportunity_definition.getGeometryDefinitions('Measurement', searchable=True)
# Set time step. TODO: should be an optional argument (see #92)
t_steps = self.n_steps
times = []
for i in range(spice.wncard(self.input_window)):
t1, t2 = spice.wnfetd(self.input_window, i)
times.append(t1)
times.append(t2)
et1 = min(times)
et2 = max(times)
step = (et2-et1)/(t_steps-1)
self.search_step = step
print(f'Number of steps = {self.n_steps}')
print(f'Search time step = {step:.3f} sec.')
# Search engine is here!
search_window = self.input_window
opportunity_window = self.input_window # init opportunity_window
for geometry_def in searchable_geometry_defs:
# print(geometry_def)
geometry = GeometryFactory().create(geometry_def, geoevt=self.sequence)
if spice.wncard(opportunity_window) > 0:
opportunity_window = geometry.search(Condition(geometry_def['condition']), search_window, step)
search_window = opportunity_window
# print(spice.wncard(opportunity_window))
# Create sequence and sub-events
# - opportunity_window is used to create Observation object.
# - n_steps and time_step are used to create Measurement objects.
self.sequence = Sequence(opportunity_definition=self.opportunity_definition, time_window=opportunity_window, n_steps=DEFAULT_N_STEPS)
# Set searched status to true so as to distinguish from simple computation
self.searched = True
[docs] def compute(self, searchable_only=False):
if self.sequence:
self.sequence.compute(searchable_only=searchable_only)
else:
# print('> Init Sequence object.')
# print('>', self.n_steps, self.time_step)
if self.sim_sc_att:
if not self.opportunity_definition.get_geometry_definition('Simulated_SC_Frame'):
print(f'Simulated_SC_Frame cannot be simulated because it is not defined in {self.opportunity_definition.odf_file} ODF file.')
return
if self.sim_scanner:
if not self.opportunity_definition.get_geometry_definition('Simulated_Scan_Frame'):
print(f'Simulated_Scan_Frame cannot be simulated because it is not defined in {self.opportunity_definition.odf_file} ODF file.')
return
self.sequence = Sequence(opportunity_definition=self.opportunity_definition, time_window=self.input_window, n_steps=self.n_steps, time_step=self.time_step)
self.sequence.compute(searchable_only=searchable_only)
self.computed = True
[docs] def get_opportunities(self):
return [self]
[docs] def get_observation(self):
return self.sequence.sub_events[0]
[docs] def get_observations(self):
return self.sequence.sub_events
[docs] def get_observation_geometry(self, name, obs_idx=0):
return self.sequence.sub_events[obs_idx].get_geometry(name)
[docs] def getMeasurementsGeoJSON(self, geometry_name, observation_id=1, surface_only=False, split=False):
observation = self.sequence.sub_events[observation_id-1]
feature_collection = []
for i, measurement in enumerate(observation.sub_events):
et_start_time = observation.get_start_time(format='et')
if measurement.has_geometry(geometry_name):
feature = measurement.get_geometry(geometry_name).getGeoJSON(surface_only=surface_only, split=split)
feature.properties = {
'id': str(i+1),
'utc_time': measurement.get_reference_time(format='utc'),
'reference_time': measurement.get_reference_time(format='rel', mission_event=self.getMissionEvent()),
'time': measurement.get_reference_time(format='et') - et_start_time
}
feature_collection.append(feature)
feature_collection = FeatureCollection(feature_collection)
return feature_collection
[docs] def getGeometryNames(self, geoevt_class_name):
geometry_names = []
if geoevt_class_name == 'sequence':
geometry_names = self.sequence.get_geometry_names()
elif geoevt_class_name == 'observation':
geometry_names = self.sequence.sub_events[0].get_geometry_names()
elif geoevt_class_name == 'measurement':
geometry_names = self.sequence.sub_events[0].sub_events[0].get_geometry_names()
else:
print('Invalid <{}> geometry class name. Allowed values are: sequence, observation, or measurement.'.format(geoevt_class_name))
return geometry_names
[docs] def getMissionEvent(self):
return self.time_inputs.mission_event
[docs] def get_no_comp_times(self, t_int, binning=None, margin=None, observation_id=1, format='rel'):
"""
Returns times at which no scanner motion compensation is required.
Args:
t_int:
binning:
margin:
observation_id:
format:
Returns:
"""
# Get Observation object
observation = self.get_observations()[observation_id - 1]
if not margin:
margin = 0.1
# Attempt to retrieve margin parameters defined in No_Compensation_Required
geometry_def = self.opportunity_definition.get_geometry_definition('No_Compensation_Required')
if geometry_def:
margin = geometry_def['parameters']['margin']
# Retrieve measurements times
times = observation.get_measurements_times(format=format, mission_event=self.getMissionEvent())
# Retrieve dwell times if available, and scale up/down if input binning different than opportunity binning.
dwell_times = observation.get_measurements_geometry_data('Dwell_Time')
if not dwell_times[0]:
print('No Dwell_Time geometry data available to compute "no-compensation" times.')
return None
dwell_times = np.array(dwell_times)
if binning:
dwell_times = dwell_times * (binning/self.binning)
indexes = np.where((dwell_times > t_int * (1 - margin)) & (dwell_times < t_int * (1 + margin)))[0]
time_windows = []
if indexes.size > 0:
# handle cases where there can be up to two time windows
i_cuts = np.where((indexes[1:] - indexes[0:-1]) > 1)[0]
n_windows = i_cuts.size + 1
if n_windows == 1:
time_windows = [[times[indexes[0]], times[indexes[-1]]]]
elif n_windows == 2:
i_cut = i_cuts[0]
time_windows = [[times[indexes[0]], times[indexes[i_cut]]],
[times[indexes[i_cut + 1]], times[indexes[-1]]]]
return time_windows
[docs] def summary(self, depth=2):
print('POINTING SIMULATOR')
print()
print('SC pointing simulator:', 'ON' if self.sim_sc_att else 'OFF')
if self.sim_sc_att:
simulated_sc_frame_def = self.opportunity_definition.get_geometry_definition('Simulated_SC_Frame')
if simulated_sc_frame_def:
offset_rotations_geometry_name = simulated_sc_frame_def['parameters']['offset_rotations']
offset_rotations_def = self.opportunity_definition.get_geometry_definition(offset_rotations_geometry_name)
simulated_sc_frame_def['parameters']['offset_rotations'] = offset_rotations_def
print(json.dumps(simulated_sc_frame_def['parameters'], indent=2))
else:
print('[WARNING] Undefined Simulated_Scan_Frame geometry in ODF file.')
print()
print('MAJIS scanner pointing simulator:', 'ON' if self.sim_scanner else 'OFF')
if self.sim_scanner:
simulated_scan_frame_def = self.opportunity_definition.get_geometry_definition('Simulated_Scan_Frame')
if simulated_scan_frame_def:
scan_rotation_angle_geometry_name = simulated_scan_frame_def['parameters']['scan_rotation_angle']
scan_rotation_angle_def = self.opportunity_definition.get_geometry_definition(scan_rotation_angle_geometry_name)
simulated_scan_frame_def['parameters']['scan_rotation_angle'] = scan_rotation_angle_def
print(json.dumps(simulated_scan_frame_def['parameters'], indent=2))
else:
print('[WARNING] Undefined Simulated_Scan_Frame geometry in ODF file.')
print()
print('OPPORTUNITY DEFINITION')
print()
print(' {}'.format(self.opportunity_definition.name))
geometry_definitions = self.opportunity_definition.getGeometryDefinitions('Measurement', searchable=True)
for geometry_definition in geometry_definitions:
name = geometry_definition['name']
rel_condition = geometry_definition['condition']['relationalCondition']
ref_value = geometry_definition['condition']['referenceValue']
print(' {} {} {}'.format(name, rel_condition, ref_value))
print()
print('INPUT TIMES')
print()
mission_event = self.getMissionEvent()
if mission_event:
mission_event_ref_time = mission_event.get_reference_time(format='utc')
event_id = self.time_inputs.event_id
else:
mission_event_ref_time = self.time_inputs.get_event().get_start_time(format='utc')
event_id = 'NONE'
utc_input_time_intervals = self.time_inputs.get_event().get_interval_times(format='utc', mission_event=mission_event)
rel_input_time_intervals = self.time_inputs.get_event().get_interval_times(format='rel', mission_event=mission_event)
n_input_time_intervals = len(utc_input_time_intervals)
if n_input_time_intervals > 1:
print(' Number of Intervals : {}'.format(n_input_time_intervals))
print()
for utc_input_time_interval, rel_input_time_interval in zip(utc_input_time_intervals, rel_input_time_intervals):
print(' Start Time (UTC) : {} ({})'.format(utc_input_time_interval[0], rel_input_time_interval[0]))
print(' Stop Time (UTC) : {} ({})'.format(utc_input_time_interval[1], rel_input_time_interval[1]))
print()
if self.searched:
print(' Search Time Step : {:.3f}'.format(self.search_step))
if self.computed:
print(' Number of Steps : {}'.format(self.n_steps))
if self.time_step:
print(' Time Step : {:.3f}'.format(self.time_step))
print(' Reference Time (UTC) : {} ({})'.format(mission_event_ref_time, event_id))
print()
print('SEARCH RESULT')
print()
if self.searched:
print(' Number of opportunity windows : {}'.format(self.sequence.n_intervals))
print()
observations = self.sequence.get_observations()
for observation in observations:
utc_start_time = observation.get_start_time(format='utc')
utc_stop_time = observation.get_stop_time(format='utc')
rel_start_time = observation.get_start_time(format='rel', mission_event=mission_event)
rel_stop_time = observation.get_stop_time(format='rel', mission_event=mission_event)
duration_str = observation.get_duration(format='str')
print(' Start Time (UTC) : {} ({})'.format(utc_start_time, rel_start_time))
print(' Stop Time (UTC) : {} ({})'.format(utc_stop_time, rel_stop_time))
print(' Duration : {}'.format(duration_str))
print()
else:
print(' No search.')
print()
if self.computed:
print('COMPUTATION RESULT')
print()
n_input_time_intervals = len(utc_input_time_intervals)
if n_input_time_intervals > 1:
print(' Number of opportunity windows : {}'.format(self.sequence.n_intervals))
print()
observations = self.sequence.get_observations()
for i, observation in enumerate(observations):
utc_start_time = observation.get_start_time(format='utc')
rel_start_time = observation.get_start_time(format='rel', mission_event=mission_event)
duration_str = observation.get_duration(format='str')
duration = observation.get_duration()
print(' #{:04} {} ({}) {} ({:.3f} s)'.format(observation.index+1, utc_start_time, rel_start_time, duration_str, duration))
for geometry_name in observation.get_geometry_names():
geometry = observation.get_geometry(geometry_name)
if isinstance(geometry.data, list):
print()
print(' {}:'.format(geometry_name))
for value, prefix, unit in zip(geometry.data, geometry.prefix, geometry.units):
print(' {} = {} {}'.format(prefix, value, unit))
else:
print(' {} = {} {}'.format(geometry_name, geometry.data, geometry.units))
print()
if depth == 3: # display measurements data
measurements = observation.get_measurements()
measurements_header = measurements[0].get_csv_header()
print(f'# {measurements_header}\n')
event_basename = self.opportunity_definition.observation_type
for j, measurement in enumerate(measurements):
event_name = f'{event_basename}_{i + 1:03}_{j:04}'
line = measurement.get_csv_line(event_name=event_name, subgroup='', group='WG2')
print(line)
# print('\n')
print()
[docs] def cross_validate(self, ck_file):
"""Cross validate MOS and AGM simulated pointings.
Used by PTR Exporter class.
"""
self.mission_scenario.loadKernels()
spice.furnsh(str(ck_file))
sim_sc_rotmats = self.sequence.sub_events[0].get_sub_events_geometry_data('Simulated_SC_Frame')
times = self.sequence.sub_events[0].get_sub_events_reference_times(format='et')
rot_deltas, x_deltas, y_deltas, z_deltas = [], [], [], []
x = [1., 0., 0.]
y = [0., 1., 0.]
z = [0., 0., 1.]
for et, sim_sc_rotmat in zip(times, sim_sc_rotmats):
sc_rotmat = spice.pxform('JUICE_SPACECRAFT', 'J2000', et)
sim_sc_rotmat = np.array(sim_sc_rotmat)
x_deltas.append(spice.vsep(spice.mxv(sc_rotmat, x), spice.mxv(sim_sc_rotmat, x)) * spice.dpr())
y_deltas.append(spice.vsep(spice.mxv(sc_rotmat, y), spice.mxv(sim_sc_rotmat, y)) * spice.dpr())
z_deltas.append(spice.vsep(spice.mxv(sc_rotmat, z), spice.mxv(sim_sc_rotmat, z)) * spice.dpr())
# compute rotation angle between the two frames
# ref: http://www.boris-belousov.net/2016/12/01/quat-dist/
r = np.dot(sc_rotmat, sim_sc_rotmat.T)
theta = (np.trace(r) - 1) / 2
rot_delta = np.arccos(theta) * spice.dpr()
rot_deltas.append(rot_delta)
x_deltas = np.array(x_deltas)
y_deltas = np.array(y_deltas)
z_deltas = np.array(z_deltas)
rot_deltas = np.array(rot_deltas)
spice.unload(str(ck_file))
self.mission_scenario.unloadKernels()
# report on discrepancies
print(f'{"":<10} {"Frames delta angle (deg)":<25} {"X-axis delta angle (deg)":<25} {"Y-axis delta angle (deg)":<25} {"Z-axis delta angle (deg)":<30}')
print(f'{"":<10} {"-" * 25} {"-" * 25} {"-" * 25} {"-" * 25}')
print(f'{"average:":>10} {np.average(rot_deltas):<25} {np.average(x_deltas):<25} {np.average(y_deltas):<25} {np.average(z_deltas):<25}')
print(f'{"median:":>10} {np.median(rot_deltas):<25} {np.median(x_deltas):<25} {np.median(y_deltas):<25} {np.mean(z_deltas):<25}')
print(f'{"std:":>10} {np.std(rot_deltas):<25} {np.std(x_deltas):<25} {np.std(y_deltas):<25} {np.std(z_deltas):<25}')
discrepancy = (rot_deltas, x_deltas, y_deltas, z_deltas)
return discrepancy
[docs] def export(self, format, obs_idx=None, path='', overwrite=False, agm_validation=None):
Exporter(format, path=path, overwrite=overwrite).export(self, measurements=obs_idx, agm_validation=agm_validation)
# OpportunityFile
#
[docs]class OpportunityFile:
def __init__(self, filepath=None):
self.opportunity_file_path = filepath if filepath else ''
self.valid = True
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def write(self, opportunity, file=None):
if file is not None:
self.opportunity_file_path = file # filepath ??
opportunity_dict = opportunity.get_dict()
#print(opportunity_dict)
with open(self.opportunity_file_path, 'w') as f:
json.dump(opportunity_dict, f, indent=2)
[docs] def read(self, filepath=None):
if filepath is not None:
self.opportunity_file_path = filepath
with open(self.opportunity_file_path) as f:
opportunity_dict = json.load(f)
# print(opportunity_dict)
# print()
return opportunity_dict