# from gfinder.opportunity import Opportunity
# from gfinder.timeline import Timeline
from gfinder.ptr_pointing import PTRPointing
import spiceypy as spice
from pathlib import Path
import shutil
import csv
import geojson
from datetime import datetime
EXPORTABLE_CLASS_NAMES = ['Opportunity', 'Timeline']
"""List of exportable class names."""
[docs]def Exporter(format, path='', overwrite=False):
"""Function serving as Exporter object factory.
"""
# check that format is valid and return corresponding Exporter object.
if format in EXPORTERS.keys():
ExporterClass = EXPORTERS[format]
exporter = ExporterClass(path=path, overwrite=overwrite)
return exporter
else:
raise ValueError(f'Invalid exporter format: {format}. Allowed format are: {list(EXPORTERS.keys())}')
[docs]class AbstractExporter():
def __init__(self, path='', overwrite=False):
# set exporter format
self.format = self.__class__.__name__.split('_')[0]
if self.format == '':
raise Exception('Unable to derive exporter format.')
# set output directory
self.output_dir = None
if path:
self.set_output_dir(path, overwrite=overwrite)
else:
self.set_output_dir('', overwrite=overwrite)
[docs] def set_output_dir(self, path, overwrite=False):
# set output directory path
output_dir = str(Path(path) / self.format.lower())
# check existence of output directory
if Path(output_dir).exists():
if overwrite:
# delete existing output directory
shutil.rmtree(output_dir)
else:
raise FileExistsError(f'Output directory already exists: {Path(output_dir).absolute()}. Use overwrite=True to replace directory content.')
# create output directory
Path(output_dir).mkdir(parents=True, exist_ok=True)
self.output_dir = Path(output_dir)
[docs] def is_exportable(self, object):
"""Returns whether or not input object is exportable.
Exportable objects must have `get_observations` method.
"""
message = ''
object_class_name = object.__class__.__name__
if object.__class__.__name__ in EXPORTABLE_CLASS_NAMES:
object_methods = [method_name for method_name in dir(object)
if callable(getattr(object, method_name))]
if 'get_observations' in object_methods:
return True, message
else:
message = f'Input object class not exportable: <{object_class_name}>. Missing `get_observations` method.'
return False, message
else:
message = f'Input object class not exportable: <{object_class_name}>. Exportable object classes are: {", ".join([f"<{value}>" for value in EXPORTABLE_CLASS_NAMES])}.'
return False, message
[docs] def list_exported(self):
all = self.output_dir.rglob('**/*')
files = [x for x in all if x.is_file()]
if len(files) == 0:
print(f'No exported files found in {self.output_dir} directory.')
elif len(files) > 0:
s = ''
if len(files) > 1:
s = 's'
print(f'Exported file{s} in {self.output_dir} directory:')
for file in files:
print(f'- {file.name}')
[docs] def export(self, object, path='', overwrite=False, measurements=False, agm_validation=False):
exportable, message = self.is_exportable(object)
if not exportable:
raise ValueError(message)
if path:
self.set_output_dir(path, overwrite=overwrite)
else:
if self.set_output_dir is None: # output directory not already set (from Exporter object creation)
self.set_output_dir('', overwrite=overwrite) # current path as base directory
[docs]class EventCSV_Exporter(AbstractExporter):
"""EventCSV_Exporter can export Opportunity or Timeline objects data to ESA Event CSV file format.
Such a file should contain a tabular list of observational opportunities, which can be time segments or
observations, with the following fields:
- event_name
- start_time
- stop_time
- subgroup
- source group
- duration
- n_lines
- spatial binning
- distance_to_jupiter
- First CU_frame start (UT and wrt CA if CA if any)
- Last CU_frame stop (UT and wrt CA if CA if any)
- if mirror used: Mirror Start Pos (°), Mirror Start Speed (°/s), Mirror End Speed (°/s)
"""
def __init__(self, path='', overwrite=False):
super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False, agm_validation=False):
super().export(object, path=path, overwrite=overwrite, measurements=measurements, agm_validation=agm_validation)
observations = object.get_observations()
# TODO: !! only works for Opportunity object
event_basename = object.opportunity_definition.observation_type
# write observations CSV file
csv_path = self.output_dir / 'observations.csv'
with open(csv_path, 'w') as f:
observations_header = observations[0].get_csv_header()
f.write(f'# {observations_header}\n')
for i, observation in enumerate(observations):
event_name = f'{event_basename}_{i+1:03}'
# WARNING! SHT: "only item names already included in the JUICE SOC Core System Segment definitions database can be imported in csv format.
line = observation.get_csv_line(event_name=event_name, subgroup='', group='WG2')
f.write(line)
f.write('\n')
# write measurements CSV files
if not measurements: # only export measurements data for the first observation
observations = [observations[0]]
for i, observation in enumerate(observations):
csv_path = self.output_dir / f'observation_{i + 1:03}.csv'
with open(csv_path, 'w') as f:
measurements_header = observation.sub_events[0].get_csv_header()
f.write(f'# {measurements_header}\n')
for j, measurement in enumerate(observation.sub_events):
event_name = f'{event_basename}_{i+1:03}_{j:04}'
line = measurement.get_csv_line(event_name=event_name, subgroup='', group='WG2')
f.write(line)
f.write('\n')
# report on exported files
self.list_exported()
[docs]class GeoJSON_Exporter(AbstractExporter):
def __init__(self, path='', overwrite=False):
# raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.')
super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False, agm_validation=False):
super().export(object, path=path, overwrite=overwrite, measurements=measurements, agm_validation=agm_validation)
observations = object.get_observations()
# write osbervations GeoJSON file
#geojson_path = self.output_dir / 'observations.geojson'
# TODO: FOV footprint does not exist yet at observation-level, see [https://git.ias.u-psud.fr/majis_sgs_mos/requirements/-/issues/67]
if not measurements: # only export measurements data for the first observation
observations = [observations[0]]
for i, observation in enumerate(observations):
geojson_path = self.output_dir / f'observation_{i+1:03}.geojson'
# write observation(measurements) GeoJSON files
with open(geojson_path, 'w') as f:
# TODO: currently only working with Opportunity objects
feature_collection = object.getMeasurementsGeoJSON(
'Detector_FOV_Footprint', observation_id=i, split=True, surface_only=True)
geojson.dump(feature_collection, f)
# report on exported files
self.list_exported()
# Convert geojson to KML for visualisation in GoogleEarth
# kml_path = '{}/observation_{:04}.kml'.format(opportunity_path,i+1)
# ogr2ogr -f KML observation_0001.kml observation_0001.geojson
[docs]class QuaternionCSV_Exporter(AbstractExporter):
def __init__(self, path='', overwrite=False):
super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False):
super().export(object, path=path, overwrite=overwrite, measurements=measurements)
observations = object.get_observations()
for i_obs, observation in enumerate(observations):
qcsv_path = self.output_dir / f'quaternion_obs_{i_obs + 1:04}.csv'
with open(qcsv_path, 'w') as f:
qcsv_writer = csv.writer(f)
simsc_j2000_rotmats = observation.get_measurements_geometry_data('Simulated_SC_Frame')
times = observation.get_measurements_times(format='et')
for idx, simsc_j2000_rotmat in enumerate(simsc_j2000_rotmats):
quaternions = spice.m2q(simsc_j2000_rotmat)
utc_time = spice.timout(times[idx], "YYYY-MM-DDTHR:MN:SC.##")
#row = [ utc_time, quaternions[0], quaternions[1], quaternions[2], quaternions[3] ]
# Qalt = (QV1, QV2, QV3, QS)
# Qspice = (QS, -QV1, -QV2, -QV3)
row = [utc_time, -quaternions[1], -quaternions[2], -quaternions[3], quaternions[0]]
#print(row)
qcsv_writer.writerow(row)
# report on exported files
self.list_exported()
[docs]class CK_Exporter(AbstractExporter):
def __init__(self, path='', overwrite=False):
# raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.')
super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False):
super().export(object, path=path, overwrite=overwrite, measurements=measurements)
# TODO: to be implemented using https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckw03
# see [https://git.ias.u-psud.fr/majis_sgs_mos/requirements/-/issues/109]
#
# $ msopck <setup_file> <input_file> <output_ck_file>
#
# \begindata
#
# LSK_FILE_NAME = 'kernels/juice/lsk/naif0011.tls'
# SCLK_FILE_NAME = 'kernels/juice/sclk/juice_fict_20160326.tsc'
#
# CK_TYPE = 3
# CK_SEGMENT_ID = 'MAJIS-SIMULATED SC SLEW'
# INSTRUMENT_ID = -28000
# REFERENCE_FRAME_NAME = 'J2000'
# ANGULAR_RATE_PRESENT = 'NO'
#
# INPUT_TIME_TYPE = 'ET'
# INPUT_DATA_TYPE = 'SPICE QUATERNIONS'
#
# PRODUCER_ID = 'MAJIS Science Operations Team'
#
# \begintext
observations = object.get_observations()
for i_obs, observation in enumerate(observations):
qtab_path = self.output_dir / f'quaternion_obs_{i_obs + 1:04}.tab'
with open(qtab_path, 'w') as f:
qtab_writer = csv.writer(f, delimiter=' ', lineterminator='\n')
simsc_j2000_rotmats = observation.get_measurements_geometry_data('Simulated_SC_Frame')
times = observation.get_measurements_times(format='et')
for idx, simsc_j2000_rotmat in enumerate(simsc_j2000_rotmats):
quaternions = spice.m2q(spice.xpose(simsc_j2000_rotmat))
row = [times[idx], quaternions[0], quaternions[1], quaternions[2], quaternions[3]]
qtab_writer.writerow(row)
# report on exported files
self.list_exported()
[docs]class PTR_Exporter(AbstractExporter):
def __init__(self, path='', overwrite=False):
# raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.')
super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, measurements=None, path='', overwrite=False, agm_validation=False):
super().export(object, measurements=measurements, path=path, overwrite=overwrite)
# observations = object.get_observations()
opportunities = object.get_opportunities()
for i, opportunity in enumerate(opportunities):
ptr_path = self.output_dir / f'observation_{i+1:03}.ptx'
ptr_pointing = PTRPointing(opportunity_definition=opportunity.opportunity_definition)
start_time = opportunity.get_observation().get_start_time(format='utc')
stop_time = opportunity.get_observation().get_stop_time(format='utc')
# PATCH removing milliseconds so that it passes AGM validation
# see: https://juigitlab.esac.esa.int/python/ptr/-/issues/8
splits = start_time.split('.')
if len(splits) > 1:
start_time = splits[0]
splits = stop_time.split('.')
if len(splits) > 1:
stop_time = splits[0]
# set exported PTR metadata/comment
metadata = [
f'MAJIS MOS opportunity pointing definition export to PTR',
f'execution time: {datetime.now().strftime("%d-%m-%Y %H:%M:%S")}',
f'opportunity_id: {opportunity.id}',
f'{ptr_pointing}'
]
prm = ptr_pointing.get_prm(start_time, stop_time, metadata=metadata)
# display PRM XML
print(prm)
print()
# valid PTR using AGM if requested
if agm_validation:
metakernel_id = opportunity.mission_scenario.agm_metakernel_id
endpoint = 'JUICE_API'
print(f'AGM simulation: metakernel={metakernel_id}, endpoint={endpoint}')
results = prm.simulate(metakernel_id, endpoint)
# report on results
if results.success:
print('AGM simulation: VALID')
# TODO: implementation cross-validation of MOS and AGM simulated pointings
cross_validation = True
if cross_validation:
# save CK file
ck_file = Path(ptr_path.absolute().parent, ptr_path.stem + '.bc')
results.ck.save(ck_file, overwrite=True)
print(f'AGM simulation: CK saved.')
print()
print('Cross validation of MOS and AGM pointing simulations:')
print()
discrepancy = opportunity.cross_validate(ck_file)
print()
else:
print('-> INVALID')
print('\n' + repr(results.log) + '\n')
else:
print('PTR file not validated.')
# save PRM XML
prm.save(ptr_path)
# results.ck.save('example.ck')
# report on exported files
self.list_exported()
[docs]class ITL_Exporter(AbstractExporter):
def __init__(self, path='', overwrite=False):
raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.')
super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, measurements=None, path='', overwrite=False):
super().export(object, measurements=measurements, path=path, overwrite=overwrite)
EXPORTERS = {
'EventCSV': EventCSV_Exporter,
'GeoJSON': GeoJSON_Exporter,
'QuaternionCSV': QuaternionCSV_Exporter,
'CK': CK_Exporter,
'PTR': PTR_Exporter,
'ITL': ITL_Exporter
}