FitTrackee/fittrackee/users/export_data.py

184 lines
6.5 KiB
Python
Raw Permalink Normal View History

2023-03-01 12:10:21 +01:00
import json
import os
import secrets
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
2023-03-01 12:10:21 +01:00
from zipfile import ZipFile
from flask import current_app
from fittrackee import appLog, db
from fittrackee.emails.tasks import data_export_email
2023-03-01 12:10:21 +01:00
from fittrackee.files import get_absolute_file_path
from .models import User, UserDataExport
from .utils.language import get_language
2023-03-01 12:10:21 +01:00
class UserDataExporter:
"""
generates a zip archive with:
- user info from database (json file)
- data from database for all workouts if exist (json file)
- profile picture file if exists
- gpx files if exist
"""
def __init__(self, user: User) -> None:
self.user = user
self.export_directory = get_absolute_file_path(
os.path.join('exports', str(self.user.id))
)
os.makedirs(self.export_directory, exist_ok=True)
self.workouts_directory = get_absolute_file_path(
os.path.join('workouts', str(self.user.id))
)
def get_user_info(self) -> Dict:
return self.user.serialize(self.user)
def get_user_workouts_data(self) -> List[Dict]:
workouts_data = []
for workout in self.user.workouts:
workout_data = workout.get_workout_data()
workout_data["sport_label"] = workout.sport.label
workout_data["gpx"] = (
workout.gpx.split('/')[-1] if workout.gpx else None
)
workouts_data.append(workout_data)
return workouts_data
def export_data(self, data: Union[Dict, List], name: str) -> str:
"""export data in existing user upload directory"""
json_object = json.dumps(data, indent=4, default=str)
file_path = os.path.join(self.export_directory, f"{name}.json")
with open(file_path, "w") as export_file:
export_file.write(json_object)
return file_path
def generate_archive(self) -> Tuple[Optional[str], Optional[str]]:
2023-03-01 12:10:21 +01:00
try:
user_data_file_name = self.export_data(
self.get_user_info(), "user_data"
)
workout_data_file_name = self.export_data(
self.get_user_workouts_data(), "workouts_data"
)
zip_file = f"archive_{secrets.token_urlsafe(15)}.zip"
zip_path = os.path.join(self.export_directory, zip_file)
with ZipFile(zip_path, 'w') as zip_object:
zip_object.write(user_data_file_name, "user_data.json")
zip_object.write(
workout_data_file_name, "user_workouts_data.json"
)
if self.user.picture:
picture_path = get_absolute_file_path(self.user.picture)
if os.path.isfile(picture_path):
zip_object.write(
picture_path, self.user.picture.split('/')[-1]
)
if os.path.exists(self.workouts_directory):
for file in os.listdir(self.workouts_directory):
if os.path.isfile(
os.path.join(self.workouts_directory, file)
) and file.endswith('.gpx'):
zip_object.write(
os.path.join(self.workouts_directory, file),
f"gpx/{file}",
)
2023-03-01 12:10:21 +01:00
file_exists = os.path.exists(zip_path)
os.remove(user_data_file_name)
os.remove(workout_data_file_name)
return (zip_path, zip_file) if file_exists else (None, None)
except Exception as e:
appLog.error(f'Error when generating user data archive: {str(e)}')
return None, None
def export_user_data(export_request_id: int) -> None:
export_request = UserDataExport.query.filter_by(
id=export_request_id
).first()
if not export_request:
appLog.error(f"No export to process for id '{export_request_id}'")
return
if export_request.completed:
appLog.info(f"Export id '{export_request_id}' already processed")
return
user = User.query.filter_by(id=export_request.user_id).first()
exporter = UserDataExporter(user)
archive_file_path, archive_file_name = exporter.generate_archive()
try:
export_request.completed = True
if archive_file_name and archive_file_path:
export_request.file_name = archive_file_name
export_request.file_size = os.path.getsize(archive_file_path)
db.session.commit()
if current_app.config['CAN_SEND_EMAILS']:
ui_url = current_app.config['UI_URL']
email_data = {
'username': user.username,
'fittrackee_url': ui_url,
'account_url': f'{ui_url}/profile/edit/account',
}
user_data = {
'language': get_language(user.language),
'email': user.email,
}
data_export_email.send(user_data, email_data)
else:
db.session.commit()
except Exception as e:
appLog.error(f'Error when exporting user data: {str(e)}')
def clean_user_data_export(days: int) -> Dict:
counts = {"deleted_requests": 0, "deleted_archives": 0, "freed_space": 0}
limit = datetime.now() - timedelta(days=days)
export_requests = UserDataExport.query.filter(
UserDataExport.created_at < limit,
UserDataExport.completed == True, # noqa
).all()
if not export_requests:
return counts
archive_directory = get_absolute_file_path("exports")
for request in export_requests:
if request.file_name:
archive_path = os.path.join(
archive_directory, f"{request.user_id}", request.file_name
)
if os.path.exists(archive_path):
counts["deleted_archives"] += 1
counts["freed_space"] += request.file_size
# Archive is deleted when row is deleted
db.session.delete(request)
counts["deleted_requests"] += 1
db.session.commit()
return counts
def generate_user_data_archives(max_count: int) -> int:
count = 0
export_requests = (
db.session.query(UserDataExport)
.filter(UserDataExport.completed == False) # noqa
.order_by(UserDataExport.created_at)
.limit(max_count)
.all()
)
for export_request in export_requests:
export_user_data(export_request.id)
count += 1
return count