API & CLI - refacto + new commands to update users

This commit is contained in:
Sam
2022-04-23 13:48:29 +02:00
parent a43b538aae
commit cea049273c
6 changed files with 291 additions and 65 deletions

View File

@ -1,8 +1,10 @@
from typing import Optional
import click
from fittrackee.cli.app import app
from fittrackee.users.exceptions import UserNotFoundException
from fittrackee.users.utils.admin import set_admin_rights
from fittrackee.users.utils.admin import UserManagerService
@click.group(name='users')
@ -11,13 +13,50 @@ def users_cli() -> None:
pass
@users_cli.command('set-admin')
@users_cli.command('update')
@click.argument('username')
def set_admin(username: str) -> None:
"""Set admin rights for given user"""
@click.option(
'--set-admin',
type=bool,
help='Add/remove admin rights (when adding admin rights, '
'it also activates user account if not active).',
)
@click.option('--activate', is_flag=True, help='Activate user account.')
@click.option(
'--reset-password',
is_flag=True,
help='Reset user password (a new password will be displayed).',
)
@click.option('--update-email', type=str, help='Update user email.')
def manage_user(
username: str,
set_admin: Optional[bool],
activate: bool,
reset_password: bool,
update_email: Optional[str],
) -> None:
"""Manage giver user account."""
with app.app_context():
try:
set_admin_rights(username)
click.echo(f"User '{username}' updated.")
user_manager_service = UserManagerService(username)
_, is_user_updated, password = user_manager_service.update(
is_admin=set_admin,
with_confirmation=False,
activate=activate,
reset_password=reset_password,
new_email=update_email,
)
if is_user_updated:
click.echo(f"User '{username}' updated.")
if password:
click.echo(f"The new password is: {password}")
else:
click.echo("No updates.")
except UserNotFoundException:
click.echo(f"User '{username}' not found.", err=True)
click.echo(
f"User '{username}' not found.\n"
"Check the provided user name (case sensitive).",
err=True,
)
except Exception as e:
click.echo(f'An error occurred: {e}', err=True)

View File

@ -1,2 +1,6 @@
class InvalidEmailException(Exception):
...
class UserNotFoundException(Exception):
...

View File

@ -1,12 +1,11 @@
import os
import secrets
import shutil
from typing import Any, Dict, Tuple, Union
from flask import Blueprint, current_app, request, send_file
from sqlalchemy import exc
from fittrackee import bcrypt, db
from fittrackee import db
from fittrackee.emails.tasks import (
email_updated_to_new_address,
password_change_email,
@ -21,12 +20,13 @@ from fittrackee.responses import (
UserNotFoundErrorResponse,
handle_error_and_return_response,
)
from fittrackee.users.utils.controls import is_valid_email
from fittrackee.utils import get_readable_duration
from fittrackee.workouts.models import Record, Workout, WorkoutSegment
from .decorators import authenticate, authenticate_as_admin
from .exceptions import InvalidEmailException, UserNotFoundException
from .models import User, UserSportPreference
from .utils.admin import UserManagerService
users_blueprint = Blueprint('users', __name__)
@ -516,46 +516,20 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
if not user_data:
return InvalidPayloadErrorResponse()
send_password_emails = False
send_new_address_email = False
try:
user = User.query.filter_by(username=user_name).first()
if not user:
return UserNotFoundErrorResponse()
if 'admin' in user_data:
user.admin = user_data['admin']
if user_data.get('activate', False):
user.is_active = True
user.confirmation_token = None
if user_data.get('reset_password', False):
new_password = secrets.token_urlsafe(30)
user.password = bcrypt.generate_password_hash(
new_password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
send_password_emails = True
if 'new_email' in user_data:
if is_valid_email(user_data['new_email']):
if user_data['new_email'] == user.email:
return InvalidPayloadErrorResponse(
'new email must be different than curent email'
)
user.email_to_confirm = user_data['new_email']
user.confirmation_token = secrets.token_urlsafe(30)
send_new_address_email = True
else:
return InvalidPayloadErrorResponse(
'valid email must be provided'
)
db.session.commit()
reset_password = user_data.get('reset_password', False)
new_email = user_data.get('new_email')
user_manager_service = UserManagerService(username=user_name)
user, _, _ = user_manager_service.update(
is_admin=user_data.get('admin'),
activate=user_data.get('activate', False),
reset_password=reset_password,
new_email=new_email,
)
user_language = 'en' if user.language is None else user.language
ui_url = current_app.config['UI_URL']
if send_password_emails:
if reset_password:
user_data = {
'language': user_language,
'email': user.email,
@ -585,7 +559,7 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
},
)
if send_new_address_email:
if new_email:
user_data = {
'language': user_language,
'email': user.email_to_confirm,
@ -604,6 +578,10 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
'status': 'success',
'data': {'users': [user.serialize(auth_user)]},
}
except UserNotFoundException:
return UserNotFoundErrorResponse()
except InvalidEmailException as e:
return InvalidPayloadErrorResponse(str(e))
except exc.StatementError as e:
return handle_error_and_return_response(e, db=db)

View File

@ -1,14 +1,86 @@
from fittrackee import db
import secrets
from typing import Optional, Tuple
from ..exceptions import UserNotFoundException
from flask import current_app
from fittrackee import bcrypt, db
from ..exceptions import InvalidEmailException, UserNotFoundException
from ..models import User
from ..utils.controls import is_valid_email
def set_admin_rights(username: str) -> None:
user = User.query.filter_by(username=username).first()
if not user:
raise UserNotFoundException()
user.admin = True
user.is_active = True
user.confirmation_token = None
db.session.commit()
class UserManagerService:
def __init__(self, username: str):
self.username = username
def _get_user(self) -> User:
user = User.query.filter_by(username=self.username).first()
if not user:
raise UserNotFoundException()
return user
def _update_admin_rights(self, user: User, is_admin: bool) -> None:
user.admin = is_admin
if is_admin:
self._activate_user(user)
@staticmethod
def _activate_user(user: User) -> None:
user.is_active = True
user.confirmation_token = None
@staticmethod
def _reset_user_password(user: User) -> str:
new_password = secrets.token_urlsafe(30)
user.password = bcrypt.generate_password_hash(
new_password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
return new_password
@staticmethod
def _update_user_email(
user: User, new_email: str, with_confirmation: bool
) -> None:
if not is_valid_email(new_email):
raise InvalidEmailException('valid email must be provided')
if user.email == new_email:
raise InvalidEmailException(
'new email must be different than curent email'
)
if with_confirmation:
user.email_to_confirm = new_email
user.confirmation_token = secrets.token_urlsafe(30)
else:
user.email = new_email
def update(
self,
is_admin: Optional[bool] = None,
activate: bool = False,
reset_password: bool = False,
new_email: Optional[str] = None,
with_confirmation: bool = True,
) -> Tuple[User, bool, Optional[str]]:
user_updated = False
new_password = None
user = self._get_user()
if is_admin is not None:
self._update_admin_rights(user, is_admin)
user_updated = True
if activate:
self._activate_user(user)
user_updated = True
if reset_password:
new_password = self._reset_user_password(user)
user_updated = True
if new_email is not None:
self._update_user_email(user, new_email, with_confirmation)
user_updated = True
db.session.commit()
return user, user_updated, new_password