Merged dev branch

This commit is contained in:
Fmstrat
2022-07-18 14:34:58 -04:00
392 changed files with 20712 additions and 12757 deletions

View File

@ -1,15 +1,24 @@
import datetime
import os
import re
from typing import Dict, Tuple, Union
import secrets
from typing import Dict, Optional, Tuple, Union
import jwt
from flask import Blueprint, current_app, request
from sqlalchemy import exc, func, or_
from sqlalchemy import exc, func
from werkzeug.exceptions import RequestEntityTooLarge
from werkzeug.utils import secure_filename
from fittrackee import appLog, bcrypt, db
from fittrackee.emails.tasks import (
account_confirmation_email,
email_updated_to_current_address,
email_updated_to_new_address,
password_change_email,
reset_password_email,
)
from fittrackee.files import get_absolute_file_path
from fittrackee.responses import (
ForbiddenErrorResponse,
HttpResponse,
@ -17,27 +26,57 @@ from fittrackee.responses import (
NotFoundErrorResponse,
PayloadTooLargeErrorResponse,
UnauthorizedErrorResponse,
get_error_response_if_file_is_invalid,
handle_error_and_return_response,
)
from fittrackee.tasks import reset_password_email
from fittrackee.utils import get_readable_duration, verify_extension_and_size
from fittrackee.utils import get_readable_duration
from fittrackee.workouts.models import Sport
from fittrackee.workouts.utils_files import get_absolute_file_path
from .decorators import authenticate
from .models import User, UserSportPreference
from .utils import check_passwords, register_controls
from .utils_token import decode_user_token
from .utils.controls import check_password, is_valid_email, register_controls
from .utils.token import decode_user_token
auth_blueprint = Blueprint('auth', __name__)
HEX_COLOR_REGEX = regex = "^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$"
NOT_FOUND_MESSAGE = 'the requested URL was not found on the server'
def get_language(language: Optional[str]) -> str:
# Note: some users may not have language preferences set
if not language or language not in current_app.config['LANGUAGES']:
language = 'en'
return language
def send_account_confirmation_email(user: User) -> None:
if current_app.config['CAN_SEND_EMAILS']:
ui_url = current_app.config['UI_URL']
email_data = {
'username': user.username,
'fittrackee_url': ui_url,
'operating_system': request.user_agent.platform, # type: ignore # noqa
'browser_name': request.user_agent.browser, # type: ignore
'account_confirmation_url': (
f'{ui_url}/account-confirmation'
f'?token={user.confirmation_token}'
),
}
user_data = {
'language': get_language(user.language),
'email': user.email,
}
account_confirmation_email.send(user_data, email_data)
@auth_blueprint.route('/auth/register', methods=['POST'])
def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
"""
register a user
register a user and send confirmation email.
The newly created account is inactive. The user must confirm his email
to activate it.
**Example request**:
@ -48,16 +87,14 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
**Example responses**:
- successful registration
- success
.. sourcecode:: http
HTTP/1.1 201 CREATED
HTTP/1.1 200 SUCCESS
Content-Type: application/json
{
"auth_token": "JSON Web Token",
"message": "successfully registered",
"status": "success"
}
@ -73,19 +110,22 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
"status": "error"
}
:<json string username: user name (3 to 12 characters required)
:<json string username: username (3 to 30 characters required)
:<json string email: user email
:<json string password: password (8 characters required)
:<json string password_conf: password confirmation
:<json string lang: user language preferences (if not provided or invalid,
fallback to 'en' (english))
:statuscode 201: successfully registered
:statuscode 200: success
:statuscode 400:
- invalid payload
- sorry, that user already exists
- sorry, that username is already taken
- Errors:
- username: 3 to 12 characters required
- username: 3 to 30 characters required
- username:
only alphanumeric characters and the underscore
character "_" allowed
- email: valid email must be provided
- password: password and password confirmation don't match
- password: 8 characters required
:statuscode 403:
error, registration is disabled
@ -96,23 +136,21 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
if not current_app.config.get('is_registration_enabled'):
return ForbiddenErrorResponse('error, registration is disabled')
# get post data
post_data = request.get_json()
if (
not post_data
or post_data.get('username') is None
or post_data.get('email') is None
or post_data.get('password') is None
or post_data.get('password_conf') is None
):
return InvalidPayloadErrorResponse()
username = post_data.get('username')
email = post_data.get('email')
password = post_data.get('password')
password_conf = post_data.get('password_conf')
language = get_language(post_data.get('language'))
try:
ret = register_controls(username, email, password, password_conf)
ret = register_controls(username, email, password)
except TypeError as e:
return handle_error_and_return_response(e, db=db)
@ -120,30 +158,30 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
return InvalidPayloadErrorResponse(ret)
try:
# check for existing user
user = User.query.filter(
or_(
func.lower(User.username) == func.lower(username),
func.lower(User.email) == func.lower(email),
)
func.lower(User.username) == func.lower(username)
).first()
if user:
return InvalidPayloadErrorResponse(
'sorry, that user already exists'
'sorry, that username is already taken'
)
# add new user to db
new_user = User(username=username, email=email, password=password)
new_user.timezone = 'Europe/Paris'
db.session.add(new_user)
db.session.commit()
# generate auth token
auth_token = new_user.encode_auth_token(new_user.id)
return {
'status': 'success',
'message': 'successfully registered',
'auth_token': auth_token,
}, 201
# if a user exists with same email address, no error is returned
# since a user has to confirm his email to activate his account
user = User.query.filter(
func.lower(User.email) == func.lower(email)
).first()
if not user:
new_user = User(username=username, email=email, password=password)
new_user.timezone = 'Europe/Paris'
new_user.confirmation_token = secrets.token_urlsafe(30)
new_user.language = language
db.session.add(new_user)
db.session.commit()
send_account_confirmation_email(new_user)
return {'status': 'success'}, 200
# handler errors
except (exc.IntegrityError, exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@ -154,6 +192,8 @@ def login_user() -> Union[Dict, HttpResponse]:
"""
user login
Only user with an active account can log in.
**Example request**:
.. sourcecode:: http
@ -180,7 +220,7 @@ def login_user() -> Union[Dict, HttpResponse]:
.. sourcecode:: http
HTTP/1.1 404 NOT FOUND
HTTP/1.1 401 UNAUTHORIZED
Content-Type: application/json
{
@ -189,7 +229,7 @@ def login_user() -> Union[Dict, HttpResponse]:
}
:<json string email: user email
:<json string password_conf: password confirmation
:<json string password: password
:statuscode 200: successfully logged in
:statuscode 400: invalid payload
@ -204,9 +244,9 @@ def login_user() -> Union[Dict, HttpResponse]:
email = post_data.get('email', '')
password = post_data.get('password')
try:
# check for existing user
user = User.query.filter(
func.lower(User.email) == func.lower(email)
func.lower(User.email) == func.lower(email),
User.is_active == True, # noqa
).first()
if user and bcrypt.check_password_hash(user.password, password):
# generate auth token
@ -222,74 +262,13 @@ def login_user() -> Union[Dict, HttpResponse]:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/logout', methods=['GET'])
@authenticate
def logout_user(auth_user: User) -> Union[Dict, HttpResponse]:
"""
user logout
**Example request**:
.. sourcecode:: http
GET /api/auth/logout HTTP/1.1
Content-Type: application/json
**Example responses**:
- successful logout
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"message": "successfully logged out",
"status": "success"
}
- error on login
.. sourcecode:: http
HTTP/1.1 401 UNAUTHORIZED
Content-Type: application/json
{
"message": "provide a valid auth token",
"status": "error"
}
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: successfully logged out
:statuscode 401: provide a valid auth token
"""
# get auth token
auth_header = request.headers.get('Authorization')
if not auth_header:
return UnauthorizedErrorResponse('provide a valid auth token')
auth_token = auth_header.split(' ')[1]
resp = User.decode_auth_token(auth_token)
if isinstance(resp, str):
return UnauthorizedErrorResponse(resp)
return {
'status': 'success',
'message': 'successfully logged out',
}
@auth_blueprint.route('/auth/profile', methods=['GET'])
@authenticate
def get_authenticated_user_profile(
auth_user: User,
) -> Union[Dict, HttpResponse]:
"""
get authenticated user info
get authenticated user info (profile, account, preferences)
**Example request**:
@ -314,6 +293,7 @@ def get_authenticated_user_profile(
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -381,14 +361,14 @@ def get_authenticated_user_profile(
- invalid token, please log in again
"""
return {'status': 'success', 'data': auth_user.serialize()}
return {'status': 'success', 'data': auth_user.serialize(auth_user)}
@auth_blueprint.route('/auth/profile/edit', methods=['POST'])
@authenticate
def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
"""
edit authenticated user
edit authenticated user profile
**Example request**:
@ -413,6 +393,7 @@ def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -477,15 +458,12 @@ def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
:<json string location: user location
:<json string bio: user biography
:<json string birth_date: user birth date (format: ``%Y-%m-%d``)
:<json string password: user password
:<json string password_conf: user password confirmation
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: user profile updated
:statuscode 400:
- invalid payload
- password: password and password confirmation don't match
:statuscode 401:
- provide a valid auth token
- signature expired, please log in again
@ -510,16 +488,6 @@ def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
bio = post_data.get('bio')
birth_date = post_data.get('birth_date')
location = post_data.get('location')
password = post_data.get('password')
password_conf = post_data.get('password_conf')
if password is not None and password != '':
message = check_passwords(password, password_conf)
if message != '':
return InvalidPayloadErrorResponse(message)
password = bcrypt.generate_password_hash(
password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
try:
auth_user.first_name = first_name
@ -531,14 +499,12 @@ def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
if birth_date
else None
)
if password is not None and password != '':
auth_user.password = password
db.session.commit()
return {
'status': 'success',
'message': 'user profile updated',
'data': auth_user.serialize(),
'data': auth_user.serialize(auth_user),
}
# handler errors
@ -546,6 +512,214 @@ def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/profile/edit/account', methods=['PATCH'])
@authenticate
def update_user_account(auth_user: User) -> Union[Dict, HttpResponse]:
"""
update authenticated user email and password
It sends emails if sending is enabled:
- Password change
- Email change:
- one to the current address to inform user
- another one to the new address to confirm it.
**Example request**:
.. sourcecode:: http
PATCH /api/auth/profile/edit/account HTTP/1.1
Content-Type: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": {
"admin": false,
"bio": null,
"birth_date": null,
"created_at": "Sun, 14 Jul 2019 14:09:58 GMT",
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
"nb_sports": 3,
"nb_workouts": 6,
"picture": false,
"records": [
{
"id": 9,
"record_type": "AS",
"sport_id": 1,
"user": "sam",
"value": 18,
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
},
{
"id": 10,
"record_type": "FD",
"sport_id": 1,
"user": "sam",
"value": 18,
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
},
{
"id": 11,
"record_type": "LD",
"sport_id": 1,
"user": "sam",
"value": "1:01:00",
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
},
{
"id": 12,
"record_type": "MS",
"sport_id": 1,
"user": "sam",
"value": 18,
"workout_date": "Sun, 07 Jul 2019 08:00:00 GMT",
"workout_id": "hvYBqYBRa7wwXpaStWR4V2"
}
],
"sports_list": [
1,
4,
6
],
"timezone": "Europe/Paris",
"total_distance": 67.895,
"total_duration": "6:50:27",
"username": "sam"
"weekm": true,
},
"message": "user account updated",
"status": "success"
}
:<json string email: user email
:<json string password: user current password
:<json string new_password: user new password
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: user account updated
:statuscode 400:
- invalid payload
- email is missing
- current password is missing
- email: valid email must be provided
- password: 8 characters required
:statuscode 401:
- provide a valid auth token
- signature expired, please log in again
- invalid token, please log in again
- invalid credentials
:statuscode 500: error, please try again or contact the administrator
"""
data = request.get_json()
if not data:
return InvalidPayloadErrorResponse()
email_to_confirm = data.get('email')
if not email_to_confirm:
return InvalidPayloadErrorResponse('email is missing')
current_password = data.get('password')
if not current_password:
return InvalidPayloadErrorResponse('current password is missing')
if not bcrypt.check_password_hash(auth_user.password, current_password):
return UnauthorizedErrorResponse('invalid credentials')
new_password = data.get('new_password')
error_messages = ''
try:
if email_to_confirm != auth_user.email:
if is_valid_email(email_to_confirm):
if current_app.config['CAN_SEND_EMAILS']:
auth_user.email_to_confirm = email_to_confirm
auth_user.confirmation_token = secrets.token_urlsafe(30)
else:
auth_user.email = email_to_confirm
auth_user.confirmation_token = None
else:
error_messages = 'email: valid email must be provided\n'
if new_password is not None:
error_messages += check_password(new_password)
if error_messages == '':
hashed_password = bcrypt.generate_password_hash(
new_password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
auth_user.password = hashed_password
if error_messages != '':
return InvalidPayloadErrorResponse(error_messages)
db.session.commit()
if current_app.config['CAN_SEND_EMAILS']:
ui_url = current_app.config['UI_URL']
user_data = {
'language': get_language(auth_user.language),
'email': auth_user.email,
}
data = {
'username': auth_user.username,
'fittrackee_url': ui_url,
'operating_system': request.user_agent.platform,
'browser_name': request.user_agent.browser,
}
if new_password is not None:
password_change_email.send(user_data, data)
if (
auth_user.email_to_confirm is not None
and auth_user.email_to_confirm != auth_user.email
):
email_data = {
**data,
**{'new_email_address': email_to_confirm},
}
email_updated_to_current_address.send(user_data, email_data)
email_data = {
**data,
**{
'email_confirmation_url': (
f'{ui_url}/email-update'
f'?token={auth_user.confirmation_token}'
)
},
}
user_data = {
**user_data,
**{'email': auth_user.email_to_confirm},
}
email_updated_to_new_address.send(user_data, email_data)
return {
'status': 'success',
'message': 'user account updated',
'data': auth_user.serialize(auth_user),
}
except (exc.IntegrityError, exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/profile/edit/preferences', methods=['POST'])
@authenticate
def edit_user_preferences(auth_user: User) -> Union[Dict, HttpResponse]:
@ -575,6 +749,7 @@ def edit_user_preferences(auth_user: User) -> Union[Dict, HttpResponse]:
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -635,8 +810,9 @@ def edit_user_preferences(auth_user: User) -> Union[Dict, HttpResponse]:
}
:<json string timezone: user time zone
:<json string weekm: does week start on Monday?
:<json boolean weekm: does week start on Monday?
:<json string language: language preferences
:<json boolean imperial_units: display distance in imperial units
:reqheader Authorization: OAuth 2.0 Bearer Token
@ -663,7 +839,7 @@ def edit_user_preferences(auth_user: User) -> Union[Dict, HttpResponse]:
return InvalidPayloadErrorResponse()
imperial_units = post_data.get('imperial_units')
language = post_data.get('language')
language = get_language(post_data.get('language'))
timezone = post_data.get('timezone')
weekm = post_data.get('weekm')
@ -677,7 +853,7 @@ def edit_user_preferences(auth_user: User) -> Union[Dict, HttpResponse]:
return {
'status': 'success',
'message': 'user preferences updated',
'data': auth_user.serialize(),
'data': auth_user.serialize(auth_user),
}
# handler errors
@ -890,7 +1066,9 @@ def edit_picture(auth_user: User) -> Union[Dict, HttpResponse]:
"""
try:
response_object = verify_extension_and_size('picture', request)
response_object = get_error_response_if_file_is_invalid(
'picture', request
)
except RequestEntityTooLarge as e:
appLog.error(e)
return PayloadTooLargeErrorResponse(
@ -980,6 +1158,8 @@ def request_password_reset() -> Union[Dict, HttpResponse]:
"""
handle password reset request
If email sending is disabled, this endpoint is not available
**Example request**:
.. sourcecode:: http
@ -1003,8 +1183,12 @@ def request_password_reset() -> Union[Dict, HttpResponse]:
:statuscode 200: password reset request processed
:statuscode 400: invalid payload
:statuscode 404: the requested URL was not found on the server
"""
if not current_app.config['CAN_SEND_EMAILS']:
return NotFoundErrorResponse(NOT_FOUND_MESSAGE)
post_data = request.get_json()
if not post_data or post_data.get('email') is None:
return InvalidPayloadErrorResponse()
@ -1014,7 +1198,7 @@ def request_password_reset() -> Union[Dict, HttpResponse]:
if user:
password_reset_token = user.encode_password_reset_token(user.id)
ui_url = current_app.config['UI_URL']
user_language = 'en' if user.language is None else user.language
user_language = get_language(user.language)
email_data = {
'expiration_delay': get_readable_duration(
current_app.config['PASSWORD_TOKEN_EXPIRATION_SECONDS'],
@ -1024,6 +1208,7 @@ def request_password_reset() -> Union[Dict, HttpResponse]:
'password_reset_url': (
f'{ui_url}/password-reset?token={password_reset_token}' # noqa
),
'fittrackee_url': ui_url,
'operating_system': request.user_agent.platform, # type: ignore
'browser_name': request.user_agent.browser, # type: ignore
}
@ -1041,7 +1226,9 @@ def request_password_reset() -> Union[Dict, HttpResponse]:
@auth_blueprint.route('/auth/password/update', methods=['POST'])
def update_password() -> Union[Dict, HttpResponse]:
"""
update user password
update user password after password reset request
It sends emails if sending is enabled
**Example request**:
@ -1063,7 +1250,6 @@ def update_password() -> Union[Dict, HttpResponse]:
}
:<json string password: password (8 characters required)
:<json string password_conf: password confirmation
:<json string token: password reset token
:statuscode 200: password updated
@ -1076,12 +1262,10 @@ def update_password() -> Union[Dict, HttpResponse]:
if (
not post_data
or post_data.get('password') is None
or post_data.get('password_conf') is None
or post_data.get('token') is None
):
return InvalidPayloadErrorResponse()
password = post_data.get('password')
password_conf = post_data.get('password_conf')
token = post_data.get('token')
try:
@ -1089,7 +1273,7 @@ def update_password() -> Union[Dict, HttpResponse]:
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return UnauthorizedErrorResponse()
message = check_passwords(password, password_conf)
message = check_password(password)
if message != '':
return InvalidPayloadErrorResponse(message)
@ -1101,9 +1285,204 @@ def update_password() -> Union[Dict, HttpResponse]:
password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
db.session.commit()
if current_app.config['CAN_SEND_EMAILS']:
password_change_email.send(
{
'language': get_language(user.language),
'email': user.email,
},
{
'username': user.username,
'fittrackee_url': current_app.config['UI_URL'],
'operating_system': request.user_agent.platform,
'browser_name': request.user_agent.browser,
},
)
return {
'status': 'success',
'message': 'password updated',
}
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/email/update', methods=['POST'])
def update_email() -> Union[Dict, HttpResponse]:
"""
update user email after confirmation
**Example request**:
.. sourcecode:: http
POST /api/auth/email/update HTTP/1.1
Content-Type: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"message": "email updated",
"status": "success"
}
:<json string token: password reset token
:statuscode 200: email updated
:statuscode 400: invalid payload
:statuscode 500: error, please try again or contact the administrator
"""
post_data = request.get_json()
if not post_data or post_data.get('token') is None:
return InvalidPayloadErrorResponse()
token = post_data.get('token')
try:
user = User.query.filter_by(confirmation_token=token).first()
if not user:
return InvalidPayloadErrorResponse()
user.email = user.email_to_confirm
user.email_to_confirm = None
user.confirmation_token = None
db.session.commit()
response = {
'status': 'success',
'message': 'email updated',
}
return response
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/account/confirm', methods=['POST'])
def confirm_account() -> Union[Dict, HttpResponse]:
"""
activate user account after registration
**Example request**:
.. sourcecode:: http
POST /api/auth/account/confirm HTTP/1.1
Content-Type: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"auth_token": "JSON Web Token",
"message": "account confirmation successful",
"status": "success"
}
:<json string token: confirmation token
:statuscode 200: account confirmation successful
:statuscode 400: invalid payload
:statuscode 500: error, please try again or contact the administrator
"""
post_data = request.get_json()
if not post_data or post_data.get('token') is None:
return InvalidPayloadErrorResponse()
token = post_data.get('token')
try:
user = User.query.filter_by(confirmation_token=token).first()
if not user:
return InvalidPayloadErrorResponse()
user.is_active = True
user.confirmation_token = None
db.session.commit()
# generate auth token
auth_token = user.encode_auth_token(user.id)
response = {
'status': 'success',
'message': 'account confirmation successful',
'auth_token': auth_token,
}
return response
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/account/resend-confirmation', methods=['POST'])
def resend_account_confirmation_email() -> Union[Dict, HttpResponse]:
"""
resend email with instructions to confirm account
If email sending is disabled, this endpoint is not available
**Example request**:
.. sourcecode:: http
POST /api/auth/account/resend-confirmation HTTP/1.1
Content-Type: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"message": "confirmation email resent",
"status": "success"
}
:<json string email: user email
:statuscode 200: confirmation email resent
:statuscode 400: invalid payload
:statuscode 404: the requested URL was not found on the server
:statuscode 500: error, please try again or contact the administrator
"""
if not current_app.config['CAN_SEND_EMAILS']:
return NotFoundErrorResponse(NOT_FOUND_MESSAGE)
post_data = request.get_json()
if not post_data or post_data.get('email') is None:
return InvalidPayloadErrorResponse()
email = post_data.get('email')
try:
user = User.query.filter_by(email=email, is_active=False).first()
if user:
user.confirmation_token = secrets.token_urlsafe(30)
db.session.commit()
send_account_confirmation_email(user)
response = {
'status': 'success',
'message': 'confirmation email resent',
}
return response
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)

View File

@ -0,0 +1,62 @@
from typing import Optional
import click
from fittrackee.cli.app import app
from fittrackee.users.exceptions import UserNotFoundException
from fittrackee.users.utils.admin import UserManagerService
@click.group(name='users')
def users_cli() -> None:
"""Manage users."""
pass
@users_cli.command('update')
@click.argument('username')
@click.option(
'--set-admin',
type=bool,
help='Add/remove admin rights (when adding admin rights, '
'it also activates user account if not active).',
)
@click.option('--activate', is_flag=True, help='Activate user account.')
@click.option(
'--reset-password',
is_flag=True,
help='Reset user password (a new password will be displayed).',
)
@click.option('--update-email', type=str, help='Update user email.')
def manage_user(
username: str,
set_admin: Optional[bool],
activate: bool,
reset_password: bool,
update_email: Optional[str],
) -> None:
"""Manage given user account."""
with app.app_context():
try:
user_manager_service = UserManagerService(username)
_, is_user_updated, password = user_manager_service.update(
is_admin=set_admin,
with_confirmation=False,
activate=activate,
reset_password=reset_password,
new_email=update_email,
)
if is_user_updated:
click.echo(f"User '{username}' updated.")
if password:
click.echo(f"The new password is: {password}")
else:
click.echo("No updates.")
except UserNotFoundException:
click.echo(
f"User '{username}' not found.\n"
"Check the provided user name (case sensitive).",
err=True,
)
except Exception as e:
click.echo(f'An error occurred: {e}', err=True)

View File

@ -5,7 +5,7 @@ from flask import request
from fittrackee.responses import HttpResponse
from .utils import verify_user
from .utils.controls import verify_user
def verify_auth_user(

View File

@ -1,2 +1,6 @@
class InvalidEmailException(Exception):
...
class UserNotFoundException(Exception):
...

View File

@ -11,7 +11,9 @@ from sqlalchemy.sql.expression import select
from fittrackee import bcrypt, db
from fittrackee.workouts.models import Workout
from .utils_token import decode_user_token, get_user_token
from .exceptions import UserNotFoundException
from .roles import UserRole
from .utils.token import decode_user_token, get_user_token
BaseModel: DeclarativeMeta = db.Model
@ -19,8 +21,8 @@ BaseModel: DeclarativeMeta = db.Model
class User(BaseModel):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(20), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
username = db.Column(db.String(255), unique=True, nullable=False)
email = db.Column(db.String(255), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
admin = db.Column(db.Boolean, default=False, nullable=False)
@ -32,7 +34,7 @@ class User(BaseModel):
picture = db.Column(db.String(255), nullable=True)
timezone = db.Column(db.String(50), nullable=True)
# does the week start Monday?
weekm = db.Column(db.Boolean(50), default=False, nullable=False)
weekm = db.Column(db.Boolean, default=False, nullable=False)
workouts = db.relationship(
'Workout',
lazy=True,
@ -45,6 +47,9 @@ class User(BaseModel):
)
language = db.Column(db.String(50), nullable=True)
imperial_units = db.Column(db.Boolean, default=False, nullable=False)
is_active = db.Column(db.Boolean, default=False, nullable=False)
email_to_confirm = db.Column(db.String(255), nullable=True)
confirmation_token = db.Column(db.String(255), nullable=True)
def __repr__(self) -> str:
return f'<User {self.username!r}>'
@ -54,14 +59,16 @@ class User(BaseModel):
username: str,
email: str,
password: str,
created_at: Optional[datetime] = datetime.utcnow(),
created_at: Optional[datetime] = None,
) -> None:
self.username = username
self.email = email
self.password = bcrypt.generate_password_hash(
password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
self.created_at = created_at
self.created_at = (
datetime.utcnow() if created_at is None else created_at
)
@staticmethod
def encode_auth_token(user_id: int) -> str:
@ -107,7 +114,18 @@ class User(BaseModel):
.label('workouts_count')
)
def serialize(self) -> Dict:
def serialize(self, current_user: 'User') -> Dict:
role = (
UserRole.AUTH_USER
if current_user.id == self.id
else UserRole.ADMIN
if current_user.admin
else UserRole.USER
)
if role == UserRole.USER:
raise UserNotFoundException()
sports = []
total = (0, '0:00:00')
if self.workouts_count > 0: # type: ignore
@ -125,22 +143,21 @@ class User(BaseModel):
.filter(Workout.user_id == self.id)
.first()
)
return {
'username': self.username,
'email': self.email,
'created_at': self.created_at,
serialized_user = {
'admin': self.admin,
'first_name': self.first_name,
'last_name': self.last_name,
'bio': self.bio,
'location': self.location,
'birth_date': self.birth_date,
'picture': self.picture is not None,
'timezone': self.timezone,
'weekm': self.weekm,
'language': self.language,
'created_at': self.created_at,
'email': self.email,
'email_to_confirm': self.email_to_confirm,
'first_name': self.first_name,
'is_active': self.is_active,
'last_name': self.last_name,
'location': self.location,
'nb_sports': len(sports),
'nb_workouts': self.workouts_count,
'picture': self.picture is not None,
'records': [record.serialize() for record in self.records],
'sports_list': [
sport for sportslist in sports for sport in sportslist
@ -148,8 +165,20 @@ class User(BaseModel):
'total_distance': float(total[0]),
'total_duration': str(total[1]),
'total_ascent': str(total[2]),
'imperial_units': self.imperial_units,
'username': self.username,
}
if role == UserRole.AUTH_USER:
serialized_user = {
**serialized_user,
**{
'imperial_units': self.imperial_units,
'language': self.language,
'timezone': self.timezone,
'weekm': self.weekm,
},
}
return serialized_user
class UserSportPreference(BaseModel):

View File

@ -0,0 +1,7 @@
from enum import Enum
class UserRole(Enum):
ADMIN = 'admin'
AUTH_USER = 'auth_user'
USER = 'user'

View File

@ -2,11 +2,16 @@ import os
import shutil
from typing import Any, Dict, Tuple, Union
import click
from flask import Blueprint, request, send_file
from flask import Blueprint, current_app, request, send_file
from sqlalchemy import exc
from fittrackee import db
from fittrackee.emails.tasks import (
email_updated_to_new_address,
password_change_email,
reset_password_email,
)
from fittrackee.files import get_absolute_file_path
from fittrackee.responses import (
ForbiddenErrorResponse,
HttpResponse,
@ -15,35 +20,28 @@ from fittrackee.responses import (
UserNotFoundErrorResponse,
handle_error_and_return_response,
)
from fittrackee.utils import get_readable_duration
from fittrackee.workouts.models import Record, Workout, WorkoutSegment
from fittrackee.workouts.utils_files import get_absolute_file_path
from .auth import get_language
from .decorators import authenticate, authenticate_as_admin
from .exceptions import UserNotFoundException
from .exceptions import InvalidEmailException, UserNotFoundException
from .models import User, UserSportPreference
from .utils import set_admin_rights
from .utils.admin import UserManagerService
users_blueprint = Blueprint('users', __name__)
USER_PER_PAGE = 10
@users_blueprint.cli.command('set-admin')
@click.argument('username')
def set_admin(username: str) -> None:
"""Set admin rights for given user"""
try:
set_admin_rights(username)
print(f"User '{username}' updated.")
except UserNotFoundException:
print(f"User '{username}' not found.")
@users_blueprint.route('/users', methods=['GET'])
@authenticate
@authenticate_as_admin
def get_users(auth_user: User) -> Dict:
"""
Get all users
Get all users (regardless their account status), if authenticated user
has admin rights
It returns user preferences only for authenticated user.
**Example request**:
@ -78,6 +76,7 @@ def get_users(auth_user: User) -> Dict:
"created_at": "Sun, 14 Jul 2019 14:09:58 GMT",
"email": "admin@example.com",
"first_name": null,
"is_admin": true,
"imperial_units": false,
"language": "en",
"last_name": null,
@ -131,7 +130,8 @@ def get_users(auth_user: User) -> Dict:
"timezone": "Europe/Paris",
"total_distance": 67.895,
"total_duration": "6:50:27",
"username": "admin"
"username": "admin",
"weekm": false
},
{
"admin": false,
@ -140,6 +140,7 @@ def get_users(auth_user: User) -> Dict:
"created_at": "Sat, 20 Jul 2019 11:27:03 GMT",
"email": "sam@example.com",
"first_name": null,
"is_admin": false,
"language": "fr",
"last_name": null,
"location": null,
@ -162,7 +163,7 @@ def get_users(auth_user: User) -> Dict:
:query integer per_page: number of users per page (default: 10, max: 50)
:query string q: query on user name
:query string order_by: sorting criteria (``username``, ``created_at``,
``workouts_count``, ``admin``)
``workouts_count``, ``admin``, ``is_active``)
:query string order: sorting order (default: ``asc``)
:reqheader Authorization: OAuth 2.0 Bearer Token
@ -184,7 +185,7 @@ def get_users(auth_user: User) -> Dict:
query = params.get('q')
users_pagination = (
User.query.filter(
User.username.like('%' + query + '%') if query else True,
User.username.ilike('%' + query + '%') if query else True,
)
.order_by(
User.workouts_count.asc() # type: ignore
@ -211,13 +212,19 @@ def get_users(auth_user: User) -> Dict:
User.admin.desc()
if order_by == 'admin' and order == 'desc'
else True,
User.is_active.asc()
if order_by == 'is_active' and order == 'asc'
else True,
User.is_active.desc()
if order_by == 'is_active' and order == 'desc'
else True,
)
.paginate(page, per_page, False)
)
users = users_pagination.items
return {
'status': 'success',
'data': {'users': [user.serialize() for user in users]},
'data': {'users': [user.serialize(auth_user) for user in users]},
'pagination': {
'has_next': users_pagination.has_next,
'has_prev': users_pagination.has_prev,
@ -234,7 +241,10 @@ def get_single_user(
auth_user: User, user_name: str
) -> Union[Dict, HttpResponse]:
"""
Get single user details
Get single user details. Only user with admin rights can get other users
details.
It returns user preferences only for authenticated user.
**Example request**:
@ -260,6 +270,7 @@ def get_single_user(
"email": "admin@example.com",
"first_name": null,
"imperial_units": false,
"is_admin": true,
"language": "en",
"last_name": null,
"location": null,
@ -330,12 +341,15 @@ def get_single_user(
:statuscode 404:
- user does not exist
"""
if user_name != auth_user.username and not auth_user.admin:
return ForbiddenErrorResponse()
try:
user = User.query.filter_by(username=user_name).first()
if user:
return {
'status': 'success',
'data': {'users': [user.serialize()]},
'data': {'users': [user.serialize(auth_user)]},
}
except ValueError:
pass
@ -375,7 +389,7 @@ def get_picture(user_name: str) -> Any:
if user.picture is not None:
picture_path = get_absolute_file_path(user.picture)
return send_file(picture_path)
except Exception:
except Exception: # nosec
pass
return NotFoundErrorResponse('No picture.')
@ -384,7 +398,13 @@ def get_picture(user_name: str) -> Any:
@authenticate_as_admin
def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
"""
Update user to add admin rights
Update user account
- add/remove admin rights (regardless user account status)
- reset password (and send email to update user password,
if sending enabled)
- update user email (and send email to new user email, if sending enabled)
- activate account for an inactive user
Only user with admin rights can modify another user
@ -392,7 +412,7 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
.. sourcecode:: http
PATCH api/users/<user_name> HTTP/1.1
PATCH /api/users/<user_name> HTTP/1.1
Content-Type: application/json
**Example response**:
@ -412,6 +432,7 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
"email": "admin@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -472,11 +493,18 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
:param string user_name: user name
:<json boolean activate: activate user account
:<json boolean admin: does the user have administrator rights
:<json boolean new_email: new user email
:<json boolean reset_password: reset user password
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: success
:statuscode 400:
- invalid payload
- valid email must be provided
- new email must be different than curent email
:statuscode 401:
- provide a valid auth token
- signature expired, please log in again
@ -487,20 +515,80 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
:statuscode 500:
"""
user_data = request.get_json()
if not user_data or user_data.get('admin') is None:
if not user_data:
return InvalidPayloadErrorResponse()
try:
user = User.query.filter_by(username=user_name).first()
if not user:
return UserNotFoundErrorResponse()
reset_password = user_data.get('reset_password', False)
new_email = user_data.get('new_email')
user_manager_service = UserManagerService(username=user_name)
user, _, _ = user_manager_service.update(
is_admin=user_data.get('admin'),
activate=user_data.get('activate', False),
reset_password=reset_password,
new_email=new_email,
with_confirmation=current_app.config['CAN_SEND_EMAILS'],
)
if current_app.config['CAN_SEND_EMAILS']:
user_language = get_language(user.language)
ui_url = current_app.config['UI_URL']
if reset_password:
user_data = {
'language': user_language,
'email': user.email,
}
password_change_email.send(
user_data,
{
'username': user.username,
'fittrackee_url': ui_url,
},
)
password_reset_token = user.encode_password_reset_token(
user.id
)
reset_password_email.send(
user_data,
{
'expiration_delay': get_readable_duration(
current_app.config[
'PASSWORD_TOKEN_EXPIRATION_SECONDS'
],
user_language,
),
'username': user.username,
'password_reset_url': (
f'{ui_url}/password-reset?'
f'token={password_reset_token}'
),
'fittrackee_url': ui_url,
},
)
if new_email:
user_data = {
'language': user_language,
'email': user.email_to_confirm,
}
email_data = {
'username': user.username,
'fittrackee_url': ui_url,
'email_confirmation_url': (
f'{ui_url}/email-update'
f'?token={user.confirmation_token}'
),
}
email_updated_to_new_address.send(user_data, email_data)
user.admin = user_data['admin']
db.session.commit()
return {
'status': 'success',
'data': {'users': [user.serialize()]},
'data': {'users': [user.serialize(auth_user)]},
}
except UserNotFoundException:
return UserNotFoundErrorResponse()
except InvalidEmailException as e:
return InvalidPayloadErrorResponse(str(e))
except exc.StatementError as e:
return handle_error_and_return_response(e, db=db)

View File

View File

@ -0,0 +1,86 @@
import secrets
from typing import Optional, Tuple
from flask import current_app
from fittrackee import bcrypt, db
from ..exceptions import InvalidEmailException, UserNotFoundException
from ..models import User
from ..utils.controls import is_valid_email
class UserManagerService:
def __init__(self, username: str):
self.username = username
def _get_user(self) -> User:
user = User.query.filter_by(username=self.username).first()
if not user:
raise UserNotFoundException()
return user
def _update_admin_rights(self, user: User, is_admin: bool) -> None:
user.admin = is_admin
if is_admin:
self._activate_user(user)
@staticmethod
def _activate_user(user: User) -> None:
user.is_active = True
user.confirmation_token = None
@staticmethod
def _reset_user_password(user: User) -> str:
new_password = secrets.token_urlsafe(30)
user.password = bcrypt.generate_password_hash(
new_password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
return new_password
@staticmethod
def _update_user_email(
user: User, new_email: str, with_confirmation: bool
) -> None:
if not is_valid_email(new_email):
raise InvalidEmailException('valid email must be provided')
if user.email == new_email:
raise InvalidEmailException(
'new email must be different than curent email'
)
if with_confirmation:
user.email_to_confirm = new_email
user.confirmation_token = secrets.token_urlsafe(30)
else:
user.email = new_email
def update(
self,
is_admin: Optional[bool] = None,
activate: bool = False,
reset_password: bool = False,
new_email: Optional[str] = None,
with_confirmation: bool = True,
) -> Tuple[User, bool, Optional[str]]:
user_updated = False
new_password = None
user = self._get_user()
if is_admin is not None:
self._update_admin_rights(user, is_admin)
user_updated = True
if activate:
self._activate_user(user)
user_updated = True
if reset_password:
new_password = self._reset_user_password(user)
user_updated = True
if new_email is not None:
self._update_user_email(user, new_email, with_confirmation)
user_updated = True
db.session.commit()
return user, user_updated, new_password

View File

@ -3,47 +3,43 @@ from typing import Optional, Tuple
from flask import Request
from fittrackee import db
from fittrackee.responses import (
ForbiddenErrorResponse,
HttpResponse,
UnauthorizedErrorResponse,
)
from .exceptions import UserNotFoundException
from .models import User
from ..models import User
def is_valid_email(email: str) -> bool:
"""
Return if email format is valid
"""
if not email:
return False
mail_pattern = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)"
return re.match(mail_pattern, email) is not None
def check_passwords(password: str, password_conf: str) -> str:
def check_password(password: str) -> str:
"""
Verify if password and password confirmation are the same and have
more than 8 characters
If not, it returns not empty string
Verify if password have more than 8 characters
If not, it returns error message
"""
ret = ''
if password_conf != password:
ret = 'password: password and password confirmation do not match\n'
if len(password) < 8:
ret += 'password: 8 characters required\n'
return ret
return 'password: 8 characters required\n'
return ''
def check_username(username: str) -> str:
"""
Return if username is valid
If not, it returns error messages
"""
ret = ''
if not 2 < len(username) < 13:
ret += 'username: 3 to 12 characters required\n'
if not (2 < len(username) < 31):
ret += 'username: 3 to 30 characters required\n'
if not re.match(r'^[a-zA-Z0-9_]+$', username):
ret += (
'username: only alphanumeric characters and the '
@ -52,18 +48,15 @@ def check_username(username: str) -> str:
return ret
def register_controls(
username: str, email: str, password: str, password_conf: str
) -> str:
def register_controls(username: str, email: str, password: str) -> str:
"""
Verify if username, email and passwords are valid
If not, it returns not empty string
If not, it returns error messages
"""
ret = check_username(username)
if not is_valid_email(email):
ret += 'email: valid email must be provided\n'
ret += check_passwords(password, password_conf)
ret += check_password(password)
return ret
@ -71,8 +64,12 @@ def verify_user(
current_request: Request, verify_admin: bool
) -> Tuple[Optional[HttpResponse], Optional[User]]:
"""
Return authenticated user, if the provided token is valid and user has
admin rights if 'verify_admin' is True
Return authenticated user if
- the provided token is valid
- the user account is active
- the user has admin rights if 'verify_admin' is True
If not, it returns Error Response
"""
default_message = 'provide a valid auth token'
auth_header = current_request.headers.get('Authorization')
@ -83,27 +80,8 @@ def verify_user(
if isinstance(resp, str):
return UnauthorizedErrorResponse(resp), None
user = User.query.filter_by(id=resp).first()
if not user:
if not user or not user.is_active:
return UnauthorizedErrorResponse(default_message), None
if verify_admin and not user.admin:
return ForbiddenErrorResponse(), None
return None, user
def can_view_workout(
auth_user_id: int, workout_user_id: int
) -> Optional[HttpResponse]:
"""
Return error response if user has no right to view workout
"""
if auth_user_id != workout_user_id:
return ForbiddenErrorResponse()
return None
def set_admin_rights(username: str) -> None:
user = User.query.filter_by(username=username).first()
if not user:
raise UserNotFoundException()
user.admin = True
db.session.commit()