112 lines
4.1 KiB
Python
112 lines
4.1 KiB
Python
import json
|
|
import os
|
|
import secrets
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
from zipfile import ZipFile
|
|
|
|
from fittrackee import appLog, db
|
|
from fittrackee.files import get_absolute_file_path
|
|
|
|
from .models import User, UserDataExport
|
|
|
|
|
|
class UserDataExporter:
|
|
"""
|
|
generates a zip archive with:
|
|
- user info from database (json file)
|
|
- data from database for all workouts if exist (json file)
|
|
- profile picture file if exists
|
|
- gpx files if exist
|
|
"""
|
|
|
|
def __init__(self, user: User) -> None:
|
|
self.user = user
|
|
self.export_directory = get_absolute_file_path(
|
|
os.path.join('exports', str(self.user.id))
|
|
)
|
|
os.makedirs(self.export_directory, exist_ok=True)
|
|
self.workouts_directory = get_absolute_file_path(
|
|
os.path.join('workouts', str(self.user.id))
|
|
)
|
|
|
|
def get_user_info(self) -> Dict:
|
|
return self.user.serialize(self.user)
|
|
|
|
def get_user_workouts_data(self) -> List[Dict]:
|
|
workouts_data = []
|
|
for workout in self.user.workouts:
|
|
workout_data = workout.get_workout_data()
|
|
workout_data["sport_label"] = workout.sport.label
|
|
workout_data["gpx"] = (
|
|
workout.gpx.split('/')[-1] if workout.gpx else None
|
|
)
|
|
workouts_data.append(workout_data)
|
|
return workouts_data
|
|
|
|
def export_data(self, data: Union[Dict, List], name: str) -> str:
|
|
"""export data in existing user upload directory"""
|
|
json_object = json.dumps(data, indent=4, default=str)
|
|
file_path = os.path.join(self.export_directory, f"{name}.json")
|
|
with open(file_path, "w") as export_file:
|
|
export_file.write(json_object)
|
|
return file_path
|
|
|
|
def generate_archive(self) -> Tuple[Optional[str], Optional[str]]:
|
|
try:
|
|
user_data_file_name = self.export_data(
|
|
self.get_user_info(), "user_data"
|
|
)
|
|
workout_data_file_name = self.export_data(
|
|
self.get_user_workouts_data(), "workouts_data"
|
|
)
|
|
zip_file = f"archive_{secrets.token_urlsafe(15)}.zip"
|
|
zip_path = os.path.join(self.export_directory, zip_file)
|
|
with ZipFile(zip_path, 'w') as zip_object:
|
|
zip_object.write(user_data_file_name, "user_data.json")
|
|
zip_object.write(
|
|
workout_data_file_name, "user_workouts_data.json"
|
|
)
|
|
if self.user.picture:
|
|
picture_path = get_absolute_file_path(self.user.picture)
|
|
if os.path.isfile(picture_path):
|
|
zip_object.write(
|
|
picture_path, self.user.picture.split('/')[-1]
|
|
)
|
|
for file in os.listdir(self.workouts_directory):
|
|
if os.path.isfile(
|
|
os.path.join(self.workouts_directory, file)
|
|
) and file.endswith('.gpx'):
|
|
zip_object.write(
|
|
os.path.join(self.workouts_directory, file),
|
|
f"gpx/{file}",
|
|
)
|
|
|
|
file_exists = os.path.exists(zip_path)
|
|
return (zip_path, zip_file) if file_exists else (None, None)
|
|
except Exception:
|
|
return None, None
|
|
|
|
|
|
def export_user_data(export_request_id: int) -> None:
|
|
export_request = UserDataExport.query.filter_by(
|
|
id=export_request_id
|
|
).first()
|
|
|
|
if not export_request:
|
|
appLog.error(f"No export to process for id '{export_request_id}'")
|
|
return
|
|
|
|
if export_request.completed:
|
|
appLog.info(f"Export id '{export_request_id}' already processed")
|
|
return
|
|
|
|
user = User.query.filter_by(id=export_request.user_id).first()
|
|
exporter = UserDataExporter(user)
|
|
archive_file_path, archive_file_name = exporter.generate_archive()
|
|
|
|
export_request.completed = True
|
|
if archive_file_name and archive_file_path:
|
|
export_request.file_name = archive_file_name
|
|
export_request.file_size = os.path.getsize(archive_file_path)
|
|
db.session.commit()
|