API - get stopped speed threshold depending on sport
This commit is contained in:
parent
7682ccc6aa
commit
b6f89d7182
@ -0,0 +1,48 @@
|
||||
"""add stopped speed threshold to sports
|
||||
|
||||
Revision ID: 9842464bb885
|
||||
Revises: cee0830497f8
|
||||
Create Date: 2021-11-03 21:39:27.310371
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '9842464bb885'
|
||||
down_revision = 'cee0830497f8'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
|
||||
op.add_column(
|
||||
'sports',
|
||||
sa.Column('stopped_speed_threshold', sa.Float(), nullable=True),
|
||||
)
|
||||
|
||||
op.execute(
|
||||
"""
|
||||
UPDATE sports
|
||||
SET stopped_speed_threshold = 1
|
||||
WHERE label in (
|
||||
'Cycling (Sport)', 'Cycling (Transport)', 'Mountain Biking',
|
||||
'Mountain Biking (Electric)', 'Rowing', 'Running',
|
||||
'Skiing (Alpine)'
|
||||
);
|
||||
UPDATE sports
|
||||
SET stopped_speed_threshold = 0.1
|
||||
WHERE label in (
|
||||
'Hiking', 'Skiing (Cross Country)', 'Trail', 'Walking'
|
||||
);
|
||||
"""
|
||||
)
|
||||
op.alter_column('sports', 'stopped_speed_threshold', nullable=False)
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column('sports', 'stopped_speed_threshold')
|
||||
# ### end Alembic commands ###
|
10
fittrackee/tests/fixtures/fixtures_workouts.py
vendored
10
fittrackee/tests/fixtures/fixtures_workouts.py
vendored
@ -2,9 +2,11 @@ import datetime
|
||||
from io import BytesIO
|
||||
from typing import Generator
|
||||
from unittest.mock import Mock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from PIL import Image
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
from fittrackee import db
|
||||
from fittrackee.workouts.models import Sport, Workout, WorkoutSegment
|
||||
@ -43,6 +45,7 @@ def sport_1_cycling_inactive() -> Sport:
|
||||
@pytest.fixture()
|
||||
def sport_2_running() -> Sport:
|
||||
sport = Sport(label='Running')
|
||||
sport.stopped_speed_threshold = 0.1
|
||||
db.session.add(sport)
|
||||
db.session.commit()
|
||||
return sport
|
||||
@ -575,3 +578,10 @@ def gpx_file_with_segments() -> str:
|
||||
' </trk>'
|
||||
'</gpx>'
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def gpx_file_storage(gpx_file: str) -> FileStorage:
|
||||
return FileStorage(
|
||||
filename=f'{uuid4().hex}.gpx', stream=BytesIO(str.encode(gpx_file))
|
||||
)
|
||||
|
58
fittrackee/tests/workouts/test_gpx_utils.py
Normal file
58
fittrackee/tests/workouts/test_gpx_utils.py
Normal file
@ -0,0 +1,58 @@
|
||||
from unittest.mock import call, patch
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from gpxpy.gpx import MovingData
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
from fittrackee.users.models import User
|
||||
from fittrackee.workouts.models import Sport
|
||||
from fittrackee.workouts.utils import process_files
|
||||
|
||||
folders = {
|
||||
'extract_dir': '/tmp/fitTrackee/uploads',
|
||||
'tmp_dir': '/tmp/fitTrackee/uploads/tmp',
|
||||
}
|
||||
moving_data = MovingData(
|
||||
moving_time=1,
|
||||
stopped_time=1,
|
||||
moving_distance=1,
|
||||
stopped_distance=1,
|
||||
max_speed=1,
|
||||
)
|
||||
|
||||
|
||||
class TestStoppedSpeedThreshold:
|
||||
@pytest.mark.parametrize(
|
||||
'sport_id, expected_threshold',
|
||||
[(1, 1.0), (2, 0.1)],
|
||||
)
|
||||
def test_it_calls_get_moving_data_with_threshold_depending_on_sport(
|
||||
self,
|
||||
app: Flask,
|
||||
user_1: User,
|
||||
gpx_file_storage: FileStorage,
|
||||
sport_1_cycling: Sport,
|
||||
sport_2_running: Sport,
|
||||
sport_id: int,
|
||||
expected_threshold: float,
|
||||
) -> None:
|
||||
with patch(
|
||||
'fittrackee.workouts.utils.get_new_file_path',
|
||||
return_value='/tmp/fitTrackee/uploads/test.png',
|
||||
), patch(
|
||||
'gpxpy.gpx.GPXTrackSegment.get_moving_data',
|
||||
return_value=moving_data,
|
||||
) as gpx_track_segment_mock:
|
||||
|
||||
process_files(
|
||||
auth_user_id=user_1.id,
|
||||
folders=folders,
|
||||
workout_data={'sport_id': sport_id},
|
||||
workout_file=gpx_file_storage,
|
||||
)
|
||||
|
||||
assert gpx_track_segment_mock.call_args_list[0] == call(
|
||||
stopped_speed_threshold=expected_threshold
|
||||
)
|
||||
gpx_track_segment_mock.assert_called_with(expected_threshold)
|
@ -72,6 +72,7 @@ class Sport(BaseModel):
|
||||
label = db.Column(db.String(50), unique=True, nullable=False)
|
||||
img = db.Column(db.String(255), unique=True, nullable=True)
|
||||
is_active = db.Column(db.Boolean, default=True, nullable=False)
|
||||
stopped_speed_threshold = db.Column(db.Float, default=1.0, nullable=False)
|
||||
workouts = db.relationship(
|
||||
'Workout', lazy=True, backref=db.backref('sports', lazy='joined')
|
||||
)
|
||||
|
@ -289,12 +289,16 @@ def get_map_hash(map_filepath: str) -> str:
|
||||
return md5.hexdigest()
|
||||
|
||||
|
||||
def process_one_gpx_file(params: Dict, filename: str) -> Workout:
|
||||
def process_one_gpx_file(
|
||||
params: Dict, filename: str, stopped_speed_threshold: float
|
||||
) -> Workout:
|
||||
"""
|
||||
Get all data from a gpx file to create an workout with map image
|
||||
"""
|
||||
try:
|
||||
gpx_data, map_data, weather_data = get_gpx_info(params['file_path'])
|
||||
gpx_data, map_data, weather_data = get_gpx_info(
|
||||
params['file_path'], stopped_speed_threshold
|
||||
)
|
||||
auth_user_id = params['user'].id
|
||||
new_filepath = get_new_file_path(
|
||||
auth_user_id=auth_user_id,
|
||||
@ -341,7 +345,9 @@ def process_one_gpx_file(params: Dict, filename: str) -> Workout:
|
||||
raise WorkoutException('fail', 'Error during workout save.', e)
|
||||
|
||||
|
||||
def process_zip_archive(common_params: Dict, extract_dir: str) -> List:
|
||||
def process_zip_archive(
|
||||
common_params: Dict, extract_dir: str, stopped_speed_threshold: float
|
||||
) -> List:
|
||||
"""
|
||||
Get files from a zip archive and create workouts, if number of files
|
||||
does not exceed defined limit.
|
||||
@ -365,7 +371,9 @@ def process_zip_archive(common_params: Dict, extract_dir: str) -> List:
|
||||
file_path = os.path.join(extract_dir, gpx_file)
|
||||
params = common_params
|
||||
params['file_path'] = file_path
|
||||
new_workout = process_one_gpx_file(params, gpx_file)
|
||||
new_workout = process_one_gpx_file(
|
||||
params, gpx_file, stopped_speed_threshold
|
||||
)
|
||||
new_workouts.append(new_workout)
|
||||
|
||||
return new_workouts
|
||||
@ -406,9 +414,19 @@ def process_files(
|
||||
raise WorkoutException('error', 'Error during workout file save.', e)
|
||||
|
||||
if extension == ".gpx":
|
||||
return [process_one_gpx_file(common_params, filename)]
|
||||
return [
|
||||
process_one_gpx_file(
|
||||
common_params,
|
||||
filename,
|
||||
sport.stopped_speed_threshold,
|
||||
)
|
||||
]
|
||||
else:
|
||||
return process_zip_archive(common_params, folders['extract_dir'])
|
||||
return process_zip_archive(
|
||||
common_params,
|
||||
folders['extract_dir'],
|
||||
sport.stopped_speed_threshold,
|
||||
)
|
||||
|
||||
|
||||
def get_upload_dir_size() -> int:
|
||||
|
@ -20,6 +20,7 @@ def get_gpx_data(
|
||||
max_speed: float,
|
||||
start: int,
|
||||
stopped_time_between_seg: timedelta,
|
||||
stopped_speed_threshold: float,
|
||||
) -> Dict:
|
||||
"""
|
||||
Returns data from parsed gpx file
|
||||
@ -42,7 +43,9 @@ def get_gpx_data(
|
||||
gpx_data['uphill'] = hill.uphill
|
||||
gpx_data['downhill'] = hill.downhill
|
||||
|
||||
mv = parsed_gpx.get_moving_data(stopped_speed_threshold=0.1)
|
||||
mv = parsed_gpx.get_moving_data(
|
||||
stopped_speed_threshold=stopped_speed_threshold
|
||||
)
|
||||
gpx_data['moving_time'] = timedelta(seconds=mv.moving_time)
|
||||
gpx_data['stop_time'] = (
|
||||
timedelta(seconds=mv.stopped_time) + stopped_time_between_seg
|
||||
@ -58,6 +61,7 @@ def get_gpx_data(
|
||||
|
||||
def get_gpx_info(
|
||||
gpx_file: str,
|
||||
stopped_speed_threshold: float,
|
||||
update_map_data: Optional[bool] = True,
|
||||
update_weather_data: Optional[bool] = True,
|
||||
) -> Tuple:
|
||||
@ -104,23 +108,30 @@ def get_gpx_info(
|
||||
|
||||
if update_map_data:
|
||||
map_data.append([point.longitude, point.latitude])
|
||||
segment_max_speed = (
|
||||
segment.get_moving_data(stopped_speed_threshold=0.1).max_speed
|
||||
if segment.get_moving_data(stopped_speed_threshold=0.1).max_speed
|
||||
else 0
|
||||
)
|
||||
calculated_max_speed = segment.get_moving_data(
|
||||
stopped_speed_threshold=stopped_speed_threshold
|
||||
).max_speed
|
||||
segment_max_speed = calculated_max_speed if calculated_max_speed else 0
|
||||
|
||||
if segment_max_speed > max_speed:
|
||||
max_speed = segment_max_speed
|
||||
|
||||
segment_data = get_gpx_data(
|
||||
segment, segment_max_speed, segment_start, no_stopped_time
|
||||
segment,
|
||||
segment_max_speed,
|
||||
segment_start,
|
||||
no_stopped_time,
|
||||
stopped_speed_threshold,
|
||||
)
|
||||
segment_data['idx'] = segment_idx
|
||||
gpx_data['segments'].append(segment_data)
|
||||
|
||||
full_gpx_data = get_gpx_data(
|
||||
gpx, max_speed, start, stopped_time_between_seg
|
||||
gpx,
|
||||
max_speed,
|
||||
start,
|
||||
stopped_time_between_seg,
|
||||
stopped_speed_threshold,
|
||||
)
|
||||
gpx_data = {**gpx_data, **full_gpx_data}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user