API - workouts refactoring

This commit is contained in:
Sam
2022-02-16 17:46:22 +01:00
parent 0b2e2ed5dd
commit 1b4a477544
20 changed files with 81 additions and 184 deletions

View File

View File

@ -0,0 +1,27 @@
from datetime import timedelta
from typing import Optional, Union
def convert_in_duration(value: str) -> timedelta:
hours = int(value.split(':')[0])
minutes = int(value.split(':')[1])
return timedelta(seconds=(hours * 3600 + minutes * 60))
def convert_timedelta_to_integer(value: str) -> int:
hours, minutes, seconds = str(value).split(':')
return int(hours) * 3600 + int(minutes) * 60 + int(seconds)
def convert_value_to_integer(
record_type: str, val: Union[str, float]
) -> Optional[int]:
if val is None:
return None
if record_type == 'LD':
return convert_timedelta_to_integer(str(val))
elif record_type in ['AS', 'MS']:
return int(val * 100)
else: # 'FD'
return int(val * 1000)

View File

@ -0,0 +1,261 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple
import gpxpy.gpx
from ..exceptions import WorkoutGPXException
from .weather import get_weather
def open_gpx_file(gpx_file: str) -> Optional[gpxpy.gpx.GPX]:
gpx_file = open(gpx_file, 'r') # type: ignore
gpx = gpxpy.parse(gpx_file)
if len(gpx.tracks) == 0:
return None
return gpx
def get_gpx_data(
parsed_gpx: gpxpy.gpx,
max_speed: float,
start: int,
stopped_time_between_seg: timedelta,
stopped_speed_threshold: float,
) -> Dict:
"""
Returns data from parsed gpx file
"""
gpx_data: Dict[str, Any] = {
'max_speed': (max_speed / 1000) * 3600,
'start': start,
}
duration = parsed_gpx.get_duration()
gpx_data['duration'] = (
timedelta(seconds=duration) + stopped_time_between_seg
)
ele = parsed_gpx.get_elevation_extremes()
gpx_data['elevation_max'] = ele.maximum
gpx_data['elevation_min'] = ele.minimum
hill = parsed_gpx.get_uphill_downhill()
gpx_data['uphill'] = hill.uphill
gpx_data['downhill'] = hill.downhill
mv = parsed_gpx.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold
)
gpx_data['moving_time'] = timedelta(seconds=mv.moving_time)
gpx_data['stop_time'] = (
timedelta(seconds=mv.stopped_time) + stopped_time_between_seg
)
distance = mv.moving_distance + mv.stopped_distance
gpx_data['distance'] = distance / 1000
average_speed = distance / mv.moving_time if mv.moving_time > 0 else 0
gpx_data['average_speed'] = (average_speed / 1000) * 3600
return gpx_data
def get_gpx_info(
gpx_file: str,
stopped_speed_threshold: float,
update_map_data: Optional[bool] = True,
update_weather_data: Optional[bool] = True,
) -> Tuple:
"""
Parse and return gpx, map and weather data from gpx file
"""
gpx = open_gpx_file(gpx_file)
if gpx is None:
raise WorkoutGPXException('not found', 'No gpx file')
gpx_data = {'name': gpx.tracks[0].name, 'segments': []}
max_speed = 0
start = 0
map_data = []
weather_data = []
segments_nb = len(gpx.tracks[0].segments)
prev_seg_last_point = None
no_stopped_time = timedelta(seconds=0)
stopped_time_between_seg = no_stopped_time
for segment_idx, segment in enumerate(gpx.tracks[0].segments):
segment_start = 0
segment_points_nb = len(segment.points)
for point_idx, point in enumerate(segment.points):
if point_idx == 0:
# first gpx point => get weather
if start == 0:
start = point.time
if update_weather_data:
weather_data.append(get_weather(point))
# if a previous segment exists, calculate stopped time between
# the two segments
if prev_seg_last_point:
stopped_time_between_seg = point.time - prev_seg_last_point
# last segment point
if point_idx == (segment_points_nb - 1):
prev_seg_last_point = point.time
# last gpx point => get weather
if segment_idx == (segments_nb - 1) and update_weather_data:
weather_data.append(get_weather(point))
if update_map_data:
map_data.append([point.longitude, point.latitude])
calculated_max_speed = segment.get_moving_data(
stopped_speed_threshold=stopped_speed_threshold
).max_speed
segment_max_speed = calculated_max_speed if calculated_max_speed else 0
if segment_max_speed > max_speed:
max_speed = segment_max_speed
segment_data = get_gpx_data(
segment,
segment_max_speed,
segment_start,
no_stopped_time,
stopped_speed_threshold,
)
segment_data['idx'] = segment_idx
gpx_data['segments'].append(segment_data)
full_gpx_data = get_gpx_data(
gpx,
max_speed,
start,
stopped_time_between_seg,
stopped_speed_threshold,
)
gpx_data = {**gpx_data, **full_gpx_data}
if update_map_data:
bounds = gpx.get_bounds()
gpx_data['bounds'] = [
bounds.min_latitude,
bounds.min_longitude,
bounds.max_latitude,
bounds.max_longitude,
]
return gpx_data, map_data, weather_data
def get_gpx_segments(
track_segments: List, segment_id: Optional[int] = None
) -> List:
"""
Return list of segments, filtered on segment id if provided
"""
if segment_id is not None:
segment_index = segment_id - 1
if segment_index > (len(track_segments) - 1):
raise WorkoutGPXException(
'not found', f'No segment with id \'{segment_id}\'', None
)
if segment_index < 0:
raise WorkoutGPXException('error', 'Incorrect segment id', None)
segments = [track_segments[segment_index]]
else:
segments = track_segments
return segments
def get_chart_data(
gpx_file: str, segment_id: Optional[int] = None
) -> Optional[List]:
"""
Return data needed to generate chart with speed and elevation
"""
gpx = open_gpx_file(gpx_file)
if gpx is None:
return None
chart_data = []
first_point = None
previous_point = None
previous_distance = 0
track_segments = gpx.tracks[0].segments
segments = get_gpx_segments(track_segments, segment_id)
for segment_idx, segment in enumerate(segments):
for point_idx, point in enumerate(segment.points):
if segment_idx == 0 and point_idx == 0:
first_point = point
distance = (
point.distance_3d(previous_point)
if (
point.elevation
and previous_point
and previous_point.elevation
)
else point.distance_2d(previous_point)
)
distance = 0 if distance is None else distance
distance += previous_distance
speed = (
round((segment.get_speed(point_idx) / 1000) * 3600, 2)
if segment.get_speed(point_idx) is not None
else 0
)
chart_data.append(
{
'distance': (
round(distance / 1000, 2)
if distance is not None
else 0
),
'duration': point.time_difference(first_point),
'elevation': (
round(point.elevation, 1)
if point.elevation is not None
else 0
),
'latitude': point.latitude,
'longitude': point.longitude,
'speed': speed,
'time': point.time,
}
)
previous_point = point
previous_distance = distance
return chart_data
def extract_segment_from_gpx_file(
content: str, segment_id: int
) -> Optional[str]:
"""
Returns segments in xml format from a gpx file content
"""
gpx_content = gpxpy.parse(content)
if len(gpx_content.tracks) == 0:
return None
track_segment = get_gpx_segments(
gpx_content.tracks[0].segments, segment_id
)
gpx = gpxpy.gpx.GPX()
gpx_track = gpxpy.gpx.GPXTrack()
gpx.tracks.append(gpx_track)
gpx_segment = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_segment)
for point_idx, point in enumerate(track_segment[0].points):
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
point.latitude, point.longitude, elevation=point.elevation
)
)
return gpx.to_xml()

