FitTrackee/fittrackee/workouts/utils/gpx.py

296 lines
9.1 KiB
Python
Raw Normal View History

2022-06-11 13:10:02 +02:00
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple, Union
2019-08-25 18:06:39 +02:00
import gpxpy.gpx
from ..exceptions import InvalidGPXException, WorkoutGPXException
2022-11-17 00:13:43 +01:00
from .weather import WeatherService
weather_service = WeatherService()
2019-08-25 18:06:39 +02:00
2021-01-02 19:28:03 +01:00
def open_gpx_file(gpx_file: str) -> Optional[gpxpy.gpx.GPX]:
gpx_file = open(gpx_file, 'r') # type: ignore
2019-08-25 18:06:39 +02:00
gpx = gpxpy.parse(gpx_file)
if len(gpx.tracks) == 0:
return None
return gpx
2021-01-02 19:28:03 +01:00
def get_gpx_data(
2022-06-11 13:10:02 +02:00
parsed_gpx: Union[gpxpy.gpx.GPX, gpxpy.gpx.GPXTrackSegment],
2021-01-02 19:28:03 +01:00
max_speed: float,
2022-06-11 13:10:02 +02:00
start: Union[datetime, None],
2021-01-02 19:28:03 +01:00
stopped_time_between_seg: timedelta,
stopped_speed_threshold: float,
2021-01-02 19:28:03 +01:00
) -> Dict:
"""
Returns data from parsed gpx file
"""
gpx_data: Dict[str, Any] = {
'max_speed': (max_speed / 1000) * 3600,
'start': start,
}
2019-08-25 18:06:39 +02:00
duration = parsed_gpx.get_duration()
2021-01-02 19:28:03 +01:00
gpx_data['duration'] = (
2022-06-11 13:10:02 +02:00
timedelta(seconds=duration if duration else 0)
+ stopped_time_between_seg
2021-01-02 19:28:03 +01:00
)
2019-08-25 18:06:39 +02:00
ele = parsed_gpx.get_elevation_extremes()
gpx_data['elevation_max'] = ele.maximum
gpx_data['elevation_min'] = ele.minimum
# gpx file contains elevation data (<ele> element)
if ele.maximum is not None:
hill = parsed_gpx.get_uphill_downhill()
gpx_data['uphill'] = hill.uphill
gpx_data['downhill'] = hill.downhill
else:
gpx_data['uphill'] = None
gpx_data['downhill'] = None
2019-08-25 18:06:39 +02:00
2022-06-11 13:10:02 +02:00
moving_data = parsed_gpx.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold
)
2022-06-11 13:10:02 +02:00
if moving_data:
gpx_data['moving_time'] = timedelta(seconds=moving_data.moving_time)
gpx_data['stop_time'] = (
timedelta(seconds=moving_data.stopped_time)
+ stopped_time_between_seg
)
distance = moving_data.moving_distance + moving_data.stopped_distance
gpx_data['distance'] = distance / 1000
2019-08-25 18:06:39 +02:00
2022-06-11 13:10:02 +02:00
average_speed = (
distance / moving_data.moving_time
if moving_data.moving_time > 0
else 0
)
gpx_data['average_speed'] = (average_speed / 1000) * 3600
2019-08-25 18:06:39 +02:00
return gpx_data
2021-01-02 19:28:03 +01:00
def get_gpx_info(
gpx_file: str,
stopped_speed_threshold: float,
2021-01-02 19:28:03 +01:00
update_map_data: Optional[bool] = True,
update_weather_data: Optional[bool] = True,
2023-05-21 19:37:47 +02:00
use_raw_gpx_speed: bool = False,
2021-01-02 19:28:03 +01:00
) -> Tuple:
"""
Parse and return gpx, map and weather data from gpx file
"""
try:
gpx = open_gpx_file(gpx_file)
except Exception:
raise InvalidGPXException('error', 'gpx file is invalid')
2019-08-25 18:06:39 +02:00
if gpx is None:
raise InvalidGPXException('error', 'no tracks in gpx file')
2019-08-25 18:06:39 +02:00
2022-06-11 13:10:02 +02:00
gpx_data: Dict = {'name': gpx.tracks[0].name, 'segments': []}
max_speed = 0.0
start: Optional[datetime] = None
2019-08-25 18:06:39 +02:00
map_data = []
weather_data = []
segments_nb = len(gpx.tracks[0].segments)
prev_seg_last_point = None
no_stopped_time = timedelta(seconds=0)
2021-01-02 19:28:03 +01:00
stopped_time_between_seg = no_stopped_time
2019-08-25 18:06:39 +02:00
for segment_idx, segment in enumerate(gpx.tracks[0].segments):
2022-06-11 13:10:02 +02:00
segment_start: Optional[datetime] = None
2019-08-25 18:06:39 +02:00
segment_points_nb = len(segment.points)
for point_idx, point in enumerate(segment.points):
if point.time is None:
raise InvalidGPXException(
'error', '<time> is missing in gpx file'
)
2019-08-25 18:06:39 +02:00
if point_idx == 0:
2022-06-11 13:10:02 +02:00
segment_start = point.time
2019-08-25 18:06:39 +02:00
# first gpx point => get weather
2022-06-11 13:10:02 +02:00
if start is None:
2019-08-25 18:06:39 +02:00
start = point.time
2022-06-11 13:10:02 +02:00
if point.time and update_weather_data:
2022-11-17 00:13:43 +01:00
weather_data.append(weather_service.get_weather(point))
2019-08-25 18:06:39 +02:00
# if a previous segment exists, calculate stopped time between
# the two segments
if prev_seg_last_point:
2022-07-06 08:16:55 +02:00
stopped_time_between_seg += (
point.time - prev_seg_last_point
)
2019-08-25 18:06:39 +02:00
# last segment point
if point_idx == (segment_points_nb - 1):
prev_seg_last_point = point.time
# last gpx point => get weather
if segment_idx == (segments_nb - 1) and update_weather_data:
2022-11-17 00:13:43 +01:00
weather_data.append(weather_service.get_weather(point))
2019-08-25 18:06:39 +02:00
if update_map_data:
2019-08-28 13:25:39 +02:00
map_data.append([point.longitude, point.latitude])
2022-06-11 13:10:02 +02:00
moving_data = segment.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold,
2023-05-21 19:29:56 +02:00
raw=use_raw_gpx_speed,
2022-06-11 13:10:02 +02:00
)
if moving_data:
calculated_max_speed = moving_data.max_speed
segment_max_speed = (
calculated_max_speed if calculated_max_speed else 0
)
2019-08-25 18:06:39 +02:00
2022-06-11 13:10:02 +02:00
if segment_max_speed > max_speed:
max_speed = segment_max_speed
else:
segment_max_speed = 0.0
2019-08-25 18:06:39 +02:00
segment_data = get_gpx_data(
segment,
segment_max_speed,
segment_start,
no_stopped_time,
stopped_speed_threshold,
2019-08-25 18:06:39 +02:00
)
segment_data['idx'] = segment_idx
gpx_data['segments'].append(segment_data)
2021-01-02 19:28:03 +01:00
full_gpx_data = get_gpx_data(
gpx,
max_speed,
start,
stopped_time_between_seg,
stopped_speed_threshold,
2021-01-02 19:28:03 +01:00
)
2019-08-25 18:06:39 +02:00
gpx_data = {**gpx_data, **full_gpx_data}
if update_map_data:
bounds = gpx.get_bounds()
2022-06-11 13:10:02 +02:00
gpx_data['bounds'] = (
[
bounds.min_latitude,
bounds.min_longitude,
bounds.max_latitude,
bounds.max_longitude,
]
if bounds
else []
)
2019-08-25 18:06:39 +02:00
return gpx_data, map_data, weather_data
2021-01-02 19:28:03 +01:00
def get_gpx_segments(
track_segments: List, segment_id: Optional[int] = None
) -> List:
"""
Return list of segments, filtered on segment id if provided
"""
if segment_id is not None:
segment_index = segment_id - 1
if segment_index > (len(track_segments) - 1):
raise WorkoutGPXException(
2019-08-28 13:25:39 +02:00
'not found', f'No segment with id \'{segment_id}\'', None
)
if segment_index < 0:
raise WorkoutGPXException('error', 'Incorrect segment id', None)
segments = [track_segments[segment_index]]
else:
segments = track_segments
return segments
2021-01-02 19:28:03 +01:00
def get_chart_data(
gpx_file: str, segment_id: Optional[int] = None
) -> Optional[List]:
"""
Return data needed to generate chart with speed and elevation
"""
2019-08-25 18:06:39 +02:00
gpx = open_gpx_file(gpx_file)
if gpx is None:
return None
chart_data = []
first_point = None
previous_point = None
previous_distance = 0
track_segments = gpx.tracks[0].segments
segments = get_gpx_segments(track_segments, segment_id)
for segment_idx, segment in enumerate(segments):
2019-08-25 18:06:39 +02:00
for point_idx, point in enumerate(segment.points):
if segment_idx == 0 and point_idx == 0:
first_point = point
2019-08-28 13:25:39 +02:00
distance = (
point.distance_3d(previous_point)
if (
point.elevation
and previous_point
and previous_point.elevation
)
else point.distance_2d(previous_point)
)
2019-08-25 18:06:39 +02:00
distance = 0 if distance is None else distance
distance += previous_distance
2019-08-28 13:25:39 +02:00
speed = (
round((segment.get_speed(point_idx) / 1000) * 3600, 2)
if segment.get_speed(point_idx) is not None
else 0
)
data = {
'distance': (
round(distance / 1000, 2) if distance is not None else 0
),
'duration': point.time_difference(first_point),
'latitude': point.latitude,
'longitude': point.longitude,
'speed': speed,
# workaround
# https://github.com/tkrajina/gpxpy/issues/209
'time': point.time.replace(
tzinfo=timezone(point.time.utcoffset())
),
}
if point.elevation:
data['elevation'] = round(point.elevation, 1)
chart_data.append(data)
2019-08-25 18:06:39 +02:00
previous_point = point
previous_distance = distance
return chart_data
2021-01-02 19:28:03 +01:00
def extract_segment_from_gpx_file(
content: str, segment_id: int
) -> Optional[str]:
"""
Returns segments in xml format from a gpx file content
"""
gpx_content = gpxpy.parse(content)
if len(gpx_content.tracks) == 0:
return None
track_segment = get_gpx_segments(
2019-08-28 13:25:39 +02:00
gpx_content.tracks[0].segments, segment_id
)
gpx = gpxpy.gpx.GPX()
gpx_track = gpxpy.gpx.GPXTrack()
gpx.tracks.append(gpx_track)
gpx_segment = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_segment)
for point_idx, point in enumerate(track_segment[0].points):
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
2019-08-28 13:25:39 +02:00
point.latitude, point.longitude, elevation=point.elevation
)
)
return gpx.to_xml()