"""GeometryEvent module."""
from gfinder.event import Event
from gfinder.geometry import GeometryFactory
from gfinder.config import DEFAULT_N_STEPS, DEFAULT_TIME_STEP, MAX_N_STEPS
import spiceypy as spice
import json
[docs]class GeometryEvent(Event):
"""Class representing a geometry event.
A GeometryEvent is an Event that can contains and handle Geometry objects.
Objects inheriting the GeometryEvent class knows how to construct their associated geometries.
Depending of its type, a GeometryEvent object can hold children GeometryEvent objects.
A GeometryEvent object can be of 3 types, which have a hierarchical relationship:
- sequence: contains Observation(s)
- observation: contain Measurements
- measurement
Attributes:
geoevt_class (str): name of the GeometryEvent class.
n_geometries (int): number of Geometry objects associated to the geometry event.
geometries (list): list of Geometry objects associated to the geometry event.
n_sub_events (int): number of sub-GeometryEvent objects.
sub_events (list): list of GeometryEvent objects.
parent_event (str): parent GeometryEvent class name (`Sequence` or `Measurement`).
index (int): index of this GeometryEvent object wrt its parent object.
valid (bool): validity flag.
"""
def __init__(self, opportunity_definition=None, geoevt_dict=None, time_window=None, start_time=None, stop_time=None,
format=''):
"""Constructs GeometryEvent object.
Args:
opportunity_definition (OpportunityDefintion):
geoevt_dict (dict):
time_window (spiceypy.utils.support_types.SpiceCell):
start_time:
stop_time:
format:
"""
super().__init__(time_window=time_window, start_time=start_time, stop_time=stop_time, format=format)
self.geoevt_class = self.__class__.__name__ # some "magic" happens here.
self.n_geometries = 0
self.geometries = []
self.n_sub_events = 0 # equal to self.n_intervals when all subevents are loaded
self.sub_events = [] # list of Observation or Measurement objects
self.parent_event = None
self.index = 0
self.valid = True
# Self create and add Geometry objects from opportunity definition or geoevent_dict (GeoEvt file)
if opportunity_definition:
geometry_defs = opportunity_definition.getGeometryDefinitions(self.geoevt_class)
for geometry_def in geometry_defs:
geometry = GeometryFactory().create(geometry_def, self)
if geometry.is_valid():
self.geometries.append(geometry)
self.n_geometries += 1
elif geoevt_dict:
for geometry_dict in geoevt_dict['geometries']:
geometry = GeometryFactory().create(geometry_dict, self)
if geometry.is_valid(): # set geometry data and add geometry
geometry.data = geometry_dict['value']
self.geometries.append(geometry)
self.n_geometries += 1
else:
print('[WARNING] Invalid Geometry:')
print(geometry_dict)
# else:
# print('[WARNING] Missing opportunity_definition or geoevt_dict to instantiate GeometryEvent object.')
def __repr__(self):
s = f"<{self.__class__.__name__}>\n"
s += json.dumps(self.get_dict(), indent=2)
return s
#return '<%s %r>' % (self.__class__.__name__, self.get_dict())
[docs] def get_dict(self):
"""Returns this GeometryEvent dictionary.
"""
# Header
geoevt_dict = {
'geometry_event_class': self.geoevt_class,
}
# Event
geoevt_dict.update(super().get_dict())
# Geometries and sub-events
geoevt_dict.update({
'geometries': self.get_geometry_dicts(),
'sub_events': self.get_sub_event_dicts()
})
return geoevt_dict
[docs] def has_geometry(self, name):
"""Returns whether or not this GeometryEvent contains a Geometry object given by its class name.
Args:
name (str): input Geometry class name.
"""
if name in self.get_geometry_names():
return True
else:
return False
[docs] def get_geometry_names(self, parent_class=None):
"""Returns the names of the Geometry objects associated to this GeometryEvent, optionally filtered by parent
class name.
Args:
parent_class (str): name of the parent class.
"""
names = []
for geometry in self.geometries:
if parent_class:
this_parent_class = geometry.__class__.__bases__[0].__name__
if this_parent_class == parent_class:
names.append(geometry.getName())
else:
names.append(geometry.getName())
return names
[docs] def get_searchable_geometries(self):
"""Returns associated Geometry objects that are searchable.
"""
searchable_geometries = []
for geometry in self.geometries:
if geometry.isSearchable():
searchable_geometries.append(geometry)
return searchable_geometries
[docs] def compute_geometries(self, searchable_only=False):
"""Compute geometry data associated to each Geometry object.
"""
geometries = self.get_geometries(searchable_only=searchable_only)
# if self.geoevt_class == 'Measurement':
# print(f'>> compute_geometries [{self.geoevt_class}]: n = {len(geometries)}/{len(self.geometries)}')
for geometry in geometries:
geometry.compute(self.reference_time)
# if self.geoevt_class == 'Measurement':g
# print(f'>> {geometry.name}({self.reference_time}) = {geometry.data}')
# print()
[docs] def get_geometries(self, scalar_only=False, searchable_only=False):
if scalar_only:
geometries = []
for geometry in self.geometries:
if geometry.get_parent_class() == 'Scalar':
geometries.append(geometry)
elif searchable_only:
geometries = []
for geometry in self.geometries:
# print('>',geometry.condition)
if geometry.get_parent_class() == 'Scalar' and geometry.condition and self.geoevt_class == 'Measurement':
geometries.append(geometry)
else:
geometries = self.geometries
return geometries
[docs] def get_geometry(self, name):
"""Returns the Geometry object for a given input class name.
Args:
name (str): name of input Geometry class.
"""
for geometry in self.geometries:
if geometry.getName() == name:
return geometry
print('<{}> Geometry class name not found.'.format(name))
return None
[docs] def get_geometry_data(self, name):
"""Returns the Geometry object data for a given input class name.
Args:
name: name of input Geometry class.
"""
data = None
for geometry in self.geometries:
if geometry.getName() == name:
data = geometry.getData()
return data
[docs] def get_geometry_dicts(self):
"""Returns the list of Geometry dictionary.
"""
geometry_dicts = []
for geometry in self.geometries:
geometry_dicts.append(geometry.getDict())
return geometry_dicts
[docs] def get_geometry_parameters(self, name):
"""Returns parameters for a given Geometry object class name.
Args:
name: name of input Geometry class.
Returns:
dict
"""
parameters = {}
for geometry in self.geometries:
if geometry.getName() == name:
parameters = geometry.parameters
return parameters
[docs] def add_sub_event(self, sub_geoevt):
"""Add a sub-GeometryEvent object to this GeometryEvent object.
For example:
- A Sequence object is made of Observation objects.
- An Observation object is made of Measurement objects.
Args:
sub_geoevt (GeometryEvent): sub-GeometryEvent object to be added.
"""
if sub_geoevt.is_valid():
self.sub_events.append(sub_geoevt)
self.n_sub_events += 1 # increment number of sub-events
sub_geoevt.index = self.n_sub_events - 1 # set index of added sub-event
sub_geoevt.parent_event = self # set GeometryEvent object parent this sub-event
[docs] def get_sub_events_geometries(self, name):
"""Returns the list of Geometry objects held by this GeometryEvent object sub-events.
For example, the following call shall return the Dwell_Time(Geometry) objects:
Observation.get_sub_events_geometries('Dwell_Time')
Args:
name (str): name of the input Geometry class to return Geometry objects for.
"""
geometries = []
for sub_event in self.sub_events:
geometries.append(sub_event.get_geometry(name))
return geometries
[docs] def get_sub_events_geometry_data(self, name):
"""Returns the list of Geometry objects data held by this GeometryEvent object sub-events.
For example, the following call shall return the Dwell_Time(Geometry) objects data:
Observation.get_sub_events_geometry_data('Dwell_Time')
Args:
name (str): name of the input Geometry class to return Geometry objects data for.
"""
data = []
for sub_event in self.sub_events:
data.append(sub_event.get_geometry_data(name))
return data
[docs] def get_sub_events_reference_times(self, format='et', mission_event=None):
"""Returns the list of reference times associated to this GeometryEvent sub-events.
For example, the following call returns the reference ephemeris times corresponding to Measurement objects.
Observation.get_sub_events_reference_times()
Args:
format (str): time format, `et` (default), `utc` or `rel` (requires mission Event object).
mission_event (Event): mission Event object (required to derive relative time).
"""
reference_times = []
for sub_event in self.sub_events:
reference_times.append(sub_event.get_reference_time(format=format, mission_event=mission_event))
return reference_times
[docs] def get_sub_event_dicts(self):
"""Returns the list of sub-GeometryEvent objects dictionary.
"""
sub_event_dicts = []
for sub_event in self.sub_events:
sub_event_dict = {
'geometries': sub_event.get_geometry_dicts()
}
sub_event_dicts.append(sub_event_dict)
return sub_event_dicts
[docs] def get_csv_line(self, event_name='EVENT', subgroup='', group='', geometries=None):
start_time = self.get_start_time(format='utc')
start_time = 'T'.join(start_time.split())+'Z'
stop_time = self.get_stop_time(format='utc')
stop_time = 'T'.join(stop_time.split())+'Z'
line = f'{event_name},{start_time},{stop_time}'
if group:
line += f',{subgroup},{group}'
else:
line += f',{subgroup}'
# add geometry data
for geometry in self.get_geometries():
geometry_csv_line = geometry.get_csv_line()
if geometry_csv_line is not None:
line += f',{geometry_csv_line}'
return line
[docs] def is_valid(self):
return self.valid
[docs]class Sequence(GeometryEvent):
""" Class that represents a sequence of observations.
"""
def __init__(self, opportunity_definition=None, geoevt_dict=None, time_window=None, n_steps=None, time_step=None):
"""Constructs Sequence object.
Args:
opportunity_definition (OpportunityDefinition):
geoevt_dict (dict):
time_window (spiceypy.utils.support_types.SpiceCell):
n_steps (int):
time_step (float):
"""
super().__init__(opportunity_definition=opportunity_definition, geoevt_dict=geoevt_dict, time_window=time_window)
# Create Observation objets from input GeometryEvent "data" dictionary, if present, otherwise use input time
# window `time_window` and observation acquisitions timing parameters: number of steps `n_steps` (a step
# represents an detector "instant-acquisition", also called repetition), and time step `time_step` (also called
# repetition time, CU_Trep).
if geoevt_dict: # input GeometryEvent "data" dictionary present
# set interval times
interval_times = geoevt_dict['interval_times']
self.set_interval_times(interval_times, format='utc')
# TODO: when are Observation objects created ?! -> DataStore calls opportunity.loadSequence(geoevents_dict),
# which calls sequence.add_observation(obs_geoevt_dict)
else:
if self.n_intervals > 0:
# Create and add valid Observation object for each interval.
for interval_time in self.interval_times:
observation = Observation(
opportunity_definition=opportunity_definition,
start_time=interval_time[0],
stop_time=interval_time[1],
n_steps=n_steps,
time_step=time_step
)
self.add_sub_event(observation) # only if valid
# else:
# # print(f'[WARNING] Cannot create sequence observation objects because no input time window intervals.')
# raise Exception('Cannot create sequence observation objects because no input time window intervals.')
def __repr__(self):
return (
f"<{self.__class__.__name__}> "
f"UTC start time: {self.get_start_time(format='utc')} | "
f"UTC stop time: {self.get_stop_time(format='utc')} | "
f"Duration (sec): {self.duration:.3f} | "
f"Nb of observations: {self.n_intervals} | " # should be equal to n_sub_events
f"Nb of geometries: {self.n_geometries}"
)
# TODO: is it used? -> opportunity.loadSequence(geoevents_dict)
[docs] def add_observation(self, geoevt_dict):
"""Add Observation object to this Sequence object.
Args:
geoevt_dict (dict): observation dictionary, passed from Opportunity.loadSequence().
"""
observation = Observation(geoevt_dict=geoevt_dict, start_time=geoevt_dict['start_time'],
stop_time=geoevt_dict['stop_time'], format='utc')
self.add_sub_event(observation)
[docs] def compute(self, searchable_only=False):
"""Compute Geometry objects data associated to each Observation object, and to this Sequence object.
"""
observations = self.get_observations()
for observation in observations:
observation.compute(searchable_only=searchable_only)
self.compute_geometries(searchable_only=searchable_only)
[docs] def get_observations(self, searchable_only=False):
"""Returns the list of Observation objects associated to this Sequence object.
"""
if searchable_only:
observations = []
for observation in self.sub_events:
if observation.geoevt_class == 'Scalar':
observations.append(observation)
else:
observations = self.sub_events
return observations
[docs] def get_observations_times(self, format=None, mission_event=None):
"""Returns the list of reference times associated to this Sequence's Observation objects.
Args:
format (str): time format, `et` (default), `utc` or `rel` (requires mission Event object).
mission_event (Event): mission Event object (required to derive relative time).
"""
return self.get_sub_events_reference_times(format=format, mission_event=mission_event)
[docs] def get_observations_geometry_data(self, name):
"""Returns the list of Geometry objects data associated to this Sequence's Observation objects.
Args:
name (str): name of the input Geometry class to return Geometry objects data for.
"""
return self.get_sub_events_geometry_data(name)
[docs]class Observation(GeometryEvent):
""" Class that represents an observation.
"""
def __init__(self, opportunity_definition=None, geoevt_dict=None, start_time=None, stop_time=None, n_steps=None,
time_step=None, format=''):
"""Constructs Observation object.
Args:
opportunity_definition:
geoevt_dict:
start_time:
stop_time:
n_steps:
time_step:
format:
"""
super().__init__(opportunity_definition=opportunity_definition, geoevt_dict=geoevt_dict, start_time=start_time,
stop_time=stop_time, format=format)
self.time_step = None
self.n_steps = None
# Create Measurement objets from input GeometryEvent "data" dictionary, if present, otherwise use observation
# start and stop times and acquisitions timing parameters: number of steps `n_steps` (a step
# represents an detector "instant-acquisition", also called repetition), and time step `time_step` (also called
# repetition time, CU_Trep).
if geoevt_dict: # input GeometryEvent "data" dictionary present
# Set measurement/interval times from geoevt_dict and create measurement objects
self.set_interval_times(geoevt_dict['interval_times'], format='utc')
for interval_time, measurement_geoevt_dict in zip(self.interval_times, geoevt_dict['sub_events']):
measurement = Measurement(geoevt_dict=measurement_geoevt_dict, start_time=interval_time[0])
self.add_sub_event(measurement)
# set measurement/repetition time step
measurement_times = self.get_measurements_times(format='et')
self.time_step = measurement_times[1] - measurement_times[0]
else:
# Set measurement times
if not time_step:
if not n_steps:
n_steps = DEFAULT_N_STEPS
time_step = self.duration / (n_steps - 1) # TODO: (n_steps - 1) or n_steps ?
else:
if not n_steps:
n_steps = int(self.duration / time_step) # ! changes stop_time !
if n_steps > MAX_N_STEPS:
print(f'[WARNING] High number of computation steps? {n_steps}')
measurement_times = [x * (self.duration-time_step) / (n_steps - 1) + self.start_time for x in range(n_steps)]
self.time_step = measurement_times[1] - measurement_times[0]
interval_times = []
for measurement_time in measurement_times:
interval_times.append([measurement_time, measurement_time])
self.set_interval_times(interval_times)
# create measurement objects
for interval_time in self.interval_times: # number of measurements
measurement = Measurement(opportunity_definition=opportunity_definition, start_time=interval_time[0])
self.add_sub_event(measurement)
self.valid = True
def __repr__(self):
return (
f"<{self.__class__.__name__}> "
f"UTC start time: {self.get_start_time(format='utc')} | "
f"UTC stop time: {self.get_stop_time(format='utc')} | "
f"Duration (sec): {self.duration:.3f} | "
f"Nb of measurements: {self.n_intervals} | " # should be equal to n_sub_events and n_steps
f"Time step (sec): {self.time_step:.3f} | "
f"Nb of geometries: {self.n_geometries}"
)
[docs] def compute(self, searchable_only=False):
"""Compute Geometry objects data associated to each Measurement object, and to this Observation object.
"""
measurements = self.get_measurements()
for measurement in measurements:
measurement.compute(searchable_only=searchable_only)
self.compute_geometries(searchable_only=searchable_only)
# for measurement in self.sub_events:
# measurement.compute()
# self.compute_geometries()
[docs] def get_measurements(self):
"""Returns the list of Measurement objects associated to this Observation object.
"""
return self.sub_events
# if scalar_only:
# measurements = []
# for measurement in self.sub_events:
# if measurement.geoevt_class == 'Scalar':
# measurements.append(measurement)
# else:
# measurements = self.sub_events
[docs] def get_measurements_times(self, format='et', mission_event=None):
"""Returns the list of reference times associated to this Observation's Measurement objects.
Args:
format (str): time format, `et` (default), `utc` or `rel` (requires mission Event object).
mission_event (Event): mission Event object (required to derive relative time).
"""
return self.get_sub_events_reference_times(format=format, mission_event=mission_event)
[docs] def get_measurements_geometries(self, name):
"""Returns the list of Geometry objects associated to this Observation's Measurement objects.
Args:
name (str): name of the input Geometry class to return Geometry objects for.
"""
return self.get_sub_events_geometries(name)
[docs] def get_measurements_geometry_data(self, name):
"""Returns the list of Geometry objects data associated to this Observation's Measurement objects.
Args:
name (str): name of the input Geometry class to return Geometry objects data for.
"""
return self.get_sub_events_geometry_data(name)
[docs] def get_measurement_geometry_names(self, parent_class=None):
"""Returns the list of Geometry class names associated to this Observation's Measurement objects.
Args:
parent_class (str): name of the parent class name.
"""
return self.sub_events[0].get_geometry_names(parent_class=parent_class)
[docs]class Measurement(GeometryEvent):
"""Class that represents an observation measurement.
"""
def __init__(self, opportunity_definition=None, geoevt_dict=None, start_time=None):
"""Constructs Measurement object.
Args:
opportunity_definition (OpportunityDefinition):
geoevt_dict (dict):
start_time:
"""
super().__init__(opportunity_definition=opportunity_definition, geoevt_dict=geoevt_dict, start_time=start_time)
# self.parent_event = None
# self.index = 0
# self.valid = True
def __repr__(self):
return (
f"<{self.__class__.__name__}> "
f"Index: {self.index:>2} | "
f"UTC acquisition time: {self.get_reference_time(format='utc')} | "
f"Nb of geometries: {self.n_geometries}"
)
[docs] def compute(self, searchable_only=False):
self.compute_geometries(searchable_only=searchable_only)
[docs]class GeoEvtFile:
def __init__(self, geoevt_file=None):
self.geoevt_file = ''
if geoevt_file is not None:
self.geoevt_file = geoevt_file
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.__dict__)
[docs] def write(self, geoevt, geoevt_file=None): # coverage (opportunity) or observation types
if geoevt_file is not None:
self.geoevt_file = geoevt_file
geoevt_dict = geoevt.get_dict()
with open(self.geoevt_file, 'w') as f:
json.dump(geoevt_dict, f, indent=2)
[docs] def read(self, geoevt_file=None):
if geoevt_file is not None:
self.geoevt_file = geoevt_file
with open(self.geoevt_file) as f:
geoevt_dict = json.load(f)
return geoevt_dict