API - replace 'Activity' with 'Workout' - #58

This commit is contained in:
Sam
2021-01-10 11:16:43 +01:00
parent 24ee5bbcfa
commit 3a80e01cc2
70 changed files with 2746 additions and 2511 deletions

View File

View File

@ -0,0 +1,494 @@
import datetime
import os
from typing import Any, Dict, Optional, Union
from uuid import UUID, uuid4
from fittrackee import db
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine.base import Connection
from sqlalchemy.event import listens_for
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm.mapper import Mapper
from sqlalchemy.orm.session import Session, object_session
from sqlalchemy.types import JSON, Enum
from .utils_files import get_absolute_file_path
from .utils_format import convert_in_duration, convert_value_to_integer
from .utils_id import encode_uuid
BaseModel: DeclarativeMeta = db.Model
record_types = [
'AS', # 'Best Average Speed'
'FD', # 'Farthest Distance'
'LD', # 'Longest Duration'
'MS', # 'Max speed'
]
def update_records(
user_id: int, sport_id: int, connection: Connection, session: Session
) -> None:
record_table = Record.__table__
new_records = Workout.get_user_workout_records(user_id, sport_id)
for record_type, record_data in new_records.items():
if record_data['record_value']:
record = Record.query.filter_by(
user_id=user_id, sport_id=sport_id, record_type=record_type
).first()
if record:
value = convert_value_to_integer(
record_type, record_data['record_value']
)
connection.execute(
record_table.update()
.where(record_table.c.id == record.id)
.values(
value=value,
workout_id=record_data['workout'].id,
workout_uuid=record_data['workout'].uuid,
workout_date=record_data['workout'].workout_date,
)
)
else:
new_record = Record(
workout=record_data['workout'], record_type=record_type
)
new_record.value = record_data['record_value'] # type: ignore
session.add(new_record)
else:
connection.execute(
record_table.delete()
.where(record_table.c.user_id == user_id)
.where(record_table.c.sport_id == sport_id)
.where(record_table.c.record_type == record_type)
)
class Sport(BaseModel):
__tablename__ = 'sports'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
label = db.Column(db.String(50), unique=True, nullable=False)
img = db.Column(db.String(255), unique=True, nullable=True)
is_active = db.Column(db.Boolean, default=True, nullable=False)
workouts = db.relationship(
'Workout', lazy=True, backref=db.backref('sports', lazy='joined')
)
records = db.relationship(
'Record', lazy=True, backref=db.backref('sports', lazy='joined')
)
def __repr__(self) -> str:
return f'<Sport {self.label!r}>'
def __init__(self, label: str) -> None:
self.label = label
def serialize(self, is_admin: Optional[bool] = False) -> Dict:
serialized_sport = {
'id': self.id,
'label': self.label,
'img': self.img,
'is_active': self.is_active,
}
if is_admin:
serialized_sport['has_workouts'] = len(self.workouts) > 0
return serialized_sport
class Workout(BaseModel):
__tablename__ = 'workouts'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
uuid = db.Column(
postgresql.UUID(as_uuid=True),
default=uuid4,
unique=True,
nullable=False,
)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
sport_id = db.Column(
db.Integer, db.ForeignKey('sports.id'), nullable=False
)
title = db.Column(db.String(255), nullable=True)
gpx = db.Column(db.String(255), nullable=True)
creation_date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
modification_date = db.Column(
db.DateTime, onupdate=datetime.datetime.utcnow
)
workout_date = db.Column(db.DateTime, nullable=False)
duration = db.Column(db.Interval, nullable=False)
pauses = db.Column(db.Interval, nullable=True)
moving = db.Column(db.Interval, nullable=True)
distance = db.Column(db.Numeric(6, 3), nullable=True) # kilometers
min_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters
max_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters
descent = db.Column(db.Numeric(7, 2), nullable=True) # meters
ascent = db.Column(db.Numeric(7, 2), nullable=True) # meters
max_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h
ave_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h
bounds = db.Column(postgresql.ARRAY(db.Float), nullable=True)
map = db.Column(db.String(255), nullable=True)
map_id = db.Column(db.String(50), nullable=True)
weather_start = db.Column(JSON, nullable=True)
weather_end = db.Column(JSON, nullable=True)
notes = db.Column(db.String(500), nullable=True)
segments = db.relationship(
'WorkoutSegment',
lazy=True,
cascade='all, delete',
backref=db.backref('workouts', lazy='joined', single_parent=True),
)
records = db.relationship(
'Record',
lazy=True,
cascade='all, delete',
backref=db.backref('workouts', lazy='joined', single_parent=True),
)
def __str__(self) -> str:
return f'<Workout \'{self.sports.label}\' - {self.workout_date}>'
def __init__(
self,
user_id: int,
sport_id: int,
workout_date: datetime.datetime,
distance: float,
duration: datetime.timedelta,
) -> None:
self.user_id = user_id
self.sport_id = sport_id
self.workout_date = workout_date
self.distance = distance
self.duration = duration
@property
def short_id(self) -> str:
return encode_uuid(self.uuid)
def serialize(self, params: Optional[Dict] = None) -> Dict:
date_from = params.get('from') if params else None
date_to = params.get('to') if params else None
distance_from = params.get('distance_from') if params else None
distance_to = params.get('distance_to') if params else None
duration_from = params.get('duration_from') if params else None
duration_to = params.get('duration_to') if params else None
ave_speed_from = params.get('ave_speed_from') if params else None
ave_speed_to = params.get('ave_speed_to') if params else None
max_speed_from = params.get('max_speed_from') if params else None
max_speed_to = params.get('max_speed_to') if params else None
sport_id = params.get('sport_id') if params else None
previous_workout = (
Workout.query.filter(
Workout.id != self.id,
Workout.user_id == self.user_id,
Workout.sport_id == sport_id if sport_id else True,
Workout.workout_date <= self.workout_date,
Workout.workout_date
>= datetime.datetime.strptime(date_from, '%Y-%m-%d')
if date_from
else True,
Workout.workout_date
<= datetime.datetime.strptime(date_to, '%Y-%m-%d')
if date_to
else True,
Workout.distance >= int(distance_from)
if distance_from
else True,
Workout.distance <= int(distance_to) if distance_to else True,
Workout.duration >= convert_in_duration(duration_from)
if duration_from
else True,
Workout.duration <= convert_in_duration(duration_to)
if duration_to
else True,
Workout.ave_speed >= float(ave_speed_from)
if ave_speed_from
else True,
Workout.ave_speed <= float(ave_speed_to)
if ave_speed_to
else True,
Workout.max_speed >= float(max_speed_from)
if max_speed_from
else True,
Workout.max_speed <= float(max_speed_to)
if max_speed_to
else True,
)
.order_by(Workout.workout_date.desc())
.first()
)
next_workout = (
Workout.query.filter(
Workout.id != self.id,
Workout.user_id == self.user_id,
Workout.sport_id == sport_id if sport_id else True,
Workout.workout_date >= self.workout_date,
Workout.workout_date
>= datetime.datetime.strptime(date_from, '%Y-%m-%d')
if date_from
else True,
Workout.workout_date
<= datetime.datetime.strptime(date_to, '%Y-%m-%d')
if date_to
else True,
Workout.distance >= int(distance_from)
if distance_from
else True,
Workout.distance <= int(distance_to) if distance_to else True,
Workout.duration >= convert_in_duration(duration_from)
if duration_from
else True,
Workout.duration <= convert_in_duration(duration_to)
if duration_to
else True,
Workout.ave_speed >= float(ave_speed_from)
if ave_speed_from
else True,
Workout.ave_speed <= float(ave_speed_to)
if ave_speed_to
else True,
)
.order_by(Workout.workout_date.asc())
.first()
)
return {
'id': self.short_id, # WARNING: client use uuid as id
'user': self.user.username,
'sport_id': self.sport_id,
'title': self.title,
'creation_date': self.creation_date,
'modification_date': self.modification_date,
'workout_date': self.workout_date,
'duration': str(self.duration) if self.duration else None,
'pauses': str(self.pauses) if self.pauses else None,
'moving': str(self.moving) if self.moving else None,
'distance': float(self.distance) if self.distance else None,
'min_alt': float(self.min_alt) if self.min_alt else None,
'max_alt': float(self.max_alt) if self.max_alt else None,
'descent': float(self.descent) if self.descent else None,
'ascent': float(self.ascent) if self.ascent else None,
'max_speed': float(self.max_speed) if self.max_speed else None,
'ave_speed': float(self.ave_speed) if self.ave_speed else None,
'with_gpx': self.gpx is not None,
'bounds': [float(bound) for bound in self.bounds]
if self.bounds
else [], # noqa
'previous_workout': previous_workout.short_id
if previous_workout
else None, # noqa
'next_workout': next_workout.short_id if next_workout else None,
'segments': [segment.serialize() for segment in self.segments],
'records': [record.serialize() for record in self.records],
'map': self.map_id if self.map else None,
'weather_start': self.weather_start,
'weather_end': self.weather_end,
'notes': self.notes,
}
@classmethod
def get_user_workout_records(
cls, user_id: int, sport_id: int, as_integer: Optional[bool] = False
) -> Dict:
record_types_columns = {
'AS': 'ave_speed', # 'Average speed'
'FD': 'distance', # 'Farthest Distance'
'LD': 'moving', # 'Longest Duration'
'MS': 'max_speed', # 'Max speed'
}
records = {}
for record_type, column in record_types_columns.items():
column_sorted = getattr(getattr(Workout, column), 'desc')()
record_workout = (
Workout.query.filter_by(user_id=user_id, sport_id=sport_id)
.order_by(column_sorted, Workout.workout_date)
.first()
)
records[record_type] = dict(
record_value=(
getattr(record_workout, column) if record_workout else None
),
workout=record_workout,
)
return records
@listens_for(Workout, 'after_insert')
def on_workout_insert(
mapper: Mapper, connection: Connection, workout: Workout
) -> None:
@listens_for(db.Session, 'after_flush', once=True)
def receive_after_flush(session: Session, context: Any) -> None:
update_records(
workout.user_id, workout.sport_id, connection, session
) # noqa
@listens_for(Workout, 'after_update')
def on_workout_update(
mapper: Mapper, connection: Connection, workout: Workout
) -> None:
if object_session(workout).is_modified(
workout, include_collections=True
): # noqa
@listens_for(db.Session, 'after_flush', once=True)
def receive_after_flush(session: Session, context: Any) -> None:
sports_list = [workout.sport_id]
records = Record.query.filter_by(workout_id=workout.id).all()
for rec in records:
if rec.sport_id not in sports_list:
sports_list.append(rec.sport_id)
for sport_id in sports_list:
update_records(workout.user_id, sport_id, connection, session)
@listens_for(Workout, 'after_delete')
def on_workout_delete(
mapper: Mapper, connection: Connection, old_record: 'Record'
) -> None:
@listens_for(db.Session, 'after_flush', once=True)
def receive_after_flush(session: Session, context: Any) -> None:
if old_record.map:
os.remove(get_absolute_file_path(old_record.map))
if old_record.gpx:
os.remove(get_absolute_file_path(old_record.gpx))
class WorkoutSegment(BaseModel):
__tablename__ = 'workout_segments'
workout_id = db.Column(
db.Integer, db.ForeignKey('workouts.id'), primary_key=True
)
workout_uuid = db.Column(postgresql.UUID(as_uuid=True), nullable=False)
segment_id = db.Column(db.Integer, primary_key=True)
duration = db.Column(db.Interval, nullable=False)
pauses = db.Column(db.Interval, nullable=True)
moving = db.Column(db.Interval, nullable=True)
distance = db.Column(db.Numeric(6, 3), nullable=True) # kilometers
min_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters
max_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters
descent = db.Column(db.Numeric(7, 2), nullable=True) # meters
ascent = db.Column(db.Numeric(7, 2), nullable=True) # meters
max_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h
ave_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h
def __str__(self) -> str:
return (
f'<Segment \'{self.segment_id}\' '
f'for workout \'{encode_uuid(self.workout_uuid)}\'>'
)
def __init__(
self, segment_id: int, workout_id: int, workout_uuid: UUID
) -> None:
self.segment_id = segment_id
self.workout_id = workout_id
self.workout_uuid = workout_uuid
def serialize(self) -> Dict:
return {
'workout_id': encode_uuid(self.workout_uuid),
'segment_id': self.segment_id,
'duration': str(self.duration) if self.duration else None,
'pauses': str(self.pauses) if self.pauses else None,
'moving': str(self.moving) if self.moving else None,
'distance': float(self.distance) if self.distance else None,
'min_alt': float(self.min_alt) if self.min_alt else None,
'max_alt': float(self.max_alt) if self.max_alt else None,
'descent': float(self.descent) if self.descent else None,
'ascent': float(self.ascent) if self.ascent else None,
'max_speed': float(self.max_speed) if self.max_speed else None,
'ave_speed': float(self.ave_speed) if self.ave_speed else None,
}
class Record(BaseModel):
__tablename__ = "records"
__table_args__ = (
db.UniqueConstraint(
'user_id', 'sport_id', 'record_type', name='user_sports_records'
),
)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
sport_id = db.Column(
db.Integer, db.ForeignKey('sports.id'), nullable=False
)
workout_id = db.Column(
db.Integer, db.ForeignKey('workouts.id'), nullable=False
)
workout_uuid = db.Column(postgresql.UUID(as_uuid=True), nullable=False)
record_type = db.Column(Enum(*record_types, name="record_types"))
workout_date = db.Column(db.DateTime, nullable=False)
_value = db.Column("value", db.Integer, nullable=True)
def __str__(self) -> str:
return (
f'<Record {self.sports.label} - '
f'{self.record_type} - '
f"{self.workout_date.strftime('%Y-%m-%d')}>"
)
def __init__(self, workout: Workout, record_type: str) -> None:
self.user_id = workout.user_id
self.sport_id = workout.sport_id
self.workout_id = workout.id
self.workout_uuid = workout.uuid
self.record_type = record_type
self.workout_date = workout.workout_date
@hybrid_property
def value(self) -> Optional[Union[datetime.timedelta, float]]:
if self._value is None:
return None
if self.record_type == 'LD':
return datetime.timedelta(seconds=self._value)
elif self.record_type in ['AS', 'MS']:
return float(self._value / 100)
else: # 'FD'
return float(self._value / 1000)
@value.setter # type: ignore
def value(self, val: Union[str, float]) -> None:
self._value = convert_value_to_integer(self.record_type, val)
def serialize(self) -> Dict:
if self.value is None:
value = None
elif self.record_type in ['AS', 'FD', 'MS']:
value = float(self.value) # type: ignore
else: # 'LD'
value = str(self.value) # type: ignore
return {
'id': self.id,
'user': self.user.username,
'sport_id': self.sport_id,
'workout_id': encode_uuid(self.workout_uuid),
'record_type': self.record_type,
'workout_date': self.workout_date,
'value': value,
}
@listens_for(Record, 'after_delete')
def on_record_delete(
mapper: Mapper, connection: Connection, old_record: Record
) -> None:
@listens_for(db.Session, 'after_flush', once=True)
def receive_after_flush(session: Session, context: Any) -> None:
workout = old_record.workouts
new_records = Workout.get_user_workout_records(
workout.user_id, workout.sport_id
)
for record_type, record_data in new_records.items():
if (
record_data['record_value']
and record_type == old_record.record_type
):
new_record = Record(
workout=record_data['workout'], record_type=record_type
)
new_record.value = record_data['record_value'] # type: ignore
session.add(new_record)

