From aeea0a26c99160ef9625c1741fe67ffec6954438 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Sat, 30 Nov 2024 20:44:14 -0600 Subject: [PATCH] Move second pass into calc times --- data/src/calculate_times.py | 55 ++++++------------------------------- data/src/utils/times.py | 15 ++++++++-- 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index 83a136d..1965a2d 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -13,7 +13,6 @@ from utils.times import ( TravelTimeCalculator, TravelTimeConfig, - TravelTimeInputs, snap_df_to_osm, ) from utils.utils import format_time, get_md5_hash @@ -38,7 +37,7 @@ def main() -> None: script_start_time = time.time() # Create a travel times configuration and set of origin/destination inputs - config = TravelTimeConfig(args, params=params, logger=logger) + config = TravelTimeConfig(args, params=params, logger=logger, verbose=True) inputs = config.load_default_inputs() chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else "" @@ -60,8 +59,9 @@ def main() -> None: len(inputs.origins) * inputs.n_destinations, ) - # Initialize the default Valhalla actor bindings + # Initialize Valhalla, where _sp is a second-pass (more expensive) fallback actor = valhalla.Actor((Path.cwd() / "valhalla.json").as_posix()) + actor_sp = valhalla.Actor((Path.cwd() / "valhalla_sp.json").as_posix()) # Use the Vahalla Locate API to append coordinates that are snapped to OSM if config.params["times"]["use_snapped"]: @@ -74,62 +74,25 @@ def main() -> None: ) # Calculate times for each chunk and append to a list - tt_calc = TravelTimeCalculator(actor, config, inputs) + tt_calc = TravelTimeCalculator(actor, actor_sp, config, inputs) results_df = tt_calc.many_to_many() logger.info( - "Finished calculating first pass times for %s pairs in %s", + "Finished calculating times for %s pairs in %s", len(results_df), format_time(time.time() - script_start_time), ) - # Extract missing pairs to a separate DataFrame + # Extract missing pairs to a separate DataFrame and sort all outputs + # for efficient compression missing_pairs_df = results_df[results_df["duration_sec"].isnull()] - n_missing_pairs = len(missing_pairs_df) - - # If there are missing pairs, rerun the routing for only those pairs - # using a more aggressive (but time consuming) second pass approach - if n_missing_pairs > 0: - logger.info( - "Found %s missing pairs, rerouting with a more aggressive method", - n_missing_pairs, - ) - actor_sp = valhalla.Actor((Path.cwd() / "valhalla_sp.json").as_posix()) - - # Create a new input class, keeping only pairs that were unroutable - inputs_sp = TravelTimeInputs( - origins=inputs.origins[ - inputs.origins["id"].isin( - missing_pairs_df.index.get_level_values("origin_id") - ) - ].reset_index(drop=True), - destinations=inputs.destinations[ - inputs.destinations["id"].isin( - missing_pairs_df.index.get_level_values("destination_id") - ) - ].reset_index(drop=True), - chunk=None, - max_split_size_origins=inputs.max_split_size_origins, - max_split_size_destinations=inputs.max_split_size_destinations, - ) - - # Route using the more aggressive settings and update the results - tt_calc_sp = TravelTimeCalculator(actor_sp, config, inputs_sp) - results_df.update(tt_calc_sp.many_to_many()) - - # Extract the missing pairs again since they may have changed - missing_pairs_df = results_df[results_df["duration_sec"].isnull()] - logger.info( - "Found %s additional pairs on second pass", - n_missing_pairs - len(missing_pairs_df), - ) - - # Drop missing pairs and sort for more efficient compression missing_pairs_df = ( missing_pairs_df.drop(columns=["duration_sec", "distance_km"]) .sort_index() .reset_index() ) + + logger.info("Found %s missing pairs", len(missing_pairs_df)) results_df = ( results_df.dropna(subset=["duration_sec"]).sort_index().reset_index() ) diff --git a/data/src/utils/times.py b/data/src/utils/times.py index a2d5dbd..cbe0470 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -352,15 +352,18 @@ class TravelTimeCalculator: def __init__( self, actor: valhalla.Actor, + actor_fallback: valhalla.Actor, config: TravelTimeConfig, inputs: TravelTimeInputs, ) -> None: self.actor = actor + self.actor_fallback = actor_fallback self.config = config self.inputs = inputs def _calculate_times( self, + actor: valhalla.Actor, o_start_idx: int, d_start_idx: int, o_end_idx: int, @@ -408,7 +411,7 @@ def col_dict(x, snapped=self.config.params["times"]["use_snapped"]): # Make the actual JSON request to the matrix API with suppress_stdout(): - response = self.actor.matrix(request_json) + response = actor.matrix(request_json) response_data = json.loads(response) # Parse the response data and convert it to a DataFrame. Recover the @@ -461,19 +464,24 @@ def _binary_search( d_end_idx, ) + # If indices are out-of-bounds return an empty list if o_start_idx >= o_end_idx or d_start_idx >= d_end_idx: return [] - # Stop recursion if the chunks are too small + # Stop recursion if the chunks are too small and use the fallback + # (more expensive) Valhalla configuration if (o_end_idx - o_start_idx <= 1) and (d_end_idx - d_start_idx <= 1): try: df = self._calculate_times( + actor=self.actor_fallback, o_start_idx=o_start_idx, d_start_idx=d_start_idx, o_end_idx=o_end_idx, d_end_idx=d_end_idx, ) - except Exception: + except Exception as e: + if print_log or self.config.verbose: + self.config.logger.error(f"{e}. Returning empty DataFrame") df = pd.merge( self.inputs.origins["id"] .iloc[o_start_idx:o_end_idx] @@ -490,6 +498,7 @@ def _binary_search( try: times = self._calculate_times( + actor=self.actor, o_start_idx=o_start_idx, d_start_idx=d_start_idx, o_end_idx=o_end_idx,