API - return error when number of files in zip exceeds limit
(instead of importing the configured number of files without raising errors)
This commit is contained in:
parent
da5f7d12e7
commit
4b1c51880f
@ -1112,7 +1112,7 @@ class TestPostWorkoutWithZipArchive(ApiTestCaseMixin):
|
|||||||
data = self.assert_500(response, 'error during gpx processing')
|
data = self.assert_500(response, 'error during gpx processing')
|
||||||
assert 'data' not in data
|
assert 'data' not in data
|
||||||
|
|
||||||
def test_it_imports_only_max_number_of_files(
|
def test_it_returns_400_when_files_in_archive_exceed_limit(
|
||||||
self,
|
self,
|
||||||
app_with_max_workouts: Flask,
|
app_with_max_workouts: Flask,
|
||||||
user_1: User,
|
user_1: User,
|
||||||
@ -1127,7 +1127,7 @@ class TestPostWorkoutWithZipArchive(ApiTestCaseMixin):
|
|||||||
app_with_max_workouts, user_1.email
|
app_with_max_workouts, user_1.email
|
||||||
)
|
)
|
||||||
|
|
||||||
client.post(
|
response = client.post(
|
||||||
'/api/workouts',
|
'/api/workouts',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(zip_file, 'gpx_test.zip'), data='{"sport_id": 1}'
|
file=(zip_file, 'gpx_test.zip'), data='{"sport_id": 1}'
|
||||||
@ -1138,12 +1138,11 @@ class TestPostWorkoutWithZipArchive(ApiTestCaseMixin):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
response = client.get(
|
self.assert_400(
|
||||||
'/api/workouts',
|
response,
|
||||||
headers=dict(Authorization=f'Bearer {auth_token}'),
|
'the number of files in the archive exceeds the limit',
|
||||||
|
'fail',
|
||||||
)
|
)
|
||||||
data = json.loads(response.data.decode())
|
|
||||||
assert len(data['data']['workouts']) == 2
|
|
||||||
|
|
||||||
def test_it_returns_error_if_archive_size_exceeds_limit(
|
def test_it_returns_error_if_archive_size_exceeds_limit(
|
||||||
self,
|
self,
|
||||||
|
@ -333,6 +333,14 @@ def process_one_gpx_file(
|
|||||||
raise WorkoutException('fail', 'Error during workout save.', e)
|
raise WorkoutException('fail', 'Error during workout save.', e)
|
||||||
|
|
||||||
|
|
||||||
|
def is_gpx_file(filename: str) -> bool:
|
||||||
|
return (
|
||||||
|
'.' in filename
|
||||||
|
and filename.rsplit('.', 1)[1].lower()
|
||||||
|
in current_app.config['WORKOUT_ALLOWED_EXTENSIONS']
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def process_zip_archive(
|
def process_zip_archive(
|
||||||
common_params: Dict, extract_dir: str, stopped_speed_threshold: float
|
common_params: Dict, extract_dir: str, stopped_speed_threshold: float
|
||||||
) -> List:
|
) -> List:
|
||||||
@ -341,21 +349,21 @@ def process_zip_archive(
|
|||||||
does not exceed defined limit.
|
does not exceed defined limit.
|
||||||
"""
|
"""
|
||||||
with zipfile.ZipFile(common_params['file_path'], "r") as zip_ref:
|
with zipfile.ZipFile(common_params['file_path'], "r") as zip_ref:
|
||||||
|
info_list = [
|
||||||
|
zip_info
|
||||||
|
for zip_info in zip_ref.infolist()
|
||||||
|
if is_gpx_file(zip_info.filename)
|
||||||
|
]
|
||||||
|
if len(info_list) > current_app.config['gpx_limit_import']:
|
||||||
|
raise WorkoutException(
|
||||||
|
'fail', 'the number of files in the archive exceeds the limit'
|
||||||
|
)
|
||||||
zip_ref.extractall(extract_dir)
|
zip_ref.extractall(extract_dir)
|
||||||
|
|
||||||
new_workouts = []
|
new_workouts = []
|
||||||
gpx_files_limit = current_app.config['gpx_limit_import']
|
|
||||||
gpx_files_ok = 0
|
|
||||||
|
|
||||||
for gpx_file in os.listdir(extract_dir):
|
for gpx_file in os.listdir(extract_dir):
|
||||||
if (
|
if is_gpx_file(gpx_file):
|
||||||
'.' in gpx_file
|
|
||||||
and gpx_file.rsplit('.', 1)[1].lower()
|
|
||||||
in current_app.config['WORKOUT_ALLOWED_EXTENSIONS']
|
|
||||||
):
|
|
||||||
gpx_files_ok += 1
|
|
||||||
if gpx_files_ok > gpx_files_limit:
|
|
||||||
break
|
|
||||||
file_path = os.path.join(extract_dir, gpx_file)
|
file_path = os.path.join(extract_dir, gpx_file)
|
||||||
params = common_params
|
params = common_params
|
||||||
params['file_path'] = file_path
|
params['file_path'] = file_path
|
||||||
|
@ -1022,7 +1022,7 @@ def post_workout(auth_user: User) -> Union[Tuple[Dict, int], HttpResponse]:
|
|||||||
appLog.error(e.e)
|
appLog.error(e.e)
|
||||||
if e.status == 'error':
|
if e.status == 'error':
|
||||||
return InternalServerErrorResponse(e.message)
|
return InternalServerErrorResponse(e.message)
|
||||||
return InvalidPayloadErrorResponse(e.message)
|
return InvalidPayloadErrorResponse(e.message, e.status)
|
||||||
|
|
||||||
shutil.rmtree(folders['extract_dir'], ignore_errors=True)
|
shutil.rmtree(folders['extract_dir'], ignore_errors=True)
|
||||||
shutil.rmtree(folders['tmp_dir'], ignore_errors=True)
|
shutil.rmtree(folders['tmp_dir'], ignore_errors=True)
|
||||||
|
Loading…
Reference in New Issue
Block a user