296 lines
9.1 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple, Union
import gpxpy.gpx
from ..exceptions import InvalidGPXException, WorkoutGPXException
from .weather import WeatherService
weather_service = WeatherService()
def open_gpx_file(gpx_file: str) -> Optional[gpxpy.gpx.GPX]:
gpx_file = open(gpx_file, 'r') # type: ignore
gpx = gpxpy.parse(gpx_file)
if len(gpx.tracks) == 0:
return None
return gpx
def get_gpx_data(
parsed_gpx: Union[gpxpy.gpx.GPX, gpxpy.gpx.GPXTrackSegment],
max_speed: float,
start: Union[datetime, None],
stopped_time_between_seg: timedelta,
stopped_speed_threshold: float,
) -> Dict:
"""
Returns data from parsed gpx file
"""
gpx_data: Dict[str, Any] = {
'max_speed': (max_speed / 1000) * 3600,
'start': start,
}
duration = parsed_gpx.get_duration()
gpx_data['duration'] = (
timedelta(seconds=duration if duration else 0)
+ stopped_time_between_seg
)
ele = parsed_gpx.get_elevation_extremes()
gpx_data['elevation_max'] = ele.maximum
gpx_data['elevation_min'] = ele.minimum
# gpx file contains elevation data (<ele> element)
if ele.maximum is not None:
hill = parsed_gpx.get_uphill_downhill()
gpx_data['uphill'] = hill.uphill
gpx_data['downhill'] = hill.downhill
else:
gpx_data['uphill'] = None
gpx_data['downhill'] = None
moving_data = parsed_gpx.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold
)
if moving_data:
gpx_data['moving_time'] = timedelta(seconds=moving_data.moving_time)
gpx_data['stop_time'] = (
timedelta(seconds=moving_data.stopped_time)
+ stopped_time_between_seg
)
distance = moving_data.moving_distance + moving_data.stopped_distance
gpx_data['distance'] = distance / 1000
average_speed = (
distance / moving_data.moving_time
if moving_data.moving_time > 0
else 0
)
gpx_data['average_speed'] = (average_speed / 1000) * 3600
return gpx_data
def get_gpx_info(
gpx_file: str,
stopped_speed_threshold: float,
update_map_data: Optional[bool] = True,
update_weather_data: Optional[bool] = True,
use_raw_gpx_speed: bool = False,
) -> Tuple:
"""
Parse and return gpx, map and weather data from gpx file
"""
try:
gpx = open_gpx_file(gpx_file)
except Exception:
raise InvalidGPXException('error', 'gpx file is invalid')
if gpx is None:
raise InvalidGPXException('error', 'no tracks in gpx file')
gpx_data: Dict = {'name': gpx.tracks[0].name, 'segments': []}
max_speed = 0.0
start: Optional[datetime] = None
map_data = []
weather_data = []
segments_nb = len(gpx.tracks[0].segments)
prev_seg_last_point = None
no_stopped_time = timedelta(seconds=0)
stopped_time_between_seg = no_stopped_time
for segment_idx, segment in enumerate(gpx.tracks[0].segments):
segment_start: Optional[datetime] = None
segment_points_nb = len(segment.points)
for point_idx, point in enumerate(segment.points):
if point.time is None:
raise InvalidGPXException(
'error', '<time> is missing in gpx file'
)
if point_idx == 0:
segment_start = point.time
# first gpx point => get weather
if start is None:
start = point.time
if point.time and update_weather_data:
weather_data.append(weather_service.get_weather(point))
# if a previous segment exists, calculate stopped time between
# the two segments
if prev_seg_last_point:
stopped_time_between_seg += (
point.time - prev_seg_last_point
)
# last segment point
if point_idx == (segment_points_nb - 1):
prev_seg_last_point = point.time
# last gpx point => get weather
if segment_idx == (segments_nb - 1) and update_weather_data:
weather_data.append(weather_service.get_weather(point))
if update_map_data:
map_data.append([point.longitude, point.latitude])
moving_data = segment.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold,
raw=use_raw_gpx_speed,
)
if moving_data:
calculated_max_speed = moving_data.max_speed
segment_max_speed = (
calculated_max_speed if calculated_max_speed else 0
)
if segment_max_speed > max_speed:
max_speed = segment_max_speed
else:
segment_max_speed = 0.0
segment_data = get_gpx_data(
segment,
segment_max_speed,
segment_start,
no_stopped_time,
stopped_speed_threshold,
)
segment_data['idx'] = segment_idx
gpx_data['segments'].append(segment_data)
full_gpx_data = get_gpx_data(
gpx,
max_speed,
start,
stopped_time_between_seg,
stopped_speed_threshold,
)
gpx_data = {**gpx_data, **full_gpx_data}
if update_map_data:
bounds = gpx.get_bounds()
gpx_data['bounds'] = (
[
bounds.min_latitude,
bounds.min_longitude,
bounds.max_latitude,
bounds.max_longitude,
]
if bounds
else []
)
return gpx_data, map_data, weather_data
def get_gpx_segments(
track_segments: List, segment_id: Optional[int] = None
) -> List:
"""
Return list of segments, filtered on segment id if provided
"""
if segment_id is not None:
segment_index = segment_id - 1
if segment_index > (len(track_segments) - 1):
raise WorkoutGPXException(
'not found', f'No segment with id \'{segment_id}\'', None
)
if segment_index < 0:
raise WorkoutGPXException('error', 'Incorrect segment id', None)
segments = [track_segments[segment_index]]
else:
segments = track_segments
return segments
def get_chart_data(
gpx_file: str, segment_id: Optional[int] = None
) -> Optional[List]:
"""
Return data needed to generate chart with speed and elevation
"""
gpx = open_gpx_file(gpx_file)
if gpx is None:
return None
chart_data = []
first_point = None
previous_point = None
previous_distance = 0
track_segments = gpx.tracks[0].segments
segments = get_gpx_segments(track_segments, segment_id)
for segment_idx, segment in enumerate(segments):
for point_idx, point in enumerate(segment.points):
if segment_idx == 0 and point_idx == 0:
first_point = point
distance = (
point.distance_3d(previous_point)
if (
point.elevation
and previous_point
and previous_point.elevation
)
else point.distance_2d(previous_point)
)
distance = 0 if distance is None else distance
distance += previous_distance
speed = (
round((segment.get_speed(point_idx) / 1000) * 3600, 2)
if segment.get_speed(point_idx) is not None
else 0
)
data = {
'distance': (
round(distance / 1000, 2) if distance is not None else 0
),
'duration': point.time_difference(first_point),
'latitude': point.latitude,
'longitude': point.longitude,
'speed': speed,
# workaround
# https://github.com/tkrajina/gpxpy/issues/209
'time': point.time.replace(
tzinfo=timezone(point.time.utcoffset())
),
}
if point.elevation:
data['elevation'] = round(point.elevation, 1)
chart_data.append(data)
previous_point = point
previous_distance = distance
return chart_data
def extract_segment_from_gpx_file(
content: str, segment_id: int
) -> Optional[str]:
"""
Returns segments in xml format from a gpx file content
"""
gpx_content = gpxpy.parse(content)
if len(gpx_content.tracks) == 0:
return None
track_segment = get_gpx_segments(
gpx_content.tracks[0].segments, segment_id
)
gpx = gpxpy.gpx.GPX()
gpx_track = gpxpy.gpx.GPXTrack()
gpx.tracks.append(gpx_track)
gpx_segment = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_segment)
for point_idx, point in enumerate(track_segment[0].points):
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
point.latitude, point.longitude, elevation=point.elevation
)
)
return gpx.to_xml()