Skip to content

Commit

Permalink
Move second pass into calc times
Browse files Browse the repository at this point in the history
  • Loading branch information
dfsnow committed Dec 1, 2024
1 parent 4d59040 commit aeea0a2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 49 deletions.
55 changes: 9 additions & 46 deletions data/src/calculate_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from utils.times import (
TravelTimeCalculator,
TravelTimeConfig,
TravelTimeInputs,
snap_df_to_osm,
)
from utils.utils import format_time, get_md5_hash
Expand All @@ -38,7 +37,7 @@ def main() -> None:
script_start_time = time.time()

# Create a travel times configuration and set of origin/destination inputs
config = TravelTimeConfig(args, params=params, logger=logger)
config = TravelTimeConfig(args, params=params, logger=logger, verbose=True)
inputs = config.load_default_inputs()

chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else ""
Expand All @@ -60,8 +59,9 @@ def main() -> None:
len(inputs.origins) * inputs.n_destinations,
)

# Initialize the default Valhalla actor bindings
# Initialize Valhalla, where _sp is a second-pass (more expensive) fallback
actor = valhalla.Actor((Path.cwd() / "valhalla.json").as_posix())
actor_sp = valhalla.Actor((Path.cwd() / "valhalla_sp.json").as_posix())

# Use the Vahalla Locate API to append coordinates that are snapped to OSM
if config.params["times"]["use_snapped"]:
Expand All @@ -74,62 +74,25 @@ def main() -> None:
)

# Calculate times for each chunk and append to a list
tt_calc = TravelTimeCalculator(actor, config, inputs)
tt_calc = TravelTimeCalculator(actor, actor_sp, config, inputs)
results_df = tt_calc.many_to_many()

logger.info(
"Finished calculating first pass times for %s pairs in %s",
"Finished calculating times for %s pairs in %s",
len(results_df),
format_time(time.time() - script_start_time),
)

# Extract missing pairs to a separate DataFrame
# Extract missing pairs to a separate DataFrame and sort all outputs
# for efficient compression
missing_pairs_df = results_df[results_df["duration_sec"].isnull()]
n_missing_pairs = len(missing_pairs_df)

# If there are missing pairs, rerun the routing for only those pairs
# using a more aggressive (but time consuming) second pass approach
if n_missing_pairs > 0:
logger.info(
"Found %s missing pairs, rerouting with a more aggressive method",
n_missing_pairs,
)
actor_sp = valhalla.Actor((Path.cwd() / "valhalla_sp.json").as_posix())

# Create a new input class, keeping only pairs that were unroutable
inputs_sp = TravelTimeInputs(
origins=inputs.origins[
inputs.origins["id"].isin(
missing_pairs_df.index.get_level_values("origin_id")
)
].reset_index(drop=True),
destinations=inputs.destinations[
inputs.destinations["id"].isin(
missing_pairs_df.index.get_level_values("destination_id")
)
].reset_index(drop=True),
chunk=None,
max_split_size_origins=inputs.max_split_size_origins,
max_split_size_destinations=inputs.max_split_size_destinations,
)

# Route using the more aggressive settings and update the results
tt_calc_sp = TravelTimeCalculator(actor_sp, config, inputs_sp)
results_df.update(tt_calc_sp.many_to_many())

# Extract the missing pairs again since they may have changed
missing_pairs_df = results_df[results_df["duration_sec"].isnull()]
logger.info(
"Found %s additional pairs on second pass",
n_missing_pairs - len(missing_pairs_df),
)

# Drop missing pairs and sort for more efficient compression
missing_pairs_df = (
missing_pairs_df.drop(columns=["duration_sec", "distance_km"])
.sort_index()
.reset_index()
)

logger.info("Found %s missing pairs", len(missing_pairs_df))
results_df = (
results_df.dropna(subset=["duration_sec"]).sort_index().reset_index()
)
Expand Down
15 changes: 12 additions & 3 deletions data/src/utils/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,18 @@ class TravelTimeCalculator:
def __init__(
self,
actor: valhalla.Actor,
actor_fallback: valhalla.Actor,
config: TravelTimeConfig,
inputs: TravelTimeInputs,
) -> None:
self.actor = actor
self.actor_fallback = actor_fallback
self.config = config
self.inputs = inputs

def _calculate_times(
self,
actor: valhalla.Actor,
o_start_idx: int,
d_start_idx: int,
o_end_idx: int,
Expand Down Expand Up @@ -408,7 +411,7 @@ def col_dict(x, snapped=self.config.params["times"]["use_snapped"]):

# Make the actual JSON request to the matrix API
with suppress_stdout():
response = self.actor.matrix(request_json)
response = actor.matrix(request_json)
response_data = json.loads(response)

# Parse the response data and convert it to a DataFrame. Recover the
Expand Down Expand Up @@ -461,19 +464,24 @@ def _binary_search(
d_end_idx,
)

# If indices are out-of-bounds return an empty list
if o_start_idx >= o_end_idx or d_start_idx >= d_end_idx:
return []

# Stop recursion if the chunks are too small
# Stop recursion if the chunks are too small and use the fallback
# (more expensive) Valhalla configuration
if (o_end_idx - o_start_idx <= 1) and (d_end_idx - d_start_idx <= 1):
try:
df = self._calculate_times(
actor=self.actor_fallback,
o_start_idx=o_start_idx,
d_start_idx=d_start_idx,
o_end_idx=o_end_idx,
d_end_idx=d_end_idx,
)
except Exception:
except Exception as e:
if print_log or self.config.verbose:
self.config.logger.error(f"{e}. Returning empty DataFrame")
df = pd.merge(
self.inputs.origins["id"]
.iloc[o_start_idx:o_end_idx]
Expand All @@ -490,6 +498,7 @@ def _binary_search(

try:
times = self._calculate_times(
actor=self.actor,
o_start_idx=o_start_idx,
d_start_idx=d_start_idx,
o_end_idx=o_end_idx,
Expand Down

0 comments on commit aeea0a2

Please sign in to comment.