View File

@ -0,0 +1,35 @@
import hashlib
from typing import List
from flask import current_app
from staticmap import Line, StaticMap
from fittrackee.files import get_absolute_file_path
def generate_map(map_filepath: str, map_data: List) -> None:
"""
Generate and save map image from map data
"""
m = StaticMap(400, 225, 10)
if not current_app.config['TILE_SERVER']['DEFAULT_STATICMAP']:
m.url_template = current_app.config['TILE_SERVER']['URL'].replace(
'{s}.', ''
)
line = Line(map_data, '#3388FF', 4)
m.add_line(line)
image = m.render()
image.save(map_filepath)
def get_map_hash(map_filepath: str) -> str:
"""
Generate a md5 hash used as id instead of workout id, to retrieve map
image (maps are sensitive data)
"""
md5 = hashlib.md5()
absolute_map_filepath = get_absolute_file_path(map_filepath)
with open(absolute_map_filepath, 'rb') as f:
for chunk in iter(lambda: f.read(128 * md5.block_size), b''):
md5.update(chunk)
return md5.hexdigest()

View File

@ -0,0 +1,17 @@
from uuid import UUID
import shortuuid
def encode_uuid(uuid_value: UUID) -> str:
"""
Return short id string from a UUID
"""
return shortuuid.encode(uuid_value)
def decode_short_id(short_id: str) -> UUID:
"""
Return UUID from a short id string
"""
return shortuuid.decode(short_id)

