API - handle gpx files with offset

This commit is contained in:
Sam
2022-06-11 13:10:02 +02:00
parent 5874933643
commit 4288c3c387
10 changed files with 392 additions and 102 deletions

View File

@ -1,5 +1,5 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple, Union
import gpxpy.gpx
@ -16,9 +16,9 @@ def open_gpx_file(gpx_file: str) -> Optional[gpxpy.gpx.GPX]:
def get_gpx_data(
parsed_gpx: gpxpy.gpx,
parsed_gpx: Union[gpxpy.gpx.GPX, gpxpy.gpx.GPXTrackSegment],
max_speed: float,
start: int,
start: Union[datetime, None],
stopped_time_between_seg: timedelta,
stopped_speed_threshold: float,
) -> Dict:
@ -32,7 +32,8 @@ def get_gpx_data(
duration = parsed_gpx.get_duration()
gpx_data['duration'] = (
timedelta(seconds=duration) + stopped_time_between_seg
timedelta(seconds=duration if duration else 0)
+ stopped_time_between_seg
)
ele = parsed_gpx.get_elevation_extremes()
@ -43,18 +44,24 @@ def get_gpx_data(
gpx_data['uphill'] = hill.uphill
gpx_data['downhill'] = hill.downhill
mv = parsed_gpx.get_moving_data(
moving_data = parsed_gpx.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold
)
gpx_data['moving_time'] = timedelta(seconds=mv.moving_time)
gpx_data['stop_time'] = (
timedelta(seconds=mv.stopped_time) + stopped_time_between_seg
)
distance = mv.moving_distance + mv.stopped_distance
gpx_data['distance'] = distance / 1000
if moving_data:
gpx_data['moving_time'] = timedelta(seconds=moving_data.moving_time)
gpx_data['stop_time'] = (
timedelta(seconds=moving_data.stopped_time)
+ stopped_time_between_seg
)
distance = moving_data.moving_distance + moving_data.stopped_distance
gpx_data['distance'] = distance / 1000
average_speed = distance / mv.moving_time if mv.moving_time > 0 else 0
gpx_data['average_speed'] = (average_speed / 1000) * 3600
average_speed = (
distance / moving_data.moving_time
if moving_data.moving_time > 0
else 0
)
gpx_data['average_speed'] = (average_speed / 1000) * 3600
return gpx_data
@ -72,9 +79,9 @@ def get_gpx_info(
if gpx is None:
raise WorkoutGPXException('not found', 'No gpx file')
gpx_data = {'name': gpx.tracks[0].name, 'segments': []}
max_speed = 0
start = 0
gpx_data: Dict = {'name': gpx.tracks[0].name, 'segments': []}
max_speed = 0.0
start: Optional[datetime] = None
map_data = []
weather_data = []
segments_nb = len(gpx.tracks[0].segments)
@ -83,14 +90,15 @@ def get_gpx_info(
stopped_time_between_seg = no_stopped_time
for segment_idx, segment in enumerate(gpx.tracks[0].segments):
segment_start = 0
segment_start: Optional[datetime] = None
segment_points_nb = len(segment.points)
for point_idx, point in enumerate(segment.points):
if point_idx == 0:
segment_start = point.time
# first gpx point => get weather
if start == 0:
if start is None:
start = point.time
if update_weather_data:
if point.time and update_weather_data:
weather_data.append(get_weather(point))
# if a previous segment exists, calculate stopped time between
@ -108,13 +116,19 @@ def get_gpx_info(
if update_map_data:
map_data.append([point.longitude, point.latitude])
calculated_max_speed = segment.get_moving_data(
moving_data = segment.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold
).max_speed
segment_max_speed = calculated_max_speed if calculated_max_speed else 0
)
if moving_data:
calculated_max_speed = moving_data.max_speed
segment_max_speed = (
calculated_max_speed if calculated_max_speed else 0
)
if segment_max_speed > max_speed:
max_speed = segment_max_speed
if segment_max_speed > max_speed:
max_speed = segment_max_speed
else:
segment_max_speed = 0.0
segment_data = get_gpx_data(
segment,
@ -137,12 +151,16 @@ def get_gpx_info(
if update_map_data:
bounds = gpx.get_bounds()
gpx_data['bounds'] = [
bounds.min_latitude,
bounds.min_longitude,
bounds.max_latitude,
bounds.max_longitude,
]
gpx_data['bounds'] = (
[
bounds.min_latitude,
bounds.min_longitude,
bounds.max_latitude,
bounds.max_longitude,
]
if bounds
else []
)
return gpx_data, map_data, weather_data
@ -222,7 +240,11 @@ def get_chart_data(
'latitude': point.latitude,
'longitude': point.longitude,
'speed': speed,
'time': point.time,
# workaround
# https://github.com/tkrajina/gpxpy/issues/209
'time': point.time.replace(
tzinfo=timezone(point.time.utcoffset())
),
}
)
previous_point = point

View File

@ -3,18 +3,22 @@ from typing import Dict, Optional
import forecastio
import pytz
from gpxpy.gpx import GPXRoutePoint
from gpxpy.gpx import GPXTrackPoint
from fittrackee import appLog
API_KEY = os.getenv('WEATHER_API_KEY')
def get_weather(point: GPXRoutePoint) -> Optional[Dict]:
if not API_KEY or API_KEY == '':
def get_weather(point: GPXTrackPoint) -> Optional[Dict]:
if not API_KEY or not point.time:
return None
try:
point_time = pytz.utc.localize(point.time)
point_time = (
pytz.utc.localize(point.time)
if point.time.tzinfo is None
else point.time.astimezone(pytz.utc)
)
forecast = forecastio.load_forecast(
API_KEY,
point.latitude,

View File

@ -22,31 +22,42 @@ from .gpx import get_gpx_info
from .maps import generate_map, get_map_hash
def get_datetime_with_tz(
timezone: str, workout_date: datetime, gpx_data: Optional[Dict] = None
) -> Tuple[Optional[datetime], datetime]:
def get_workout_datetime(
workout_date: Union[datetime, str],
user_timezone: Optional[str],
date_str_format: Optional[str] = None,
with_timezone: bool = False,
) -> Tuple[datetime, Optional[datetime]]:
"""
Return naive datetime and datetime with user timezone
Return naive datetime and datetime with user timezone if with_timezone
"""
workout_date_tz = None
if timezone:
user_tz = pytz.timezone(timezone)
utc_tz = pytz.utc
if gpx_data:
# workout date in gpx is in UTC, but in naive datetime
fmt = '%Y-%m-%d %H:%M:%S'
workout_date_string = workout_date.strftime(fmt)
workout_date_tmp = utc_tz.localize(
datetime.strptime(workout_date_string, fmt)
)
workout_date_tz = workout_date_tmp.astimezone(user_tz)
else:
workout_date_tz = user_tz.localize(workout_date)
workout_date = workout_date_tz.astimezone(utc_tz)
# make datetime 'naive' like in gpx file
workout_date = workout_date.replace(tzinfo=None)
workout_date_with_user_tz = None
return workout_date_tz, workout_date
# workout w/o gpx
if isinstance(workout_date, str):
if not date_str_format:
date_str_format = '%Y-%m-%d %H:%M:%S'
workout_date = datetime.strptime(workout_date, date_str_format)
if user_timezone:
workout_date = pytz.timezone(user_timezone).localize(workout_date)
if workout_date.tzinfo is None:
naive_workout_date = workout_date
if user_timezone and with_timezone:
pytz.utc.localize(naive_workout_date)
workout_date_with_user_tz = pytz.utc.localize(
naive_workout_date
).astimezone(pytz.timezone(user_timezone))
else:
naive_workout_date = workout_date.astimezone(pytz.utc).replace(
tzinfo=None
)
if user_timezone and with_timezone:
workout_date_with_user_tz = workout_date.astimezone(
pytz.timezone(user_timezone)
)
return naive_workout_date, workout_date_with_user_tz
def get_datetime_from_request_args(
@ -57,25 +68,32 @@ def get_datetime_from_request_args(
date_from_str = params.get('from')
if date_from_str:
date_from = datetime.strptime(date_from_str, '%Y-%m-%d')
_, date_from = get_datetime_with_tz(user.timezone, date_from)
date_from, _ = get_workout_datetime(
workout_date=date_from_str,
user_timezone=user.timezone,
date_str_format='%Y-%m-%d',
)
date_to_str = params.get('to')
if date_to_str:
date_to = datetime.strptime(
f'{date_to_str} 23:59:59', '%Y-%m-%d %H:%M:%S'
date_to, _ = get_workout_datetime(
workout_date=f'{date_to_str} 23:59:59',
user_timezone=user.timezone,
)
_, date_to = get_datetime_with_tz(user.timezone, date_to)
return date_from, date_to
def _remove_microseconds(delta: timedelta) -> timedelta:
return delta - timedelta(microseconds=delta.microseconds)
def update_workout_data(
workout: Union[Workout, WorkoutSegment], gpx_data: Dict
) -> Union[Workout, WorkoutSegment]:
"""
Update workout or workout segment with data from gpx file
"""
workout.pauses = gpx_data['stop_time']
workout.moving = gpx_data['moving_time']
workout.pauses = _remove_microseconds(gpx_data['stop_time'])
workout.moving = _remove_microseconds(gpx_data['moving_time'])
workout.min_alt = gpx_data['elevation_min']
workout.max_alt = gpx_data['elevation_max']
workout.descent = gpx_data['downhill']
@ -92,17 +110,17 @@ def create_workout(
Create Workout from data entered by user and from gpx if a gpx file is
provided
"""
workout_date = (
gpx_data['start']
workout_date, workout_date_tz = get_workout_datetime(
workout_date=gpx_data['start']
if gpx_data
else datetime.strptime(workout_data['workout_date'], '%Y-%m-%d %H:%M')
)
workout_date_tz, workout_date = get_datetime_with_tz(
user.timezone, workout_date, gpx_data
else workout_data['workout_date'],
date_str_format=None if gpx_data else '%Y-%m-%d %H:%M',
user_timezone=user.timezone,
with_timezone=True,
)
duration = (
gpx_data['duration']
_remove_microseconds(gpx_data['duration'])
if gpx_data
else timedelta(seconds=workout_data['duration'])
)
@ -202,11 +220,10 @@ def edit_workout(
workout.notes = workout_data.get('notes')
if not workout.gpx:
if workout_data.get('workout_date'):
workout_date = datetime.strptime(
workout_data['workout_date'], '%Y-%m-%d %H:%M'
)
_, workout.workout_date = get_datetime_with_tz(
auth_user.timezone, workout_date
workout.workout_date, _ = get_workout_datetime(
workout_date=workout_data.get('workout_date', ''),
date_str_format='%Y-%m-%d %H:%M',
user_timezone=auth_user.timezone,
)
if workout_data.get('duration'):