Source code for gfinder.dataviewer

"""DataViewer module."""

# TODO: Rename PointingViewer to something less confusing, eg: DiskViewer.

from gfinder.datastore import DataStore

import plotly.graph_objects as go
from ipyleaflet import Map, WMSLayer, GeoJSON, Popup, FullScreenControl, LayersControl, WidgetControl
from ipywidgets import Text, HTML, Label, Layout

import json

import spiceypy as spice
import numpy as np
import random

EPSG4326 = dict(name='EPSG4326',custom=False)

DEFAULT_PLOTLY_TEMPLATE = 'plotly_white'

[docs]class DataViewer: def __init__(self): pass
[docs] def show(self): pass
[docs]class TimelineViewer(DataViewer): def __init__(self): pass
[docs] def show(self): pass
[docs]class TimeSeriesViewer(DataViewer): def __init__(self, opportunity): self.opportunity = opportunity self.figure = go.Figure()
[docs] def show(self, observation_id=1, ref=None, time_format='rel', log_y=False, template=DEFAULT_PLOTLY_TEMPLATE): n_observations = self.opportunity.sequence.n_sub_events if (observation_id < 1) or (observation_id > n_observations): raise AttributeError('`observation_id` must range between 1 and the number of observations for this opportunity: {n_observations}') observation = self.opportunity.sequence.get_observations()[observation_id - 1] geometry_names = observation.get_measurement_geometry_names(parent_class='Scalar') mission_event = self.opportunity.getMissionEvent() #mission_event.get_reference_time(format='utc') utc_times = observation.get_measurements_times(format='utc') rel_times = observation.get_measurements_times(format='rel', mission_event=mission_event) etimes = observation.get_measurements_times(format='et') times = rel_times if time_format == 'utc' or not mission_event: times = utc_times trep = etimes[1]-etimes[0] n_steps = len(times) # print('>> {} - observation {}/{} (duration={}, time_step={:.2f} s, n_step={})'.format(self.opportunity.id, # observation_id, n_observations, self.opportunity.sequence.get_duration(format='str'), trep, n_steps)) self.figure.update_layout( title='{} - observation {}/{} (duration={}, time_step={:.2f} s, n_step={})'.format(self.opportunity.id, observation_id, n_observations, self.opportunity.sequence.get_duration(format='str'), trep, n_steps), height=512 ) for geometry_name in geometry_names: # times = observation.get_measurements_times(format='utc') data = observation.get_measurements_geometry_data(geometry_name) if isinstance(data[0], bool): # data type is boolean data_int = [] for d in data: data_int.append(int(d)) data = data_int self.figure.add_trace(go.Scatter(x=times, y=data, mode='lines', name=geometry_name)) # TEMPORARY: Add first element (x_rot) of SC_Slew_Angles if available. # data = observation.get_measurements_geometry_data('SC_Slew_Angles') # # Idea: observation_get_measurements_geometry_data_element('SC_Slew_Angles:x_rot') # # if data[0][0]: # x_rot_values = [] # for values in data: # x_rot_values.append(values[0]) # x_rot_values = np.array(x_rot_values)*spice.dpr() # self.figure.add_trace(go.Scatter(x=times, y=x_rot_values, mode='lines', name='X_SC_Slew_Angle (deg.)')) # # y_rot_values = [] # for values in data: # y_rot_values.append(values[1]) # y_rot_values = np.array(y_rot_values)*spice.dpr() # self.figure.add_trace(go.Scatter(x=times, y=y_rot_values, mode='lines', name='Y_SC_Slew_Angle (deg.)')) if ref: ref_data = np.zeros(n_steps)+ref self.figure.add_trace(go.Scatter(x=times, y=ref_data, mode='lines', name='Reference Value')) if log_y: self.figure.update_yaxes(type='log') # self.figure.update_layout(legend=time_inputs_dict( # orientation="h", # yanchor="bottom", # y=1.02, # xanchor="left", # x=1 # )) #fig.update_traces(hovertemplate=None) #fig.update_layout(hovermode="x unified") # datas = observation.get_measurements_geometry_data('Motion_Compensation') # scanner_angles = [] # for data in datas: # scanner_angles.append(data[0]*spice.dpr()) # self.figure.add_trace(go.Scatter(x=rel_times,y=scanner_angles, mode='lines', name='Scanner_Angle')) self.figure.update_layout(hovermode='x unified', template=template) self.figure.show()
[docs]class XY_Viewer(DataViewer): def __init__(self, opportunity): self.opportunity = opportunity self.figure = go.Figure()
[docs] def show(self,observation_id=1, x_geometry_name='', y_geometry_name='', geometry_name='', x_index=0, y_index=1, xaxis_range=[], yaxis_range=[], exclude_zeros=True, height=500, template=DEFAULT_PLOTLY_TEMPLATE): # get observation object observation = self.opportunity.sequence.get_observations()[observation_id - 1] if x_geometry_name and y_geometry_name: scalar_geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Scalar') if scalar_geometry_names: # check that input geometry names are valid if x_geometry_name not in scalar_geometry_names: print(f'Invalid input `{x_geometry_name}` Geometry class name. Available geometry: {scalar_geometry_names}') return if y_geometry_name not in scalar_geometry_names: print(f'Invalid input `{y_geometry_name}` Geometry class name. Available geometry: {scalar_geometry_names}') return else: print('No Scalar-type Geometry class data to visualise with XY_Viewer.') return # retrieve geometry objects x_geometries = observation.get_measurements_geometries(x_geometry_name) y_geometries = observation.get_measurements_geometries(y_geometry_name) # print(f'Visualising `{x_geometry_name}`vs `{y_geometry_name} Geometry class.') x_title = f'{x_geometry_name} ({x_geometries[0].units})' y_title = f'{y_geometry_name} ({y_geometries[0].units})' title = f'{x_title} / {y_title}' x_vec = [] y_vec = [] for x_geometry, y_geometry in zip(x_geometries, y_geometries): x_data_value = x_geometry.getData() y_data_value = y_geometry.getData() if (x_data_value is None) or (y_data_value is None): if not exclude_zeros: x_vec.append(0.0) y_vec.append(0.0) else: x_value = x_data_value y_value = y_data_value if exclude_zeros: if (x_value != 0.0) and (y_value != 0.0): x_vec.append(x_value) y_vec.append(y_value) else: x_vec.append(x_value) y_vec.append(y_value) else: vec_geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Vector') if vec_geometry_names: if geometry_name: # check that input geometry name is valid if geometry_name not in vec_geometry_names: print(f'Invalid input `{geometry_name}` Geometry class name. Available geometry: {vec_geometry_names}') return else: # set default Geometry class geometry_name = vec_geometry_names[0] else: print('No Vector-type Geometry class data to visualise with XY_Viewer.') return # retrieve geometry objects geometries = observation.get_measurements_geometries(geometry_name) # print(f'Visualising `{geometry_name}` Geometry class.') x_title = f'{geometries[0].prefix[x_index]} ({geometries[0].units[x_index]})' y_title = f'{geometries[0].prefix[y_index]} ({geometries[0].units[y_index]})' title = f'{geometry_name}:{x_title} / {geometry_name}:{y_title}' s = '' for count, (prefix, unit) in enumerate(zip(geometries[0].prefix, geometries[0].units)): s += f'{count}: {prefix} ({unit}) ' print(s) x_vec = [] y_vec = [] for geometry in geometries: data_value = geometry.getData() if data_value is None: if not exclude_zeros: x_vec.append(0.0) y_vec.append(0.0) else: x_value = data_value[x_index] y_value = data_value[y_index] if exclude_zeros: if (x_value != 0.0) and (y_value != 0.0): x_vec.append(data_value[x_index]) y_vec.append(data_value[y_index]) else: x_vec.append(data_value[x_index]) y_vec.append(data_value[y_index]) # set data point annotation text text = [] utc_times = observation.get_measurements_times(format='utc') for i, utc_time in enumerate(utc_times): text.append('['+str(i)+'] '+utc_time) self.figure.add_trace(go.Scatter(x=x_vec, y=y_vec, mode='markers', name=title, text=text, hovertemplate="%{text}<br>" +"(%{x}, %{y})" )) if xaxis_range: self.figure.update_layout(xaxis_range=xaxis_range) if yaxis_range: self.figure.update_layout(yaxis_range=yaxis_range) self.figure.update_layout(title=title, height=height, template=template) self.figure.update_xaxes(title_text=x_title) self.figure.update_yaxes(title_text=y_title) self.figure.show()
[docs]class MapViewer(DataViewer): def __init__(self, opportunities): self.opportunities = opportunities self.target = opportunities[0].opportunity_definition.target self.roi_geojson_file = '' self.sc_tranquilisation_time = 0.0 if self.target == 'CALLISTO': self.roi_geojson_file = DataStore().roi_basedir+'/callisto/CALLISTO_ROIS.geojson' elif self.target == 'GANYMEDE': self.roi_geojson_file = DataStore().roi_basedir+'/ganymede/GANYMEDE_ROIS.geojson'
[docs] def show(self, observation_id=1, geometries=None, colors=None, opacity=0.4, split=True, surface_only=False, sc_tranq_time=None): planetary_maps = { 'JUPITER': { 'url':'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/jupiter_simp_cyl.map', 'layer': 'CASSINI' }, 'GANYMEDE': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/ganymede_simp_cyl.map', 'layer': 'GALILEO_VOYAGER' }, 'CALLISTO': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/callisto_simp_cyl.map', 'layer': 'GALILEO_VOYAGER' }, 'EUROPA': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/europa_simp_cyl.map', 'layer': 'GALILEO_VOYAGER' }, 'IO': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/io_simp_cyl.map', 'layer': 'SSI_color' }, 'EARTH': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/earth/earth_simp_cyl.map', 'layer': 'MODIS' }, 'MARS': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/mars/mars_simp_cyl.map', 'layer': 'MDIM21_color' }, 'VENUS': { 'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/venus/venus_simp_cyl.map', 'layer': 'MAGELLAN_color' } } if self.target in planetary_maps: basemap_layer = WMSLayer( url=planetary_maps[self.target]['url'], layers=planetary_maps[self.target]['layer'], name='{} basemap'.format(self.target.lower().capitalize()), crs=EPSG4326, base=True, show_loading=False, attribution='USGS/NASA') map = Map(layers=(basemap_layer,), center=(0,0), zoom=1, crs=EPSG4326, layout=Layout(height='512px')) else: target = 'JUPITER' # default basemap basemap_layer = WMSLayer( url=planetary_maps[target]['url'], layers=planetary_maps[target]['layer'], name='{} basemap'.format(target.lower().capitalize()), crs=EPSG4326, base=True, show_loading=False, attribution='USGS/NASA') map = Map(layers=(basemap_layer,), center=(0,0), zoom=1, crs=EPSG4326, layout=Layout(height='512px')) print('WARNING: Unknown input <{}> target.'.format(self.target)) map.add_control(FullScreenControl()) map.fit_bounds([[-90.0, -180.0], [90.0, 180.0]]) # [[south, west], [north, east]]. def update_html(feature, **kwargs): html.value = ''' <p><b>#{}</b> {}</p> '''.format(feature['properties']['id'], feature['properties']['utc_time']) # previously reference_time (see DataStore.writeOpportunity() and Opportunity.getMeasurementsGeoJSON()) html = HTML('''Hover over a geometry''') html.layout.margin = '0px 10px 0px 10px' control = WidgetControl(widget=html, position='bottomleft') map.add_control(control) label = Label() display(label) def handle_interaction(**kwargs): if kwargs.get('type') == 'mousemove': label.value = str(kwargs.get('coordinates')) map.on_interaction(handle_interaction) if geometries: geometry_names = geometries else: observation = self.opportunities[0].sequence.get_observations()[0] # first observation of the first opportunity geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Geolocation') # generate colors # default_colors = ['#2E91E5', '#E15F99', '#1CA71C', '#FB0D0D', '#DA16FF', '#222A2A', '#B68100', '#750D86', # '#EB663B', '#511CFB', '#00A08B', '#FB00D1', '#FC0080', '#B2828D', '#6C7C32', '#778AAE', # '#862A16', '#A777F1', '#620042', '#1616A7', '#DA60CA', '#6C4516', '#0D2A63', '#AF0038'] default_colors = [ 'Crimson', 'Tomato', 'Gold', 'Yellow', 'GreenYellow', 'SpringGreen', 'Aqua', 'DeepSkyBlue', 'RoyalBlue', 'Violet', 'HotPink'] if colors: default_colors = colors opportunity_colors = [] for i in range(len(self.opportunities)): opportunity_colors.append(default_colors[i % len(default_colors)]) # Add graticule layer graticule_file = DataStore().roi_basedir+'/ne_50m_graticules_10.geojson' with open(graticule_file, 'r') as f: graticule = json.load(f) map.add_layer(GeoJSON( data=graticule, name='Graticule 10x10 deg', style={'opacity': 0.5, 'weight': 1, 'color': 'white', 'dashArray': '1,1'} )) # add ROIs if self.roi_geojson_file: with open(self.roi_geojson_file, 'r') as f: data = json.load(f) geo_json = GeoJSON( data = data, name = 'ROIs', style = { 'color': 'red', 'opacity': 1, 'fillOpacity': 0.1, 'weight': 1 } ) def click_handler(**kwargs): if kwargs.get('type') == 'click': label.value = feature.properties['id'] geo_json.on_click(click_handler) map.add_layer(geo_json) if sc_tranq_time: self.sc_tranquilisation_time = sc_tranq_time def footprint_style(feature): style = { 'color': opportunity_color, 'fillColor': opportunity_color, 'weight': 1, 'opacity': opacity, 'fillOpacity': opacity } if feature.properties['time'] < self.sc_tranquilisation_time: style = { 'color': 'black', 'fillColor': 'black', 'weight': 1, 'opacity': opacity, 'fillOpacity': opacity } return style # Add measurements GeoJSON layer for each opportunity for opportunity, opportunity_color in zip(self.opportunities, opportunity_colors): opportunity_id = opportunity.id for geometry_name in geometry_names: for observation_id in range(opportunity.sequence.n_sub_events): data = opportunity.getMeasurementsGeoJSON( geometry_name, observation_id=observation_id, split=split, surface_only=surface_only) geo_json = GeoJSON( data = data, name = opportunity.id + ' ' + geometry_name + ' obs{:03}'.format(observation_id), # style = { # 'color': opportunity_color, # 'fillColor': opportunity_color, # 'weight': 1, # 'fillOpacity': opacity # }, style_callback=footprint_style, point_style = { 'color': 'white', 'fill_color': 'white', 'fillOpacity': 1, 'radius': 2, 'stroke': False }, hover_style = { 'color': 'blue' } ) map.add_layer(geo_json) geo_json.on_hover(update_html) control = LayersControl(position='topright') map.add_control(control) display(map)
[docs]class PointingViewer(DataViewer): def __init__(self, opportunity): self.opportunity = opportunity self.figure = go.Figure()
[docs] def show(self, frame='NOA', observation_id=1, geometries=None, all_detector_fov=False, plot_all=None, height=500, template=DEFAULT_PLOTLY_TEMPLATE): observation = self.opportunity.sequence.get_observations()[observation_id - 1] if geometries: geometry_names = geometries else: geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Direction') #print(geometry_names) utc_times = observation.get_measurements_times(format='utc') times = observation.get_measurements_times(format='et') imid = int(len(utc_times) / 2) def get_style(geometry_name, property): styles = { 'Limb_Directions': {'mode':'lines', 'line_dash':'solid', 'marker_symbol':0}, 'Detector_FOV': {'mode':'lines', 'line_dash':'solid', 'marker_symbol':0}, 'North_Pole_Direction':{'mode':'markers', 'line_dash':'solid', 'marker_symbol':0}, 'Terminator_Directions': {'mode':'lines', 'line_dash':'dot', 'marker_symbol':0}, 'Limb_Point_Direction':{'mode': 'markers', 'line_dash': 'solid', 'marker_symbol':1}, 'Detector_Boresight': {'mode': 'markers', 'line_dash': 'solid', 'marker_symbol': 2}, 'Ring_Ansae_Directions': {'mode': 'lines', 'line_dash': 'dot', 'marker_symbol': 0}, } style = {'mode':'markers', 'line_dash':'solid', 'marker_symbol':3} # default style value = '' if geometry_name in styles.keys(): style = styles[geometry_name] if property in style.keys(): value = style[property] return value target = self.opportunity.opportunity_definition.target observer = self.opportunity.opportunity_definition.observer for geometry_name in geometry_names: #data_values = observation.get_measurements_geometry_data(geometry_name) geometries = observation.get_measurements_geometries(geometry_name) data_values = [] for et, geometry in zip(times, geometries): data_values.append(geometry.get_pointing_angles(et, observer, target, frame=frame)) if all_detector_fov: if geometry_name == 'Detector_FOV': # plot all FOVs for idx, fov_dir_angles in enumerate(data_values): x = np.transpose(fov_dir_angles)[0] x = np.append(x, x[0]) y = np.transpose(fov_dir_angles)[1] y = np.append(y, y[0]) self.figure.add_trace( go.Scatter(x=x, y=y, mode=get_style(geometry_name, 'mode'), marker_color='silver', line_dash=get_style(geometry_name, 'line_dash'), fill="toself", name=geometry_name + ' ' + utc_times[idx])) if plot_all: if geometry_name in plot_all: for idx, dir_angles in enumerate(data_values): x = np.transpose(dir_angles)[0] # x = np.append(x, x[0]) y = np.transpose(dir_angles)[1] # y = np.append(y, y[0]) self.figure.add_trace(go.Scatter(x=x, y=y, mode=get_style(geometry_name, 'mode'), marker_symbol=get_style(geometry_name, 'marker_symbol'), marker_color='silver', line_dash=get_style(geometry_name, 'line_dash'), name=geometry_name + ' ' + utc_times[idx] )) # Set and plot parameters x_start = np.transpose(data_values[0])[0] y_start = np.transpose(data_values[0])[1] x_middle = np.transpose(data_values[imid])[0] y_middle = np.transpose(data_values[imid])[1] x_stop = np.transpose(data_values[-1])[0] y_stop = np.transpose(data_values[-1])[1] if x_start.size > 1: # treat as a polygon x_start = np.append(x_start, x_start[0]) y_start = np.append(y_start, y_start[0]) x_middle = np.append(x_middle, x_middle[0]) y_middle = np.append(y_middle, y_middle[0]) x_stop = np.append(x_stop, x_stop[0]) y_stop = np.append(y_stop, y_stop[0]) # else: # x_start = [x_start] # y_start = [y_start] # x_middle = [x_middle] # y_middle = [y_middle] # x_stop = [x_stop] # y_stop = [y_stop] self.figure.add_trace(go.Scatter( x=x_start, y=y_start, mode=get_style(geometry_name, 'mode'), marker_symbol=get_style(geometry_name, 'marker_symbol'), marker_color='Red', line_dash=get_style(geometry_name, 'line_dash'), name=geometry_name + ' ' + utc_times[0] )) self.figure.add_trace(go.Scatter( x=x_middle, y=y_middle, mode=get_style(geometry_name, 'mode'), marker_symbol=get_style(geometry_name, 'marker_symbol'), marker_color='Green', line_dash=get_style(geometry_name, 'line_dash'), name=geometry_name + ' ' + utc_times[imid] )) self.figure.add_trace(go.Scatter( x=x_stop, y=y_stop, mode=get_style(geometry_name, 'mode'), marker_symbol=get_style(geometry_name, 'marker_symbol'), marker_color='Blue', line_dash=get_style(geometry_name, 'line_dash'), name=geometry_name + ' ' + utc_times[-1] )) self.figure.update_layout(height=height, template=template) self.figure.update_yaxes(scaleanchor="x", scaleratio=1) self.figure.update_xaxes(title_text = r'$\theta_{x} \text{(deg.)}$') self.figure.update_yaxes(title_text = r'$\theta_{y} \text{(deg.)}$') self.figure.show()