View File

@ -0,0 +1,16 @@
import os
from fittrackee.files import get_absolute_file_path
def get_upload_dir_size() -> int:
"""
Return upload directory size
"""
upload_path = get_absolute_file_path('')
total_size = 0
for dir_path, _, filenames in os.walk(upload_path):
for f in filenames:
fp = os.path.join(dir_path, f)
total_size += os.path.getsize(fp)
return total_size

View File

@ -0,0 +1,36 @@
import os
from typing import Dict, Optional
import forecastio
import pytz
from gpxpy.gpx import GPXRoutePoint
from fittrackee import appLog
API_KEY = os.getenv('WEATHER_API_KEY')
def get_weather(point: GPXRoutePoint) -> Optional[Dict]:
if not API_KEY or API_KEY == '':
return None
try:
point_time = pytz.utc.localize(point.time)
forecast = forecastio.load_forecast(
API_KEY,
point.latitude,
point.longitude,
time=point_time,
units='si',
)
weather = forecast.currently()
return {
'summary': weather.summary,
'icon': weather.icon,
'temperature': weather.temperature,
'humidity': weather.humidity,
'wind': weather.windSpeed,
'windBearing': weather.windBearing,
}
except Exception as e:
appLog.error(e)
return None

View File

@ -0,0 +1,417 @@
import os
import tempfile
import zipfile
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID
import gpxpy.gpx
import pytz
from flask import current_app
from sqlalchemy import exc
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from fittrackee import db
from fittrackee.files import get_absolute_file_path
from fittrackee.users.models import User, UserSportPreference
from ..exceptions import WorkoutException
from ..models import Sport, Workout, WorkoutSegment
from .gpx import get_gpx_info
from .maps import generate_map, get_map_hash
def get_datetime_with_tz(
timezone: str, workout_date: datetime, gpx_data: Optional[Dict] = None
) -> Tuple[Optional[datetime], datetime]:
"""
Return naive datetime and datetime with user timezone
"""
workout_date_tz = None
if timezone:
user_tz = pytz.timezone(timezone)
utc_tz = pytz.utc
if gpx_data:
# workout date in gpx is in UTC, but in naive datetime
fmt = '%Y-%m-%d %H:%M:%S'
workout_date_string = workout_date.strftime(fmt)
workout_date_tmp = utc_tz.localize(
datetime.strptime(workout_date_string, fmt)
)
workout_date_tz = workout_date_tmp.astimezone(user_tz)
else:
workout_date_tz = user_tz.localize(workout_date)
workout_date = workout_date_tz.astimezone(utc_tz)
# make datetime 'naive' like in gpx file
workout_date = workout_date.replace(tzinfo=None)
return workout_date_tz, workout_date
def get_datetime_from_request_args(
params: Dict, user: User
) -> Tuple[Optional[datetime], Optional[datetime]]:
date_from = None
date_to = None
date_from_str = params.get('from')
if date_from_str:
date_from = datetime.strptime(date_from_str, '%Y-%m-%d')
_, date_from = get_datetime_with_tz(user.timezone, date_from)
date_to_str = params.get('to')
if date_to_str:
date_to = datetime.strptime(
f'{date_to_str} 23:59:59', '%Y-%m-%d %H:%M:%S'
)
_, date_to = get_datetime_with_tz(user.timezone, date_to)
return date_from, date_to
def update_workout_data(
workout: Union[Workout, WorkoutSegment], gpx_data: Dict
) -> Union[Workout, WorkoutSegment]:
"""
Update workout or workout segment with data from gpx file
"""
workout.pauses = gpx_data['stop_time']
workout.moving = gpx_data['moving_time']
workout.min_alt = gpx_data['elevation_min']
workout.max_alt = gpx_data['elevation_max']
workout.descent = gpx_data['downhill']
workout.ascent = gpx_data['uphill']
workout.max_speed = gpx_data['max_speed']
workout.ave_speed = gpx_data['average_speed']
return workout
def create_workout(
user: User, workout_data: Dict, gpx_data: Optional[Dict] = None
) -> Workout:
"""
Create Workout from data entered by user and from gpx if a gpx file is
provided
"""
workout_date = (
gpx_data['start']
if gpx_data
else datetime.strptime(workout_data['workout_date'], '%Y-%m-%d %H:%M')
)
workout_date_tz, workout_date = get_datetime_with_tz(
user.timezone, workout_date, gpx_data
)
duration = (
gpx_data['duration']
if gpx_data
else timedelta(seconds=workout_data['duration'])
)
distance = gpx_data['distance'] if gpx_data else workout_data['distance']
title = gpx_data['name'] if gpx_data else workout_data.get('title', '')
new_workout = Workout(
user_id=user.id,
sport_id=workout_data['sport_id'],
workout_date=workout_date,
distance=distance,
duration=duration,
)
new_workout.notes = workout_data.get('notes')
if title is not None and title != '':
new_workout.title = title
else:
sport = Sport.query.filter_by(id=new_workout.sport_id).first()
fmt = "%Y-%m-%d %H:%M:%S"
workout_datetime = (
workout_date_tz.strftime(fmt)
if workout_date_tz
else new_workout.workout_date.strftime(fmt)
)
new_workout.title = f'{sport.label} - {workout_datetime}'
if gpx_data:
new_workout.gpx = gpx_data['filename']
new_workout.bounds = gpx_data['bounds']
update_workout_data(new_workout, gpx_data)
else:
new_workout.moving = duration
new_workout.ave_speed = (
None
if duration.seconds == 0
else float(new_workout.distance) / (duration.seconds / 3600)
)
new_workout.max_speed = new_workout.ave_speed
return new_workout
def create_segment(
workout_id: int, workout_uuid: UUID, segment_data: Dict
) -> WorkoutSegment:
"""
Create Workout Segment from gpx data
"""
new_segment = WorkoutSegment(
workout_id=workout_id,
workout_uuid=workout_uuid,
segment_id=segment_data['idx'],
)
new_segment.duration = segment_data['duration']
new_segment.distance = segment_data['distance']
update_workout_data(new_segment, segment_data)
return new_segment
def update_workout(workout: Workout) -> Workout:
"""
Update workout data from gpx file
"""
gpx_data, _, _ = get_gpx_info(
get_absolute_file_path(workout.gpx), False, False
)
updated_workout = update_workout_data(workout, gpx_data)
updated_workout.duration = gpx_data['duration']
updated_workout.distance = gpx_data['distance']
db.session.flush()
for segment_idx, segment in enumerate(updated_workout.segments):
segment_data = gpx_data['segments'][segment_idx]
updated_segment = update_workout_data(segment, segment_data)
updated_segment.duration = segment_data['duration']
updated_segment.distance = segment_data['distance']
db.session.flush()
return updated_workout
def edit_workout(
workout: Workout, workout_data: Dict, auth_user: User
) -> Workout:
"""
Edit a workout
Note: the gpx file is NOT modified
In a next version, map_data and weather_data will be updated
(case of a modified gpx file, see issue #7)
"""
if workout_data.get('sport_id'):
workout.sport_id = workout_data.get('sport_id')
if workout_data.get('title'):
workout.title = workout_data.get('title')
if workout_data.get('notes') is not None:
workout.notes = workout_data.get('notes')
if not workout.gpx:
if workout_data.get('workout_date'):
workout_date = datetime.strptime(
workout_data['workout_date'], '%Y-%m-%d %H:%M'
)
_, workout.workout_date = get_datetime_with_tz(
auth_user.timezone, workout_date
)
if workout_data.get('duration'):
workout.duration = timedelta(seconds=workout_data['duration'])
workout.moving = workout.duration
if workout_data.get('distance'):
workout.distance = workout_data['distance']
workout.ave_speed = (
None
if workout.duration.seconds == 0
else float(workout.distance) / (workout.duration.seconds / 3600)
)
workout.max_speed = workout.ave_speed
return workout
def get_file_path(dir_path: str, filename: str) -> str:
"""
Get full path for a file
"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
file_path = os.path.join(dir_path, filename)
return file_path
def get_new_file_path(
auth_user_id: int,
workout_date: str,
sport: str,
old_filename: Optional[str] = None,
extension: Optional[str] = None,
) -> str:
"""
Generate a file path from user and workout data
"""
if not extension and old_filename:
extension = f".{old_filename.rsplit('.', 1)[1].lower()}"
_, new_filename = tempfile.mkstemp(
prefix=f'{workout_date}_{sport}_', suffix=extension
)
dir_path = os.path.join('workouts', str(auth_user_id))
if not os.path.exists(dir_path):
os.makedirs(dir_path)
file_path = os.path.join(dir_path, new_filename.split('/')[-1])
return file_path
def process_one_gpx_file(
params: Dict, filename: str, stopped_speed_threshold: float
) -> Workout:
"""
Get all data from a gpx file to create a workout with map image
"""
try:
gpx_data, map_data, weather_data = get_gpx_info(
params['file_path'], stopped_speed_threshold
)
auth_user = params['auth_user']
new_filepath = get_new_file_path(
auth_user_id=auth_user.id,
workout_date=gpx_data['start'],
old_filename=filename,
sport=params['sport_label'],
)
absolute_gpx_filepath = get_absolute_file_path(new_filepath)
os.rename(params['file_path'], absolute_gpx_filepath)
gpx_data['filename'] = new_filepath
map_filepath = get_new_file_path(
auth_user_id=auth_user.id,
workout_date=gpx_data['start'],
extension='.png',
sport=params['sport_label'],
)
absolute_map_filepath = get_absolute_file_path(map_filepath)
generate_map(absolute_map_filepath, map_data)
except (gpxpy.gpx.GPXXMLSyntaxException, TypeError) as e:
raise WorkoutException('error', 'Error during gpx file parsing.', e)
except Exception as e:
raise WorkoutException('error', 'Error during gpx processing.', e)
try:
new_workout = create_workout(
auth_user, params['workout_data'], gpx_data
)
new_workout.map = map_filepath
new_workout.map_id = get_map_hash(map_filepath)
new_workout.weather_start = weather_data[0]
new_workout.weather_end = weather_data[1]
db.session.add(new_workout)
db.session.flush()
for segment_data in gpx_data['segments']:
new_segment = create_segment(
new_workout.id, new_workout.uuid, segment_data
)
db.session.add(new_segment)
db.session.commit()
return new_workout
except (exc.IntegrityError, ValueError) as e:
raise WorkoutException('fail', 'Error during workout save.', e)
def process_zip_archive(
common_params: Dict, extract_dir: str, stopped_speed_threshold: float
) -> List:
"""
Get files from a zip archive and create workouts, if number of files
does not exceed defined limit.
"""
with zipfile.ZipFile(common_params['file_path'], "r") as zip_ref:
zip_ref.extractall(extract_dir)
new_workouts = []
gpx_files_limit = current_app.config['gpx_limit_import']
gpx_files_ok = 0
for gpx_file in os.listdir(extract_dir):
if (
'.' in gpx_file
and gpx_file.rsplit('.', 1)[1].lower()
in current_app.config['WORKOUT_ALLOWED_EXTENSIONS']
):
gpx_files_ok += 1
if gpx_files_ok > gpx_files_limit:
break
file_path = os.path.join(extract_dir, gpx_file)
params = common_params
params['file_path'] = file_path
new_workout = process_one_gpx_file(
params, gpx_file, stopped_speed_threshold
)
new_workouts.append(new_workout)
return new_workouts
def process_files(
auth_user: User,
workout_data: Dict,
workout_file: FileStorage,
folders: Dict,
) -> List:
"""
Store gpx file or zip archive and create workouts
"""
if workout_file.filename is None:
raise WorkoutException('error', 'File has no filename.')
filename = secure_filename(workout_file.filename)
extension = f".{filename.rsplit('.', 1)[1].lower()}"
file_path = get_file_path(folders['tmp_dir'], filename)
sport = Sport.query.filter_by(id=workout_data.get('sport_id')).first()
if not sport:
raise WorkoutException(
'error',
f"Sport id: {workout_data.get('sport_id')} does not exist",
)
sport_preferences = UserSportPreference.query.filter_by(
user_id=auth_user.id, sport_id=sport.id
).first()
stopped_speed_threshold = (
sport.stopped_speed_threshold
if sport_preferences is None
else sport_preferences.stopped_speed_threshold
)
common_params = {
'auth_user': auth_user,
'workout_data': workout_data,
'file_path': file_path,
'sport_label': sport.label,
}
try:
workout_file.save(file_path)
except Exception as e:
raise WorkoutException('error', 'Error during workout file save.', e)
if extension == ".gpx":
return [
process_one_gpx_file(
common_params,
filename,
stopped_speed_threshold,
)
]
else:
return process_zip_archive(
common_params,
folders['extract_dir'],
stopped_speed_threshold,
)
def get_average_speed(
nb_workouts: int, total_average_speed: float, workout_average_speed: float
) -> float:
return round(
(
(total_average_speed * (nb_workouts - 1))
+ float(workout_average_speed)
)
/ nb_workouts,
2,
)