Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New method to detect TOFF runway #463

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/traffic/core/distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,18 @@ def guess_airport(
f" (distance={airport.distance})"
)
return airport


def minimal_angular_difference(angle1: float, angle2: float) -> float:
"""
Calculate the min diff between two angles, considering circularity.

Parameters:
angle1 (float): First angle in degrees.
angle2 (float): Second angle in degrees.

Returns:
float: Minimal angular difference in degrees.
"""
diff = abs(angle1 - angle2) % 360
return min(diff, 360 - diff)
118 changes: 117 additions & 1 deletion src/traffic/core/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@

from ..algorithms import filters
from ..algorithms.douglas_peucker import douglas_peucker
from ..algorithms.navigation import NavigationFeatures
from ..algorithms.navigation import (
figuetbe marked this conversation as resolved.
Show resolved Hide resolved
NavigationFeatures,
)
from ..algorithms.openap import OpenAP
from ..core import types as tt
from ..core.distance import minimal_angular_difference
from ..core.structure import Airport
from .intervals import Interval, IntervalCollection
from .iterator import FlightIterator, flight_iterator
Expand Down Expand Up @@ -2589,6 +2592,119 @@ def intersects( # type: ignore
# given here for consistency in types
...

def get_toff_runway(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse takeoff_from_runway but pass an argument (string) with the method which can come to this one? would be more helpful to understand why we need two methods

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a bit confusing because it does not return an iterable while takeoff_from_runway does

self,
airport: Union[str, Airport],
threshold_alt: float = 1500,
min_groundspeed_kts: float = 30,
min_vert_rate_ftmin: float = 257,
max_dist_nm: float = 5,
) -> Optional[str]:
"""
Determines the taking-off runway of a flight based on its trajectory.
Parameters:
flight (Flight): The flight data object containing flight information.
airport: The IATA code of the airport. Defaults to "LSZH".
figuetbe marked this conversation as resolved.
Show resolved Hide resolved
threshold_alt (float): Altitude threshold above airport altitude.
min_groundspeed_kts (float): Minimum groundspeed to consider.
min_vert_rate_ftmin (float): Minimum vertical rate to consider.
max_dist_nm (float): Maximum distance from the airport to consider.

Returns:
Optional[str]: The name of the closest runway, or None if not found.

Raises:
KeyError: If the airport code is not found in the airports data.
ValueError: If no rway available or data is empty after filtering.
"""
from traffic.data import airports

airport = airport if isinstance(airport, Airport) else airports[airport]
if not self.takeoff_from(airport):
return None
alt_max = airport.altitude + threshold_alt
if airport.runways is None:
return None
runways = airport.runways.data
runways_names = runways.name

filtered_flight = self.distance(airport).query(
"distance < @max_dist_nm"
)
if filtered_flight is None or filtered_flight.data.empty:
return None

query_str = (
f"geoaltitude < {alt_max} and "
f"vertical_rate > {min_vert_rate_ftmin} and "
f"groundspeed > {min_groundspeed_kts}"
)
filtered_flight = filtered_flight.query(query_str)
if (
filtered_flight is None
or filtered_flight.data.empty
or len(filtered_flight.data) < 4
):
return None

# Check for parallel runways with suffixes L, R, or C
has_parallel_runway = runways_names.str.contains(r"[LRC]").any()
runway_bearings = runways.bearing

median_track = filtered_flight.data["track"].median()
closest_runway: Optional[str] = None

if not has_parallel_runway:
# Find the runway with the bearing closest to the median track
bearing_diff = runway_bearings.apply(
lambda x: minimal_angular_difference(x, median_track)
)
closest_index = bearing_diff.idxmin()
closest_runway = runways.name.iloc[closest_index]
else:
# Round runway bearings to the nearest 5 degrees
rounded_bearings = (runway_bearings / 5).round() * 5
rounded_bearings = (
rounded_bearings % 360
) # Ensure bearings stay within 0-359

# Find the bearing closest to the median track using minimal angular difference
bearing_diff = rounded_bearings.apply(
lambda x: minimal_angular_difference(x, median_track)
)

# Identify all runways where bearing diff is less than 10 deg
figuetbe marked this conversation as resolved.
Show resolved Hide resolved

candidate_runways = runways.loc[
bearing_diff[bearing_diff < 10].index
]
if candidate_runways.empty:
return None

# Calculate distance from flight trajectory to each candidate runway
try:
flight_ls = filtered_flight.linestring
rways_ls = airport.runways.shape

closest = None
for rway in rways_ls.geoms:
if closest is None:
closest = rway
continue
if rway.distance(flight_ls) < closest.distance(flight_ls):
closest = rway
if closest is None:
return None
lon_1, _, lon_2, _ = closest.bounds
# find the candidate_runways latitude and longitude that is closest to the closest bounds
eps = 1 / 1000
closest_runway = candidate_runways.query(
f"(abs(longitude-{lon_1})<{eps}) or (abs(longitude-{lon_2})<{eps})"
)["name"].iloc[0]
except AttributeError as e:
return None
return closest_runway

@flight_iterator
def clip_iterate(
self, shape: Union[ShapelyMixin, base.BaseGeometry], strict: bool = True
Expand Down