API - add tests on JWT generation
This commit is contained in:
parent
8dc7761c48
commit
98a3e3fec1
@ -1,6 +1,12 @@
|
|||||||
from unittest.mock import patch
|
from calendar import timegm
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Dict
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
import jwt
|
||||||
import pytest
|
import pytest
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
|
|
||||||
from fittrackee import bcrypt
|
from fittrackee import bcrypt
|
||||||
@ -17,6 +23,7 @@ from fittrackee.users.utils.controls import (
|
|||||||
is_valid_email,
|
is_valid_email,
|
||||||
register_controls,
|
register_controls,
|
||||||
)
|
)
|
||||||
|
from fittrackee.users.utils.token import decode_user_token, get_user_token
|
||||||
|
|
||||||
from ..utils import random_email
|
from ..utils import random_email
|
||||||
|
|
||||||
@ -333,3 +340,174 @@ class TestRegisterControls:
|
|||||||
'username: 3 to 30 characters required\n'
|
'username: 3 to 30 characters required\n'
|
||||||
'email: valid email must be provided\n'
|
'email: valid email must be provided\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetUserToken:
|
||||||
|
@staticmethod
|
||||||
|
def decode_token(app: Flask, token: str) -> Dict:
|
||||||
|
return jwt.decode(
|
||||||
|
token,
|
||||||
|
app.config['SECRET_KEY'],
|
||||||
|
algorithms=['HS256'],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_token_is_encoded_with_hs256(self, app: Flask) -> None:
|
||||||
|
token = get_user_token(user_id=1)
|
||||||
|
|
||||||
|
decoded_token = self.decode_token(app, token)
|
||||||
|
assert list(decoded_token.keys()) == ['exp', 'iat', 'sub']
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('input_password_reset', [True, False])
|
||||||
|
def test_token_contains_user_id(
|
||||||
|
self, app: Flask, input_password_reset: bool
|
||||||
|
) -> None:
|
||||||
|
user_id = 1
|
||||||
|
token = get_user_token(
|
||||||
|
user_id=user_id, password_reset=input_password_reset
|
||||||
|
)
|
||||||
|
|
||||||
|
decoded_token = self.decode_token(app, token)
|
||||||
|
assert decoded_token['sub'] == user_id
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('input_password_reset', [True, False])
|
||||||
|
def test_token_contains_timestamp_of_when_it_is_issued(
|
||||||
|
self, app: Flask, input_password_reset: bool
|
||||||
|
) -> None:
|
||||||
|
user_id = 1
|
||||||
|
iat = datetime.utcnow()
|
||||||
|
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
|
||||||
|
datetime_mock.utcnow = Mock(return_value=iat)
|
||||||
|
|
||||||
|
token = get_user_token(
|
||||||
|
user_id=user_id, password_reset=input_password_reset
|
||||||
|
)
|
||||||
|
|
||||||
|
decoded_token = self.decode_token(app, token)
|
||||||
|
assert decoded_token['iat'] == timegm(iat.utctimetuple())
|
||||||
|
|
||||||
|
def test_token_contains_timestamp_of_when_it_expired(
|
||||||
|
self, app: Flask
|
||||||
|
) -> None:
|
||||||
|
user_id = 1
|
||||||
|
iat = datetime.utcnow()
|
||||||
|
expiration = timedelta(
|
||||||
|
days=app.config['TOKEN_EXPIRATION_DAYS'],
|
||||||
|
seconds=app.config['TOKEN_EXPIRATION_SECONDS'],
|
||||||
|
)
|
||||||
|
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
|
||||||
|
datetime_mock.utcnow = Mock(return_value=iat)
|
||||||
|
|
||||||
|
token = get_user_token(user_id=user_id)
|
||||||
|
|
||||||
|
decoded_token = self.decode_token(app, token)
|
||||||
|
assert decoded_token['exp'] == timegm(
|
||||||
|
(iat + expiration).utctimetuple()
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_password_token_contains_timestamp_of_when_it_expired(
|
||||||
|
self, app: Flask
|
||||||
|
) -> None:
|
||||||
|
user_id = 1
|
||||||
|
iat = datetime.utcnow()
|
||||||
|
expiration = timedelta(
|
||||||
|
days=0.0,
|
||||||
|
seconds=app.config['PASSWORD_TOKEN_EXPIRATION_SECONDS'],
|
||||||
|
)
|
||||||
|
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
|
||||||
|
datetime_mock.utcnow = Mock(return_value=iat)
|
||||||
|
|
||||||
|
token = get_user_token(user_id=user_id, password_reset=True)
|
||||||
|
|
||||||
|
decoded_token = self.decode_token(app, token)
|
||||||
|
assert decoded_token['exp'] == timegm(
|
||||||
|
(iat + expiration).utctimetuple()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDecodeUserToken:
|
||||||
|
@staticmethod
|
||||||
|
def generate_token(user_id: int, now: datetime) -> str:
|
||||||
|
with patch('fittrackee.users.utils.token.datetime') as datetime_mock:
|
||||||
|
datetime_mock.utcnow = Mock(return_value=now)
|
||||||
|
token = get_user_token(user_id)
|
||||||
|
return token
|
||||||
|
|
||||||
|
def test_it_raises_error_when_token_is_invalid(self, app: Flask) -> None:
|
||||||
|
with pytest.raises(jwt.exceptions.DecodeError):
|
||||||
|
|
||||||
|
decode_user_token(random_string())
|
||||||
|
|
||||||
|
def test_it_raises_error_when_token_body_is_invalid(
|
||||||
|
self, app: Flask
|
||||||
|
) -> None:
|
||||||
|
token = self.generate_token(user_id=1, now=datetime.utcnow())
|
||||||
|
header, body, signature = token.split('.')
|
||||||
|
modified_token = f'{header}.{random_string()}.{signature}'
|
||||||
|
with pytest.raises(
|
||||||
|
jwt.exceptions.InvalidSignatureError,
|
||||||
|
match='Signature verification failed',
|
||||||
|
):
|
||||||
|
|
||||||
|
decode_user_token(modified_token)
|
||||||
|
|
||||||
|
def test_it_raises_error_when_secret_key_is_invalid(
|
||||||
|
self, app: Flask
|
||||||
|
) -> None:
|
||||||
|
now = datetime.utcnow()
|
||||||
|
token = jwt.encode(
|
||||||
|
{
|
||||||
|
'exp': now + timedelta(minutes=1),
|
||||||
|
'iat': now,
|
||||||
|
'sub': 1,
|
||||||
|
},
|
||||||
|
random_string(),
|
||||||
|
algorithm='HS256',
|
||||||
|
)
|
||||||
|
with pytest.raises(
|
||||||
|
jwt.exceptions.InvalidSignatureError,
|
||||||
|
match='Signature verification failed',
|
||||||
|
):
|
||||||
|
|
||||||
|
decode_user_token(token)
|
||||||
|
|
||||||
|
def test_it_raises_error_when_algorithm_is_not_hs256(
|
||||||
|
self, app: Flask
|
||||||
|
) -> None:
|
||||||
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||||
|
private_key = key.private_bytes(
|
||||||
|
serialization.Encoding.PEM,
|
||||||
|
serialization.PrivateFormat.PKCS8,
|
||||||
|
serialization.NoEncryption(),
|
||||||
|
)
|
||||||
|
now = datetime.utcnow()
|
||||||
|
token = jwt.encode(
|
||||||
|
{
|
||||||
|
'exp': now + timedelta(minutes=1),
|
||||||
|
'iat': now,
|
||||||
|
'sub': 1,
|
||||||
|
},
|
||||||
|
private_key.decode(),
|
||||||
|
algorithm="RS256",
|
||||||
|
)
|
||||||
|
with pytest.raises(jwt.exceptions.InvalidAlgorithmError):
|
||||||
|
|
||||||
|
decode_user_token(token)
|
||||||
|
|
||||||
|
def test_it_raises_error_when_token_is_expired(self, app: Flask) -> None:
|
||||||
|
now = datetime.utcnow() - timedelta(minutes=10)
|
||||||
|
token = self.generate_token(user_id=1, now=now)
|
||||||
|
with pytest.raises(
|
||||||
|
jwt.exceptions.ExpiredSignatureError, match='Signature has expired'
|
||||||
|
):
|
||||||
|
|
||||||
|
decode_user_token(token)
|
||||||
|
|
||||||
|
def test_it_returns_user_id(self, app: Flask) -> None:
|
||||||
|
expected_user_id = 1
|
||||||
|
token = self.generate_token(
|
||||||
|
user_id=expected_user_id, now=datetime.utcnow()
|
||||||
|
)
|
||||||
|
|
||||||
|
user_id = decode_user_token(token)
|
||||||
|
|
||||||
|
assert user_id == expected_user_id
|
||||||
|
@ -21,10 +21,11 @@ def get_user_token(
|
|||||||
if password_reset
|
if password_reset
|
||||||
else current_app.config['TOKEN_EXPIRATION_SECONDS']
|
else current_app.config['TOKEN_EXPIRATION_SECONDS']
|
||||||
)
|
)
|
||||||
|
now = datetime.utcnow()
|
||||||
payload = {
|
payload = {
|
||||||
'exp': datetime.utcnow()
|
'exp': now
|
||||||
+ timedelta(days=expiration_days, seconds=expiration_seconds),
|
+ timedelta(days=expiration_days, seconds=expiration_seconds),
|
||||||
'iat': datetime.utcnow(),
|
'iat': now,
|
||||||
'sub': user_id,
|
'sub': user_id,
|
||||||
}
|
}
|
||||||
return jwt.encode(
|
return jwt.encode(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user