View File

@ -0,0 +1,116 @@
from typing import Dict
from flask import Blueprint
from ..users.utils import authenticate
from .models import Record
records_blueprint = Blueprint('records', __name__)
@records_blueprint.route('/records', methods=['GET'])
@authenticate
def get_records(auth_user_id: int) -> Dict:
"""
Get all records for authenticated user.
Following types of records are available:
- average speed (record_type: 'AS')
- farest distance (record_type: 'FD')
- longest duration (record_type: 'LD')
- maximum speed (record_type: 'MS')
**Example request**:
.. sourcecode:: http
GET /api/records HTTP/1.1
Content-Type: application/json
**Example responses**:
- returning records
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"records": [
{
"id": 9,
"record_type": "AS",
"sport_id": 1,
"user": "admin",
"value": 18,
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
},
{
"id": 10,
"record_type": "FD",
"sport_id": 1,
"user": "admin",
"value": 18,
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
},
{
"id": 11,
"record_type": "LD",
"sport_id": 1,
"user": "admin",
"value": "1:01:00",
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
},
{
"id": 12,
"record_type": "MS",
"sport_id": 1,
"user": "admin",
"value": 18,
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
}
]
},
"status": "success"
}
- no records
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"records": []
},
"status": "success"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
"""
records = (
Record.query.filter_by(user_id=auth_user_id)
.order_by(Record.sport_id.asc(), Record.record_type.asc())
.all()
)
return {
'status': 'success',
'data': {'records': [record.serialize() for record in records]},
}

