import datetime import os from typing import Any, Dict, Optional, Union from uuid import UUID, uuid4 from sqlalchemy.dialects import postgresql from sqlalchemy.engine.base import Connection from sqlalchemy.event import listens_for from sqlalchemy.ext.declarative import DeclarativeMeta from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm.mapper import Mapper from sqlalchemy.orm.session import Session, object_session from sqlalchemy.types import JSON, Enum from fittrackee import appLog, db from fittrackee.files import get_absolute_file_path from .utils.convert import convert_in_duration, convert_value_to_integer from .utils.short_id import encode_uuid BaseModel: DeclarativeMeta = db.Model record_types = [ 'AS', # 'Best Average Speed' 'FD', # 'Farthest Distance' 'HA', # 'Highest Ascent' 'LD', # 'Longest Duration' 'MS', # 'Max speed' ] def update_records( user_id: int, sport_id: int, connection: Connection, session: Session ) -> None: record_table = Record.__table__ new_records = Workout.get_user_workout_records(user_id, sport_id) for record_type, record_data in new_records.items(): if record_data['record_value']: record = Record.query.filter_by( user_id=user_id, sport_id=sport_id, record_type=record_type ).first() if record: value = convert_value_to_integer( record_type, record_data['record_value'] ) connection.execute( record_table.update() .where(record_table.c.id == record.id) .values( value=value, workout_id=record_data['workout'].id, workout_uuid=record_data['workout'].uuid, workout_date=record_data['workout'].workout_date, ) ) else: new_record = Record( workout=record_data['workout'], record_type=record_type ) new_record.value = record_data['record_value'] # type: ignore session.add(new_record) else: connection.execute( record_table.delete() .where(record_table.c.user_id == user_id) .where(record_table.c.sport_id == sport_id) .where(record_table.c.record_type == record_type) ) class Sport(BaseModel): __tablename__ = 'sports' id = db.Column(db.Integer, primary_key=True, autoincrement=True) label = db.Column(db.String(50), unique=True, nullable=False) is_active = db.Column(db.Boolean, default=True, nullable=False) stopped_speed_threshold = db.Column(db.Float, default=1.0, nullable=False) workouts = db.relationship( 'Workout', lazy=True, backref=db.backref('sport', lazy='joined', single_parent=True), ) records = db.relationship( 'Record', lazy=True, backref=db.backref('sport', lazy='joined', single_parent=True), ) def __repr__(self) -> str: return f'' def __init__(self, label: str) -> None: self.label = label def serialize( self, is_admin: Optional[bool] = False, sport_preferences: Optional[Dict] = None, ) -> Dict: serialized_sport = { 'id': self.id, 'label': self.label, 'is_active': self.is_active, 'is_active_for_user': ( self.is_active if sport_preferences is None else (sport_preferences['is_active'] and self.is_active) ), 'color': ( None if sport_preferences is None else sport_preferences['color'] ), 'stopped_speed_threshold': ( self.stopped_speed_threshold if sport_preferences is None else sport_preferences['stopped_speed_threshold'] ), } if is_admin: serialized_sport['has_workouts'] = len(self.workouts) > 0 return serialized_sport class Workout(BaseModel): __tablename__ = 'workouts' id = db.Column(db.Integer, primary_key=True, autoincrement=True) uuid = db.Column( postgresql.UUID(as_uuid=True), default=uuid4, unique=True, nullable=False, ) user_id = db.Column( db.Integer, db.ForeignKey('users.id'), index=True, nullable=False ) sport_id = db.Column( db.Integer, db.ForeignKey('sports.id'), index=True, nullable=False ) title = db.Column(db.String(255), nullable=True) gpx = db.Column(db.String(255), nullable=True) creation_date = db.Column(db.DateTime, default=datetime.datetime.utcnow) modification_date = db.Column( db.DateTime, onupdate=datetime.datetime.utcnow ) workout_date = db.Column(db.DateTime, index=True, nullable=False) duration = db.Column(db.Interval, nullable=False) pauses = db.Column(db.Interval, nullable=True) moving = db.Column(db.Interval, nullable=True) distance = db.Column(db.Numeric(6, 3), nullable=True) # kilometers min_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters max_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters descent = db.Column(db.Numeric(8, 3), nullable=True) # meters ascent = db.Column(db.Numeric(8, 3), nullable=True) # meters max_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h ave_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h bounds = db.Column(postgresql.ARRAY(db.Float), nullable=True) map = db.Column(db.String(255), nullable=True) map_id = db.Column(db.String(50), index=True, nullable=True) weather_start = db.Column(JSON, nullable=True) weather_end = db.Column(JSON, nullable=True) notes = db.Column(db.String(500), nullable=True) segments = db.relationship( 'WorkoutSegment', lazy=True, cascade='all, delete', backref=db.backref('workout', lazy='joined', single_parent=True), ) records = db.relationship( 'Record', lazy=True, cascade='all, delete', backref=db.backref('workout', lazy='joined', single_parent=True), ) def __str__(self) -> str: return f'' def __init__( self, user_id: int, sport_id: int, workout_date: datetime.datetime, distance: float, duration: datetime.timedelta, ) -> None: self.user_id = user_id self.sport_id = sport_id self.workout_date = workout_date self.distance = distance self.duration = duration @property def short_id(self) -> str: return encode_uuid(self.uuid) def get_workout_data(self) -> Dict: return { 'id': self.short_id, # WARNING: client use uuid as id 'sport_id': self.sport_id, 'title': self.title, 'creation_date': self.creation_date, 'modification_date': self.modification_date, 'workout_date': self.workout_date, 'duration': None if self.duration is None else str(self.duration), 'pauses': str(self.pauses) if self.pauses else None, 'moving': None if self.moving is None else str(self.moving), 'distance': ( None if self.distance is None else float(self.distance) ), 'min_alt': None if self.min_alt is None else float(self.min_alt), 'max_alt': None if self.max_alt is None else float(self.max_alt), 'descent': None if self.descent is None else float(self.descent), 'ascent': None if self.ascent is None else float(self.ascent), 'max_speed': ( None if self.max_speed is None else float(self.max_speed) ), 'ave_speed': ( None if self.ave_speed is None else float(self.ave_speed) ), 'records': [record.serialize() for record in self.records], 'segments': [segment.serialize() for segment in self.segments], 'weather_start': self.weather_start, 'weather_end': self.weather_end, 'notes': self.notes, } def serialize(self, params: Optional[Dict] = None) -> Dict: date_from = params.get('from') if params else None date_to = params.get('to') if params else None distance_from = params.get('distance_from') if params else None distance_to = params.get('distance_to') if params else None duration_from = params.get('duration_from') if params else None duration_to = params.get('duration_to') if params else None ave_speed_from = params.get('ave_speed_from') if params else None ave_speed_to = params.get('ave_speed_to') if params else None max_speed_from = params.get('max_speed_from') if params else None max_speed_to = params.get('max_speed_to') if params else None sport_id = params.get('sport_id') if params else None previous_workout = ( Workout.query.filter( Workout.id != self.id, Workout.user_id == self.user_id, Workout.sport_id == sport_id if sport_id else True, Workout.workout_date <= self.workout_date, Workout.workout_date >= datetime.datetime.strptime(date_from, '%Y-%m-%d') if date_from else True, Workout.workout_date <= datetime.datetime.strptime(date_to, '%Y-%m-%d') if date_to else True, Workout.distance >= float(distance_from) if distance_from else True, Workout.distance <= float(distance_to) if distance_to else True, Workout.duration >= convert_in_duration(duration_from) if duration_from else True, Workout.duration <= convert_in_duration(duration_to) if duration_to else True, Workout.ave_speed >= float(ave_speed_from) if ave_speed_from else True, Workout.ave_speed <= float(ave_speed_to) if ave_speed_to else True, Workout.max_speed >= float(max_speed_from) if max_speed_from else True, Workout.max_speed <= float(max_speed_to) if max_speed_to else True, ) .order_by(Workout.workout_date.desc()) .first() ) next_workout = ( Workout.query.filter( Workout.id != self.id, Workout.user_id == self.user_id, Workout.sport_id == sport_id if sport_id else True, Workout.workout_date >= self.workout_date, Workout.workout_date >= datetime.datetime.strptime(date_from, '%Y-%m-%d') if date_from else True, Workout.workout_date <= datetime.datetime.strptime(date_to, '%Y-%m-%d') if date_to else True, Workout.distance >= float(distance_from) if distance_from else True, Workout.distance <= float(distance_to) if distance_to else True, Workout.duration >= convert_in_duration(duration_from) if duration_from else True, Workout.duration <= convert_in_duration(duration_to) if duration_to else True, Workout.ave_speed >= float(ave_speed_from) if ave_speed_from else True, Workout.ave_speed <= float(ave_speed_to) if ave_speed_to else True, ) .order_by(Workout.workout_date.asc()) .first() ) workout = self.get_workout_data() workout["next_workout"] = ( next_workout.short_id if next_workout else None ) workout["previous_workout"] = ( previous_workout.short_id if previous_workout else None ) workout["bounds"] = ( [float(bound) for bound in self.bounds] if self.bounds else [] ) workout["user"] = self.user.username workout["map"] = self.map_id if self.map else None workout["with_gpx"] = self.gpx is not None return workout @classmethod def get_user_workout_records( cls, user_id: int, sport_id: int, as_integer: Optional[bool] = False ) -> Dict: """ Note: Values for ascent are null for workouts without gpx """ record_types_columns = { 'AS': 'ave_speed', # 'Average speed' 'FD': 'distance', # 'Farthest Distance' 'HA': 'ascent', # 'Highest Ascent' 'LD': 'moving', # 'Longest Duration' 'MS': 'max_speed', # 'Max speed' } records = {} for record_type, column in record_types_columns.items(): column_sorted = getattr(getattr(Workout, column), 'desc')() record_workout = ( Workout.query.filter( Workout.user_id == user_id, Workout.sport_id == sport_id, getattr(Workout, column) != None, # noqa ) .order_by(column_sorted, Workout.workout_date) .first() ) records[record_type] = dict( record_value=( getattr(record_workout, column) if record_workout else None ), workout=record_workout, ) return records @listens_for(Workout, 'after_insert') def on_workout_insert( mapper: Mapper, connection: Connection, workout: Workout ) -> None: @listens_for(db.Session, 'after_flush', once=True) def receive_after_flush(session: Session, context: Any) -> None: update_records( workout.user_id, workout.sport_id, connection, session ) # noqa @listens_for(Workout, 'after_update') def on_workout_update( mapper: Mapper, connection: Connection, workout: Workout ) -> None: if object_session(workout).is_modified( workout, include_collections=True ): # noqa @listens_for(db.Session, 'after_flush', once=True) def receive_after_flush(session: Session, context: Any) -> None: sports_list = [workout.sport_id] records = Record.query.filter_by(workout_id=workout.id).all() for rec in records: if rec.sport_id not in sports_list: sports_list.append(rec.sport_id) for sport_id in sports_list: update_records(workout.user_id, sport_id, connection, session) @listens_for(Workout, 'after_delete') def on_workout_delete( mapper: Mapper, connection: Connection, old_record: 'Record' ) -> None: @listens_for(db.Session, 'after_flush', once=True) def receive_after_flush(session: Session, context: Any) -> None: if old_record.map: try: os.remove(get_absolute_file_path(old_record.map)) except OSError: appLog.error('map file not found when deleting workout') if old_record.gpx: try: os.remove(get_absolute_file_path(old_record.gpx)) except OSError: appLog.error('gpx file not found when deleting workout') class WorkoutSegment(BaseModel): __tablename__ = 'workout_segments' workout_id = db.Column( db.Integer, db.ForeignKey('workouts.id'), primary_key=True ) workout_uuid = db.Column(postgresql.UUID(as_uuid=True), nullable=False) segment_id = db.Column(db.Integer, primary_key=True) duration = db.Column(db.Interval, nullable=False) pauses = db.Column(db.Interval, nullable=True) moving = db.Column(db.Interval, nullable=True) distance = db.Column(db.Numeric(6, 3), nullable=True) # kilometers min_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters max_alt = db.Column(db.Numeric(6, 2), nullable=True) # meters descent = db.Column(db.Numeric(8, 3), nullable=True) # meters ascent = db.Column(db.Numeric(8, 3), nullable=True) # meters max_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h ave_speed = db.Column(db.Numeric(6, 2), nullable=True) # km/h def __str__(self) -> str: return ( f'' ) def __init__( self, segment_id: int, workout_id: int, workout_uuid: UUID ) -> None: self.segment_id = segment_id self.workout_id = workout_id self.workout_uuid = workout_uuid def serialize(self) -> Dict: return { 'workout_id': encode_uuid(self.workout_uuid), 'segment_id': self.segment_id, 'duration': str(self.duration) if self.duration else None, 'pauses': str(self.pauses) if self.pauses else None, 'moving': str(self.moving) if self.moving else None, 'distance': float(self.distance) if self.distance else None, 'min_alt': float(self.min_alt) if self.min_alt else None, 'max_alt': float(self.max_alt) if self.max_alt else None, 'descent': float(self.descent) if self.descent else None, 'ascent': float(self.ascent) if self.ascent else None, 'max_speed': float(self.max_speed) if self.max_speed else None, 'ave_speed': float(self.ave_speed) if self.ave_speed else None, } class Record(BaseModel): __tablename__ = "records" __table_args__ = ( db.UniqueConstraint( 'user_id', 'sport_id', 'record_type', name='user_sports_records' ), ) id = db.Column(db.Integer, primary_key=True, autoincrement=True) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) sport_id = db.Column( db.Integer, db.ForeignKey('sports.id'), nullable=False ) workout_id = db.Column( db.Integer, db.ForeignKey('workouts.id'), nullable=False ) workout_uuid = db.Column(postgresql.UUID(as_uuid=True), nullable=False) record_type = db.Column(Enum(*record_types, name="record_types")) workout_date = db.Column(db.DateTime, nullable=False) _value = db.Column("value", db.Integer, nullable=True) def __str__(self) -> str: return ( f'" ) def __init__(self, workout: Workout, record_type: str) -> None: self.user_id = workout.user_id self.sport_id = workout.sport_id self.workout_id = workout.id self.workout_uuid = workout.uuid self.record_type = record_type self.workout_date = workout.workout_date @hybrid_property def value(self) -> Optional[Union[datetime.timedelta, float]]: if self._value is None: return None if self.record_type == 'LD': return datetime.timedelta(seconds=self._value) elif self.record_type in ['AS', 'MS']: return float(self._value / 100) else: # 'FD' or 'HA' return float(self._value / 1000) @value.setter # type: ignore def value(self, val: Union[str, float]) -> None: self._value = convert_value_to_integer(self.record_type, val) def serialize(self) -> Dict: if self.value is None: value = None elif self.record_type in ['AS', 'FD', 'HA', 'MS']: value = float(self.value) # type: ignore else: # 'LD' value = str(self.value) # type: ignore return { 'id': self.id, 'user': self.user.username, 'sport_id': self.sport_id, 'workout_id': encode_uuid(self.workout_uuid), 'record_type': self.record_type, 'workout_date': self.workout_date, 'value': value, } @listens_for(Record, 'after_delete') def on_record_delete( mapper: Mapper, connection: Connection, old_record: Record ) -> None: @listens_for(db.Session, 'after_flush', once=True) def receive_after_flush(session: Session, context: Any) -> None: workout = old_record.workout new_records = Workout.get_user_workout_records( workout.user_id, workout.sport_id ) for record_type, record_data in new_records.items(): if ( record_data['record_value'] and record_type == old_record.record_type ): new_record = Record( workout=record_data['workout'], record_type=record_type ) new_record.value = record_data['record_value'] # type: ignore session.add(new_record)