Skip to content

Commit

Permalink
Finalize recursive binary search
Browse files Browse the repository at this point in the history
  • Loading branch information
dfsnow committed Dec 1, 2024
1 parent aeea0a2 commit 745e3cc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 37 deletions.
6 changes: 6 additions & 0 deletions data/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ times:
# matrix API calls
use_snapped: true

# If Vahalla fails on the first pass, the time calculator begins a recursive
# binary search to try to "go around" and origin-destination pairs causing
# the failure. The full depth search can take a long time; this parameter
# trades off search time and completeness
max_recursion_depth: 5

input:
# Distance in meters to buffer each state boundary by when clipping the
# national road network. Should be slightly higher than `destination_buffer_m`
Expand Down
18 changes: 8 additions & 10 deletions data/src/calculate_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main() -> None:
script_start_time = time.time()

# Create a travel times configuration and set of origin/destination inputs
config = TravelTimeConfig(args, params=params, logger=logger, verbose=True)
config = TravelTimeConfig(args, params=params, logger=logger)
inputs = config.load_default_inputs()

chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else ""
Expand All @@ -59,7 +59,8 @@ def main() -> None:
len(inputs.origins) * inputs.n_destinations,
)

# Initialize Valhalla, where _sp is a second-pass (more expensive) fallback
# Initialize the Valhalla router Python bindings. The _sp version is a
# more expensive fallback router used as a second pass
actor = valhalla.Actor((Path.cwd() / "valhalla.json").as_posix())
actor_sp = valhalla.Actor((Path.cwd() / "valhalla_sp.json").as_posix())

Expand All @@ -73,10 +74,9 @@ def main() -> None:
inputs.destinations, config.args.mode, actor
)

# Calculate times for each chunk and append to a list
# Calculate times for each chunk and return a single DataFrame
tt_calc = TravelTimeCalculator(actor, actor_sp, config, inputs)
results_df = tt_calc.many_to_many()

logger.info(
"Finished calculating times for %s pairs in %s",
len(results_df),
Expand All @@ -91,16 +91,14 @@ def main() -> None:
.sort_index()
.reset_index()
)

logger.info("Found %s missing pairs", len(missing_pairs_df))
results_df = (
results_df.dropna(subset=["duration_sec"]).sort_index().reset_index()
)

# Loop through files and write to both local and remote paths
out_locations = ["local", "s3"] if args.write_to_s3 else ["local"]
logger.info(
"Calculated times between %s pairs. Times missing between %s pairs. "
"Calculated times between %s pairs (%s missing). "
"Saving outputs to: %s",
len(results_df),
len(missing_pairs_df),
Expand All @@ -112,7 +110,7 @@ def main() -> None:
config.paths.write_to_parquet(inputs.destinations, "destinations", loc)
config.paths.write_to_parquet(missing_pairs_df, "missing_pairs", loc)

# Construct and save a metadata DataFrame
# Collect metadata and git information for the metadata table
run_id = str(uuid.uuid4().hex[:8])
git_commit_sha = str(os.getenv("GITHUB_SHA"))
git_commit_sha_short = str(git_commit_sha[:8] if git_commit_sha else None)
Expand All @@ -126,8 +124,8 @@ def main() -> None:
if f != "metadata_file"
}

# Create a metadata dataframe of all settings and data used for creating inputs
# and generating times
# Create a metadata DataFrame of all settings and data used for creating
# inputs and generating times
with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f:
valhalla_data = json.load(f)
with open(DOCKER_INTERNAL_PATH / "valhalla_sp.json", "r") as f:
Expand Down
95 changes: 68 additions & 27 deletions data/src/utils/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ def __init__(
) -> None:
self.origins = origins
self.destinations = destinations

# "full" is the original (before chunk subsetting) number of origins
self.n_origins_full: int = len(self.origins)