View File

@ -0,0 +1,346 @@
from typing import Dict, Union
from fittrackee import db
from fittrackee.responses import (
DataNotFoundErrorResponse,
HttpResponse,
InvalidPayloadErrorResponse,
handle_error_and_return_response,
)
from flask import Blueprint, request
from sqlalchemy import exc
from ..users.models import User
from ..users.utils import authenticate, authenticate_as_admin
from .models import Sport
sports_blueprint = Blueprint('sports', __name__)
@sports_blueprint.route('/sports', methods=['GET'])
@authenticate
def get_sports(auth_user_id: int) -> Dict:
"""
Get all sports
**Example request**:
.. sourcecode:: http
GET /api/sports HTTP/1.1
Content-Type: application/json
**Example response**:
- for non admin user :
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"sports": [
{
"id": 1,
"img": "/img/sports/cycling-sport.png",
"is_active": true,
"label": "Cycling (Sport)"
},
{
"id": 2,
"img": "/img/sports/cycling-transport.png",
"is_active": true,
"label": "Cycling (Transport)"
},
{
"id": 3,
"img": "/img/sports/hiking.png",
"is_active": true,
"label": "Hiking"
},
{
"id": 4,
"img": "/img/sports/mountain-biking.png",
"is_active": true,
"label": "Mountain Biking"
},
{
"id": 5,
"img": "/img/sports/running.png",
"is_active": true,
"label": "Running"
},
{
"id": 6,
"img": "/img/sports/walking.png",
"is_active": true,
"label": "Walking"
}
]
},
"status": "success"
}
- for admin user :
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"sports": [
{
"has_workouts": true,
"id": 1,
"img": "/img/sports/cycling-sport.png",
"is_active": true,
"label": "Cycling (Sport)"
},
{
"has_workouts": false,
"id": 2,
"img": "/img/sports/cycling-transport.png",
"is_active": true,
"label": "Cycling (Transport)"
},
{
"has_workouts": false,
"id": 3,
"img": "/img/sports/hiking.png",
"is_active": true,
"label": "Hiking"
},
{
"has_workouts": false,
"id": 4,
"img": "/img/sports/mountain-biking.png",
"is_active": true,
"label": "Mountain Biking"
},
{
"has_workouts": false,
"id": 5,
"img": "/img/sports/running.png",
"is_active": true,
"label": "Running"
},
{
"has_workouts": false,
"id": 6,
"img": "/img/sports/walking.png",
"is_active": true,
"label": "Walking"
}
]
},
"status": "success"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
"""
user = User.query.filter_by(id=int(auth_user_id)).first()
sports = Sport.query.order_by(Sport.id).all()
return {
'status': 'success',
'data': {'sports': [sport.serialize(user.admin) for sport in sports]},
}
@sports_blueprint.route('/sports/<int:sport_id>', methods=['GET'])
@authenticate
def get_sport(auth_user_id: int, sport_id: int) -> Union[Dict, HttpResponse]:
"""
Get a sport
**Example request**:
.. sourcecode:: http
GET /api/sports/1 HTTP/1.1
Content-Type: application/json
**Example response**:
- success for non admin user :
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"sports": [
{
"id": 1,
"img": "/img/sports/cycling-sport.png",
"is_active": true,
"label": "Cycling (Sport)"
}
]
},
"status": "success"
}
- success for admin user :
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"sports": [
{
"has_workouts": false,
"id": 1,
"img": "/img/sports/cycling-sport.png",
"is_active": true,
"label": "Cycling (Sport)"
}
]
},
"status": "success"
}
- sport not found
.. sourcecode:: http
HTTP/1.1 404 NOT FOUND
Content-Type: application/json
{
"data": {
"sports": []
},
"status": "not found"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:param integer sport_id: sport id
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
:statuscode 404: sport not found
"""
user = User.query.filter_by(id=int(auth_user_id)).first()
sport = Sport.query.filter_by(id=sport_id).first()
if sport:
return {
'status': 'success',
'data': {'sports': [sport.serialize(user.admin)]},
}
return DataNotFoundErrorResponse('sports')
@sports_blueprint.route('/sports/<int:sport_id>', methods=['PATCH'])
@authenticate_as_admin
def update_sport(
auth_user_id: int, sport_id: int
) -> Union[Dict, HttpResponse]:
"""
Update a sport
Authenticated user must be an admin
**Example request**:
.. sourcecode:: http
PATCH /api/sports/1 HTTP/1.1
Content-Type: application/json
**Example response**:
- success
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"sports": [
{
"has_workouts": false,
"id": 1,
"img": "/img/sports/cycling-sport.png",
"is_active": false,
"label": "Cycling (Sport)"
}
]
},
"status": "success"
}
- sport not found
.. sourcecode:: http
HTTP/1.1 404 NOT FOUND
Content-Type: application/json
{
"data": {
"sports": []
},
"status": "not found"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:param integer sport_id: sport id
:<json string is_active: sport active status
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: sport updated
:statuscode 400: invalid payload
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
:statuscode 403: You do not have permissions.
:statuscode 404: sport not found
:statuscode 500:
"""
sport_data = request.get_json()
if not sport_data or sport_data.get('is_active') is None:
return InvalidPayloadErrorResponse()
try:
sport = Sport.query.filter_by(id=sport_id).first()
if not sport:
return DataNotFoundErrorResponse('sports')
sport.is_active = sport_data.get('is_active')
db.session.commit()
return {
'status': 'success',
'data': {'sports': [sport.serialize(True)]},
}
except (exc.IntegrityError, exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)

