3126 lines
99 KiB
Python
3126 lines
99 KiB
Python
import json
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from typing import Optional, Union
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
from flask import Flask
|
|
from freezegun import freeze_time
|
|
|
|
from fittrackee import db
|
|
from fittrackee.users.models import (
|
|
BlacklistedToken,
|
|
User,
|
|
UserDataExport,
|
|
UserSportPreference,
|
|
)
|
|
from fittrackee.users.utils.token import get_user_token
|
|
from fittrackee.workouts.models import Sport
|
|
|
|
from ..mixins import ApiTestCaseMixin
|
|
from ..utils import OAUTH_SCOPES, jsonify_dict
|
|
|
|
USER_AGENT = (
|
|
'Mozilla/5.0 (X11; Linux x86_64; rv:98.0) Gecko/20100101 Firefox/98.0'
|
|
)
|
|
|
|
|
|
class TestUserRegistration(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_payload_is_empty(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_accepted_policy_is_missing(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_accepted_policy_is_false(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=False,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(
|
|
response,
|
|
'sorry, you must agree privacy policy to register',
|
|
)
|
|
|
|
def test_it_returns_error_if_username_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
@pytest.mark.parametrize(
|
|
'input_username_length',
|
|
[1, 31],
|
|
)
|
|
def test_it_returns_error_if_username_length_is_invalid(
|
|
self, app: Flask, input_username_length: int
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(length=input_username_length),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response, 'username: 3 to 30 characters required\n')
|
|
|
|
@pytest.mark.parametrize(
|
|
'input_description,input_username',
|
|
[
|
|
('account_handle', '@sam@example.com'),
|
|
('with special characters', 'sam*'),
|
|
],
|
|
)
|
|
def test_it_returns_error_if_username_is_invalid(
|
|
self, app: Flask, input_description: str, input_username: str
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=input_username,
|
|
email=self.random_email(),
|
|
password=self.random_email(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(
|
|
response,
|
|
'username: only alphanumeric characters and '
|
|
'the underscore character "_" allowed\n',
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
'text_transformation',
|
|
['upper', 'lower'],
|
|
)
|
|
def test_it_returns_error_if_user_already_exists_with_same_username(
|
|
self, app: Flask, user_1: User, text_transformation: str
|
|
) -> None:
|
|
client = app.test_client()
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=(
|
|
user_1.username.upper()
|
|
if text_transformation == 'upper'
|
|
else user_1.username.lower()
|
|
),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response, 'sorry, that username is already taken')
|
|
|
|
def test_it_returns_error_if_password_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_password_is_too_short(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(length=7),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response, 'password: 8 characters required\n')
|
|
|
|
def test_it_returns_error_if_email_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_email_is_invalid(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_string(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response, 'email: valid email must be provided\n')
|
|
|
|
def test_it_does_not_send_email_after_error(
|
|
self, app: Flask, account_confirmation_email_mock: Mock
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
account_confirmation_email_mock.send.assert_not_called()
|
|
|
|
def test_it_returns_success_if_payload_is_valid(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.content_type == 'application/json'
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert 'auth_token' not in data
|
|
|
|
def test_it_creates_user_with_default_date_format(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
username = self.random_string()
|
|
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=username,
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
new_user = User.query.filter_by(username=username).first()
|
|
assert new_user.date_format == 'MM/dd/yyyy'
|
|
|
|
@pytest.mark.parametrize(
|
|
'input_language,expected_language',
|
|
[('en', 'en'), ('fr', 'fr'), ('invalid', 'en'), (None, 'en')],
|
|
)
|
|
def test_it_creates_user_with_inactive_account(
|
|
self,
|
|
app: Flask,
|
|
input_language: Optional[str],
|
|
expected_language: str,
|
|
) -> None:
|
|
client = app.test_client()
|
|
username = self.random_string()
|
|
email = self.random_email()
|
|
accepted_policy_date = datetime.utcnow()
|
|
|
|
with patch('fittrackee.users.auth.datetime.datetime') as datetime_mock:
|
|
datetime_mock.utcnow = Mock(return_value=accepted_policy_date)
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=username,
|
|
email=email,
|
|
password=self.random_string(),
|
|
language=input_language,
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
new_user = User.query.filter_by(username=username).first()
|
|
assert new_user.email == email
|
|
assert new_user.password is not None
|
|
assert new_user.is_active is False
|
|
assert new_user.language == expected_language
|
|
assert new_user.accepted_policy_date == accepted_policy_date
|
|
|
|
@pytest.mark.parametrize(
|
|
'input_language,expected_language',
|
|
[('en', 'en'), ('fr', 'fr'), ('invalid', 'en'), (None, 'en')],
|
|
)
|
|
def test_it_calls_account_confirmation_email_when_payload_is_valid(
|
|
self,
|
|
app: Flask,
|
|
account_confirmation_email_mock: Mock,
|
|
input_language: Optional[str],
|
|
expected_language: str,
|
|
) -> None:
|
|
client = app.test_client()
|
|
email = self.random_email()
|
|
username = self.random_string()
|
|
expected_token = self.random_string()
|
|
|
|
with patch('secrets.token_urlsafe', return_value=expected_token):
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=username,
|
|
email=email,
|
|
password='12345678',
|
|
language=input_language,
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
account_confirmation_email_mock.send.assert_called_once_with(
|
|
{
|
|
'language': expected_language,
|
|
'email': email,
|
|
},
|
|
{
|
|
'username': username,
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
'account_confirmation_url': (
|
|
'http://0.0.0.0:5000/account-confirmation'
|
|
f'?token={expected_token}'
|
|
),
|
|
},
|
|
)
|
|
|
|
def test_it_does_not_call_account_confirmation_email_when_email_sending_is_disabled( # noqa
|
|
self,
|
|
app_wo_email_activation: Flask,
|
|
account_confirmation_email_mock: Mock,
|
|
) -> None:
|
|
client = app_wo_email_activation.test_client()
|
|
email = self.random_email()
|
|
username = self.random_string()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=username,
|
|
email=email,
|
|
password='12345678',
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
account_confirmation_email_mock.send.assert_not_called()
|
|
|
|
@pytest.mark.parametrize(
|
|
'text_transformation',
|
|
['upper', 'lower'],
|
|
)
|
|
def test_it_does_not_return_error_if_a_user_already_exists_with_same_email(
|
|
self, app: Flask, user_1: User, text_transformation: str
|
|
) -> None:
|
|
client = app.test_client()
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=(
|
|
user_1.email.upper()
|
|
if text_transformation == 'upper'
|
|
else user_1.email.lower()
|
|
),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.content_type == 'application/json'
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert 'auth_token' not in data
|
|
|
|
def test_it_does_not_call_account_confirmation_email_if_user_already_exists( # noqa
|
|
self, app: Flask, user_1: User, account_confirmation_email_mock: Mock
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=user_1.email,
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
account_confirmation_email_mock.send.assert_not_called()
|
|
|
|
|
|
class TestUserLogin(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_payload_is_empty(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/login',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_user_does_not_exists(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/login',
|
|
data=json.dumps(
|
|
dict(email=self.random_email(), password=self.random_string())
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_401(response, 'invalid credentials')
|
|
|
|
def test_it_returns_error_if_user_account_is_inactive(
|
|
self, app: Flask, inactive_user: User
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/login',
|
|
data=json.dumps(
|
|
dict(email=inactive_user.email, password='12345678')
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_401(response, 'invalid credentials')
|
|
|
|
def test_it_returns_error_if_password_is_invalid(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/login',
|
|
data=json.dumps(
|
|
dict(email=user_1.email, password=self.random_email())
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_401(response, 'invalid credentials')
|
|
|
|
@pytest.mark.parametrize(
|
|
'text_transformation',
|
|
['upper', 'lower'],
|
|
)
|
|
def test_user_can_login_regardless_username_case(
|
|
self, app: Flask, user_1: User, text_transformation: str
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/login',
|
|
data=json.dumps(
|
|
dict(
|
|
email=(
|
|
user_1.email.upper()
|
|
if text_transformation == 'upper'
|
|
else user_1.email.lower()
|
|
),
|
|
password='12345678',
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.content_type == 'application/json'
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'successfully logged in'
|
|
assert data['auth_token']
|
|
|
|
|
|
class TestUserProfile(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_auth_token_is_missing(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.get('/api/auth/profile')
|
|
|
|
self.assert_401(response, 'provide a valid auth token')
|
|
|
|
def test_it_returns_error_if_auth_token_is_invalid(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.get(
|
|
'/api/auth/profile', headers=dict(Authorization='Bearer invalid')
|
|
)
|
|
|
|
self.assert_invalid_token(response)
|
|
|
|
def test_it_returns_error_if_token_is_blacklisted(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
db.session.add(BlacklistedToken(token=auth_token))
|
|
db.session.commit()
|
|
|
|
response = client.get(
|
|
'/api/auth/profile',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_invalid_token(response)
|
|
|
|
def test_it_returns_user(self, app: Flask, user_1: User) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
'/api/auth/profile',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['data'] == jsonify_dict(user_1.serialize(user_1))
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:read': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self, app: Flask, user_1: User, client_scope: str, can_access: bool
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.get(
|
|
'/api/auth/profile',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestUserProfileUpdate(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_payload_is_empty(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit',
|
|
content_type='application/json',
|
|
data=json.dumps(dict()),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_fields_are_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(first_name=self.random_string())),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_updates_user_profile(self, app: Flask, user_1: User) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
first_name = self.random_string()
|
|
last_name = self.random_string()
|
|
location = self.random_string()
|
|
bio = self.random_string()
|
|
birth_date = '1980-01-01'
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
location=location,
|
|
bio=bio,
|
|
birth_date=birth_date,
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user profile updated'
|
|
assert data['data'] == jsonify_dict(user_1.serialize(user_1))
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:write': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
client_scope: str,
|
|
can_access: bool,
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestUserAccountUpdate(ApiTestCaseMixin):
|
|
@staticmethod
|
|
def assert_no_emails_sent(
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
email_updated_to_current_address_mock.send.assert_not_called()
|
|
email_updated_to_new_address_mock.send.assert_not_called()
|
|
password_change_email_mock.send.assert_not_called()
|
|
|
|
def test_it_returns_error_if_payload_is_empty(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(dict()),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_current_password_is_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, error_message='current password is missing')
|
|
|
|
def test_it_returns_error_if_email_is_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, 'email is missing')
|
|
|
|
def test_it_returns_error_if_current_password_is_invalid(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password=self.random_string(),
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_401(response, error_message='invalid credentials')
|
|
|
|
def test_it_does_not_send_emails_when_error_occurs(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password=self.random_string(),
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_no_emails_sent(
|
|
email_updated_to_current_address_mock,
|
|
email_updated_to_new_address_mock,
|
|
password_change_email_mock,
|
|
)
|
|
|
|
def test_it_does_not_returns_error_if_no_new_password_provided(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user account updated'
|
|
|
|
def test_it_does_not_send_emails_if_no_change(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_no_emails_sent(
|
|
email_updated_to_current_address_mock,
|
|
email_updated_to_new_address_mock,
|
|
password_change_email_mock,
|
|
)
|
|
|
|
def test_it_returns_error_if_new_email_is_invalid(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=self.random_string(),
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, 'email: valid email must be provided\n')
|
|
|
|
def test_it_only_updates_email_to_confirm_if_new_email_provided(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
current_email = user_1.email
|
|
new_email = 'new.email@example.com'
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=new_email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert current_email == user_1.email
|
|
assert new_email == user_1.email_to_confirm
|
|
assert user_1.confirmation_token is not None
|
|
|
|
def test_it_updates_email_when_email_sending_is_disabled(
|
|
self,
|
|
app_wo_email_activation: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app_wo_email_activation, user_1.email
|
|
)
|
|
new_email = 'new.email@example.com'
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=new_email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert user_1.email == new_email
|
|
assert user_1.email_to_confirm is None
|
|
assert user_1.confirmation_token is None
|
|
|
|
def test_it_calls_email_updated_to_current_email_send_when_new_email_provided( # noqa
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
new_email = 'new.email@example.com'
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=new_email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
email_updated_to_current_address_mock.send.assert_called_once_with(
|
|
{
|
|
'language': 'en',
|
|
'email': user_1.email,
|
|
},
|
|
{
|
|
'username': user_1.username,
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
'new_email_address': new_email,
|
|
},
|
|
)
|
|
|
|
def test_it_calls_email_updated_to_new_email_send_when_new_email_provided(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
new_email = 'new.email@example.com'
|
|
expected_token = self.random_string()
|
|
|
|
with patch('secrets.token_urlsafe', return_value=expected_token):
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=new_email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
email_updated_to_new_address_mock.send.assert_called_once_with(
|
|
{
|
|
'language': 'en',
|
|
'email': user_1.email_to_confirm,
|
|
},
|
|
{
|
|
'username': user_1.username,
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
'email_confirmation_url': (
|
|
f'http://0.0.0.0:5000/email-update?token={expected_token}'
|
|
),
|
|
},
|
|
)
|
|
|
|
def test_it_does_not_calls_password_change_email_send_when_new_email_provided( # noqa
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
new_email = 'new.email@example.com'
|
|
expected_token = self.random_string()
|
|
|
|
with patch('secrets.token_urlsafe', return_value=expected_token):
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=new_email,
|
|
password='12345678',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
password_change_email_mock.send.assert_not_called()
|
|
|
|
def test_it_returns_error_if_controls_fail_on_new_password(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
new_password=self.random_string(length=3),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, 'password: 8 characters required')
|
|
|
|
def test_it_updates_auth_user_password_when_new_password_provided(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
current_hashed_password = user_1.password
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user account updated'
|
|
assert current_hashed_password != user_1.password
|
|
|
|
def test_new_password_is_hashed(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
new_password = self.random_string()
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
new_password=new_password,
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert new_password != user_1.password
|
|
|
|
def test_it_calls_password_change_email_when_new_password_provided(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
password_change_email_mock.send.assert_called_once_with(
|
|
{
|
|
'language': 'en',
|
|
'email': user_1.email,
|
|
},
|
|
{
|
|
'username': user_1.username,
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
},
|
|
)
|
|
|
|
def test_it_does_not_call_email_updated_emails_send_when_new_password_provided( # noqa
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=user_1.email,
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
email_updated_to_current_address_mock.send.assert_not_called()
|
|
email_updated_to_new_address_mock.send.assert_not_called()
|
|
|
|
def test_it_updates_email_to_confirm_and_password_when_new_email_and_password_provided( # noqa
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
current_email = user_1.email
|
|
current_hashed_password = user_1.password
|
|
new_email = 'new.email@example.com'
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email=new_email,
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user account updated'
|
|
assert user_1.email == current_email
|
|
assert user_1.email_to_confirm == new_email
|
|
assert user_1.password != current_hashed_password
|
|
|
|
def test_it_calls_all_email_send_when_new_email_and_password_provided(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email='new.email@example.com',
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
email_updated_to_current_address_mock.send.assert_called_once()
|
|
email_updated_to_new_address_mock.send.assert_called_once()
|
|
password_change_email_mock.send.assert_called_once()
|
|
|
|
def test_it_does_not_calls_all_email_send_when_email_sending_is_disabled(
|
|
self,
|
|
app_wo_email_activation: Flask,
|
|
user_1: User,
|
|
email_updated_to_current_address_mock: MagicMock,
|
|
email_updated_to_new_address_mock: MagicMock,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app_wo_email_activation, user_1.email
|
|
)
|
|
|
|
client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
email='new.email@example.com',
|
|
password='12345678',
|
|
new_password=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_no_emails_sent(
|
|
email_updated_to_current_address_mock,
|
|
email_updated_to_new_address_mock,
|
|
password_change_email_mock,
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:write': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
client_scope: str,
|
|
can_access: bool,
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.patch(
|
|
'/api/auth/profile/edit/account',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestUserPreferencesUpdate(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_payload_is_empty(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/preferences',
|
|
content_type='application/json',
|
|
data=json.dumps(dict()),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_fields_are_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/preferences',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(weekm=True)),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
@pytest.mark.parametrize(
|
|
'input_language,expected_language',
|
|
[('en', 'en'), ('fr', 'fr'), ('invalid', 'en'), (None, 'en')],
|
|
)
|
|
def test_it_updates_user_preferences(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
input_language: Optional[str],
|
|
expected_language: str,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/preferences',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
timezone='America/New_York',
|
|
weekm=True,
|
|
language=input_language,
|
|
imperial_units=True,
|
|
display_ascent=False,
|
|
start_elevation_at_zero=False,
|
|
use_dark_mode=True,
|
|
use_raw_gpx_speed=True,
|
|
date_format='yyyy-MM-dd',
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user preferences updated'
|
|
assert data['data']['display_ascent'] is False
|
|
assert data['data']['start_elevation_at_zero'] is False
|
|
assert data['data']['use_raw_gpx_speed'] is True
|
|
assert data['data']['imperial_units'] is True
|
|
assert data['data']['language'] == expected_language
|
|
assert data['data']['timezone'] == 'America/New_York'
|
|
assert data['data']['date_format'] == 'yyyy-MM-dd'
|
|
assert data['data']['weekm'] is True
|
|
assert data['data']['use_dark_mode'] is True
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:write': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
client_scope: str,
|
|
can_access: bool,
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/preferences',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestUserSportPreferencesUpdate(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_payload_is_empty(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(dict()),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_sport_id_is_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(is_active=True)),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_sport_not_found(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(sport_id=1, is_active=True)),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_404_with_entity(response, 'sport')
|
|
|
|
def test_it_returns_error_if_payload_contains_only_sport_id(
|
|
self, app: Flask, user_1: User, sport_1_cycling: Sport
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(sport_id=1)),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_color_is_invalid(
|
|
self, app: Flask, user_1: User, sport_1_cycling: Sport
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
sport_id=sport_1_cycling.id,
|
|
color=self.random_string(),
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, 'invalid hexadecimal color')
|
|
|
|
@pytest.mark.parametrize(
|
|
'input_color',
|
|
['#000000', '#FFF'],
|
|
)
|
|
def test_it_updates_sport_color_for_auth_user(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
sport_2_running: Sport,
|
|
input_color: str,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
sport_id=sport_2_running.id,
|
|
color=input_color,
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user sport preferences updated'
|
|
assert response.status_code == 200
|
|
assert data['data']['user_id'] == user_1.id
|
|
assert data['data']['sport_id'] == sport_2_running.id
|
|
assert data['data']['color'] == input_color
|
|
assert data['data']['is_active'] is True
|
|
assert data['data']['stopped_speed_threshold'] == 0.1
|
|
|
|
def test_it_disables_sport_for_auth_user(
|
|
self, app: Flask, user_1: User, sport_1_cycling: Sport
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
sport_id=sport_1_cycling.id,
|
|
is_active=False,
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user sport preferences updated'
|
|
assert response.status_code == 200
|
|
assert data['data']['user_id'] == user_1.id
|
|
assert data['data']['sport_id'] == sport_1_cycling.id
|
|
assert data['data']['color'] is None
|
|
assert data['data']['is_active'] is False
|
|
assert data['data']['stopped_speed_threshold'] == 1
|
|
|
|
def test_it_updates_stopped_speed_threshold_for_auth_user(
|
|
self, app: Flask, user_1: User, sport_1_cycling: Sport
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
data=json.dumps(
|
|
dict(
|
|
sport_id=sport_1_cycling.id,
|
|
stopped_speed_threshold=0.5,
|
|
)
|
|
),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user sport preferences updated'
|
|
assert response.status_code == 200
|
|
assert data['data']['user_id'] == user_1.id
|
|
assert data['data']['sport_id'] == sport_1_cycling.id
|
|
assert data['data']['color'] is None
|
|
assert data['data']['is_active']
|
|
assert data['data']['stopped_speed_threshold'] == 0.5
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:write': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
client_scope: str,
|
|
can_access: bool,
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/sports',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestUserSportPreferencesReset(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_sport_does_not_exist(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.delete(
|
|
'/api/auth/profile/reset/sports/1',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_404_with_entity(response, 'sport')
|
|
|
|
def test_it_resets_sport_preferences(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
sport_1_cycling: Sport,
|
|
user_sport_1_preference: UserSportPreference,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.delete(
|
|
f'/api/auth/profile/reset/sports/{sport_1_cycling.id}',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 204
|
|
assert (
|
|
UserSportPreference.query.filter_by(
|
|
user_id=user_1.id,
|
|
sport_id=sport_1_cycling.id,
|
|
).first()
|
|
is None
|
|
)
|
|
|
|
def test_it_does_not_raise_error_if_sport_preferences_do_not_exist(
|
|
self, app: Flask, user_1: User, sport_1_cycling: Sport
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.delete(
|
|
f'/api/auth/profile/reset/sports/{sport_1_cycling.id}',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 204
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:write': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
client_scope: str,
|
|
can_access: bool,
|
|
sport_1_cycling: Sport,
|
|
user_sport_1_preference: UserSportPreference,
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.delete(
|
|
f'/api/auth/profile/reset/sports/{sport_1_cycling.id}',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestUserPicture(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_file_is_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
headers=dict(
|
|
content_type='multipart/form-data',
|
|
Authorization=f'Bearer {auth_token}',
|
|
),
|
|
)
|
|
|
|
self.assert_400(response, 'no file part', 'fail')
|
|
|
|
def test_it_returns_error_if_file_is_invalid(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
data=dict(file=(BytesIO(b'avatar'), 'avatar.bmp')),
|
|
headers=dict(
|
|
content_type='multipart/form-data',
|
|
Authorization=f'Bearer {auth_token}',
|
|
),
|
|
)
|
|
|
|
self.assert_400(response, 'file extension not allowed', 'fail')
|
|
|
|
def test_it_returns_error_if_image_size_exceeds_file_limit(
|
|
self,
|
|
app_with_max_file_size: Flask,
|
|
user_1: User,
|
|
sport_1_cycling: Sport,
|
|
gpx_file: str,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app_with_max_file_size, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
data=dict(
|
|
file=(BytesIO(b'test_file_for_avatar' * 50), 'avatar.jpg')
|
|
),
|
|
headers=dict(
|
|
content_type='multipart/form-data',
|
|
Authorization=f'Bearer {auth_token}',
|
|
),
|
|
)
|
|
|
|
data = self.assert_413(
|
|
response,
|
|
'Error during picture upload, file size (1.2KB) exceeds 1.0KB.',
|
|
)
|
|
assert 'data' not in data
|
|
|
|
def test_it_returns_error_if_image_size_exceeds_archive_limit(
|
|
self,
|
|
app_with_max_zip_file_size: Flask,
|
|
user_1: User,
|
|
sport_1_cycling: Sport,
|
|
gpx_file: str,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app_with_max_zip_file_size, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
data=dict(
|
|
file=(BytesIO(b'test_file_for_avatar' * 50), 'avatar.jpg')
|
|
),
|
|
headers=dict(
|
|
content_type='multipart/form-data',
|
|
Authorization=f'Bearer {auth_token}',
|
|
),
|
|
)
|
|
|
|
data = self.assert_413(
|
|
response,
|
|
'Error during picture upload, file size (1.2KB) exceeds 1.0KB.',
|
|
)
|
|
assert 'data' not in data
|
|
|
|
def test_it_updates_user_picture(self, app: Flask, user_1: User) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
data=dict(file=(BytesIO(b'avatar'), 'avatar.png')),
|
|
headers=dict(
|
|
content_type='multipart/form-data',
|
|
Authorization=f'Bearer {auth_token}',
|
|
),
|
|
)
|
|
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user picture updated'
|
|
assert response.status_code == 200
|
|
assert 'avatar.png' in user_1.picture
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
data=dict(file=(BytesIO(b'avatar2'), 'avatar2.png')),
|
|
headers=dict(
|
|
content_type='multipart/form-data',
|
|
Authorization=f'Bearer {auth_token}',
|
|
),
|
|
)
|
|
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'user picture updated'
|
|
assert response.status_code == 200
|
|
assert 'avatar.png' not in user_1.picture
|
|
assert 'avatar2.png' in user_1.picture
|
|
|
|
@pytest.mark.parametrize(
|
|
'client_scope, can_access',
|
|
{**OAUTH_SCOPES, 'profile:write': True}.items(),
|
|
)
|
|
def test_expected_scopes_are_defined(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
client_scope: str,
|
|
can_access: bool,
|
|
) -> None:
|
|
(
|
|
client,
|
|
oauth_client,
|
|
access_token,
|
|
_,
|
|
) = self.create_oauth2_client_and_issue_token(
|
|
app, user_1, scope=client_scope
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/picture',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {access_token}'),
|
|
)
|
|
|
|
self.assert_response_scope(response, can_access)
|
|
|
|
|
|
class TestRegistrationConfiguration(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_it_exceeds_max_users(
|
|
self,
|
|
app_with_3_users_max: Flask,
|
|
user_1_admin: User,
|
|
user_2: User,
|
|
user_3: User,
|
|
) -> None:
|
|
client = app_with_3_users_max.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_403(response, 'error, registration is disabled')
|
|
|
|
def test_it_disables_registration_on_user_registration(
|
|
self,
|
|
app_with_3_users_max: Flask,
|
|
user_1_admin: User,
|
|
user_2: User,
|
|
) -> None:
|
|
client = app_with_3_users_max.test_client()
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_403(response, 'error, registration is disabled')
|
|
|
|
def test_it_does_not_disable_registration_if_users_count_below_limit(
|
|
self,
|
|
app_with_3_users_max: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
client = app_with_3_users_max.test_client()
|
|
client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/register',
|
|
data=json.dumps(
|
|
dict(
|
|
username=self.random_string(),
|
|
email=self.random_email(),
|
|
password=self.random_string(),
|
|
accepted_policy=True,
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestPasswordResetRequest(ApiTestCaseMixin):
|
|
def test_it_returns_error_on_empty_payload(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_on_invalid_payload(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict(username=self.random_string())),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_when_email_sending_is_disabled(
|
|
self, app_wo_email_activation: Flask
|
|
) -> None:
|
|
client = app_wo_email_activation.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict(email='test@test.com')),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_404_with_message(
|
|
response, 'the requested URL was not found on the server'
|
|
)
|
|
|
|
def test_it_requests_password_reset_when_user_exists(
|
|
self, app: Flask, user_1: User, user_reset_password_email: Mock
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict(email='test@test.com')),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'password reset request processed'
|
|
|
|
def test_it_calls_reset_password_email_when_user_exists(
|
|
self, app: Flask, user_1: User, reset_password_email: Mock
|
|
) -> None:
|
|
client = app.test_client()
|
|
token = self.random_string()
|
|
|
|
with patch('jwt.encode', return_value=token):
|
|
client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict(email='test@test.com')),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
reset_password_email.send.assert_called_once_with(
|
|
{
|
|
'language': 'en',
|
|
'email': user_1.email,
|
|
},
|
|
{
|
|
'expiration_delay': 'a minute',
|
|
'username': user_1.username,
|
|
'password_reset_url': (
|
|
f'http://0.0.0.0:5000/password-reset?token={token}'
|
|
),
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
},
|
|
)
|
|
|
|
def test_it_does_not_return_error_when_user_does_not_exist(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict(email='test@test.com')),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'password reset request processed'
|
|
|
|
def test_it_does_not_call_reset_password_email_when_user_does_not_exist(
|
|
self, app: Flask, reset_password_email: Mock
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
client.post(
|
|
'/api/auth/password/reset-request',
|
|
data=json.dumps(dict(email='test@test.com')),
|
|
content_type='application/json',
|
|
)
|
|
|
|
reset_password_email.assert_not_called()
|
|
|
|
|
|
class TestPasswordUpdate(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_payload_is_empty(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_token_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_password_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_token_is_invalid(self, app: Flask) -> None:
|
|
token = get_user_token(1)
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_401(response, 'invalid token, please request a new token')
|
|
|
|
def test_it_returns_error_if_token_is_expired(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
now = datetime.utcnow()
|
|
token = get_user_token(user_1.id, password_reset=True)
|
|
client = app.test_client()
|
|
|
|
with freeze_time(now + timedelta(seconds=61)):
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_401(
|
|
response, 'invalid token, please request a new token'
|
|
)
|
|
|
|
def test_it_returns_error_if_password_is_invalid(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
token = get_user_token(user_1.id, password_reset=True)
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(length=7),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response, 'password: 8 characters required\n')
|
|
|
|
def test_it_does_not_send_email_after_error(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
token = get_user_token(user_1.id, password_reset=True)
|
|
client = app.test_client()
|
|
|
|
client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(length=7),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
password_change_email_mock.assert_not_called()
|
|
|
|
def test_it_updates_password(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
token = get_user_token(user_1.id, password_reset=True)
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'password updated'
|
|
|
|
def test_it_sends_email_after_successful_update(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
token = get_user_token(user_1.id, password_reset=True)
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
password_change_email_mock.send.assert_called_once_with(
|
|
{
|
|
'language': 'en',
|
|
'email': user_1.email,
|
|
},
|
|
{
|
|
'username': user_1.username,
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
},
|
|
)
|
|
|
|
def test_it_does_not_send_email_when_email_sending_is_disabled(
|
|
self,
|
|
app_wo_email_activation: Flask,
|
|
user_1: User,
|
|
password_change_email_mock: MagicMock,
|
|
) -> None:
|
|
token = get_user_token(user_1.id, password_reset=True)
|
|
client = app_wo_email_activation.test_client()
|
|
|
|
client.post(
|
|
'/api/auth/password/update',
|
|
data=json.dumps(
|
|
dict(
|
|
token=token,
|
|
password=self.random_string(),
|
|
)
|
|
),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
password_change_email_mock.send.assert_not_called()
|
|
|
|
|
|
class TestEmailUpdateWitUnauthenticatedUser(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_token_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/email/update',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_token_is_invalid(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/email/update',
|
|
data=json.dumps(dict(token=self.random_string())),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_does_not_update_email_if_token_mismatches(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
user_1.confirmation_token = self.random_string()
|
|
new_email = 'new.email@example.com'
|
|
user_1.email_to_confirm = new_email
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/email/update',
|
|
data=json.dumps(dict(token=self.random_string())),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_updates_email(self, app: Flask, user_1: User) -> None:
|
|
token = self.random_string()
|
|
user_1.confirmation_token = token
|
|
new_email = 'new.email@example.com'
|
|
user_1.email_to_confirm = new_email
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/email/update',
|
|
data=json.dumps(dict(token=token)),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'email updated'
|
|
assert user_1.email == new_email
|
|
assert user_1.email_to_confirm is None
|
|
assert user_1.confirmation_token is None
|
|
|
|
|
|
class TestConfirmationAccount(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_token_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/confirm',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_returns_error_if_token_is_invalid(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/confirm',
|
|
data=json.dumps(dict(token=self.random_string())),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_activates_user_account(
|
|
self, app: Flask, inactive_user: User
|
|
) -> None:
|
|
token = self.random_string()
|
|
inactive_user.confirmation_token = token
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/confirm',
|
|
data=json.dumps(dict(token=token)),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'account confirmation successful'
|
|
assert inactive_user.is_active is True
|
|
assert inactive_user.confirmation_token is None
|
|
|
|
|
|
class TestResendAccountConfirmationEmail(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_email_is_missing(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict()),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_does_not_return_error_if_account_does_not_exist(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=self.random_email())),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'confirmation email resent'
|
|
|
|
def test_it_does_not_return_error_if_account_already_active(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=user_1.email)),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'confirmation email resent'
|
|
|
|
def test_it_does_not_call_account_confirmation_email_if_user_is_active(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
account_confirmation_email_mock: Mock,
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=user_1.email)),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
account_confirmation_email_mock.send.assert_not_called()
|
|
|
|
def test_it_returns_success_if_user_is_inactive(
|
|
self, app: Flask, inactive_user: User
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=inactive_user.email)),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'confirmation email resent'
|
|
|
|
def test_it_updates_token_if_user_is_inactive(
|
|
self, app: Flask, inactive_user: User
|
|
) -> None:
|
|
client = app.test_client()
|
|
previous_token = inactive_user.confirmation_token
|
|
|
|
client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=inactive_user.email)),
|
|
content_type='application/json',
|
|
)
|
|
|
|
assert inactive_user.confirmation_token != previous_token
|
|
|
|
def test_it_calls_account_confirmation_email_if_user_is_inactive(
|
|
self,
|
|
app: Flask,
|
|
inactive_user: User,
|
|
account_confirmation_email_mock: Mock,
|
|
) -> None:
|
|
client = app.test_client()
|
|
expected_token = self.random_string()
|
|
inactive_user.language = 'fr'
|
|
|
|
with patch('secrets.token_urlsafe', return_value=expected_token):
|
|
client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=inactive_user.email)),
|
|
content_type='application/json',
|
|
environ_base={'HTTP_USER_AGENT': USER_AGENT},
|
|
)
|
|
|
|
account_confirmation_email_mock.send.assert_called_once_with(
|
|
{
|
|
'language': inactive_user.language,
|
|
'email': inactive_user.email,
|
|
},
|
|
{
|
|
'username': inactive_user.username,
|
|
'fittrackee_url': 'http://0.0.0.0:5000',
|
|
'operating_system': 'Linux',
|
|
'browser_name': 'Firefox',
|
|
'account_confirmation_url': (
|
|
'http://0.0.0.0:5000/account-confirmation'
|
|
f'?token={expected_token}'
|
|
),
|
|
},
|
|
)
|
|
|
|
def test_it_returns_error_if_email_sending_is_disabled(
|
|
self, app_wo_email_activation: Flask, inactive_user: User
|
|
) -> None:
|
|
client = app_wo_email_activation.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/account/resend-confirmation',
|
|
data=json.dumps(dict(email=inactive_user.email)),
|
|
content_type='application/json',
|
|
)
|
|
|
|
self.assert_404_with_message(
|
|
response, 'the requested URL was not found on the server'
|
|
)
|
|
|
|
|
|
class TestUserLogout(ApiTestCaseMixin):
|
|
def test_it_returns_error_when_headers_are_missing(
|
|
self, app: Flask
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post('/api/auth/logout', headers=dict())
|
|
|
|
self.assert_401(response, 'provide a valid auth token')
|
|
|
|
def test_it_returns_error_when_token_is_invalid(self, app: Flask) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/logout', headers=dict(Authorization='Bearer invalid')
|
|
)
|
|
|
|
self.assert_invalid_token(response)
|
|
|
|
def test_it_returns_error_when_token_is_expired(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
now = datetime.utcnow()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
with freeze_time(now + timedelta(seconds=61)):
|
|
response = client.post(
|
|
'/api/auth/logout',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_invalid_token(response)
|
|
|
|
def test_user_can_logout(self, app: Flask, user_1: User) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/logout',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
data = json.loads(response.data.decode())
|
|
assert data['status'] == 'success'
|
|
assert data['message'] == 'successfully logged out'
|
|
assert response.status_code == 200
|
|
|
|
def test_token_is_blacklisted_on_logout(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.post(
|
|
'/api/auth/logout',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
token = BlacklistedToken.query.filter_by(token=auth_token).first()
|
|
assert token.blacklisted_on is not None
|
|
|
|
def test_it_returns_error_if_token_is_already_blacklisted(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
db.session.add(BlacklistedToken(token=auth_token))
|
|
db.session.commit()
|
|
|
|
response = client.post(
|
|
'/api/auth/logout',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_invalid_token(response)
|
|
|
|
|
|
class TestUserPrivacyPolicyUpdate(ApiTestCaseMixin):
|
|
def test_it_returns_error_if_user_is_not_authenticated(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client = app.test_client()
|
|
|
|
response = client.post(
|
|
'/api/auth/profile/edit/preferences',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(accepted_policy=True)),
|
|
)
|
|
|
|
self.assert_401(response)
|
|
|
|
def test_it_returns_error_if_accepted_policy_is_missing(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/account/privacy-policy',
|
|
content_type='application/json',
|
|
data=json.dumps(dict()),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response)
|
|
|
|
def test_it_updates_accepted_policy(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
accepted_policy_date = datetime.utcnow()
|
|
|
|
with patch('fittrackee.users.auth.datetime.datetime') as datetime_mock:
|
|
datetime_mock.utcnow = Mock(return_value=accepted_policy_date)
|
|
response = client.post(
|
|
'/api/auth/account/privacy-policy',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(accepted_policy=True)),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert user_1.accepted_policy_date == accepted_policy_date
|
|
|
|
@pytest.mark.parametrize('input_accepted_policy', [False, '', None, 'foo'])
|
|
def test_it_return_error_if_user_has_not_accepted_policy(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
input_accepted_policy: Union[str, bool, None],
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/account/privacy-policy',
|
|
content_type='application/json',
|
|
data=json.dumps(dict(accepted_policy=input_accepted_policy)),
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
@patch('fittrackee.users.auth.export_data')
|
|
class TestPostUserDataExportRequest(ApiTestCaseMixin):
|
|
def test_it_returns_data_export_info_when_no_ongoing_request_exists_for_user( # noqa
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
user_2: User,
|
|
) -> None:
|
|
db.session.add(UserDataExport(user_id=user_2.id))
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
data_export_request = UserDataExport.query.filter_by(
|
|
user_id=user_1.id
|
|
).first()
|
|
assert data["status"] == "success"
|
|
assert data["request"] == jsonify_dict(data_export_request.serialize())
|
|
|
|
def test_it_returns_error_if_ongoing_request_exist(
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
db.session.add(UserDataExport(user_id=user_1.id))
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, "ongoing request exists")
|
|
|
|
def test_it_returns_error_if_existing_request_has_not_expired(
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
completed_export_request = UserDataExport(user_id=user_1.id)
|
|
db.session.add(completed_export_request)
|
|
completed_export_request.completed = True
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_400(response, "completed request already exists")
|
|
|
|
def test_it_returns_new_request_if_existing_request_has_expired( # noqa
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
export_expiration = app.config["DATA_EXPORT_EXPIRATION"]
|
|
completed_export_request = UserDataExport(
|
|
user_id=user_1.id,
|
|
created_at=datetime.utcnow() - timedelta(hours=export_expiration),
|
|
)
|
|
db.session.add(completed_export_request)
|
|
completed_export_request.completed = True
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
data_export_request = UserDataExport.query.filter_by(
|
|
user_id=user_1.id
|
|
).first()
|
|
assert data_export_request.id != completed_export_request.id
|
|
assert data["status"] == "success"
|
|
assert data["request"] == jsonify_dict(data_export_request.serialize())
|
|
|
|
def test_it_calls_export_data_tasks_when_request_is_created(
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
data_export_request = UserDataExport.query.filter_by(
|
|
user_id=user_1.id
|
|
).first()
|
|
export_data_mock.send.assert_called_once_with(
|
|
export_request_id=data_export_request.id
|
|
)
|
|
|
|
def test_it_does_not_calls_export_data_tasks_when_request_already_exists(
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
export_expiration = app.config["DATA_EXPORT_EXPIRATION"]
|
|
completed_export_request = UserDataExport(
|
|
user_id=user_1.id,
|
|
created_at=datetime.utcnow() - timedelta(hours=export_expiration),
|
|
)
|
|
db.session.add(completed_export_request)
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
export_data_mock.send.assert_not_called()
|
|
|
|
def test_it_returns_new_request_if_previous_request_has_expired(
|
|
self,
|
|
export_data_mock: Mock,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
export_expiration = app.config["DATA_EXPORT_EXPIRATION"]
|
|
completed_export_request = UserDataExport(
|
|
user_id=user_1.id,
|
|
created_at=datetime.utcnow() - timedelta(hours=export_expiration),
|
|
)
|
|
db.session.add(completed_export_request)
|
|
completed_export_request.completed = True
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
client.post(
|
|
'/api/auth/account/export/request',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
data_export_request = UserDataExport.query.filter_by(
|
|
user_id=user_1.id
|
|
).first()
|
|
export_data_mock.send.assert_called_once_with(
|
|
export_request_id=data_export_request.id
|
|
)
|
|
|
|
|
|
class TestGetUserDataExportRequest(ApiTestCaseMixin):
|
|
def test_it_returns_none_if_no_request(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
user_2: User,
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
'/api/auth/account/export',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data["status"] == "success"
|
|
assert data["request"] is None
|
|
|
|
def test_it_does_not_return_another_user_existing_request(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
user_2: User,
|
|
) -> None:
|
|
export_expiration = app.config["DATA_EXPORT_EXPIRATION"]
|
|
completed_export_request = UserDataExport(
|
|
user_id=user_2.id,
|
|
created_at=datetime.utcnow() - timedelta(hours=export_expiration),
|
|
)
|
|
db.session.add(completed_export_request)
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
'/api/auth/account/export',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data["status"] == "success"
|
|
assert data["request"] is None
|
|
|
|
def test_it_returns_existing_request_for_authenticated_user(
|
|
self,
|
|
app: Flask,
|
|
user_1: User,
|
|
) -> None:
|
|
export_expiration = app.config["DATA_EXPORT_EXPIRATION"]
|
|
completed_export_request = UserDataExport(
|
|
user_id=user_1.id,
|
|
created_at=datetime.utcnow() - timedelta(hours=export_expiration),
|
|
)
|
|
db.session.add(completed_export_request)
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
'/api/auth/account/export',
|
|
content_type='application/json',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data.decode())
|
|
assert data["status"] == "success"
|
|
assert data["request"] == jsonify_dict(
|
|
completed_export_request.serialize()
|
|
)
|
|
|
|
|
|
class TestDownloadExportDataArchive(ApiTestCaseMixin):
|
|
def test_it_returns_404_when_request_export_does_not_exist(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
f'/api/auth/account/export/{self.random_string()}',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_404_with_message(response, 'file not found')
|
|
|
|
def test_it_returns_404_when_request_export_from_another_user(
|
|
self, app: Flask, user_1: User, user_2: User
|
|
) -> None:
|
|
archive_file_name = self.random_string()
|
|
export_request = UserDataExport(user_id=user_2.id)
|
|
db.session.add(export_request)
|
|
export_request.completed = True
|
|
export_request.file_name = archive_file_name
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
f'/api/auth/account/export/{archive_file_name}',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_404_with_message(response, 'file not found')
|
|
|
|
def test_it_returns_404_when_file_name_does_not_match(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
export_request = UserDataExport(user_id=user_1.id)
|
|
db.session.add(export_request)
|
|
export_request.completed = True
|
|
export_request.file_name = self.random_string()
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
|
|
response = client.get(
|
|
f'/api/auth/account/export/{self.random_string()}',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
self.assert_404_with_message(response, 'file not found')
|
|
|
|
def test_it_calls_send_from_directory_if_request_exist(
|
|
self, app: Flask, user_1: User
|
|
) -> None:
|
|
archive_file_name = self.random_string()
|
|
export_request = UserDataExport(user_id=user_1.id)
|
|
db.session.add(export_request)
|
|
export_request.completed = True
|
|
export_request.file_name = archive_file_name
|
|
db.session.commit()
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
app, user_1.email
|
|
)
|
|
with patch('fittrackee.users.auth.send_from_directory') as mock:
|
|
mock.return_value = 'file'
|
|
|
|
client.get(
|
|
f'/api/auth/account/export/{archive_file_name}',
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
)
|
|
|
|
mock.assert_called_once_with(
|
|
f"{app.config['UPLOAD_FOLDER']}/exports/{user_1.id}",
|
|
archive_file_name,
|
|
mimetype='application/zip',
|
|
as_attachment=True,
|
|
)
|