From cea049273c9b858f62bfaf3ca2ab3190eb308a70 Mon Sep 17 00:00:00 2001 From: Sam Date: Sat, 23 Apr 2022 13:48:29 +0200 Subject: [PATCH] API & CLI - refacto + new commands to update users --- Makefile | 2 +- fittrackee/tests/users/test_users_utils.py | 147 ++++++++++++++++++++- fittrackee/users/commands.py | 53 +++++++- fittrackee/users/exceptions.py | 4 + fittrackee/users/users.py | 58 +++----- fittrackee/users/utils/admin.py | 92 +++++++++++-- 6 files changed, 291 insertions(+), 65 deletions(-) diff --git a/Makefile b/Makefile index d3ddc492..3f1374ff 100644 --- a/Makefile +++ b/Makefile @@ -164,7 +164,7 @@ serve-python-dev: $(FLASK) run --with-threads -h $(HOST) -p $(PORT) --cert=adhoc set-admin: - $(FTCLI) users set-admin $(USERNAME) + $(FTCLI) users update $(USERNAME) --set-admin true test-e2e: $(PYTEST) e2e --driver firefox $(PYTEST_ARGS) diff --git a/fittrackee/tests/users/test_users_utils.py b/fittrackee/tests/users/test_users_utils.py index c068ab81..cf6dddba 100644 --- a/fittrackee/tests/users/test_users_utils.py +++ b/fittrackee/tests/users/test_users_utils.py @@ -3,10 +3,14 @@ from unittest.mock import patch import pytest from flask import Flask +from fittrackee import bcrypt from fittrackee.tests.utils import random_string -from fittrackee.users.exceptions import UserNotFoundException +from fittrackee.users.exceptions import ( + InvalidEmailException, + UserNotFoundException, +) from fittrackee.users.models import User -from fittrackee.users.utils.admin import set_admin_rights +from fittrackee.users.utils.admin import UserManagerService from fittrackee.users.utils.controls import ( check_password, check_username, @@ -14,37 +18,166 @@ from fittrackee.users.utils.controls import ( register_controls, ) +from ..utils import random_email -class TestSetAdminRights: + +class TestUserManagerService: def test_it_raises_exception_if_user_does_not_exist( self, app: Flask ) -> None: + user_manager_service = UserManagerService(username=random_string()) + with pytest.raises(UserNotFoundException): - set_admin_rights(random_string()) + user_manager_service.update() + + def test_it_does_not_update_user_when_no_args_provided( + self, app: Flask, user_1: User + ) -> None: + user_manager_service = UserManagerService(username=user_1.username) + + _, user_updated, _ = user_manager_service.update() + + assert user_updated is False + + def test_it_returns_user(self, app: Flask, user_1: User) -> None: + user_manager_service = UserManagerService(username=user_1.username) + + user, _, _ = user_manager_service.update() + + assert user == user_1 def test_it_sets_admin_right_for_a_given_user( self, app: Flask, user_1: User ) -> None: - set_admin_rights(user_1.username) + user_manager_service = UserManagerService(username=user_1.username) + + user_manager_service.update(is_admin=True) assert user_1.admin is True + def test_it_return_updated_user_flag_to_true( + self, app: Flask, user_1: User + ) -> None: + user_manager_service = UserManagerService(username=user_1.username) + + _, user_updated, _ = user_manager_service.update(is_admin=True) + + assert user_updated is True + def test_it_does_not_raise_exception_when_user_has_already_admin_right( self, app: Flask, user_1_admin: User ) -> None: - set_admin_rights(user_1_admin.username) + user_manager_service = UserManagerService( + username=user_1_admin.username + ) + + user_manager_service.update(is_admin=True) assert user_1_admin.admin is True def test_it_activates_account_if_user_is_inactive( self, app: Flask, inactive_user: User ) -> None: - set_admin_rights(inactive_user.username) + user_manager_service = UserManagerService( + username=inactive_user.username + ) + + user_manager_service.update(is_admin=True) assert inactive_user.admin is True assert inactive_user.is_active is True assert inactive_user.confirmation_token is None + def test_it_activates_given_user_account( + self, app: Flask, inactive_user: User + ) -> None: + user_manager_service = UserManagerService( + username=inactive_user.username + ) + + user_manager_service.update(activate=True) + + assert inactive_user.is_active is True + + def test_it_empties_confirmation_token( + self, app: Flask, inactive_user: User + ) -> None: + user_manager_service = UserManagerService( + username=inactive_user.username + ) + + user_manager_service.update(activate=True) + + assert inactive_user.confirmation_token is None + + def test_it_does_not_raise_error_if_user_account_already_activated( + self, app: Flask, user_1: User + ) -> None: + user_manager_service = UserManagerService(username=user_1.username) + + user_manager_service.update(activate=True) + + assert user_1.is_active is True + + def test_it_resets_user_password(self, app: Flask, user_1: User) -> None: + previous_password = user_1.password + user_manager_service = UserManagerService(username=user_1.username) + + user_manager_service.update(reset_password=True) + + assert user_1.password != previous_password + + def test_new_password_is_encrypted(self, app: Flask, user_1: User) -> None: + user_manager_service = UserManagerService(username=user_1.username) + + _, _, new_password = user_manager_service.update(reset_password=True) + + assert bcrypt.check_password_hash(user_1.password, new_password) + + def test_it_raises_exception_if_provided_email_is_invalid( + self, app: Flask, user_1: User + ) -> None: + user_manager_service = UserManagerService(username=user_1.username) + with pytest.raises( + InvalidEmailException, match='valid email must be provided' + ): + user_manager_service.update(new_email=random_string()) + + def test_it_raises_exception_if_provided_email_is_current_user_email( + self, app: Flask, user_1: User + ) -> None: + user_manager_service = UserManagerService(username=user_1.username) + with pytest.raises( + InvalidEmailException, + match='new email must be different than curent email', + ): + user_manager_service.update(new_email=user_1.email) + + def test_it_updates_user_email_to_confirm( + self, app: Flask, user_1: User + ) -> None: + new_email = random_email() + current_email = user_1.email + user_manager_service = UserManagerService(username=user_1.username) + + user_manager_service.update(new_email=new_email) + + assert user_1.email == current_email + assert user_1.email_to_confirm == new_email + assert user_1.confirmation_token is not None + + def test_it_updates_user_email(self, app: Flask, user_1: User) -> None: + new_email = random_email() + user_manager_service = UserManagerService(username=user_1.username) + + user_manager_service.update( + new_email=new_email, with_confirmation=False + ) + + assert user_1.email == new_email + assert user_1.email_to_confirm is None + assert user_1.confirmation_token is None + class TestIsValidEmail: @pytest.mark.parametrize( diff --git a/fittrackee/users/commands.py b/fittrackee/users/commands.py index 97679e68..dd8dc4d3 100644 --- a/fittrackee/users/commands.py +++ b/fittrackee/users/commands.py @@ -1,8 +1,10 @@ +from typing import Optional + import click from fittrackee.cli.app import app from fittrackee.users.exceptions import UserNotFoundException -from fittrackee.users.utils.admin import set_admin_rights +from fittrackee.users.utils.admin import UserManagerService @click.group(name='users') @@ -11,13 +13,50 @@ def users_cli() -> None: pass -@users_cli.command('set-admin') +@users_cli.command('update') @click.argument('username') -def set_admin(username: str) -> None: - """Set admin rights for given user""" +@click.option( + '--set-admin', + type=bool, + help='Add/remove admin rights (when adding admin rights, ' + 'it also activates user account if not active).', +) +@click.option('--activate', is_flag=True, help='Activate user account.') +@click.option( + '--reset-password', + is_flag=True, + help='Reset user password (a new password will be displayed).', +) +@click.option('--update-email', type=str, help='Update user email.') +def manage_user( + username: str, + set_admin: Optional[bool], + activate: bool, + reset_password: bool, + update_email: Optional[str], +) -> None: + """Manage giver user account.""" with app.app_context(): try: - set_admin_rights(username) - click.echo(f"User '{username}' updated.") + user_manager_service = UserManagerService(username) + _, is_user_updated, password = user_manager_service.update( + is_admin=set_admin, + with_confirmation=False, + activate=activate, + reset_password=reset_password, + new_email=update_email, + ) + if is_user_updated: + click.echo(f"User '{username}' updated.") + if password: + click.echo(f"The new password is: {password}") + else: + click.echo("No updates.") except UserNotFoundException: - click.echo(f"User '{username}' not found.", err=True) + click.echo( + f"User '{username}' not found.\n" + "Check the provided user name (case sensitive).", + err=True, + ) + except Exception as e: + click.echo(f'An error occurred: {e}', err=True) diff --git a/fittrackee/users/exceptions.py b/fittrackee/users/exceptions.py index 08d04537..33733758 100644 --- a/fittrackee/users/exceptions.py +++ b/fittrackee/users/exceptions.py @@ -1,2 +1,6 @@ +class InvalidEmailException(Exception): + ... + + class UserNotFoundException(Exception): ... diff --git a/fittrackee/users/users.py b/fittrackee/users/users.py index 5efe3704..ec8b76b7 100644 --- a/fittrackee/users/users.py +++ b/fittrackee/users/users.py @@ -1,12 +1,11 @@ import os -import secrets import shutil from typing import Any, Dict, Tuple, Union from flask import Blueprint, current_app, request, send_file from sqlalchemy import exc -from fittrackee import bcrypt, db +from fittrackee import db from fittrackee.emails.tasks import ( email_updated_to_new_address, password_change_email, @@ -21,12 +20,13 @@ from fittrackee.responses import ( UserNotFoundErrorResponse, handle_error_and_return_response, ) -from fittrackee.users.utils.controls import is_valid_email from fittrackee.utils import get_readable_duration from fittrackee.workouts.models import Record, Workout, WorkoutSegment from .decorators import authenticate, authenticate_as_admin +from .exceptions import InvalidEmailException, UserNotFoundException from .models import User, UserSportPreference +from .utils.admin import UserManagerService users_blueprint = Blueprint('users', __name__) @@ -516,46 +516,20 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]: if not user_data: return InvalidPayloadErrorResponse() - send_password_emails = False - send_new_address_email = False try: - user = User.query.filter_by(username=user_name).first() - if not user: - return UserNotFoundErrorResponse() - - if 'admin' in user_data: - user.admin = user_data['admin'] - - if user_data.get('activate', False): - user.is_active = True - user.confirmation_token = None - - if user_data.get('reset_password', False): - new_password = secrets.token_urlsafe(30) - user.password = bcrypt.generate_password_hash( - new_password, current_app.config.get('BCRYPT_LOG_ROUNDS') - ).decode() - send_password_emails = True - - if 'new_email' in user_data: - if is_valid_email(user_data['new_email']): - if user_data['new_email'] == user.email: - return InvalidPayloadErrorResponse( - 'new email must be different than curent email' - ) - user.email_to_confirm = user_data['new_email'] - user.confirmation_token = secrets.token_urlsafe(30) - send_new_address_email = True - else: - return InvalidPayloadErrorResponse( - 'valid email must be provided' - ) - - db.session.commit() + reset_password = user_data.get('reset_password', False) + new_email = user_data.get('new_email') + user_manager_service = UserManagerService(username=user_name) + user, _, _ = user_manager_service.update( + is_admin=user_data.get('admin'), + activate=user_data.get('activate', False), + reset_password=reset_password, + new_email=new_email, + ) user_language = 'en' if user.language is None else user.language ui_url = current_app.config['UI_URL'] - if send_password_emails: + if reset_password: user_data = { 'language': user_language, 'email': user.email, @@ -585,7 +559,7 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]: }, ) - if send_new_address_email: + if new_email: user_data = { 'language': user_language, 'email': user.email_to_confirm, @@ -604,6 +578,10 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]: 'status': 'success', 'data': {'users': [user.serialize(auth_user)]}, } + except UserNotFoundException: + return UserNotFoundErrorResponse() + except InvalidEmailException as e: + return InvalidPayloadErrorResponse(str(e)) except exc.StatementError as e: return handle_error_and_return_response(e, db=db) diff --git a/fittrackee/users/utils/admin.py b/fittrackee/users/utils/admin.py index 2e0a0cbd..4b666f2c 100644 --- a/fittrackee/users/utils/admin.py +++ b/fittrackee/users/utils/admin.py @@ -1,14 +1,86 @@ -from fittrackee import db +import secrets +from typing import Optional, Tuple -from ..exceptions import UserNotFoundException +from flask import current_app + +from fittrackee import bcrypt, db + +from ..exceptions import InvalidEmailException, UserNotFoundException from ..models import User +from ..utils.controls import is_valid_email -def set_admin_rights(username: str) -> None: - user = User.query.filter_by(username=username).first() - if not user: - raise UserNotFoundException() - user.admin = True - user.is_active = True - user.confirmation_token = None - db.session.commit() +class UserManagerService: + def __init__(self, username: str): + self.username = username + + def _get_user(self) -> User: + user = User.query.filter_by(username=self.username).first() + if not user: + raise UserNotFoundException() + return user + + def _update_admin_rights(self, user: User, is_admin: bool) -> None: + user.admin = is_admin + if is_admin: + self._activate_user(user) + + @staticmethod + def _activate_user(user: User) -> None: + user.is_active = True + user.confirmation_token = None + + @staticmethod + def _reset_user_password(user: User) -> str: + new_password = secrets.token_urlsafe(30) + user.password = bcrypt.generate_password_hash( + new_password, current_app.config.get('BCRYPT_LOG_ROUNDS') + ).decode() + return new_password + + @staticmethod + def _update_user_email( + user: User, new_email: str, with_confirmation: bool + ) -> None: + if not is_valid_email(new_email): + raise InvalidEmailException('valid email must be provided') + if user.email == new_email: + raise InvalidEmailException( + 'new email must be different than curent email' + ) + if with_confirmation: + user.email_to_confirm = new_email + user.confirmation_token = secrets.token_urlsafe(30) + else: + user.email = new_email + + def update( + self, + is_admin: Optional[bool] = None, + activate: bool = False, + reset_password: bool = False, + new_email: Optional[str] = None, + with_confirmation: bool = True, + ) -> Tuple[User, bool, Optional[str]]: + user_updated = False + new_password = None + user = self._get_user() + + if is_admin is not None: + self._update_admin_rights(user, is_admin) + user_updated = True + + if activate: + self._activate_user(user) + user_updated = True + + if reset_password: + new_password = self._reset_user_password(user) + user_updated = True + + if new_email is not None: + self._update_user_email(user, new_email, with_confirmation) + user_updated = True + + db.session.commit() + return user, user_updated, new_password