Source code for gfinder.exporter

# from gfinder.opportunity import Opportunity
# from gfinder.timeline import Timeline

from gfinder.ptr_pointing import PTRPointing

import spiceypy as spice
from pathlib import Path
import shutil
import csv
import geojson
from datetime import datetime

EXPORTABLE_CLASS_NAMES = ['Opportunity', 'Timeline']
"""List of exportable class names."""

[docs]def Exporter(format, path='', overwrite=False): """Function serving as Exporter object factory. """ # check that format is valid and return corresponding Exporter object. if format in EXPORTERS.keys(): ExporterClass = EXPORTERS[format] exporter = ExporterClass(path=path, overwrite=overwrite) return exporter else: raise ValueError(f'Invalid exporter format: {format}. Allowed format are: {list(EXPORTERS.keys())}')
[docs]class AbstractExporter(): def __init__(self, path='', overwrite=False): # set exporter format self.format = self.__class__.__name__.split('_')[0] if self.format == '': raise Exception('Unable to derive exporter format.') # set output directory self.output_dir = None if path: self.set_output_dir(path, overwrite=overwrite) else: self.set_output_dir('', overwrite=overwrite)
[docs] def set_output_dir(self, path, overwrite=False): # set output directory path output_dir = str(Path(path) / self.format.lower()) # check existence of output directory if Path(output_dir).exists(): if overwrite: # delete existing output directory shutil.rmtree(output_dir) else: raise FileExistsError(f'Output directory already exists: {Path(output_dir).absolute()}. Use overwrite=True to replace directory content.') # create output directory Path(output_dir).mkdir(parents=True, exist_ok=True) self.output_dir = Path(output_dir)
[docs] def is_exportable(self, object): """Returns whether or not input object is exportable. Exportable objects must have `get_observations` method. """ message = '' object_class_name = object.__class__.__name__ if object.__class__.__name__ in EXPORTABLE_CLASS_NAMES: object_methods = [method_name for method_name in dir(object) if callable(getattr(object, method_name))] if 'get_observations' in object_methods: return True, message else: message = f'Input object class not exportable: <{object_class_name}>. Missing `get_observations` method.' return False, message else: message = f'Input object class not exportable: <{object_class_name}>. Exportable object classes are: {", ".join([f"<{value}>" for value in EXPORTABLE_CLASS_NAMES])}.' return False, message
[docs] def list_exported(self): all = self.output_dir.rglob('**/*') files = [x for x in all if x.is_file()] if len(files) == 0: print(f'No exported files found in {self.output_dir} directory.') elif len(files) > 0: s = '' if len(files) > 1: s = 's' print(f'Exported file{s} in {self.output_dir} directory:') for file in files: print(f'- {file.name}')
[docs] def export(self, object, path='', overwrite=False, measurements=False, agm_validation=False): exportable, message = self.is_exportable(object) if not exportable: raise ValueError(message) if path: self.set_output_dir(path, overwrite=overwrite) else: if self.set_output_dir is None: # output directory not already set (from Exporter object creation) self.set_output_dir('', overwrite=overwrite) # current path as base directory
[docs]class EventCSV_Exporter(AbstractExporter): """EventCSV_Exporter can export Opportunity or Timeline objects data to ESA Event CSV file format. Such a file should contain a tabular list of observational opportunities, which can be time segments or observations, with the following fields: - event_name - start_time - stop_time - subgroup - source group - duration - n_lines - spatial binning - distance_to_jupiter - First CU_frame start (UT and wrt CA if CA if any) - Last CU_frame stop (UT and wrt CA if CA if any) - if mirror used: Mirror Start Pos (°), Mirror Start Speed (°/s), Mirror End Speed (°/s) """ def __init__(self, path='', overwrite=False): super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False, agm_validation=False): super().export(object, path=path, overwrite=overwrite, measurements=measurements, agm_validation=agm_validation) observations = object.get_observations() # TODO: !! only works for Opportunity object event_basename = object.opportunity_definition.observation_type # write observations CSV file csv_path = self.output_dir / 'observations.csv' with open(csv_path, 'w') as f: observations_header = observations[0].get_csv_header() f.write(f'# {observations_header}\n') for i, observation in enumerate(observations): event_name = f'{event_basename}_{i+1:03}' # WARNING! SHT: "only item names already included in the JUICE SOC Core System Segment definitions database can be imported in csv format. line = observation.get_csv_line(event_name=event_name, subgroup='', group='WG2') f.write(line) f.write('\n') # write measurements CSV files if not measurements: # only export measurements data for the first observation observations = [observations[0]] for i, observation in enumerate(observations): csv_path = self.output_dir / f'observation_{i + 1:03}.csv' with open(csv_path, 'w') as f: measurements_header = observation.sub_events[0].get_csv_header() f.write(f'# {measurements_header}\n') for j, measurement in enumerate(observation.sub_events): event_name = f'{event_basename}_{i+1:03}_{j:04}' line = measurement.get_csv_line(event_name=event_name, subgroup='', group='WG2') f.write(line) f.write('\n') # report on exported files self.list_exported()
[docs]class GeoJSON_Exporter(AbstractExporter): def __init__(self, path='', overwrite=False): # raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.') super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False, agm_validation=False): super().export(object, path=path, overwrite=overwrite, measurements=measurements, agm_validation=agm_validation) observations = object.get_observations() # write osbervations GeoJSON file #geojson_path = self.output_dir / 'observations.geojson' # TODO: FOV footprint does not exist yet at observation-level, see [https://git.ias.u-psud.fr/majis_sgs_mos/requirements/-/issues/67] if not measurements: # only export measurements data for the first observation observations = [observations[0]] for i, observation in enumerate(observations): geojson_path = self.output_dir / f'observation_{i+1:03}.geojson' # write observation(measurements) GeoJSON files with open(geojson_path, 'w') as f: # TODO: currently only working with Opportunity objects feature_collection = object.getMeasurementsGeoJSON( 'Detector_FOV_Footprint', observation_id=i, split=True, surface_only=True) geojson.dump(feature_collection, f) # report on exported files self.list_exported()
# Convert geojson to KML for visualisation in GoogleEarth # kml_path = '{}/observation_{:04}.kml'.format(opportunity_path,i+1) # ogr2ogr -f KML observation_0001.kml observation_0001.geojson
[docs]class QuaternionCSV_Exporter(AbstractExporter): def __init__(self, path='', overwrite=False): super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False): super().export(object, path=path, overwrite=overwrite, measurements=measurements) observations = object.get_observations() for i_obs, observation in enumerate(observations): qcsv_path = self.output_dir / f'quaternion_obs_{i_obs + 1:04}.csv' with open(qcsv_path, 'w') as f: qcsv_writer = csv.writer(f) simsc_j2000_rotmats = observation.get_measurements_geometry_data('Simulated_SC_Frame') times = observation.get_measurements_times(format='et') for idx, simsc_j2000_rotmat in enumerate(simsc_j2000_rotmats): quaternions = spice.m2q(simsc_j2000_rotmat) utc_time = spice.timout(times[idx], "YYYY-MM-DDTHR:MN:SC.##") #row = [ utc_time, quaternions[0], quaternions[1], quaternions[2], quaternions[3] ] # Qalt = (QV1, QV2, QV3, QS) # Qspice = (QS, -QV1, -QV2, -QV3) row = [utc_time, -quaternions[1], -quaternions[2], -quaternions[3], quaternions[0]] #print(row) qcsv_writer.writerow(row) # report on exported files self.list_exported()
[docs]class CK_Exporter(AbstractExporter): def __init__(self, path='', overwrite=False): # raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.') super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, path='', overwrite=False, measurements=False): super().export(object, path=path, overwrite=overwrite, measurements=measurements) # TODO: to be implemented using https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckw03 # see [https://git.ias.u-psud.fr/majis_sgs_mos/requirements/-/issues/109] # # $ msopck <setup_file> <input_file> <output_ck_file> # # \begindata # # LSK_FILE_NAME = 'kernels/juice/lsk/naif0011.tls' # SCLK_FILE_NAME = 'kernels/juice/sclk/juice_fict_20160326.tsc' # # CK_TYPE = 3 # CK_SEGMENT_ID = 'MAJIS-SIMULATED SC SLEW' # INSTRUMENT_ID = -28000 # REFERENCE_FRAME_NAME = 'J2000' # ANGULAR_RATE_PRESENT = 'NO' # # INPUT_TIME_TYPE = 'ET' # INPUT_DATA_TYPE = 'SPICE QUATERNIONS' # # PRODUCER_ID = 'MAJIS Science Operations Team' # # \begintext observations = object.get_observations() for i_obs, observation in enumerate(observations): qtab_path = self.output_dir / f'quaternion_obs_{i_obs + 1:04}.tab' with open(qtab_path, 'w') as f: qtab_writer = csv.writer(f, delimiter=' ', lineterminator='\n') simsc_j2000_rotmats = observation.get_measurements_geometry_data('Simulated_SC_Frame') times = observation.get_measurements_times(format='et') for idx, simsc_j2000_rotmat in enumerate(simsc_j2000_rotmats): quaternions = spice.m2q(spice.xpose(simsc_j2000_rotmat)) row = [times[idx], quaternions[0], quaternions[1], quaternions[2], quaternions[3]] qtab_writer.writerow(row) # report on exported files self.list_exported()
[docs]class PTR_Exporter(AbstractExporter): def __init__(self, path='', overwrite=False): # raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.') super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, measurements=None, path='', overwrite=False, agm_validation=False): super().export(object, measurements=measurements, path=path, overwrite=overwrite) # observations = object.get_observations() opportunities = object.get_opportunities() for i, opportunity in enumerate(opportunities): ptr_path = self.output_dir / f'observation_{i+1:03}.ptx' ptr_pointing = PTRPointing(opportunity_definition=opportunity.opportunity_definition) start_time = opportunity.get_observation().get_start_time(format='utc') stop_time = opportunity.get_observation().get_stop_time(format='utc') # PATCH removing milliseconds so that it passes AGM validation # see: https://juigitlab.esac.esa.int/python/ptr/-/issues/8 splits = start_time.split('.') if len(splits) > 1: start_time = splits[0] splits = stop_time.split('.') if len(splits) > 1: stop_time = splits[0] # set exported PTR metadata/comment metadata = [ f'MAJIS MOS opportunity pointing definition export to PTR', f'execution time: {datetime.now().strftime("%d-%m-%Y %H:%M:%S")}', f'opportunity_id: {opportunity.id}', f'{ptr_pointing}' ] prm = ptr_pointing.get_prm(start_time, stop_time, metadata=metadata) # display PRM XML print(prm) print() # valid PTR using AGM if requested if agm_validation: metakernel_id = opportunity.mission_scenario.agm_metakernel_id endpoint = 'JUICE_API' print(f'AGM simulation: metakernel={metakernel_id}, endpoint={endpoint}') results = prm.simulate(metakernel_id, endpoint) # report on results if results.success: print('AGM simulation: VALID') # TODO: implementation cross-validation of MOS and AGM simulated pointings cross_validation = True if cross_validation: # save CK file ck_file = Path(ptr_path.absolute().parent, ptr_path.stem + '.bc') results.ck.save(ck_file, overwrite=True) print(f'AGM simulation: CK saved.') print() print('Cross validation of MOS and AGM pointing simulations:') print() discrepancy = opportunity.cross_validate(ck_file) print() else: print('-> INVALID') print('\n' + repr(results.log) + '\n') else: print('PTR file not validated.') # save PRM XML prm.save(ptr_path) # results.ck.save('example.ck') # report on exported files self.list_exported()
[docs]class ITL_Exporter(AbstractExporter): def __init__(self, path='', overwrite=False): raise NotImplementedError(f'{self.__class__.__name__} not yet implemented.') super().__init__(path=path, overwrite=overwrite)
[docs] def export(self, object, measurements=None, path='', overwrite=False): super().export(object, measurements=measurements, path=path, overwrite=overwrite)
EXPORTERS = { 'EventCSV': EventCSV_Exporter, 'GeoJSON': GeoJSON_Exporter, 'QuaternionCSV': QuaternionCSV_Exporter, 'CK': CK_Exporter, 'PTR': PTR_Exporter, 'ITL': ITL_Exporter }