FitTrackee/fittrackee/tests/users/test_users_utils.py
2023-08-09 08:29:36 +02:00

727 lines
24 KiB
Python

import time
from calendar import timegm
from datetime import datetime, timedelta
from typing import Dict, Optional
from unittest.mock import Mock, patch
import jwt
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from flask import Flask
from fittrackee import bcrypt, db
from fittrackee.users.exceptions import (
InvalidEmailException,
UserCreationException,
UserNotFoundException,
)
from fittrackee.users.models import BlacklistedToken, User
from fittrackee.users.utils.admin import UserManagerService
from fittrackee.users.utils.controls import (
check_password,
check_username,
is_valid_email,
register_controls,
)
from fittrackee.users.utils.token import (
clean_blacklisted_tokens,
decode_user_token,
get_user_token,
)
from ..utils import random_email, random_int, random_string
class TestUserManagerServiceUserUpdate:
def test_it_raises_exception_if_user_does_not_exist(
self, app: Flask
) -> None:
user_manager_service = UserManagerService(username=random_string())
with pytest.raises(UserNotFoundException):
user_manager_service.update()
def test_it_does_not_update_user_when_no_args_provided(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
_, user_updated, _ = user_manager_service.update()
assert user_updated is False
def test_it_returns_user(self, app: Flask, user_1: User) -> None:
user_manager_service = UserManagerService(username=user_1.username)
user, _, _ = user_manager_service.update()
assert user == user_1
def test_it_sets_admin_right_for_a_given_user(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
user_manager_service.update(is_admin=True)
assert user_1.admin is True
def test_it_return_updated_user_flag_to_true(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
_, user_updated, _ = user_manager_service.update(is_admin=True)
assert user_updated is True
def test_it_does_not_raise_exception_when_user_has_already_admin_right(
self, app: Flask, user_1_admin: User
) -> None:
user_manager_service = UserManagerService(
username=user_1_admin.username
)
user_manager_service.update(is_admin=True)
assert user_1_admin.admin is True
def test_it_activates_account_if_user_is_inactive(
self, app: Flask, inactive_user: User
) -> None:
user_manager_service = UserManagerService(
username=inactive_user.username
)
user_manager_service.update(is_admin=True)
assert inactive_user.admin is True
assert inactive_user.is_active is True
assert inactive_user.confirmation_token is None
def test_it_activates_given_user_account(
self, app: Flask, inactive_user: User
) -> None:
user_manager_service = UserManagerService(
username=inactive_user.username
)
user_manager_service.update(activate=True)
assert inactive_user.is_active is True
def test_it_empties_confirmation_token(
self, app: Flask, inactive_user: User
) -> None:
user_manager_service = UserManagerService(
username=inactive_user.username
)
user_manager_service.update(activate=True)
assert inactive_user.confirmation_token is None
def test_it_does_not_raise_error_if_user_account_already_activated(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
user_manager_service.update(activate=True)
assert user_1.is_active is True
def test_it_resets_user_password(self, app: Flask, user_1: User) -> None:
previous_password = user_1.password
user_manager_service = UserManagerService(username=user_1.username)
user_manager_service.update(reset_password=True)
assert user_1.password != previous_password
def test_new_password_is_encrypted(self, app: Flask, user_1: User) -> None:
user_manager_service = UserManagerService(username=user_1.username)
_, _, new_password = user_manager_service.update(reset_password=True)
assert bcrypt.check_password_hash(user_1.password, new_password)
def test_it_raises_exception_if_provided_email_is_invalid(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
with pytest.raises(
InvalidEmailException, match='valid email must be provided'
):
user_manager_service.update(new_email=random_string())
def test_it_raises_exception_if_provided_email_is_current_user_email(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
with pytest.raises(
InvalidEmailException,
match='new email must be different than current email',
):
user_manager_service.update(new_email=user_1.email)
def test_it_updates_user_email_to_confirm(
self, app: Flask, user_1: User
) -> None:
new_email = random_email()
current_email = user_1.email
user_manager_service = UserManagerService(username=user_1.username)
user_manager_service.update(new_email=new_email)
assert user_1.email == current_email
assert user_1.email_to_confirm == new_email
assert user_1.confirmation_token is not None
def test_it_updates_user_email(self, app: Flask, user_1: User) -> None:
new_email = random_email()
user_manager_service = UserManagerService(username=user_1.username)
user_manager_service.update(
new_email=new_email, with_confirmation=False
)
assert user_1.email == new_email
assert user_1.email_to_confirm is None
assert user_1.confirmation_token is None
class TestUserManagerServiceUserCreation:
def test_it_raises_exception_if_provided_username_is_invalid(
self, app: Flask
) -> None:
user_manager_service = UserManagerService(username='.admin')
with pytest.raises(
UserCreationException,
match=(
'username: only alphanumeric characters and '
'the underscore character "_" allowed\n'
),
):
user_manager_service.create(email=random_email())
def test_it_raises_exception_if_a_user_exists_with_same_username(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=user_1.username)
with pytest.raises(
UserCreationException,
match='sorry, that username is already taken',
):
user_manager_service.create(email=random_email())
def test_it_raises_exception_if_provided_email_is_invalid(
self, app: Flask
) -> None:
user_manager_service = UserManagerService(username=random_string())
with pytest.raises(
UserCreationException, match='valid email must be provided'
):
user_manager_service.create(email=random_string())
def test_it_raises_exception_if_a_user_exists_with_same_email(
self, app: Flask, user_1: User
) -> None:
user_manager_service = UserManagerService(username=random_string())
with pytest.raises(
UserCreationException,
match='This user already exists. No action done.',
):
user_manager_service.create(email=user_1.email)
def test_it_creates_user_with_provided_password(self, app: Flask) -> None:
username = random_string()
email = random_email()
password = random_string()
user_manager_service = UserManagerService(username=username)
new_user, user_password = user_manager_service.create(email, password)
assert new_user
assert new_user.username == username
assert new_user.email == email
assert bcrypt.check_password_hash(new_user.password, password)
assert user_password == password
def test_it_creates_user_when_password_is_not_provided(
self, app: Flask
) -> None:
username = random_string()
email = random_email()
user_manager_service = UserManagerService(username=username)
new_user, user_password = user_manager_service.create(email)
assert new_user
assert new_user.username == username
assert new_user.email == email
assert bcrypt.check_password_hash(new_user.password, user_password)
def test_it_creates_when_registration_is_not_enabled(
self,
app_with_3_users_max: Flask,
user_1: User,
user_2: User,
user_3: User,
) -> None:
username = random_string()
email = random_email()
user_manager_service = UserManagerService(username=username)
new_user, user_password = user_manager_service.create(email)
assert new_user
assert new_user.username == username
assert new_user.email == email
assert bcrypt.check_password_hash(new_user.password, user_password)
def test_created_user_is_inactive(self, app: Flask) -> None:
username = random_string()
user_manager_service = UserManagerService(username=username)
new_user, _ = user_manager_service.create(email=random_email())
assert new_user
assert new_user.is_active is False
assert new_user.confirmation_token is not None
def test_created_user_has_no_admin_rights(self, app: Flask) -> None:
username = random_string()
user_manager_service = UserManagerService(username=username)
new_user, _ = user_manager_service.create(email=random_email())
assert new_user
assert new_user.admin is False
def test_created_user_does_not_accept_privacy_policy(
self, app: Flask
) -> None:
username = random_string()
user_manager_service = UserManagerService(username=username)
new_user, _ = user_manager_service.create(email=random_email())
assert new_user
assert new_user.accepted_policy_date is None
def test_created_user_timezone_is_europe_paris(self, app: Flask) -> None:
username = random_string()
user_manager_service = UserManagerService(username=username)
new_user, _ = user_manager_service.create(email=random_email())
assert new_user
assert new_user.timezone == 'Europe/Paris'
def test_created_user_date_format_is_MM_dd_yyyy( # noqa
self, app: Flask
) -> None:
username = random_string()
user_manager_service = UserManagerService(username=username)
new_user, _ = user_manager_service.create(email=random_email())
assert new_user
assert new_user.date_format == 'MM/dd/yyyy'
def test_created_user_language_is_en(self, app: Flask) -> None:
username = random_string()
user_manager_service = UserManagerService(username=username)
new_user, _ = user_manager_service.create(email=random_email())
assert new_user
assert new_user.language == 'en'
class TestIsValidEmail:
@pytest.mark.parametrize(
('input_email',),
[
(None,),
('',),
('foo',),
('foo@',),
('@foo.fr',),
('foo@foo',),
('.',),
('./',),
],
)
def test_it_returns_false_if_email_is_invalid(
self, input_email: str
) -> None:
assert is_valid_email(input_email) is False
@pytest.mark.parametrize(
('input_email',),
[
('admin@example.com',),
('admin@test.example.com',),
('admin.site@test.example.com',),
('admin-site@test-example.com',),
],
)
def test_it_returns_true_if_email_is_valid(self, input_email: str) -> None:
assert is_valid_email(input_email) is True
class TestCheckPasswords:
@pytest.mark.parametrize(
('input_password_length',),
[
(0,),
(3,),
(7,),
],
)
def test_it_returns_error_message_string_if_password_length_is_below_8_characters( # noqa
self, input_password_length: int
) -> None:
password = random_string(input_password_length)
assert check_password(password) == (
'password: 8 characters required\n'
)
@pytest.mark.parametrize(
('input_password_length',),
[
(8,),
(10,),
],
)
def test_it_returns_empty_string_when_password_length_exceeds_7_characters(
self, input_password_length: int
) -> None:
password = random_string(input_password_length)
assert check_password(password) == ''
class TestIsUsernameValid:
@pytest.mark.parametrize(
('input_username_length',),
[
(2,),
(31,),
],
)
def test_it_returns_error_message_when_username_length_is_invalid(
self, input_username_length: int
) -> None:
assert (
check_username(
username=random_string(31),
)
== 'username: 3 to 30 characters required\n'
)
@pytest.mark.parametrize(
('input_invalid_character',),
[
('.',),
('/',),
('$',),
],
)
def test_it_returns_error_message_when_username_has_invalid_character(
self, input_invalid_character: str
) -> None:
username = random_string() + input_invalid_character
assert check_username(username=username) == (
'username: only alphanumeric characters and the '
'underscore character "_" allowed\n'
)
def test_it_returns_empty_string_when_username_is_valid(self) -> None:
assert check_username(username=random_string()) == ''
def test_it_returns_multiple_errors(self) -> None:
username = random_string(31) + '.'
assert check_username(username=username) == (
'username: 3 to 30 characters required\n'
'username: only alphanumeric characters and the underscore '
'character "_" allowed\n'
)
class TestRegisterControls:
module_path = 'fittrackee.users.utils.controls.'
valid_username = random_string()
valid_email = f'{random_string()}@example.com'
valid_password = random_string()
def test_it_calls_all_validators(self) -> None:
with patch(
self.module_path + 'check_password'
) as check_passwords_mock, patch(
self.module_path + 'check_username'
) as check_username_mock, patch(
self.module_path + 'is_valid_email'
) as is_valid_email_mock:
register_controls(
self.valid_username,
self.valid_email,
self.valid_password,
)
check_passwords_mock.assert_called_once_with(self.valid_password)
check_username_mock.assert_called_once_with(self.valid_username)
is_valid_email_mock.assert_called_once_with(self.valid_email)
def test_it_returns_empty_string_when_inputs_are_valid(self) -> None:
assert (
register_controls(
self.valid_username,
self.valid_email,
self.valid_password,
)
== ''
)
def test_it_returns_multiple_errors_when_inputs_are_invalid(self) -> None:
invalid_username = random_string(31)
assert register_controls(
username=invalid_username,
email=invalid_username,
password=random_string(8),
) == (
'username: 3 to 30 characters required\n'
'email: valid email must be provided\n'
)
class TestGetUserToken:
@staticmethod
def decode_token(app: Flask, token: str) -> Dict:
return jwt.decode(
token,
app.config['SECRET_KEY'],
algorithms=['HS256'],
)
def test_token_is_encoded_with_hs256(self, app: Flask) -> None:
token = get_user_token(user_id=1)
decoded_token = self.decode_token(app, token)
assert list(decoded_token.keys()) == ['exp', 'iat', 'sub']
@pytest.mark.parametrize('input_password_reset', [True, False])
def test_token_contains_user_id(
self, app: Flask, input_password_reset: bool
) -> None:
user_id = 1
token = get_user_token(
user_id=user_id, password_reset=input_password_reset
)
decoded_token = self.decode_token(app, token)
assert decoded_token['sub'] == user_id
@pytest.mark.parametrize('input_password_reset', [True, False])
def test_token_contains_timestamp_of_when_it_is_issued(
self, app: Flask, input_password_reset: bool
) -> None:
user_id = 1
iat = datetime.utcnow()
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
datetime_mock.utcnow = Mock(return_value=iat)
token = get_user_token(
user_id=user_id, password_reset=input_password_reset
)
decoded_token = self.decode_token(app, token)
assert decoded_token['iat'] == timegm(iat.utctimetuple())
def test_token_contains_timestamp_of_when_it_expired(
self, app: Flask
) -> None:
user_id = 1
iat = datetime.utcnow()
expiration = timedelta(
days=app.config['TOKEN_EXPIRATION_DAYS'],
seconds=app.config['TOKEN_EXPIRATION_SECONDS'],
)
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
datetime_mock.utcnow = Mock(return_value=iat)
token = get_user_token(user_id=user_id)
decoded_token = self.decode_token(app, token)
assert decoded_token['exp'] == timegm(
(iat + expiration).utctimetuple()
)
def test_password_token_contains_timestamp_of_when_it_expired(
self, app: Flask
) -> None:
user_id = 1
iat = datetime.utcnow()
expiration = timedelta(
days=0.0,
seconds=app.config['PASSWORD_TOKEN_EXPIRATION_SECONDS'],
)
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
datetime_mock.utcnow = Mock(return_value=iat)
token = get_user_token(user_id=user_id, password_reset=True)
decoded_token = self.decode_token(app, token)
assert decoded_token['exp'] == timegm(
(iat + expiration).utctimetuple()
)
class TestDecodeUserToken:
@staticmethod
def generate_token(user_id: int, now: datetime) -> str:
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
datetime_mock.utcnow = Mock(return_value=now)
token = get_user_token(user_id)
return token
def test_it_raises_error_when_token_is_invalid(self, app: Flask) -> None:
with pytest.raises(jwt.exceptions.DecodeError):
decode_user_token(random_string())
def test_it_raises_error_when_token_body_is_invalid(
self, app: Flask
) -> None:
token = self.generate_token(user_id=1, now=datetime.utcnow())
header, body, signature = token.split('.')
modified_token = f'{header}.{random_string()}.{signature}'
with pytest.raises(
jwt.exceptions.InvalidSignatureError,
match='Signature verification failed',
):
decode_user_token(modified_token)
def test_it_raises_error_when_secret_key_is_invalid(
self, app: Flask
) -> None:
now = datetime.utcnow()
token = jwt.encode(
{
'exp': now + timedelta(minutes=1),
'iat': now,
'sub': 1,
},
random_string(),
algorithm='HS256',
)
with pytest.raises(
jwt.exceptions.InvalidSignatureError,
match='Signature verification failed',
):
decode_user_token(token)
def test_it_raises_error_when_algorithm_is_not_hs256(
self, app: Flask
) -> None:
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_key = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
)
now = datetime.utcnow()
token = jwt.encode(
{
'exp': now + timedelta(minutes=1),
'iat': now,
'sub': 1,
},
private_key.decode(),
algorithm="RS256",
)
with pytest.raises(jwt.exceptions.InvalidAlgorithmError):
decode_user_token(token)
def test_it_raises_error_when_token_is_expired(self, app: Flask) -> None:
now = datetime.utcnow() - timedelta(minutes=10)
token = self.generate_token(user_id=1, now=now)
with pytest.raises(
jwt.exceptions.ExpiredSignatureError, match='Signature has expired'
):
decode_user_token(token)
def test_it_returns_user_id(self, app: Flask) -> None:
expected_user_id = 1
token = self.generate_token(
user_id=expected_user_id, now=datetime.utcnow()
)
user_id = decode_user_token(token)
assert user_id == expected_user_id
class TestBlacklistedTokensCleanup:
@staticmethod
def blacklisted_token(expiration_days: Optional[int] = None) -> str:
token = get_user_token(user_id=random_int())
blacklisted_token = BlacklistedToken(token=token)
if expiration_days is not None:
blacklisted_token.expired_at = int(time.time()) - (
expiration_days * 86400
)
db.session.add(blacklisted_token)
db.session.commit()
return token
def test_it_returns_0_as_count_when_no_blacklisted_token_deleted(
self, app: Flask, user_1: User
) -> None:
count = clean_blacklisted_tokens(days=30)
assert count == 0
def test_it_does_not_delete_blacklisted_token_when_not_expired(
self, app: Flask, user_1: User
) -> None:
token = self.blacklisted_token()
clean_blacklisted_tokens(days=10)
existing_token = BlacklistedToken.query.filter_by(token=token).first()
assert existing_token is not None
def test_it_deletes_blacklisted_token_when_expired_more_then_provided_days(
self, app: Flask, user_1: User
) -> None:
token = self.blacklisted_token(expiration_days=40)
clean_blacklisted_tokens(days=30)
existing_token = BlacklistedToken.query.filter_by(token=token).first()
assert existing_token is None
def test_it_does_not_delete_blacklisted_token_when_expired_below_provided_days( # noqa
self, app: Flask, user_1: User
) -> None:
token = self.blacklisted_token(expiration_days=30)
clean_blacklisted_tokens(days=40)
existing_token = BlacklistedToken.query.filter_by(token=token).first()
assert existing_token is not None
def test_it_returns_deleted_rows_count(
self, app: Flask, user_1: User
) -> None:
self.blacklisted_token()
for _ in range(3):
self.blacklisted_token(expiration_days=30)
count = clean_blacklisted_tokens(
days=app.config['TOKEN_EXPIRATION_DAYS']
)
assert count == 3