View File

@ -0,0 +1,388 @@
from datetime import datetime, timedelta
from typing import Dict, Union
from fittrackee import db
from fittrackee.responses import (
HttpResponse,
InvalidPayloadErrorResponse,
NotFoundErrorResponse,
UserNotFoundErrorResponse,
handle_error_and_return_response,
)
from flask import Blueprint, request
from sqlalchemy import func
from ..users.models import User
from ..users.utils import authenticate, authenticate_as_admin
from .models import Sport, Workout
from .utils import get_datetime_with_tz, get_upload_dir_size
from .utils_format import convert_timedelta_to_integer
stats_blueprint = Blueprint('stats', __name__)
def get_workouts(
user_name: str, filter_type: str
) -> Union[Dict, HttpResponse]:
"""
Return user workouts by sport or by time
"""
try:
user = User.query.filter_by(username=user_name).first()
if not user:
return UserNotFoundErrorResponse()
params = request.args.copy()
date_from = params.get('from')
if date_from:
date_from = datetime.strptime(date_from, '%Y-%m-%d')
_, date_from = get_datetime_with_tz(user.timezone, date_from)
date_to = params.get('to')
if date_to:
date_to = datetime.strptime(
f'{date_to} 23:59:59', '%Y-%m-%d %H:%M:%S'
)
_, date_to = get_datetime_with_tz(user.timezone, date_to)
sport_id = params.get('sport_id')
time = params.get('time')
if filter_type == 'by_sport':
if sport_id:
sport = Sport.query.filter_by(id=sport_id).first()
if not sport:
return NotFoundErrorResponse('Sport does not exist.')
workouts = (
Workout.query.filter(
Workout.user_id == user.id,
Workout.workout_date >= date_from if date_from else True,
Workout.workout_date < date_to + timedelta(seconds=1)
if date_to
else True,
Workout.sport_id == sport_id if sport_id else True,
)
.order_by(Workout.workout_date.asc())
.all()
)
workouts_list_by_sport = {}
workouts_list_by_time = {} # type: ignore
for workout in workouts:
if filter_type == 'by_sport':
sport_id = workout.sport_id
if sport_id not in workouts_list_by_sport:
workouts_list_by_sport[sport_id] = {
'nb_workouts': 0,
'total_distance': 0.0,
'total_duration': 0,
}
workouts_list_by_sport[sport_id]['nb_workouts'] += 1
workouts_list_by_sport[sport_id]['total_distance'] += float(
workout.distance
)
workouts_list_by_sport[sport_id][
'total_duration'
] += convert_timedelta_to_integer(workout.moving)
# filter_type == 'by_time'
else:
if time == 'week':
workout_date = workout.workout_date - timedelta(
days=(
workout.workout_date.isoweekday()
if workout.workout_date.isoweekday() < 7
else 0
)
)
time_period = datetime.strftime(workout_date, "%Y-%m-%d")
elif time == 'weekm': # week start Monday
workout_date = workout.workout_date - timedelta(
days=workout.workout_date.weekday()
)
time_period = datetime.strftime(workout_date, "%Y-%m-%d")
elif time == 'month':
time_period = datetime.strftime(
workout.workout_date, "%Y-%m"
)
elif time == 'year' or not time:
time_period = datetime.strftime(workout.workout_date, "%Y")
else:
return InvalidPayloadErrorResponse(
'Invalid time period.', 'fail'
)
sport_id = workout.sport_id
if time_period not in workouts_list_by_time:
workouts_list_by_time[time_period] = {}
if sport_id not in workouts_list_by_time[time_period]:
workouts_list_by_time[time_period][sport_id] = {
'nb_workouts': 0,
'total_distance': 0.0,
'total_duration': 0,
}
workouts_list_by_time[time_period][sport_id][
'nb_workouts'
] += 1
workouts_list_by_time[time_period][sport_id][
'total_distance'
] += float(workout.distance)
workouts_list_by_time[time_period][sport_id][
'total_duration'
] += convert_timedelta_to_integer(workout.moving)
return {
'status': 'success',
'data': {
'statistics': workouts_list_by_sport
if filter_type == 'by_sport'
else workouts_list_by_time
},
}
except Exception as e:
return handle_error_and_return_response(e)
@stats_blueprint.route('/stats/<user_name>/by_time', methods=['GET'])
@authenticate
def get_workouts_by_time(
auth_user_id: int, user_name: str
) -> Union[Dict, HttpResponse]:
"""
Get workouts statistics for a user by time
**Example requests**:
- without parameters
.. sourcecode:: http
GET /api/stats/admin/by_time HTTP/1.1
- with parameters
.. sourcecode:: http
GET /api/stats/admin/by_time?from=2018-01-01&to=2018-06-30&time=week
HTTP/1.1
**Example responses**:
- success
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"statistics": {
"2017": {
"3": {
"nb_workouts": 2,
"total_distance": 15.282,
"total_duration": 12341
}
},
"2019": {
"1": {
"nb_workouts": 3,
"total_distance": 47,
"total_duration": 9960
},
"2": {
"nb_workouts": 1,
"total_distance": 5.613,
"total_duration": 1267
}
}
}
},
"status": "success"
}
- no workouts
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"statistics": {}
},
"status": "success"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:param integer user_name: user name
:query string from: start date (format: ``%Y-%m-%d``)
:query string to: end date (format: ``%Y-%m-%d``)
:query string time: time frame:
- ``week``: week starting Sunday
- ``weekm``: week starting Monday
- ``month``: month
- ``year``: year (default)
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
:statuscode 404:
- User does not exist.
"""
return get_workouts(user_name, 'by_time')
@stats_blueprint.route('/stats/<user_name>/by_sport', methods=['GET'])
@authenticate
def get_workouts_by_sport(
auth_user_id: int, user_name: str
) -> Union[Dict, HttpResponse]:
"""
Get workouts statistics for a user by sport
**Example requests**:
- without parameters (get stats for all sports with workouts)
.. sourcecode:: http
GET /api/stats/admin/by_sport HTTP/1.1
- with sport id
.. sourcecode:: http
GET /api/stats/admin/by_sport?sport_id=1 HTTP/1.1
**Example responses**:
- success
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"statistics": {
"1": {
"nb_workouts": 3,
"total_distance": 47,
"total_duration": 9960
},
"2": {
"nb_workouts": 1,
"total_distance": 5.613,
"total_duration": 1267
},
"3": {
"nb_workouts": 2,
"total_distance": 15.282,
"total_duration": 12341
}
}
},
"status": "success"
}
- no workouts
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"statistics": {}
},
"status": "success"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:param integer user_name: user name
:query integer sport_id: sport id
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
:statuscode 404:
- User does not exist.
- Sport does not exist.
"""
return get_workouts(user_name, 'by_sport')
@stats_blueprint.route('/stats/all', methods=['GET'])
@authenticate_as_admin
def get_application_stats(auth_user_id: int) -> Dict:
"""
Get all application statistics
**Example requests**:
.. sourcecode:: http
GET /api/stats/all HTTP/1.1
**Example responses**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"sports": 3,
"uploads_dir_size": 1000,
"users": 2,
"workouts": 3,
},
"status": "success"
}
:param integer auth_user_id: authenticate user id (from JSON Web Token)
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 401:
- Provide a valid auth token.
- Signature expired. Please log in again.
- Invalid token. Please log in again.
:statuscode 403: You do not have permissions.
"""
nb_workouts = Workout.query.filter().count()
nb_users = User.query.filter().count()
nb_sports = (
db.session.query(func.count(Workout.sport_id))
.group_by(Workout.sport_id)
.count()
)
return {
'status': 'success',
'data': {
'workouts': nb_workouts,
'sports': nb_sports,
'users': nb_users,
'uploads_dir_size': get_upload_dir_size(),
},
}

