Source code for gfinder.geometryevent

"""GeometryEvent module."""

from gfinder.event import Event
from gfinder.geometry import GeometryFactory
from gfinder.config import DEFAULT_N_STEPS, DEFAULT_TIME_STEP, MAX_N_STEPS

import spiceypy as spice

import json

[docs]class GeometryEvent(Event): """Class representing a geometry event. A GeometryEvent is an Event that can contains and handle Geometry objects. Objects inheriting the GeometryEvent class knows how to construct their associated geometries. Depending of its type, a GeometryEvent object can hold children GeometryEvent objects. A GeometryEvent object can be of 3 types, which have a hierarchical relationship: - sequence: contains Observation(s) - observation: contain Measurements - measurement Attributes: geoevt_class (str): name of the GeometryEvent class. n_geometries (int): number of Geometry objects associated to the geometry event. geometries (list): list of Geometry objects associated to the geometry event. n_sub_events (int): number of sub-GeometryEvent objects. sub_events (list): list of GeometryEvent objects. parent_event (str): parent GeometryEvent class name (`Sequence` or `Measurement`). index (int): index of this GeometryEvent object wrt its parent object. valid (bool): validity flag. """ def __init__(self, opportunity_definition=None, geoevt_dict=None, time_window=None, start_time=None, stop_time=None, format=''): """Constructs GeometryEvent object. Args: opportunity_definition (OpportunityDefintion): geoevt_dict (dict): time_window (spiceypy.utils.support_types.SpiceCell): start_time: stop_time: format: """ super().__init__(time_window=time_window, start_time=start_time, stop_time=stop_time, format=format) self.geoevt_class = self.__class__.__name__ # some "magic" happens here. self.n_geometries = 0 self.geometries = [] self.n_sub_events = 0 # equal to self.n_intervals when all subevents are loaded self.sub_events = [] # list of Observation or Measurement objects self.parent_event = None self.index = 0 self.valid = True # Self create and add Geometry objects from opportunity definition or geoevent_dict (GeoEvt file) if opportunity_definition: geometry_defs = opportunity_definition.getGeometryDefinitions(self.geoevt_class) for geometry_def in geometry_defs: geometry = GeometryFactory().create(geometry_def, self) if geometry.is_valid(): self.geometries.append(geometry) self.n_geometries += 1 elif geoevt_dict: for geometry_dict in geoevt_dict['geometries']: geometry = GeometryFactory().create(geometry_dict, self) if geometry.is_valid(): # set geometry data and add geometry geometry.data = geometry_dict['value'] self.geometries.append(geometry) self.n_geometries += 1 else: print('[WARNING] Invalid Geometry:') print(geometry_dict) # else: # print('[WARNING] Missing opportunity_definition or geoevt_dict to instantiate GeometryEvent object.') def __repr__(self): s = f"<{self.__class__.__name__}>\n" s += json.dumps(self.get_dict(), indent=2) return s #return '<%s %r>' % (self.__class__.__name__, self.get_dict())
[docs] def get_dict(self): """Returns this GeometryEvent dictionary. """ # Header geoevt_dict = { 'geometry_event_class': self.geoevt_class, } # Event geoevt_dict.update(super().get_dict()) # Geometries and sub-events geoevt_dict.update({ 'geometries': self.get_geometry_dicts(), 'sub_events': self.get_sub_event_dicts() }) return geoevt_dict
[docs] def has_geometry(self, name): """Returns whether or not this GeometryEvent contains a Geometry object given by its class name. Args: name (str): input Geometry class name. """ if name in self.get_geometry_names(): return True else: return False
[docs] def get_geometry_names(self, parent_class=None): """Returns the names of the Geometry objects associated to this GeometryEvent, optionally filtered by parent class name. Args: parent_class (str): name of the parent class. """ names = [] for geometry in self.geometries: if parent_class: this_parent_class = geometry.__class__.__bases__[0].__name__ if this_parent_class == parent_class: names.append(geometry.getName()) else: names.append(geometry.getName()) return names
[docs] def get_searchable_geometries(self): """Returns associated Geometry objects that are searchable. """ searchable_geometries = [] for geometry in self.geometries: if geometry.isSearchable(): searchable_geometries.append(geometry) return searchable_geometries
[docs] def compute_geometries(self, searchable_only=False): """Compute geometry data associated to each Geometry object. """ geometries = self.get_geometries(searchable_only=searchable_only) # if self.geoevt_class == 'Measurement': # print(f'>> compute_geometries [{self.geoevt_class}]: n = {len(geometries)}/{len(self.geometries)}') for geometry in geometries: geometry.compute(self.reference_time)
# if self.geoevt_class == 'Measurement':g # print(f'>> {geometry.name}({self.reference_time}) = {geometry.data}') # print()
[docs] def get_geometries(self, scalar_only=False, searchable_only=False): if scalar_only: geometries = [] for geometry in self.geometries: if geometry.get_parent_class() == 'Scalar': geometries.append(geometry) elif searchable_only: geometries = [] for geometry in self.geometries: # print('>',geometry.condition) if geometry.get_parent_class() == 'Scalar' and geometry.condition and self.geoevt_class == 'Measurement': geometries.append(geometry) else: geometries = self.geometries return geometries
[docs] def get_geometry(self, name): """Returns the Geometry object for a given input class name. Args: name (str): name of input Geometry class. """ for geometry in self.geometries: if geometry.getName() == name: return geometry print('<{}> Geometry class name not found.'.format(name)) return None
[docs] def get_geometry_data(self, name): """Returns the Geometry object data for a given input class name. Args: name: name of input Geometry class. """ data = None for geometry in self.geometries: if geometry.getName() == name: data = geometry.getData() return data
[docs] def get_geometry_dicts(self): """Returns the list of Geometry dictionary. """ geometry_dicts = [] for geometry in self.geometries: geometry_dicts.append(geometry.getDict()) return geometry_dicts
[docs] def get_geometry_parameters(self, name): """Returns parameters for a given Geometry object class name. Args: name: name of input Geometry class. Returns: dict """ parameters = {} for geometry in self.geometries: if geometry.getName() == name: parameters = geometry.parameters return parameters
[docs] def add_sub_event(self, sub_geoevt): """Add a sub-GeometryEvent object to this GeometryEvent object. For example: - A Sequence object is made of Observation objects. - An Observation object is made of Measurement objects. Args: sub_geoevt (GeometryEvent): sub-GeometryEvent object to be added. """ if sub_geoevt.is_valid(): self.sub_events.append(sub_geoevt) self.n_sub_events += 1 # increment number of sub-events sub_geoevt.index = self.n_sub_events - 1 # set index of added sub-event sub_geoevt.parent_event = self # set GeometryEvent object parent this sub-event
[docs] def get_sub_events_geometries(self, name): """Returns the list of Geometry objects held by this GeometryEvent object sub-events. For example, the following call shall return the Dwell_Time(Geometry) objects: Observation.get_sub_events_geometries('Dwell_Time') Args: name (str): name of the input Geometry class to return Geometry objects for. """ geometries = [] for sub_event in self.sub_events: geometries.append(sub_event.get_geometry(name)) return geometries
[docs] def get_sub_events_geometry_data(self, name): """Returns the list of Geometry objects data held by this GeometryEvent object sub-events. For example, the following call shall return the Dwell_Time(Geometry) objects data: Observation.get_sub_events_geometry_data('Dwell_Time') Args: name (str): name of the input Geometry class to return Geometry objects data for. """ data = [] for sub_event in self.sub_events: data.append(sub_event.get_geometry_data(name)) return data
[docs] def get_sub_events_reference_times(self, format='et', mission_event=None): """Returns the list of reference times associated to this GeometryEvent sub-events. For example, the following call returns the reference ephemeris times corresponding to Measurement objects. Observation.get_sub_events_reference_times() Args: format (str): time format, `et` (default), `utc` or `rel` (requires mission Event object). mission_event (Event): mission Event object (required to derive relative time). """ reference_times = [] for sub_event in self.sub_events: reference_times.append(sub_event.get_reference_time(format=format, mission_event=mission_event)) return reference_times
[docs] def get_sub_event_dicts(self): """Returns the list of sub-GeometryEvent objects dictionary. """ sub_event_dicts = [] for sub_event in self.sub_events: sub_event_dict = { 'geometries': sub_event.get_geometry_dicts() } sub_event_dicts.append(sub_event_dict) return sub_event_dicts
[docs] def get_csv_header(self): # set base header header = f'event_name,start_time (utc),stop_time (utc),subgroup,group' # add geometries headers for geometry in self.get_geometries(): geometry_csv_header = geometry.get_csv_header() if geometry_csv_header is not None: header += f',{geometry.get_csv_header()}' return header
[docs] def get_csv_line(self, event_name='EVENT', subgroup='', group='', geometries=None): start_time = self.get_start_time(format='utc') start_time = 'T'.join(start_time.split())+'Z' stop_time = self.get_stop_time(format='utc') stop_time = 'T'.join(stop_time.split())+'Z' line = f'{event_name},{start_time},{stop_time}' if group: line += f',{subgroup},{group}' else: line += f',{subgroup}' # add geometry data for geometry in self.get_geometries(): geometry_csv_line = geometry.get_csv_line() if geometry_csv_line is not None: line += f',{geometry_csv_line}' return line
[docs] def is_valid(self): return self.valid
[docs]class Sequence(GeometryEvent): """ Class that represents a sequence of observations. """ def __init__(self, opportunity_definition=None, geoevt_dict=None, time_window=None, n_steps=None, time_step=None): """Constructs Sequence object. Args: opportunity_definition (OpportunityDefinition): geoevt_dict (dict): time_window (spiceypy.utils.support_types.SpiceCell): n_steps (int): time_step (float): """ super().__init__(opportunity_definition=opportunity_definition, geoevt_dict=geoevt_dict, time_window=time_window) # Create Observation objets from input GeometryEvent "data" dictionary, if present, otherwise use input time # window `time_window` and observation acquisitions timing parameters: number of steps `n_steps` (a step # represents an detector "instant-acquisition", also called repetition), and time step `time_step` (also called # repetition time, CU_Trep). if geoevt_dict: # input GeometryEvent "data" dictionary present # set interval times interval_times = geoevt_dict['interval_times'] self.set_interval_times(interval_times, format='utc') # TODO: when are Observation objects created ?! -> DataStore calls opportunity.loadSequence(geoevents_dict), # which calls sequence.add_observation(obs_geoevt_dict) else: if self.n_intervals > 0: # Create and add valid Observation object for each interval. for interval_time in self.interval_times: observation = Observation( opportunity_definition=opportunity_definition, start_time=interval_time[0], stop_time=interval_time[1], n_steps=n_steps, time_step=time_step ) self.add_sub_event(observation) # only if valid # else: # # print(f'[WARNING] Cannot create sequence observation objects because no input time window intervals.') # raise Exception('Cannot create sequence observation objects because no input time window intervals.') def __repr__(self): return ( f"<{self.__class__.__name__}> " f"UTC start time: {self.get_start_time(format='utc')} | " f"UTC stop time: {self.get_stop_time(format='utc')} | " f"Duration (sec): {self.duration:.3f} | " f"Nb of observations: {self.n_intervals} | " # should be equal to n_sub_events f"Nb of geometries: {self.n_geometries}" ) # TODO: is it used? -> opportunity.loadSequence(geoevents_dict)
[docs] def add_observation(self, geoevt_dict): """Add Observation object to this Sequence object. Args: geoevt_dict (dict): observation dictionary, passed from Opportunity.loadSequence(). """ observation = Observation(geoevt_dict=geoevt_dict, start_time=geoevt_dict['start_time'], stop_time=geoevt_dict['stop_time'], format='utc') self.add_sub_event(observation)
[docs] def compute(self, searchable_only=False): """Compute Geometry objects data associated to each Observation object, and to this Sequence object. """ observations = self.get_observations() for observation in observations: observation.compute(searchable_only=searchable_only) self.compute_geometries(searchable_only=searchable_only)
[docs] def get_observations(self, searchable_only=False): """Returns the list of Observation objects associated to this Sequence object. """ if searchable_only: observations = [] for observation in self.sub_events: if observation.geoevt_class == 'Scalar': observations.append(observation) else: observations = self.sub_events return observations
[docs] def get_observations_times(self, format=None, mission_event=None): """Returns the list of reference times associated to this Sequence's Observation objects. Args: format (str): time format, `et` (default), `utc` or `rel` (requires mission Event object). mission_event (Event): mission Event object (required to derive relative time). """ return self.get_sub_events_reference_times(format=format, mission_event=mission_event)
[docs] def get_observations_geometry_data(self, name): """Returns the list of Geometry objects data associated to this Sequence's Observation objects. Args: name (str): name of the input Geometry class to return Geometry objects data for. """ return self.get_sub_events_geometry_data(name)
[docs]class Observation(GeometryEvent): """ Class that represents an observation. """ def __init__(self, opportunity_definition=None, geoevt_dict=None, start_time=None, stop_time=None, n_steps=None, time_step=None, format=''): """Constructs Observation object. Args: opportunity_definition: geoevt_dict: start_time: stop_time: n_steps: time_step: format: """ super().__init__(opportunity_definition=opportunity_definition, geoevt_dict=geoevt_dict, start_time=start_time, stop_time=stop_time, format=format) self.time_step = None self.n_steps = None # Create Measurement objets from input GeometryEvent "data" dictionary, if present, otherwise use observation # start and stop times and acquisitions timing parameters: number of steps `n_steps` (a step # represents an detector "instant-acquisition", also called repetition), and time step `time_step` (also called # repetition time, CU_Trep). if geoevt_dict: # input GeometryEvent "data" dictionary present # Set measurement/interval times from geoevt_dict and create measurement objects self.set_interval_times(geoevt_dict['interval_times'], format='utc') for interval_time, measurement_geoevt_dict in zip(self.interval_times, geoevt_dict['sub_events']): measurement = Measurement(geoevt_dict=measurement_geoevt_dict, start_time=interval_time[0]) self.add_sub_event(measurement) # set measurement/repetition time step measurement_times = self.get_measurements_times(format='et') self.time_step = measurement_times[1] - measurement_times[0] else: # Set measurement times if not time_step: if not n_steps: n_steps = DEFAULT_N_STEPS time_step = self.duration / (n_steps - 1) # TODO: (n_steps - 1) or n_steps ? else: if not n_steps: n_steps = int(self.duration / time_step) # ! changes stop_time ! if n_steps > MAX_N_STEPS: print(f'[WARNING] High number of computation steps? {n_steps}') measurement_times = [x * (self.duration-time_step) / (n_steps - 1) + self.start_time for x in range(n_steps)] self.time_step = measurement_times[1] - measurement_times[0] interval_times = [] for measurement_time in measurement_times: interval_times.append([measurement_time, measurement_time]) self.set_interval_times(interval_times) # create measurement objects for interval_time in self.interval_times: # number of measurements measurement = Measurement(opportunity_definition=opportunity_definition, start_time=interval_time[0]) self.add_sub_event(measurement) self.valid = True def __repr__(self): return ( f"<{self.__class__.__name__}> " f"UTC start time: {self.get_start_time(format='utc')} | " f"UTC stop time: {self.get_stop_time(format='utc')} | " f"Duration (sec): {self.duration:.3f} | " f"Nb of measurements: {self.n_intervals} | " # should be equal to n_sub_events and n_steps f"Time step (sec): {self.time_step:.3f} | " f"Nb of geometries: {self.n_geometries}" )
[docs] def compute(self, searchable_only=False): """Compute Geometry objects data associated to each Measurement object, and to this Observation object. """ measurements = self.get_measurements() for measurement in measurements: measurement.compute(searchable_only=searchable_only) self.compute_geometries(searchable_only=searchable_only)
# for measurement in self.sub_events: # measurement.compute() # self.compute_geometries()
[docs] def get_measurements(self): """Returns the list of Measurement objects associated to this Observation object. """ return self.sub_events
# if scalar_only: # measurements = [] # for measurement in self.sub_events: # if measurement.geoevt_class == 'Scalar': # measurements.append(measurement) # else: # measurements = self.sub_events
[docs] def get_measurements_times(self, format='et', mission_event=None): """Returns the list of reference times associated to this Observation's Measurement objects. Args: format (str): time format, `et` (default), `utc` or `rel` (requires mission Event object). mission_event (Event): mission Event object (required to derive relative time). """ return self.get_sub_events_reference_times(format=format, mission_event=mission_event)
[docs] def get_measurements_geometries(self, name): """Returns the list of Geometry objects associated to this Observation's Measurement objects. Args: name (str): name of the input Geometry class to return Geometry objects for. """ return self.get_sub_events_geometries(name)
[docs] def get_measurements_geometry_data(self, name): """Returns the list of Geometry objects data associated to this Observation's Measurement objects. Args: name (str): name of the input Geometry class to return Geometry objects data for. """ return self.get_sub_events_geometry_data(name)
[docs] def get_measurement_geometry_names(self, parent_class=None): """Returns the list of Geometry class names associated to this Observation's Measurement objects. Args: parent_class (str): name of the parent class name. """ return self.sub_events[0].get_geometry_names(parent_class=parent_class)
[docs]class Measurement(GeometryEvent): """Class that represents an observation measurement. """ def __init__(self, opportunity_definition=None, geoevt_dict=None, start_time=None): """Constructs Measurement object. Args: opportunity_definition (OpportunityDefinition): geoevt_dict (dict): start_time: """ super().__init__(opportunity_definition=opportunity_definition, geoevt_dict=geoevt_dict, start_time=start_time) # self.parent_event = None # self.index = 0 # self.valid = True def __repr__(self): return ( f"<{self.__class__.__name__}> " f"Index: {self.index:>2} | " f"UTC acquisition time: {self.get_reference_time(format='utc')} | " f"Nb of geometries: {self.n_geometries}" )
[docs] def compute(self, searchable_only=False): self.compute_geometries(searchable_only=searchable_only)
[docs]class GeoEvtFile: def __init__(self, geoevt_file=None): self.geoevt_file = '' if geoevt_file is not None: self.geoevt_file = geoevt_file def __repr__(self): return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def write(self, geoevt, geoevt_file=None): # coverage (opportunity) or observation types if geoevt_file is not None: self.geoevt_file = geoevt_file geoevt_dict = geoevt.get_dict() with open(self.geoevt_file, 'w') as f: json.dump(geoevt_dict, f, indent=2)
[docs] def read(self, geoevt_file=None): if geoevt_file is not None: self.geoevt_file = geoevt_file with open(self.geoevt_file) as f: geoevt_dict = json.load(f) return geoevt_dict