"""DataViewer module."""
# TODO: Rename PointingViewer to something less confusing, eg: DiskViewer.
from gfinder.datastore import DataStore
import plotly.graph_objects as go
from ipyleaflet import Map, WMSLayer, GeoJSON, Popup, FullScreenControl, LayersControl, WidgetControl
from ipywidgets import Text, HTML, Label, Layout
import json
import spiceypy as spice
import numpy as np
import random
EPSG4326 = dict(name='EPSG4326',custom=False)
DEFAULT_PLOTLY_TEMPLATE = 'plotly_white'
[docs]class DataViewer:
def __init__(self):
pass
[docs]class TimelineViewer(DataViewer):
def __init__(self):
pass
[docs]class TimeSeriesViewer(DataViewer):
def __init__(self, opportunity):
self.opportunity = opportunity
self.figure = go.Figure()
[docs] def show(self, observation_id=1, ref=None, time_format='rel', log_y=False, template=DEFAULT_PLOTLY_TEMPLATE):
n_observations = self.opportunity.sequence.n_sub_events
if (observation_id < 1) or (observation_id > n_observations):
raise AttributeError('`observation_id` must range between 1 and the number of observations for this opportunity: {n_observations}')
observation = self.opportunity.sequence.get_observations()[observation_id - 1]
geometry_names = observation.get_measurement_geometry_names(parent_class='Scalar')
mission_event = self.opportunity.getMissionEvent()
#mission_event.get_reference_time(format='utc')
utc_times = observation.get_measurements_times(format='utc')
rel_times = observation.get_measurements_times(format='rel', mission_event=mission_event)
etimes = observation.get_measurements_times(format='et')
times = rel_times
if time_format == 'utc' or not mission_event:
times = utc_times
trep = etimes[1]-etimes[0]
n_steps = len(times)
# print('>> {} - observation {}/{} (duration={}, time_step={:.2f} s, n_step={})'.format(self.opportunity.id,
# observation_id, n_observations, self.opportunity.sequence.get_duration(format='str'), trep, n_steps))
self.figure.update_layout(
title='{} - observation {}/{} (duration={}, time_step={:.2f} s, n_step={})'.format(self.opportunity.id,
observation_id, n_observations, self.opportunity.sequence.get_duration(format='str'), trep, n_steps),
height=512
)
for geometry_name in geometry_names:
# times = observation.get_measurements_times(format='utc')
data = observation.get_measurements_geometry_data(geometry_name)
if isinstance(data[0], bool): # data type is boolean
data_int = []
for d in data:
data_int.append(int(d))
data = data_int
self.figure.add_trace(go.Scatter(x=times, y=data, mode='lines', name=geometry_name))
# TEMPORARY: Add first element (x_rot) of SC_Slew_Angles if available.
# data = observation.get_measurements_geometry_data('SC_Slew_Angles')
#
# Idea: observation_get_measurements_geometry_data_element('SC_Slew_Angles:x_rot')
#
# if data[0][0]:
# x_rot_values = []
# for values in data:
# x_rot_values.append(values[0])
# x_rot_values = np.array(x_rot_values)*spice.dpr()
# self.figure.add_trace(go.Scatter(x=times, y=x_rot_values, mode='lines', name='X_SC_Slew_Angle (deg.)'))
#
# y_rot_values = []
# for values in data:
# y_rot_values.append(values[1])
# y_rot_values = np.array(y_rot_values)*spice.dpr()
# self.figure.add_trace(go.Scatter(x=times, y=y_rot_values, mode='lines', name='Y_SC_Slew_Angle (deg.)'))
if ref:
ref_data = np.zeros(n_steps)+ref
self.figure.add_trace(go.Scatter(x=times, y=ref_data, mode='lines', name='Reference Value'))
if log_y:
self.figure.update_yaxes(type='log')
# self.figure.update_layout(legend=time_inputs_dict(
# orientation="h",
# yanchor="bottom",
# y=1.02,
# xanchor="left",
# x=1
# ))
#fig.update_traces(hovertemplate=None)
#fig.update_layout(hovermode="x unified")
# datas = observation.get_measurements_geometry_data('Motion_Compensation')
# scanner_angles = []
# for data in datas:
# scanner_angles.append(data[0]*spice.dpr())
# self.figure.add_trace(go.Scatter(x=rel_times,y=scanner_angles, mode='lines', name='Scanner_Angle'))
self.figure.update_layout(hovermode='x unified', template=template)
self.figure.show()
[docs]class XY_Viewer(DataViewer):
def __init__(self, opportunity):
self.opportunity = opportunity
self.figure = go.Figure()
[docs] def show(self,observation_id=1, x_geometry_name='', y_geometry_name='', geometry_name='', x_index=0, y_index=1,
xaxis_range=[], yaxis_range=[], exclude_zeros=True, height=500, template=DEFAULT_PLOTLY_TEMPLATE):
# get observation object
observation = self.opportunity.sequence.get_observations()[observation_id - 1]
if x_geometry_name and y_geometry_name:
scalar_geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Scalar')
if scalar_geometry_names:
# check that input geometry names are valid
if x_geometry_name not in scalar_geometry_names:
print(f'Invalid input `{x_geometry_name}` Geometry class name. Available geometry: {scalar_geometry_names}')
return
if y_geometry_name not in scalar_geometry_names:
print(f'Invalid input `{y_geometry_name}` Geometry class name. Available geometry: {scalar_geometry_names}')
return
else:
print('No Scalar-type Geometry class data to visualise with XY_Viewer.')
return
# retrieve geometry objects
x_geometries = observation.get_measurements_geometries(x_geometry_name)
y_geometries = observation.get_measurements_geometries(y_geometry_name)
# print(f'Visualising `{x_geometry_name}`vs `{y_geometry_name} Geometry class.')
x_title = f'{x_geometry_name} ({x_geometries[0].units})'
y_title = f'{y_geometry_name} ({y_geometries[0].units})'
title = f'{x_title} / {y_title}'
x_vec = []
y_vec = []
for x_geometry, y_geometry in zip(x_geometries, y_geometries):
x_data_value = x_geometry.getData()
y_data_value = y_geometry.getData()
if (x_data_value is None) or (y_data_value is None):
if not exclude_zeros:
x_vec.append(0.0)
y_vec.append(0.0)
else:
x_value = x_data_value
y_value = y_data_value
if exclude_zeros:
if (x_value != 0.0) and (y_value != 0.0):
x_vec.append(x_value)
y_vec.append(y_value)
else:
x_vec.append(x_value)
y_vec.append(y_value)
else:
vec_geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Vector')
if vec_geometry_names:
if geometry_name:
# check that input geometry name is valid
if geometry_name not in vec_geometry_names:
print(f'Invalid input `{geometry_name}` Geometry class name. Available geometry: {vec_geometry_names}')
return
else:
# set default Geometry class
geometry_name = vec_geometry_names[0]
else:
print('No Vector-type Geometry class data to visualise with XY_Viewer.')
return
# retrieve geometry objects
geometries = observation.get_measurements_geometries(geometry_name)
# print(f'Visualising `{geometry_name}` Geometry class.')
x_title = f'{geometries[0].prefix[x_index]} ({geometries[0].units[x_index]})'
y_title = f'{geometries[0].prefix[y_index]} ({geometries[0].units[y_index]})'
title = f'{geometry_name}:{x_title} / {geometry_name}:{y_title}'
s = ''
for count, (prefix, unit) in enumerate(zip(geometries[0].prefix, geometries[0].units)):
s += f'{count}: {prefix} ({unit}) '
print(s)
x_vec = []
y_vec = []
for geometry in geometries:
data_value = geometry.getData()
if data_value is None:
if not exclude_zeros:
x_vec.append(0.0)
y_vec.append(0.0)
else:
x_value = data_value[x_index]
y_value = data_value[y_index]
if exclude_zeros:
if (x_value != 0.0) and (y_value != 0.0):
x_vec.append(data_value[x_index])
y_vec.append(data_value[y_index])
else:
x_vec.append(data_value[x_index])
y_vec.append(data_value[y_index])
# set data point annotation text
text = []
utc_times = observation.get_measurements_times(format='utc')
for i, utc_time in enumerate(utc_times):
text.append('['+str(i)+'] '+utc_time)
self.figure.add_trace(go.Scatter(x=x_vec, y=y_vec, mode='markers', name=title, text=text, hovertemplate="%{text}<br>" +"(%{x}, %{y})" ))
if xaxis_range:
self.figure.update_layout(xaxis_range=xaxis_range)
if yaxis_range:
self.figure.update_layout(yaxis_range=yaxis_range)
self.figure.update_layout(title=title, height=height, template=template)
self.figure.update_xaxes(title_text=x_title)
self.figure.update_yaxes(title_text=y_title)
self.figure.show()
[docs]class MapViewer(DataViewer):
def __init__(self, opportunities):
self.opportunities = opportunities
self.target = opportunities[0].opportunity_definition.target
self.roi_geojson_file = ''
self.sc_tranquilisation_time = 0.0
if self.target == 'CALLISTO':
self.roi_geojson_file = DataStore().roi_basedir+'/callisto/CALLISTO_ROIS.geojson'
elif self.target == 'GANYMEDE':
self.roi_geojson_file = DataStore().roi_basedir+'/ganymede/GANYMEDE_ROIS.geojson'
[docs] def show(self, observation_id=1, geometries=None, colors=None, opacity=0.4, split=True, surface_only=False, sc_tranq_time=None):
planetary_maps = {
'JUPITER': {
'url':'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/jupiter_simp_cyl.map',
'layer': 'CASSINI'
},
'GANYMEDE': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/ganymede_simp_cyl.map',
'layer': 'GALILEO_VOYAGER'
},
'CALLISTO': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/callisto_simp_cyl.map',
'layer': 'GALILEO_VOYAGER'
},
'EUROPA': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/europa_simp_cyl.map',
'layer': 'GALILEO_VOYAGER'
},
'IO': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/jupiter/io_simp_cyl.map',
'layer': 'SSI_color'
},
'EARTH': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/earth/earth_simp_cyl.map',
'layer': 'MODIS'
},
'MARS': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/mars/mars_simp_cyl.map',
'layer': 'MDIM21_color'
},
'VENUS': {
'url': 'https://planetarymaps.usgs.gov/cgi-bin/mapserv?map=/maps/venus/venus_simp_cyl.map',
'layer': 'MAGELLAN_color'
}
}
if self.target in planetary_maps:
basemap_layer = WMSLayer(
url=planetary_maps[self.target]['url'],
layers=planetary_maps[self.target]['layer'], name='{} basemap'.format(self.target.lower().capitalize()), crs=EPSG4326, base=True, show_loading=False,
attribution='USGS/NASA')
map = Map(layers=(basemap_layer,), center=(0,0), zoom=1, crs=EPSG4326, layout=Layout(height='512px'))
else:
target = 'JUPITER' # default basemap
basemap_layer = WMSLayer(
url=planetary_maps[target]['url'],
layers=planetary_maps[target]['layer'], name='{} basemap'.format(target.lower().capitalize()), crs=EPSG4326, base=True, show_loading=False,
attribution='USGS/NASA')
map = Map(layers=(basemap_layer,), center=(0,0), zoom=1, crs=EPSG4326, layout=Layout(height='512px'))
print('WARNING: Unknown input <{}> target.'.format(self.target))
map.add_control(FullScreenControl())
map.fit_bounds([[-90.0, -180.0], [90.0, 180.0]]) # [[south, west], [north, east]].
def update_html(feature, **kwargs):
html.value = '''
<p><b>#{}</b> {}</p>
'''.format(feature['properties']['id'],
feature['properties']['utc_time']) # previously reference_time (see DataStore.writeOpportunity() and Opportunity.getMeasurementsGeoJSON())
html = HTML('''Hover over a geometry''')
html.layout.margin = '0px 10px 0px 10px'
control = WidgetControl(widget=html, position='bottomleft')
map.add_control(control)
label = Label()
display(label)
def handle_interaction(**kwargs):
if kwargs.get('type') == 'mousemove':
label.value = str(kwargs.get('coordinates'))
map.on_interaction(handle_interaction)
if geometries:
geometry_names = geometries
else:
observation = self.opportunities[0].sequence.get_observations()[0] # first observation of the first opportunity
geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Geolocation')
# generate colors
# default_colors = ['#2E91E5', '#E15F99', '#1CA71C', '#FB0D0D', '#DA16FF', '#222A2A', '#B68100', '#750D86',
# '#EB663B', '#511CFB', '#00A08B', '#FB00D1', '#FC0080', '#B2828D', '#6C7C32', '#778AAE',
# '#862A16', '#A777F1', '#620042', '#1616A7', '#DA60CA', '#6C4516', '#0D2A63', '#AF0038']
default_colors = [ 'Crimson', 'Tomato', 'Gold', 'Yellow', 'GreenYellow',
'SpringGreen', 'Aqua', 'DeepSkyBlue', 'RoyalBlue', 'Violet', 'HotPink']
if colors:
default_colors = colors
opportunity_colors = []
for i in range(len(self.opportunities)):
opportunity_colors.append(default_colors[i % len(default_colors)])
# Add graticule layer
graticule_file = DataStore().roi_basedir+'/ne_50m_graticules_10.geojson'
with open(graticule_file, 'r') as f:
graticule = json.load(f)
map.add_layer(GeoJSON(
data=graticule,
name='Graticule 10x10 deg',
style={'opacity': 0.5, 'weight': 1, 'color': 'white', 'dashArray': '1,1'}
))
# add ROIs
if self.roi_geojson_file:
with open(self.roi_geojson_file, 'r') as f:
data = json.load(f)
geo_json = GeoJSON(
data = data,
name = 'ROIs',
style = {
'color': 'red', 'opacity': 1, 'fillOpacity': 0.1, 'weight': 1
}
)
def click_handler(**kwargs):
if kwargs.get('type') == 'click':
label.value = feature.properties['id']
geo_json.on_click(click_handler)
map.add_layer(geo_json)
if sc_tranq_time:
self.sc_tranquilisation_time = sc_tranq_time
def footprint_style(feature):
style = {
'color': opportunity_color,
'fillColor': opportunity_color,
'weight': 1,
'opacity': opacity,
'fillOpacity': opacity
}
if feature.properties['time'] < self.sc_tranquilisation_time:
style = {
'color': 'black',
'fillColor': 'black',
'weight': 1,
'opacity': opacity,
'fillOpacity': opacity
}
return style
# Add measurements GeoJSON layer for each opportunity
for opportunity, opportunity_color in zip(self.opportunities, opportunity_colors):
opportunity_id = opportunity.id
for geometry_name in geometry_names:
for observation_id in range(opportunity.sequence.n_sub_events):
data = opportunity.getMeasurementsGeoJSON(
geometry_name, observation_id=observation_id, split=split, surface_only=surface_only)
geo_json = GeoJSON(
data = data,
name = opportunity.id + ' ' + geometry_name + ' obs{:03}'.format(observation_id),
# style = {
# 'color': opportunity_color,
# 'fillColor': opportunity_color,
# 'weight': 1,
# 'fillOpacity': opacity
# },
style_callback=footprint_style,
point_style = {
'color': 'white',
'fill_color': 'white',
'fillOpacity': 1,
'radius': 2,
'stroke': False
},
hover_style = {
'color': 'blue'
}
)
map.add_layer(geo_json)
geo_json.on_hover(update_html)
control = LayersControl(position='topright')
map.add_control(control)
display(map)
[docs]class PointingViewer(DataViewer):
def __init__(self, opportunity):
self.opportunity = opportunity
self.figure = go.Figure()
[docs] def show(self, frame='NOA', observation_id=1, geometries=None, all_detector_fov=False, plot_all=None, height=500, template=DEFAULT_PLOTLY_TEMPLATE):
observation = self.opportunity.sequence.get_observations()[observation_id - 1]
if geometries:
geometry_names = geometries
else:
geometry_names = observation.sub_events[0].get_geometry_names(parent_class='Direction')
#print(geometry_names)
utc_times = observation.get_measurements_times(format='utc')
times = observation.get_measurements_times(format='et')
imid = int(len(utc_times) / 2)
def get_style(geometry_name, property):
styles = {
'Limb_Directions': {'mode':'lines', 'line_dash':'solid', 'marker_symbol':0},
'Detector_FOV': {'mode':'lines', 'line_dash':'solid', 'marker_symbol':0},
'North_Pole_Direction':{'mode':'markers', 'line_dash':'solid', 'marker_symbol':0},
'Terminator_Directions': {'mode':'lines', 'line_dash':'dot', 'marker_symbol':0},
'Limb_Point_Direction':{'mode': 'markers', 'line_dash': 'solid', 'marker_symbol':1},
'Detector_Boresight': {'mode': 'markers', 'line_dash': 'solid', 'marker_symbol': 2},
'Ring_Ansae_Directions': {'mode': 'lines', 'line_dash': 'dot', 'marker_symbol': 0},
}
style = {'mode':'markers', 'line_dash':'solid', 'marker_symbol':3} # default style
value = ''
if geometry_name in styles.keys():
style = styles[geometry_name]
if property in style.keys():
value = style[property]
return value
target = self.opportunity.opportunity_definition.target
observer = self.opportunity.opportunity_definition.observer
for geometry_name in geometry_names:
#data_values = observation.get_measurements_geometry_data(geometry_name)
geometries = observation.get_measurements_geometries(geometry_name)
data_values = []
for et, geometry in zip(times, geometries):
data_values.append(geometry.get_pointing_angles(et, observer, target, frame=frame))
if all_detector_fov:
if geometry_name == 'Detector_FOV': # plot all FOVs
for idx, fov_dir_angles in enumerate(data_values):
x = np.transpose(fov_dir_angles)[0]
x = np.append(x, x[0])
y = np.transpose(fov_dir_angles)[1]
y = np.append(y, y[0])
self.figure.add_trace(
go.Scatter(x=x, y=y, mode=get_style(geometry_name, 'mode'), marker_color='silver',
line_dash=get_style(geometry_name, 'line_dash'), fill="toself",
name=geometry_name + ' ' + utc_times[idx]))
if plot_all:
if geometry_name in plot_all:
for idx, dir_angles in enumerate(data_values):
x = np.transpose(dir_angles)[0]
# x = np.append(x, x[0])
y = np.transpose(dir_angles)[1]
# y = np.append(y, y[0])
self.figure.add_trace(go.Scatter(x=x, y=y,
mode=get_style(geometry_name, 'mode'),
marker_symbol=get_style(geometry_name, 'marker_symbol'),
marker_color='silver',
line_dash=get_style(geometry_name, 'line_dash'),
name=geometry_name + ' ' + utc_times[idx]
))
# Set and plot parameters
x_start = np.transpose(data_values[0])[0]
y_start = np.transpose(data_values[0])[1]
x_middle = np.transpose(data_values[imid])[0]
y_middle = np.transpose(data_values[imid])[1]
x_stop = np.transpose(data_values[-1])[0]
y_stop = np.transpose(data_values[-1])[1]
if x_start.size > 1: # treat as a polygon
x_start = np.append(x_start, x_start[0])
y_start = np.append(y_start, y_start[0])
x_middle = np.append(x_middle, x_middle[0])
y_middle = np.append(y_middle, y_middle[0])
x_stop = np.append(x_stop, x_stop[0])
y_stop = np.append(y_stop, y_stop[0])
# else:
# x_start = [x_start]
# y_start = [y_start]
# x_middle = [x_middle]
# y_middle = [y_middle]
# x_stop = [x_stop]
# y_stop = [y_stop]
self.figure.add_trace(go.Scatter(
x=x_start,
y=y_start,
mode=get_style(geometry_name, 'mode'),
marker_symbol=get_style(geometry_name, 'marker_symbol'),
marker_color='Red',
line_dash=get_style(geometry_name, 'line_dash'),
name=geometry_name + ' ' + utc_times[0]
))
self.figure.add_trace(go.Scatter(
x=x_middle,
y=y_middle,
mode=get_style(geometry_name, 'mode'),
marker_symbol=get_style(geometry_name, 'marker_symbol'),
marker_color='Green',
line_dash=get_style(geometry_name, 'line_dash'),
name=geometry_name + ' ' + utc_times[imid]
))
self.figure.add_trace(go.Scatter(
x=x_stop,
y=y_stop,
mode=get_style(geometry_name, 'mode'),
marker_symbol=get_style(geometry_name, 'marker_symbol'),
marker_color='Blue',
line_dash=get_style(geometry_name, 'line_dash'),
name=geometry_name + ' ' + utc_times[-1]
))
self.figure.update_layout(height=height, template=template)
self.figure.update_yaxes(scaleanchor="x", scaleratio=1)
self.figure.update_xaxes(title_text = r'$\theta_{x} \text{(deg.)}$')
self.figure.update_yaxes(title_text = r'$\theta_{y} \text{(deg.)}$')
self.figure.show()