Source code for gfinder.commands.export

"""GFINDER CLI `export` command."""

import click
from .docstring import set_docstring

from gfinder.datastore import DataStore
from gfinder.exporter import Exporter, EXPORTERS

from pathlib import Path

@click.command(name='export')
@click.argument('opportunity-id')
@click.option('--format',  type=click.STRING, help=f'Output format: {list(EXPORTERS.keys())}.', default='EventCSV')
@click.option('--overwrite/--no-overwrite', help='Overwrite existing exported files.', default='False')
@click.option('--measurements/--no-measurements', help='[EventCSV] Export measurements data for all observations. By default, only the first observation measurements are exported.', default='False')
@click.option('--agm-validation/--no-agm-validation', help='[PTR] Validate exported PTR file(s) using AGM.', default='False')
def export_cmd(opportunity_id, format, overwrite, measurements, agm_validation):
    """Export opportunity data to external data format (EventCSV, QuaternionCSV, GeoJSON, CK, PTR, ITL).

    Exported files are located in subdirectories of the opportunity directory. Each subdirectory corresponds to the
    exported format (eg: EventCSV files on eventcsv/ subdirectory).

    Export to Event CSV:

    Jupiter ring low-phase observations opportunity windows, and a specific simulated observation geometry data in CSV format.

    \b
        $ gfinder export crema50_jup_majis_mainring_lowphase_20310119_20341219 --format=EventCSV
        $ gfinder export crema50_jup_majis_mainring_lowphase_20331127_20331127 --format=EventCSV

    Export to PTR:

    \b
        $ gfinder export crema50_jup_majis_mainring_lowphase_20331127_20331127 --format=PTR
        $ gfinder export crema50_jup_majis_mainring_lowphase_20331127_20331127 --format=PTR --agm-validation
    """
    export(opportunity_id, format=format, overwrite=overwrite, measurements=measurements, agm_validation=agm_validation)
    # export(opportunity_id, format=format, overwrite=overwrite, **kargs)

[docs]@set_docstring(export_cmd, {'opportunity_id': 'Opportunity or Timeline ID'}) def export(opportunity_id, format=None, overwrite=False, measurements=False, agm_validation=False): # Init data store datastore = DataStore() # Retrieve opportunity_dict from data store if exists opportunity_dict = datastore.getOpportunityDict(opportunity_id) if not opportunity_dict: print() print(f'Unknown Opportunity ID: {opportunity_id}') return # Initiate exporter using opportunity data directory path from data store, allowing to check existing of # exporter output directory before loading opportunity data. basedir = datastore.getOpportunityPath(opportunity_id) try: exporter = Exporter(format, path=basedir, overwrite=overwrite) except Exception as e: print(e) return # Load opportunity data print(f'Loading opportunity data...') opportunity = datastore.load_opportunity(opportunity_id) # Export opportunity data print(f'Exporting opportunity data to {format} file(s)...') try: exporter.export(opportunity, measurements=measurements, agm_validation=agm_validation) print(f'Successfully exported.') except Exception as e: print(e) print('Failed to export.')