import json from collections.abc import Callable import folium import numpy as np import plotly.graph_objects as go import scipy.spatial from folium import plugins from . import common, types # ==================== This file contains cluster plotting code (mode: 'cluster') # ==================== Cluster mode argument definition def arguments(parser): """ Defines arguments specific to cluster plotting. """ parser.add_argument( "--input_cluster", type=str, nargs="?", default="", help="path to the cluster file to plot", ) parser.add_argument( "--jpath_cluster", type=str, nargs="?", default="state.clusters[*].points", help="JSON path to the cluster elements (XPATH like," + " see https://goessner.net/articles/JsonPath/," + ' example: "state.clusters[*].points")', ) parser.add_argument( "--input_pos", type=str, nargs="?", default="", help="path to file containing the positions, if not supplied by cluster file", ) parser.add_argument( "--jpath_pos", type=str, nargs="?", default="", help="JSON path to the positions, if cluster elements are stored as indices", ) parser.add_argument( "--no_points", action="store_true", help="indicates whether to omit plotting the actual points in addition to the convex hull", ) parser.add_argument( "--weight_points", type=float, nargs="?", default=1, help="point size (<1 decreases, >1 increases)", ) # ==================== Cluster plotting specific functionality def convex_hull(points): """ Calculates the convex hull for the given points and returns it as a sorted list of points. """ if len(points) <= 2: return [(p.lon, p.lat) for p in points] np_points = np.array([(p.lon, p.lat) for p in points]) hull = scipy.spatial.ConvexHull(np_points) simplices = hull.vertices.tolist() return [(points[s].lon, points[s].lat) for s in simplices] def parse( input_cluster: str, jpath_cluster: str, input_pos: str, jpath_pos: str, jpath_x: str, jpath_y: str, ) -> tuple[list[list[types.Position]], list[list[types.Position]]]: """ Parses the cluster data from the file(s). """ # Load json data content_cluster, content_points = common.load_data(input_cluster, input_pos) # Extract clusters points = common.extract_position_groups( content_cluster, jpath_cluster, content_points, jpath_pos, jpath_x, jpath_y, ) return points def plot( input_cluster: str, jpath_cluster: str, input_pos: str, jpath_pos: str, jpath_x: str, jpath_y: str, swap: bool, coords: str, output_image: str, output_plot: str, output_map: str, stats_file: str, colors: str, sort_color: bool, no_points: bool, weight_points: float, custom_map_tile: list[str], plotly_theme: str, ): """ Plots clusters based on the given arguments. Interprets args, reads .json, collects some stats, plots a .png and plots an interactive .html map. """ # Determine base filename base_name = "plot" # Default for STDIN if input_cluster: base_name = input_cluster # Parse data points = parse( input_cluster, jpath_cluster, input_pos, jpath_pos, jpath_x, jpath_y, ) # Quit on no points if len(points) <= 0: print("no points found in given file(s) using given filter(s)") return # Conduct some checks points, world_coords, dataerror = common.preprocess_coordinates(points, swap, coords) if dataerror: print(dataerror) return # Determine bbox bbox = common.bounding_box(points) # Wrap in clusters clusters = [types.Cluster(p) for p in points] # Wrap it if len(clusters) <= 0: print(f"no clusters could be extracted at the given path: {jpath_cluster}") return measure = common.haversine if world_coords else common.euclidean # Process clusters for cluster in clusters: # Collect some statistics of the cluster cluster.size = len(cluster.points) cluster.diameter = 0 if len(cluster.points) > 0: centroid_x = sum([p.lon for p in cluster.points]) / len(cluster.points) centroid_y = sum([p.lat for p in cluster.points]) / len(cluster.points) cluster.centroid = (centroid_x, centroid_y) distances_from_centroid = [measure(p, cluster.centroid) for p in cluster.points] cluster.sum_of_distances_from_centroid = sum(distances_from_centroid) cluster.max_distance_from_centroid = max(distances_from_centroid, default=0) cluster.wcss = sum([measure(p, cluster.centroid) ** 2 for p in cluster.points]) for i in range(len(cluster.points)): for j in range(len(cluster.points)): if i == j: continue distance = measure( cluster.points[i], cluster.points[j], ) if distance > cluster.diameter: cluster.diameter = distance # Determine convex hulls for cluster in clusters: cluster.hull = convex_hull(cluster.points) # Dump some stats statistics(clusters, measure, stats_file) # Prepares colors for the groups common.prepare_colors(clusters, colors, sort_color) # Make simple plot of clusters aspect_ratio = (bbox.height) / (bbox.width) if bbox.width > 0 else 1 # Init plot fig = go.Figure( layout=go.Layout( xaxis_title="lon" if world_coords else "x", yaxis_title="lat" if world_coords else "y", template=plotly_theme, margin={"l": 20, "r": 20, "b": 20, "t": 20, "pad": 4}, font={"size": 18}, showlegend=False, ) ) # Plot clusters for i, cluster in enumerate(clusters): if len(cluster.points) <= 0: continue # Calculate hull of cluster hull_points = np.array(cluster.hull) # Repeat the first point at the end to close the polygon hull_points = np.append(hull_points, hull_points[0, :].reshape(1, 2), axis=0) # Plot hull fig.add_trace( go.Scatter( x=hull_points[:, 0], y=hull_points[:, 1], mode="lines", line={"color": cluster.color.hex, "width": 2}, name=f"Cluster {i+1}", fill="toself", ) ) # Plot points if not no_points: for i, cluster in enumerate(clusters): if len(cluster.points) <= 0: continue # Plot points fig.add_trace( go.Scatter( x=[p.lon for p in cluster.points], y=[p.lat for p in cluster.points], mode="markers", marker={ "size": weight_points * 5, "color": cluster.color.hex, }, name=f"Cluster {i+1}", ) ) # Save interactive plot plot_file = output_plot if not plot_file: plot_file = base_name + ".plot.html" print(f"Plotting interactive plot to {plot_file}") fig.write_html(plot_file) # Save plot image image_file = output_image if not image_file: image_file = base_name + ".plot.png" print(f"Plotting image to {image_file}") fig.write_image( image_file, width=min(common.IMAGE_SIZE, common.IMAGE_SIZE / aspect_ratio), height=min(common.IMAGE_SIZE, common.IMAGE_SIZE * aspect_ratio), ) # Skip plotting on map, if no geo-coordinates if not world_coords: print("No world coordinates, skipping map plotting") quit() # Make map plot of routes map_file = output_map if not map_file: map_file = base_name + ".map.html" print(f"Plotting map to {map_file}") m, base_tree = common.create_map( (bbox.max_x + bbox.min_x) / 2.0, (bbox.max_y + bbox.min_y) / 2.0, custom_map_tile, ) plot_groups = {} group_names = {} # Plot the clusters themselves for i, cluster in enumerate(clusters): if len(cluster.points) <= 0: continue layer_name = f"Cluster {i+1}" plot_groups[i] = folium.FeatureGroup(name=layer_name) group_names[plot_groups[i]] = layer_name text = ( "
" + f"Cluster: {i} / {len(clusters)}" + f"Cluster points: {cluster.size}" + f"Cluster diameter: {cluster.diameter:.2f} km " + f"({common.km_to_miles(cluster.diameter):.2f} miles)" + "
" ) plot_map_cluster(plot_groups[i], cluster, text) # Plot the individual points if not no_points: for i, cluster in enumerate(clusters): for point in cluster.points: d = point.desc.replace("\n", "Location (lon/lat): {point[0]}, {point[1]}
{d}
"
)
plot_map_point(
plot_groups[i],
point,
text,
weight_points,
cluster.color.hex,
)
# Add all grouped parts to the map
for k in plot_groups:
plot_groups[k].add_to(m)
# Add button to expand the map to fullscreen
plugins.Fullscreen(
position="topright",
title="Expand me",
title_cancel="Exit me",
).add_to(m)
# Create overlay tree for advanced control of route/unassigned layers
overlay_tree = {
"label": "Overlays",
"select_all_checkbox": "Un/select all",
"children": [
{
"label": "Clusters",
"select_all_checkbox": True,
"collapsed": True,
"children": [{"label": group_names[v], "layer": v} for v in plot_groups.values()],
}
],
}
# Add control for all layers and write file
plugins.TreeLayerControl(base_tree=base_tree, overlay_tree=overlay_tree).add_to(m)
# Fit map to bounds
m.fit_bounds([[bbox.min_y, bbox.min_x], [bbox.max_y, bbox.max_x]])
# Save map
m.save(map_file)
def plot_map_point(map, point, text, weight, color):
"""
Plots a point on the given map.
"""
popup_text = folium.Html(text, script=True)
popup = folium.Popup(popup_text, max_width=450, sticky=True)
marker = folium.Circle(
(point[1], point[0]), # folium operates on lat/lon
color=color,
popup=popup,
radius=15 * weight,
fill=True,
fillOpacity=1.0,
)
marker.options["fillOpacity"] = 1.0
marker.add_to(map)
def plot_map_cluster(
map: object,
cluster: object,
text: str,
):
"""
Plots a cluster on the given map.
"""
popup_text = folium.Html(text, script=True)
popup = folium.Popup(popup_text, max_width=450, sticky=True)
mod_hull = [(y, x) for (x, y) in cluster.hull] # folium operates on lat/lon
polygon = folium.Polygon(
mod_hull,
color=cluster.color.hex,
fill=True,
popup=popup,
)
polygon.add_to(map)
def statistics(
clusters: list[types.Cluster],
measure: Callable[[types.Position, types.Position], float],
stats_file: str,
):
"""
Outlines some route statistics. Statistics are written to file, if provided.
"""
# Collect statistics
sizes, diameters = [r.size for r in clusters], [r.diameter for r in clusters]
sum_of_max_distances = sum([c.max_distance_from_centroid for c in clusters])
max_distance = max([c.max_distance_from_centroid for c in clusters])
sum_of_distances = sum([c.sum_of_distances_from_centroid for c in clusters])
wcss = sum([c.wcss for c in clusters])
bad_assignments = 0
for c in clusters:
for p in c.points:
distance_to_centroid = measure(c.centroid, p)
distances_to_other_centroids = np.array(
[[measure(c2.centroid, p) for c2 in clusters if hasattr(c2, "centroid")]]
)
if len(distances_to_other_centroids[distance_to_centroid > distances_to_other_centroids]):
bad_assignments += 1
stats = [
types.Stat("npoints", "Total points", sum([len(c.points) for c in clusters])),
types.Stat("nclusters", "Cluster count", len(clusters)),
types.Stat("clust_size_max", "Cluster size (max)", max(sizes)),
types.Stat("clust_size_min", "Cluster size (min)", min(sizes)),
types.Stat("clust_size_avg", "Cluster size (avg)", sum(sizes) / float(len(clusters))),
types.Stat(
"cluster_size_var",
"Cluster size (variance)",
np.var(np.array([len(c.points) for c in clusters])),
),
types.Stat("clust_diam_max", "Cluster diameter (max)", max(diameters)),
types.Stat("clust_diam_min", "Cluster diameter (min)", min(diameters)),
types.Stat(
"clust_diam_avg",
"Cluster diameter (avg)",
sum(diameters) / float(len(clusters)),
),
types.Stat(
"sum_max_distances",
"Sum of max distances from centroid",
sum_of_max_distances,
),
types.Stat("distance_from_centroid_max", "Max distance from centroid", max_distance),
types.Stat(
"distance_from_centroid_sum",
"Sum of distances from centroid",
sum_of_distances,
),
types.Stat("wcss", "Sum of squares from centroid", wcss),
types.Stat("bad_assignments", "Bad assignments", bad_assignments),
]
# Log statistics
print("Cluster stats")
for stat in stats:
print(f"{stat.desc}: {stat.val:.2f}")
# Write statistics to file
if stats_file:
stats_table = {}
for stat in stats:
stats_table[stat.name] = stat.val
with open(stats_file, "w+") as f:
json.dump(stats_table, f)