CLI - add command to clean blacklisted tokens
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
import time
|
||||
from calendar import timegm
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict
|
||||
from typing import Dict, Optional
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import jwt
|
||||
@ -9,13 +10,12 @@ from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from flask import Flask
|
||||
|
||||
from fittrackee import bcrypt
|
||||
from fittrackee.tests.utils import random_string
|
||||
from fittrackee import bcrypt, db
|
||||
from fittrackee.users.exceptions import (
|
||||
InvalidEmailException,
|
||||
UserNotFoundException,
|
||||
)
|
||||
from fittrackee.users.models import User
|
||||
from fittrackee.users.models import BlacklistedToken, User
|
||||
from fittrackee.users.utils.admin import UserManagerService
|
||||
from fittrackee.users.utils.controls import (
|
||||
check_password,
|
||||
@ -23,9 +23,13 @@ from fittrackee.users.utils.controls import (
|
||||
is_valid_email,
|
||||
register_controls,
|
||||
)
|
||||
from fittrackee.users.utils.token import decode_user_token, get_user_token
|
||||
from fittrackee.users.utils.token import (
|
||||
clean_blacklisted_tokens,
|
||||
decode_user_token,
|
||||
get_user_token,
|
||||
)
|
||||
|
||||
from ..utils import random_email
|
||||
from ..utils import random_email, random_int, random_string
|
||||
|
||||
|
||||
class TestUserManagerService:
|
||||
@ -511,3 +515,67 @@ class TestDecodeUserToken:
|
||||
user_id = decode_user_token(token)
|
||||
|
||||
assert user_id == expected_user_id
|
||||
|
||||
|
||||
class TestBlacklistedTokensCleanup:
|
||||
@staticmethod
|
||||
def blacklisted_token(expiration_days: Optional[int] = None) -> str:
|
||||
token = get_user_token(user_id=random_int())
|
||||
blacklisted_token = BlacklistedToken(token=token)
|
||||
if expiration_days is not None:
|
||||
blacklisted_token.expired_at = int(time.time()) - (
|
||||
expiration_days * 86400
|
||||
)
|
||||
db.session.add(blacklisted_token)
|
||||
db.session.commit()
|
||||
return token
|
||||
|
||||
def test_it_returns_0_as_count_when_no_blacklisted_token_deleted(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
count = clean_blacklisted_tokens(days=30)
|
||||
|
||||
assert count == 0
|
||||
|
||||
def test_it_does_not_delete_blacklisted_token_when_not_expired(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
token = self.blacklisted_token()
|
||||
|
||||
clean_blacklisted_tokens(days=10)
|
||||
|
||||
existing_token = BlacklistedToken.query.filter_by(token=token).first()
|
||||
assert existing_token is not None
|
||||
|
||||
def test_it_deletes_blacklisted_token_when_expired_more_then_provided_days(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
token = self.blacklisted_token(expiration_days=40)
|
||||
|
||||
clean_blacklisted_tokens(days=30)
|
||||
|
||||
existing_token = BlacklistedToken.query.filter_by(token=token).first()
|
||||
assert existing_token is None
|
||||
|
||||
def test_it_does_not_delete_blacklisted_token_when_expired_below_provided_days( # noqa
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
token = self.blacklisted_token(expiration_days=30)
|
||||
|
||||
clean_blacklisted_tokens(days=40)
|
||||
|
||||
existing_token = BlacklistedToken.query.filter_by(token=token).first()
|
||||
assert existing_token is not None
|
||||
|
||||
def test_it_returns_deleted_rows_count(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
self.blacklisted_token()
|
||||
for _ in range(3):
|
||||
self.blacklisted_token(expiration_days=30)
|
||||
|
||||
count = clean_blacklisted_tokens(
|
||||
days=app.config['TOKEN_EXPIRATION_DAYS']
|
||||
)
|
||||
|
||||
assert count == 3
|
||||
|
Reference in New Issue
Block a user