View File

@ -0,0 +1,417 @@
import hashlib
import os
import tempfile
import zipfile
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID
import gpxpy.gpx
import pytz
from fittrackee import appLog, db
from flask import current_app
from sqlalchemy import exc
from staticmap import Line, StaticMap
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from ..users.models import User
from .models import Sport, Workout, WorkoutSegment
from .utils_files import get_absolute_file_path
from .utils_gpx import get_gpx_info
class WorkoutException(Exception):
def __init__(
self, status: str, message: str, e: Optional[Exception] = None
) -> None:
self.status = status
self.message = message
self.e = e
def get_datetime_with_tz(
timezone: str, workout_date: datetime, gpx_data: Optional[Dict] = None
) -> Tuple[Optional[datetime], datetime]:
"""
Return naive datetime and datetime with user timezone
"""
workout_date_tz = None
if timezone:
user_tz = pytz.timezone(timezone)
utc_tz = pytz.utc
if gpx_data:
# workout date in gpx is in UTC, but in naive datetime
fmt = '%Y-%m-%d %H:%M:%S'
workout_date_string = workout_date.strftime(fmt)
workout_date_tmp = utc_tz.localize(
datetime.strptime(workout_date_string, fmt)
)
workout_date_tz = workout_date_tmp.astimezone(user_tz)
else:
workout_date_tz = user_tz.localize(workout_date)
workout_date = workout_date_tz.astimezone(utc_tz)
# make datetime 'naive' like in gpx file
workout_date = workout_date.replace(tzinfo=None)
return workout_date_tz, workout_date
def update_workout_data(
workout: Union[Workout, WorkoutSegment], gpx_data: Dict
) -> Union[Workout, WorkoutSegment]:
"""
Update workout or workout segment with data from gpx file
"""
workout.pauses = gpx_data['stop_time']
workout.moving = gpx_data['moving_time']
workout.min_alt = gpx_data['elevation_min']
workout.max_alt = gpx_data['elevation_max']
workout.descent = gpx_data['downhill']
workout.ascent = gpx_data['uphill']
workout.max_speed = gpx_data['max_speed']
workout.ave_speed = gpx_data['average_speed']
return workout
def create_workout(
user: User, workout_data: Dict, gpx_data: Optional[Dict] = None
) -> Workout:
"""
Create Workout from data entered by user and from gpx if a gpx file is
provided
"""
workout_date = (
gpx_data['start']
if gpx_data
else datetime.strptime(workout_data['workout_date'], '%Y-%m-%d %H:%M')
)
workout_date_tz, workout_date = get_datetime_with_tz(
user.timezone, workout_date, gpx_data
)
duration = (
gpx_data['duration']
if gpx_data
else timedelta(seconds=workout_data['duration'])
)
distance = gpx_data['distance'] if gpx_data else workout_data['distance']
title = gpx_data['name'] if gpx_data else workout_data.get('title', '')
new_workout = Workout(
user_id=user.id,
sport_id=workout_data['sport_id'],
workout_date=workout_date,
distance=distance,
duration=duration,
)
new_workout.notes = workout_data.get('notes')
if title is not None and title != '':
new_workout.title = title
else:
sport = Sport.query.filter_by(id=new_workout.sport_id).first()
fmt = "%Y-%m-%d %H:%M:%S"
workout_datetime = (
workout_date_tz.strftime(fmt)
if workout_date_tz
else new_workout.workout_date.strftime(fmt)
)
new_workout.title = f'{sport.label} - {workout_datetime}'
if gpx_data:
new_workout.gpx = gpx_data['filename']
new_workout.bounds = gpx_data['bounds']
update_workout_data(new_workout, gpx_data)
else:
new_workout.moving = duration
new_workout.ave_speed = (
None
if duration.seconds == 0
else float(new_workout.distance) / (duration.seconds / 3600)
)
new_workout.max_speed = new_workout.ave_speed
return new_workout
def create_segment(
workout_id: int, workout_uuid: UUID, segment_data: Dict
) -> WorkoutSegment:
"""
Create Workout Segment from gpx data
"""
new_segment = WorkoutSegment(
workout_id=workout_id,
workout_uuid=workout_uuid,
segment_id=segment_data['idx'],
)
new_segment.duration = segment_data['duration']
new_segment.distance = segment_data['distance']
update_workout_data(new_segment, segment_data)
return new_segment
def update_workout(workout: Workout) -> Workout:
"""
Update workout data from gpx file
"""
gpx_data, _, _ = get_gpx_info(
get_absolute_file_path(workout.gpx), False, False
)
updated_workout = update_workout_data(workout, gpx_data)
updated_workout.duration = gpx_data['duration']
updated_workout.distance = gpx_data['distance']
db.session.flush()
for segment_idx, segment in enumerate(updated_workout.segments):
segment_data = gpx_data['segments'][segment_idx]
updated_segment = update_workout_data(segment, segment_data)
updated_segment.duration = segment_data['duration']
updated_segment.distance = segment_data['distance']
db.session.flush()
return updated_workout
def edit_workout(
workout: Workout, workout_data: Dict, auth_user_id: int
) -> Workout:
"""
Edit an workout
Note: the gpx file is NOT modified
In a next version, map_data and weather_data will be updated
(case of a modified gpx file, see issue #7)
"""
user = User.query.filter_by(id=auth_user_id).first()
if workout_data.get('refresh'):
workout = update_workout(workout)
if workout_data.get('sport_id'):
workout.sport_id = workout_data.get('sport_id')
if workout_data.get('title'):
workout.title = workout_data.get('title')
if workout_data.get('notes'):
workout.notes = workout_data.get('notes')
if not workout.gpx:
if workout_data.get('workout_date'):
workout_date = datetime.strptime(
workout_data['workout_date'], '%Y-%m-%d %H:%M'
)
_, workout.workout_date = get_datetime_with_tz(
user.timezone, workout_date
)
if workout_data.get('duration'):
workout.duration = timedelta(seconds=workout_data['duration'])
workout.moving = workout.duration
if workout_data.get('distance'):
workout.distance = workout_data['distance']
workout.ave_speed = (
None
if workout.duration.seconds == 0
else float(workout.distance) / (workout.duration.seconds / 3600)
)
workout.max_speed = workout.ave_speed
return workout
def get_file_path(dir_path: str, filename: str) -> str:
"""
Get full path for a file
"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
file_path = os.path.join(dir_path, filename)
return file_path
def get_new_file_path(
auth_user_id: int,
workout_date: str,
sport: str,
old_filename: Optional[str] = None,
extension: Optional[str] = None,
) -> str:
"""
Generate a file path from user and workout data
"""
if not extension and old_filename:
extension = f".{old_filename.rsplit('.', 1)[1].lower()}"
_, new_filename = tempfile.mkstemp(
prefix=f'{workout_date}_{sport}_', suffix=extension
)
dir_path = os.path.join('workouts', str(auth_user_id))
if not os.path.exists(dir_path):
os.makedirs(dir_path)
file_path = os.path.join(dir_path, new_filename.split('/')[-1])
return file_path
def generate_map(map_filepath: str, map_data: List) -> None:
"""
Generate and save map image from map data
"""
m = StaticMap(400, 225, 10)
line = Line(map_data, '#3388FF', 4)
m.add_line(line)
image = m.render()
image.save(map_filepath)
def get_map_hash(map_filepath: str) -> str:
"""
Generate a md5 hash used as id instead of workout id, to retrieve map
image (maps are sensitive data)
"""
md5 = hashlib.md5()
absolute_map_filepath = get_absolute_file_path(map_filepath)
with open(absolute_map_filepath, 'rb') as f:
for chunk in iter(lambda: f.read(128 * md5.block_size), b''):
md5.update(chunk)
return md5.hexdigest()
def process_one_gpx_file(params: Dict, filename: str) -> Workout:
"""
Get all data from a gpx file to create an workout with map image
"""
try:
gpx_data, map_data, weather_data = get_gpx_info(params['file_path'])
auth_user_id = params['user'].id
new_filepath = get_new_file_path(
auth_user_id=auth_user_id,
workout_date=gpx_data['start'],
old_filename=filename,
sport=params['sport_label'],
)
absolute_gpx_filepath = get_absolute_file_path(new_filepath)
os.rename(params['file_path'], absolute_gpx_filepath)
gpx_data['filename'] = new_filepath
map_filepath = get_new_file_path(
auth_user_id=auth_user_id,
workout_date=gpx_data['start'],
extension='.png',
sport=params['sport_label'],
)
absolute_map_filepath = get_absolute_file_path(map_filepath)
generate_map(absolute_map_filepath, map_data)
except (gpxpy.gpx.GPXXMLSyntaxException, TypeError) as e:
raise WorkoutException('error', 'Error during gpx file parsing.', e)
except Exception as e:
raise WorkoutException('error', 'Error during gpx processing.', e)
try:
new_workout = create_workout(
params['user'], params['workout_data'], gpx_data
)
new_workout.map = map_filepath
new_workout.map_id = get_map_hash(map_filepath)
new_workout.weather_start = weather_data[0]
new_workout.weather_end = weather_data[1]
db.session.add(new_workout)
db.session.flush()
for segment_data in gpx_data['segments']:
new_segment = create_segment(
new_workout.id, new_workout.uuid, segment_data
)
db.session.add(new_segment)
db.session.commit()
return new_workout
except (exc.IntegrityError, ValueError) as e:
raise WorkoutException('fail', 'Error during workout save.', e)
def process_zip_archive(common_params: Dict, extract_dir: str) -> List:
"""
Get files from a zip archive and create workouts, if number of files
does not exceed defined limit.
"""
with zipfile.ZipFile(common_params['file_path'], "r") as zip_ref:
zip_ref.extractall(extract_dir)
new_workouts = []
gpx_files_limit = os.getenv('REACT_APP_GPX_LIMIT_IMPORT', 10)
if (
gpx_files_limit
and isinstance(gpx_files_limit, str)
and gpx_files_limit.isdigit()
):
gpx_files_limit = int(gpx_files_limit)
else:
gpx_files_limit = 10
appLog.warning('GPX limit not configured, set to 10.')
gpx_files_ok = 0
for gpx_file in os.listdir(extract_dir):
if (
'.' in gpx_file
and gpx_file.rsplit('.', 1)[1].lower()
in current_app.config['WORKOUT_ALLOWED_EXTENSIONS']
):
gpx_files_ok += 1
if gpx_files_ok > gpx_files_limit:
break
file_path = os.path.join(extract_dir, gpx_file)
params = common_params
params['file_path'] = file_path
new_workout = process_one_gpx_file(params, gpx_file)
new_workouts.append(new_workout)
return new_workouts
def process_files(
auth_user_id: int,
workout_data: Dict,
workout_file: FileStorage,
folders: Dict,
) -> List:
"""
Store gpx file or zip archive and create workouts
"""
if workout_file.filename is None:
raise WorkoutException('error', 'File has no filename.')
filename = secure_filename(workout_file.filename)
extension = f".{filename.rsplit('.', 1)[1].lower()}"
file_path = get_file_path(folders['tmp_dir'], filename)
sport = Sport.query.filter_by(id=workout_data.get('sport_id')).first()
if not sport:
raise WorkoutException(
'error',
f"Sport id: {workout_data.get('sport_id')} does not exist",
)
user = User.query.filter_by(id=auth_user_id).first()
common_params = {
'user': user,
'workout_data': workout_data,
'file_path': file_path,
'sport_label': sport.label,
}
try:
workout_file.save(file_path)
except Exception as e:
raise WorkoutException('error', 'Error during workout file save.', e)
if extension == ".gpx":
return [process_one_gpx_file(common_params, filename)]
else:
return process_zip_archive(common_params, folders['extract_dir'])
def get_upload_dir_size() -> int:
"""
Return upload directory size
"""
upload_path = get_absolute_file_path('')
total_size = 0
for dir_path, _, filenames in os.walk(upload_path):
for f in filenames:
fp = os.path.join(dir_path, f)
total_size += os.path.getsize(fp)
return total_size

View File

@ -0,0 +1,7 @@
import os
from flask import current_app
def get_absolute_file_path(relative_path: str) -> str:
return os.path.join(current_app.config['UPLOAD_FOLDER'], relative_path)

View File

@ -0,0 +1,27 @@
from datetime import timedelta
from typing import Optional, Union
def convert_in_duration(value: str) -> timedelta:
hours = int(value.split(':')[0])
minutes = int(value.split(':')[1])
return timedelta(seconds=(hours * 3600 + minutes * 60))
def convert_timedelta_to_integer(value: str) -> int:
hours, minutes, seconds = str(value).split(':')
return int(hours) * 3600 + int(minutes) * 60 + int(seconds)
def convert_value_to_integer(
record_type: str, val: Union[str, float]
) -> Optional[int]:
if val is None:
return None
if record_type == 'LD':
return convert_timedelta_to_integer(str(val))
elif record_type in ['AS', 'MS']:
return int(val * 100)
else: # 'FD'
return int(val * 1000)

View File

@ -0,0 +1,258 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple
import gpxpy.gpx
from .utils_weather import get_weather
class WorkoutGPXException(Exception):
def __init__(
self, status: str, message: str, e: Optional[Exception] = None
) -> None:
self.status = status
self.message = message
self.e = e
def open_gpx_file(gpx_file: str) -> Optional[gpxpy.gpx.GPX]:
gpx_file = open(gpx_file, 'r') # type: ignore
gpx = gpxpy.parse(gpx_file)
if len(gpx.tracks) == 0:
return None
return gpx
def get_gpx_data(
parsed_gpx: gpxpy.gpx,
max_speed: float,
start: int,
stopped_time_between_seg: timedelta,
) -> Dict:
"""
Returns data from parsed gpx file
"""
gpx_data: Dict[str, Any] = {
'max_speed': (max_speed / 1000) * 3600,
'start': start,
}
duration = parsed_gpx.get_duration()
gpx_data['duration'] = (
timedelta(seconds=duration) + stopped_time_between_seg
)
ele = parsed_gpx.get_elevation_extremes()
gpx_data['elevation_max'] = ele.maximum
gpx_data['elevation_min'] = ele.minimum
hill = parsed_gpx.get_uphill_downhill()
gpx_data['uphill'] = hill.uphill
gpx_data['downhill'] = hill.downhill
mv = parsed_gpx.get_moving_data()
gpx_data['moving_time'] = timedelta(seconds=mv.moving_time)
gpx_data['stop_time'] = (
timedelta(seconds=mv.stopped_time) + stopped_time_between_seg
)
distance = mv.moving_distance + mv.stopped_distance
gpx_data['distance'] = distance / 1000
average_speed = distance / mv.moving_time if mv.moving_time > 0 else 0
gpx_data['average_speed'] = (average_speed / 1000) * 3600
return gpx_data
def get_gpx_info(
gpx_file: str,
update_map_data: Optional[bool] = True,
update_weather_data: Optional[bool] = True,
) -> Tuple:
"""
Parse and return gpx, map and weather data from gpx file
"""
gpx = open_gpx_file(gpx_file)
if gpx is None:
raise WorkoutGPXException('not found', 'No gpx file')
gpx_data = {'name': gpx.tracks[0].name, 'segments': []}
max_speed = 0
start = 0
map_data = []
weather_data = []
segments_nb = len(gpx.tracks[0].segments)
prev_seg_last_point = None
no_stopped_time = timedelta(seconds=0)
stopped_time_between_seg = no_stopped_time
for segment_idx, segment in enumerate(gpx.tracks[0].segments):
segment_start = 0
segment_points_nb = len(segment.points)
for point_idx, point in enumerate(segment.points):
if point_idx == 0:
# first gpx point => get weather
if start == 0:
start = point.time
if update_weather_data:
weather_data.append(get_weather(point))
# if a previous segment exists, calculate stopped time between
# the two segments
if prev_seg_last_point:
stopped_time_between_seg = point.time - prev_seg_last_point
# last segment point
if point_idx == (segment_points_nb - 1):
prev_seg_last_point = point.time
# last gpx point => get weather
if segment_idx == (segments_nb - 1) and update_weather_data:
weather_data.append(get_weather(point))
if update_map_data:
map_data.append([point.longitude, point.latitude])
segment_max_speed = (
segment.get_moving_data().max_speed
if segment.get_moving_data().max_speed
else 0
)
if segment_max_speed > max_speed:
max_speed = segment_max_speed
segment_data = get_gpx_data(
segment, segment_max_speed, segment_start, no_stopped_time
)
segment_data['idx'] = segment_idx
gpx_data['segments'].append(segment_data)
full_gpx_data = get_gpx_data(
gpx, max_speed, start, stopped_time_between_seg
)
gpx_data = {**gpx_data, **full_gpx_data}
if update_map_data:
bounds = gpx.get_bounds()
gpx_data['bounds'] = [
bounds.min_latitude,
bounds.min_longitude,
bounds.max_latitude,
bounds.max_longitude,
]
return gpx_data, map_data, weather_data
def get_gpx_segments(
track_segments: List, segment_id: Optional[int] = None
) -> List:
"""
Return list of segments, filtered on segment id if provided
"""
if segment_id is not None:
segment_index = segment_id - 1
if segment_index > (len(track_segments) - 1):
raise WorkoutGPXException(
'not found', f'No segment with id \'{segment_id}\'', None
)
if segment_index < 0:
raise WorkoutGPXException('error', 'Incorrect segment id', None)
segments = [track_segments[segment_index]]
else:
segments = track_segments
return segments
def get_chart_data(
gpx_file: str, segment_id: Optional[int] = None
) -> Optional[List]:
"""
Return data needed to generate chart with speed and elevation
"""
gpx = open_gpx_file(gpx_file)
if gpx is None:
return None
chart_data = []
first_point = None
previous_point = None
previous_distance = 0
track_segments = gpx.tracks[0].segments
segments = get_gpx_segments(track_segments, segment_id)
for segment_idx, segment in enumerate(segments):
for point_idx, point in enumerate(segment.points):
if segment_idx == 0 and point_idx == 0:
first_point = point
distance = (
point.distance_3d(previous_point)
if (
point.elevation
and previous_point
and previous_point.elevation
)
else point.distance_2d(previous_point)
)
distance = 0 if distance is None else distance
distance += previous_distance
speed = (
round((segment.get_speed(point_idx) / 1000) * 3600, 2)
if segment.get_speed(point_idx) is not None
else 0
)
chart_data.append(
{
'distance': (
round(distance / 1000, 2)
if distance is not None
else 0
),
'duration': point.time_difference(first_point),
'elevation': (
round(point.elevation, 1)
if point.elevation is not None
else 0
),
'latitude': point.latitude,
'longitude': point.longitude,
'speed': speed,
'time': point.time,
}
)
previous_point = point
previous_distance = distance
return chart_data
def extract_segment_from_gpx_file(
content: str, segment_id: int
) -> Optional[str]:
"""
Returns segments in xml format from a gpx file content
"""
gpx_content = gpxpy.parse(content)
if len(gpx_content.tracks) == 0:
return None
track_segment = get_gpx_segments(
gpx_content.tracks[0].segments, segment_id
)
gpx = gpxpy.gpx.GPX()
gpx_track = gpxpy.gpx.GPXTrack()
gpx.tracks.append(gpx_track)
gpx_segment = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_segment)
for point_idx, point in enumerate(track_segment[0].points):
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
point.latitude, point.longitude, elevation=point.elevation
)
)
return gpx.to_xml()

View File

@ -0,0 +1,17 @@
from uuid import UUID
import shortuuid
def encode_uuid(uuid_value: UUID) -> str:
"""
Return short id string from an UUID
"""
return shortuuid.encode(uuid_value)
def decode_short_id(short_id: str) -> UUID:
"""
Return UUID from a short id string
"""
return shortuuid.decode(short_id)

View File

@ -0,0 +1,34 @@
import os
from typing import Dict, Optional
import forecastio
import pytz
from fittrackee import appLog
from gpxpy.gpx import GPXRoutePoint
API_KEY = os.getenv('WEATHER_API_KEY')
def get_weather(point: GPXRoutePoint) -> Optional[Dict]:
if not API_KEY or API_KEY == '':
return None
try:
point_time = pytz.utc.localize(point.time)
forecast = forecastio.load_forecast(
API_KEY,
point.latitude,
point.longitude,
time=point_time,
units='si',
)
weather = forecast.currently()
return {
'summary': weather.summary,
'icon': weather.icon,
'temperature': weather.temperature,
'humidity': weather.humidity,
'wind': weather.windSpeed,
}
except Exception as e:
appLog.error(e)
return None

File diff suppressed because it is too large Load Diff