Skip to content

Commit

Permalink
Add second pass for main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
dfsnow committed Dec 1, 2024
1 parent 745e3cc commit 68ac35d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion data/src/calculate_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main() -> None:
script_start_time = time.time()

# Create a travel times configuration and set of origin/destination inputs
config = TravelTimeConfig(args, params=params, logger=logger)
config = TravelTimeConfig(args, params=params, logger=logger, verbose=True)
inputs = config.load_default_inputs()

chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else ""
Expand Down
80 changes: 40 additions & 40 deletions data/src/utils/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,20 +366,12 @@ def __init__(
def _calculate_times(
self,
actor: valhalla.Actor,
o_start_idx: int,
d_start_idx: int,
o_end_idx: int,
d_end_idx: int,
origins: pd.DataFrame,
destinations: pd.DataFrame,
) -> pd.DataFrame:
"""
Calculates travel times and distances between origins and destinations.
Args:
o_start_idx: Starting index for the origins DataFrame.
d_start_idx: Starting index for the destinations DataFrame.
o_end_idx: Ending index for the origins DataFrame.
d_end_idx: Ending index for the destinations DataFrame.
Returns:
DataFrame containing origin IDs, destination IDs, travel durations,
and distances.
Expand All @@ -393,16 +385,8 @@ def _col_dict(x, snapped=self.config.params["times"]["use_snapped"]):

# Get the subset of origin and destination points and convert them to
# lists, then squash them into the request body
origins_list = (
self.inputs.origins.iloc[o_start_idx:o_end_idx]
.apply(_col_dict, axis=1)
.tolist()
)
destinations_list = (
self.inputs.destinations.iloc[d_start_idx:d_end_idx]
.apply(_col_dict, axis=1)
.tolist()
)
origins_list = origins.apply(_col_dict, axis=1).tolist()
destinations_list = destinations.apply(_col_dict, axis=1).tolist()
request_json = json.dumps(
{
"sources": origins_list,
Expand All @@ -421,15 +405,8 @@ def _col_dict(x, snapped=self.config.params["times"]["use_snapped"]):
# origin and destination indices and append them to the DataFrame
durations = response_data["sources_to_targets"]["durations"]
distances = response_data["sources_to_targets"]["distances"]
origin_ids = (
self.inputs.origins["id"]
.iloc[o_start_idx:o_end_idx]
.repeat(d_end_idx - d_start_idx)
.tolist()
)
destination_ids = self.inputs.destinations["id"].iloc[
d_start_idx:d_end_idx
].tolist() * (o_end_idx - o_start_idx)
origin_ids = origins["id"].repeat(len(destinations)).tolist()
destination_ids = destinations["id"].tolist() * (len(origins))

df = pd.DataFrame(
{
Expand Down Expand Up @@ -504,10 +481,10 @@ def _binary_search(
try:
df = self._calculate_times(
actor=self.actor_fallback,
o_start_idx=o_start_idx,
d_start_idx=d_start_idx,
o_end_idx=o_end_idx,
d_end_idx=d_end_idx,
origins=self.inputs.origins.iloc[o_start_idx:o_end_idx],
destinations=self.inputs.destinations.iloc[
d_start_idx:d_end_idx
],
)
except Exception as e:
if print_log or self.config.verbose:
Expand All @@ -533,22 +510,45 @@ def _binary_search(
actor = self.actor_fallback if cur_depth > 0 else self.actor
times = self._calculate_times(
actor=actor,
o_start_idx=o_start_idx,
d_start_idx=d_start_idx,
o_end_idx=o_end_idx,
d_end_idx=d_end_idx,
origins=self.inputs.origins.iloc[o_start_idx:o_end_idx],
destinations=self.inputs.destinations.iloc[
d_start_idx:d_end_idx
],
)

# Check for completeness in the output. If any times are missing
# after the first pass, use the fallback router
# after the first pass, run a second pass with the fallback router
if times.isnull().values.any() and cur_depth == 0:
raise ValueError("First pass values contain missing times")
missing = times[times["duration_sec"].isnull()]
if print_log or self.config.verbose:
self.config.logger.warning(
"Found %s pairs with missing times. "
"Running a second pass with the fallback router",
len(missing),
)
times_sp = self._calculate_times(
actor=self.actor_fallback,
origins=self.inputs.origins[
self.inputs.origins["id"].isin(
missing["origin_id"].unique()
)
],
destinations=self.inputs.destinations[
self.inputs.destinations["id"].isin(
missing["destination_id"].unique()
)
],
).set_index(["origin_id", "destination_id"])
times = times.set_index(["origin_id", "destination_id"])
times.update(times_sp)
times = times.reset_index(drop=False)

if print_log or self.config.verbose:
elapsed_time = time.time() - start_time
self.config.logger.info(
"Routed %s pairs in %s",
"Routed %s pairs (%s missing) in %s",
(o_end_idx - o_start_idx) * (d_end_idx - d_start_idx),
len(times[times["duration_sec"].isnull()]),
format_time(elapsed_time),
)

Expand Down

0 comments on commit 68ac35d

Please sign in to comment.