API - allow admin to reset password for a given user

This commit is contained in:
Sam 2022-03-13 09:04:46 +01:00
parent d13a3704c5
commit e8ca600e4a
6 changed files with 238 additions and 58 deletions

View File

@ -22,3 +22,15 @@ def email_updated_to_new_address_mock() -> Iterator[MagicMock]:
def password_change_email_mock() -> Iterator[MagicMock]:
with patch('fittrackee.users.auth.password_change_email') as mock:
yield mock
@pytest.fixture()
def user_password_change_email_mock() -> Iterator[MagicMock]:
with patch('fittrackee.users.users.password_change_email') as mock:
yield mock
@pytest.fixture()
def user_reset_password_email() -> Iterator[MagicMock]:
with patch('fittrackee.users.users.reset_password_email') as mock:
yield mock

View File

@ -8,11 +8,11 @@ from flask import Flask
from freezegun import freeze_time
from fittrackee.users.models import User, UserSportPreference
from fittrackee.users.utils.random import random_string
from fittrackee.users.utils.token import get_user_token
from fittrackee.workouts.models import Sport, Workout
from ..api_test_case import ApiTestCaseMixin
from ..utils import random_string
USER_AGENT = (
'Mozilla/5.0 (X11; Linux x86_64; rv:98.0) Gecko/20100101 Firefox/98.0'

View File

@ -1,11 +1,12 @@
import json
from datetime import datetime, timedelta
from io import BytesIO
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from flask import Flask
from fittrackee.users.models import User, UserSportPreference
from fittrackee.utils import get_readable_duration
from fittrackee.workouts.models import Sport, Workout
from ..api_test_case import ApiTestCaseMixin
@ -841,7 +842,7 @@ class TestGetUserPicture(ApiTestCaseMixin):
class TestUpdateUser(ApiTestCaseMixin):
def test_it_adds_admin_rights_to_a_user(
def test_it_returns_error_if_payload_is_empty(
self, app: Flask, user_1_admin: User, user_2: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
@ -849,52 +850,7 @@ class TestUpdateUser(ApiTestCaseMixin):
)
response = client.patch(
'/api/users/toto',
content_type='application/json',
data=json.dumps(dict(admin=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
data = json.loads(response.data.decode())
assert response.status_code == 200
assert 'success' in data['status']
assert len(data['data']['users']) == 1
user = data['data']['users'][0]
assert user['email'] == 'toto@toto.com'
assert user['admin'] is True
def test_it_removes_admin_rights_to_a_user(
self, app: Flask, user_1_admin: User, user_2: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
response = client.patch(
'/api/users/toto',
content_type='application/json',
data=json.dumps(dict(admin=False)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
data = json.loads(response.data.decode())
assert response.status_code == 200
assert 'success' in data['status']
assert len(data['data']['users']) == 1
user = data['data']['users'][0]
assert user['email'] == 'toto@toto.com'
assert user['admin'] is False
def test_it_returns_error_if_payload_for_admin_rights_is_empty(
self, app: Flask, user_1_admin: User, user_2: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
response = client.patch(
'/api/users/toto',
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict()),
headers=dict(Authorization=f'Bearer {auth_token}'),
@ -910,13 +866,19 @@ class TestUpdateUser(ApiTestCaseMixin):
)
response = client.patch(
'/api/users/toto',
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(admin="")),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
self.assert_500(response)
assert response.status_code == 500
data = json.loads(response.data.decode())
assert 'error' in data['status']
assert (
'error, please try again or contact the administrator'
in data['message']
)
def test_it_returns_error_if_user_can_not_change_admin_rights(
self, app: Flask, user_1: User, user_2: User
@ -926,7 +888,7 @@ class TestUpdateUser(ApiTestCaseMixin):
)
response = client.patch(
'/api/users/toto',
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(admin=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
@ -934,6 +896,163 @@ class TestUpdateUser(ApiTestCaseMixin):
self.assert_403(response)
def test_it_adds_admin_rights_to_a_user(
self, app: Flask, user_1_admin: User, user_2: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
response = client.patch(
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(admin=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
assert response.status_code == 200
data = json.loads(response.data.decode())
assert 'success' in data['status']
assert len(data['data']['users']) == 1
user = data['data']['users'][0]
assert user['email'] == 'toto@toto.com'
assert user['admin'] is True
def test_it_removes_admin_rights_to_a_user(
self, app: Flask, user_1_admin: User, user_2: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
response = client.patch(
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(admin=False)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
assert response.status_code == 200
data = json.loads(response.data.decode())
assert 'success' in data['status']
assert len(data['data']['users']) == 1
user = data['data']['users'][0]
assert user['email'] == 'toto@toto.com'
assert user['admin'] is False
def test_it_does_not_send_email_when_only_admin_rights_update(
self,
app: Flask,
user_1_admin: User,
user_2: User,
user_password_change_email_mock: MagicMock,
user_reset_password_email: MagicMock,
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
response = client.patch(
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(admin=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
assert response.status_code == 200
user_password_change_email_mock.send.assert_not_called()
user_reset_password_email.send.assert_not_called()
def test_it_resets_user_password(
self, app: Flask, user_1_admin: User, user_2: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
user_2_password = user_2.password
response = client.patch(
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(reset_password=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
assert response.status_code == 200
assert user_2.password != user_2_password
def test_it_calls_password_change_email_when_password_reset_is_successful(
self,
app: Flask,
user_1_admin: User,
user_2: User,
user_password_change_email_mock: MagicMock,
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
response = client.patch(
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(reset_password=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
assert response.status_code == 200
user_password_change_email_mock.send.assert_called_once_with(
{
'language': 'en',
'email': user_2.email,
},
{
'username': user_2.username,
'fittrackee_url': 'http://0.0.0.0:5000',
},
)
def test_it_calls_reset_password_email_when_password_reset_is_successful(
self,
app: Flask,
user_1_admin: User,
user_2: User,
user_reset_password_email: MagicMock,
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1_admin.email
)
with patch(
'fittrackee.users.users.User.encode_password_reset_token',
return_value='xxx',
):
response = client.patch(
f'/api/users/{user_2.username}',
content_type='application/json',
data=json.dumps(dict(reset_password=True)),
headers=dict(Authorization=f'Bearer {auth_token}'),
)
assert response.status_code == 200
user_reset_password_email.send.assert_called_once_with(
{
'language': 'en',
'email': user_2.email,
},
{
'expiration_delay': get_readable_duration(
app.config['PASSWORD_TOKEN_EXPIRATION_SECONDS'],
'en',
),
'username': user_2.username,
'password_reset_url': (
'http://0.0.0.0:5000/password-reset?token=xxx'
),
'fittrackee_url': 'http://0.0.0.0:5000',
},
)
class TestDeleteUser(ApiTestCaseMixin):
def test_user_can_delete_its_own_account(

View File

@ -12,8 +12,7 @@ from fittrackee.users.utils.controls import (
is_valid_email,
register_controls,
)
from ..utils import random_string
from fittrackee.users.utils.random import random_string
class TestSetAdminRights:

View File

@ -1,12 +1,14 @@
import os
import random
import shutil
from typing import Any, Dict, Tuple, Union
import click
from flask import Blueprint, request, send_file
from flask import Blueprint, current_app, request, send_file
from sqlalchemy import exc
from fittrackee import db
from fittrackee import bcrypt, db
from fittrackee.emails.tasks import password_change_email, reset_password_email
from fittrackee.files import get_absolute_file_path
from fittrackee.responses import (
ForbiddenErrorResponse,
@ -16,12 +18,14 @@ from fittrackee.responses import (
UserNotFoundErrorResponse,
handle_error_and_return_response,
)
from fittrackee.utils import get_readable_duration
from fittrackee.workouts.models import Record, Workout, WorkoutSegment
from .decorators import authenticate, authenticate_as_admin
from .exceptions import UserNotFoundException
from .models import User, UserSportPreference
from .utils.admin import set_admin_rights
from .utils.random import random_string
users_blueprint = Blueprint('users', __name__)
@ -487,16 +491,62 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
:statuscode 500:
"""
user_data = request.get_json()
if not user_data or user_data.get('admin') is None:
if not user_data:
return InvalidPayloadErrorResponse()
send_emails = False
try:
user = User.query.filter_by(username=user_name).first()
if not user:
return UserNotFoundErrorResponse()
if 'admin' in user_data:
user.admin = user_data['admin']
if (
'reset_password' in user_data
and user_data['reset_password'] is True
):
new_password = random_string(length=random.randint(10, 20))
user.password = bcrypt.generate_password_hash(
new_password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
send_emails = True
db.session.commit()
if send_emails:
user_language = 'en' if user.language is None else user.language
ui_url = current_app.config['UI_URL']
user_data = {
'language': user_language,
'email': user.email,
}
password_change_email.send(
user_data,
{
'username': user.username,
'fittrackee_url': ui_url,
},
)
password_reset_token = user.encode_password_reset_token(user.id)
reset_password_email.send(
user_data,
{
'expiration_delay': get_readable_duration(
current_app.config[
'PASSWORD_TOKEN_EXPIRATION_SECONDS'
],
user_language,
),
'username': user.username,
'password_reset_url': (
f'{ui_url}/password-reset?token={password_reset_token}'
),
'fittrackee_url': ui_url,
},
)
return {
'status': 'success',
'data': {'users': [user.serialize()]},