self.chunk = chunk
Expand Down Expand Up @@ -383,21 +385,22 @@ def _calculate_times(
and distances.
"""

def col_dict(x, snapped=self.config.params["times"]["use_snapped"]):
"""Use the snapped lat/lon if set."""
# Helper to use the snapped lat/lon columns (if available) and
# enabled via a parameter
def _col_dict(x, snapped=self.config.params["times"]["use_snapped"]):
col_suffix = "_snapped" if snapped else ""
return {"lat": x[f"lat{col_suffix}"], "lon": x[f"lon{col_suffix}"]}

# Get the subset of origin and destination points and convert them to
# lists, then squash them into the request body
origins_list = (
self.inputs.origins.iloc[o_start_idx:o_end_idx]
.apply(col_dict, axis=1)
.apply(_col_dict, axis=1)
.tolist()
)
destinations_list = (
self.inputs.destinations.iloc[d_start_idx:d_end_idx]
.apply(col_dict, axis=1)
.apply(_col_dict, axis=1)
.tolist()
)
request_json = json.dumps(
Expand Down Expand Up @@ -439,13 +442,39 @@ def col_dict(x, snapped=self.config.params["times"]["use_snapped"]):

return df

def _empty_df(
self,
o_start_idx: int,
d_start_idx: int,
o_end_idx: int,
d_end_idx: int,
) -> pd.DataFrame:
"""
Gets an empty DataFrame with the Cartesian product of the
origins and destinations specified by the indices. Used to return
when at max depth or unroutable.
"""
df = pd.merge(
self.inputs.origins["id"]
.iloc[o_start_idx:o_end_idx]
.rename("origin_id"),
self.inputs.destinations["id"]
.iloc[d_start_idx:d_end_idx]
.rename("destination_id"),
how="cross",
)
df["distance_km"] = pd.Series([], dtype=float)
df["duration_sec"] = pd.Series([], dtype=float)
return df

def _binary_search(
self,
o_start_idx: int,
d_start_idx: int,
o_end_idx: int,
d_end_idx: int,
print_log: bool = True,
cur_depth: int = 0,
) -> list[pd.DataFrame]:
"""
Recursively split the origins and destinations into smaller chunks.
Expand All @@ -454,6 +483,7 @@ def _binary_search(
Binary searching all origins and destinations will return all routable
values AROUND the unroutable ones.
"""

start_time = time.time()
if print_log or self.config.verbose:
self.config.logger.info(
Expand Down Expand Up @@ -481,30 +511,39 @@ def _binary_search(
)
except Exception as e:
if print_log or self.config.verbose:
self.config.logger.error(f"{e}. Returning empty DataFrame")
df = pd.merge(
self.inputs.origins["id"]
.iloc[o_start_idx:o_end_idx]
.rename("origin_id"),
self.inputs.destinations["id"]
.iloc[d_start_idx:d_end_idx]
.rename("destination_id"),
how="cross",
)
df["distance_km"] = pd.Series([], dtype=float)
df["duration_sec"] = pd.Series([], dtype=float)
self.config.logger.warning(
f"{e}. Returning empty DataFrame"
)

df = self._empty_df(o_start_idx, d_start_idx, o_end_idx, d_end_idx)
return [df]

max_depth = self.config.params["times"]["max_recursion_depth"]
if cur_depth >= max_depth:
if print_log or self.config.verbose:
self.config.logger.warning(
f"Max recursion depth {max_depth} reached. "
"Returning empty DataFrame"
)
df = self._empty_df(o_start_idx, d_start_idx, o_end_idx, d_end_idx)
return [df]

try:
# Use the fallback actor after the first pass
actor = self.actor_fallback if cur_depth > 0 else self.actor
times = self._calculate_times(
actor=self.actor,
actor=actor,
o_start_idx=o_start_idx,
d_start_idx=d_start_idx,
o_end_idx=o_end_idx,
d_end_idx=d_end_idx,
)

# Check for completeness in the output. If any times are missing
# after the first pass, use the fallback router
if times.isnull().values.any() and cur_depth == 0:
raise ValueError("First pass values contain missing times")

if print_log or self.config.verbose:
elapsed_time = time.time() - start_time
self.config.logger.info(
Expand All @@ -515,16 +554,18 @@ def _binary_search(

return [times]

# If the request fails, split the origins and destinations into
# quadrants and start a binary search
except Exception as e:
if print_log or self.config.verbose:
self.config.logger.error(f"{e}. Starting binary search...")
mo = (o_start_idx + o_end_idx) // 2
md = (d_start_idx + d_end_idx) // 2
self.config.logger.warning(f"{e}. Starting binary search...")
osi, oei, dsi, dei = o_start_idx, o_end_idx, d_start_idx, d_end_idx
mo, md = (osi + oei) // 2, (dsi + dei) // 2
return (
self._binary_search(o_start_idx, d_start_idx, mo, md, False)
+ self._binary_search(mo, d_start_idx, o_end_idx, md, False)
+ self._binary_search(o_start_idx, md, mo, d_end_idx, False)
+ self._binary_search(mo, md, o_end_idx, d_end_idx, False)
self._binary_search(osi, dsi, mo, md, False, cur_depth + 1)
+ self._binary_search(mo, dsi, oei, md, False, cur_depth + 1)
+ self._binary_search(osi, md, mo, dei, False, cur_depth + 1)
+ self._binary_search(mo, md, oei, dei, False, cur_depth + 1)
)

def many_to_many(self) -> pd.DataFrame:
Expand Down Expand Up @@ -602,7 +643,7 @@ def snap_df_to_osm(

# Use the first element of nodes to populate the snapped lat/lon, otherwise
# fallback to the correlated lat/lon from edges
def get_col(x: dict, col: str):
def _get_col(x: dict, col: str):
return (
x["nodes"][0][col]
if x["nodes"]
Expand All @@ -612,8 +653,8 @@ def get_col(x: dict, col: str):
response_df = pd.DataFrame(
[
{
"lon_snapped": get_col(item, "lon"),
"lat_snapped": get_col(item, "lat"),
"lon_snapped": _get_col(item, "lon"),
"lat_snapped": _get_col(item, "lat"),
}
for item in response_data
]
Expand Down

0 comments on commit 745e3cc

Please sign in to comment.