API - blacklist jwt on user logout

This commit is contained in:
Sam 2022-09-14 15:15:03 +02:00
parent 841edc76c6
commit aad02cd93c
6 changed files with 238 additions and 7 deletions

View File

@ -17,4 +17,5 @@ Authentication
auth.request_password_reset,
auth.update_user_account,
auth.update_password,
auth.update_email
auth.update_email,
auth.logout_user

View File

@ -1,4 +1,4 @@
"""add OAuth 2.0
"""add OAuth 2.0 and blacklisted tokens
Revision ID: 84d840ce853b
Revises: 5e3a3a31c432
@ -67,11 +67,20 @@ def upgrade():
)
op.create_index(op.f('ix_oauth2_token_refresh_token'), 'oauth2_token', ['refresh_token'], unique=False)
op.create_index(op.f('ix_oauth2_token_user_id'), 'oauth2_token', ['user_id'], unique=False)
op.create_table('blacklisted_tokens',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=500), nullable=False),
sa.Column('blacklisted_on', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('token')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('blacklisted_tokens')
op.drop_index(op.f('ix_oauth2_token_user_id'), table_name='oauth2_token')
op.drop_index(op.f('ix_oauth2_token_refresh_token'), table_name='oauth2_token')
op.drop_table('oauth2_token')

View File

@ -8,7 +8,8 @@ import pytest
from flask import Flask
from freezegun import freeze_time
from fittrackee.users.models import User, UserSportPreference
from fittrackee import db
from fittrackee.users.models import BlacklistedToken, User, UserSportPreference
from fittrackee.users.utils.token import get_user_token
from fittrackee.workouts.models import Sport
@ -500,6 +501,22 @@ class TestUserProfile(ApiTestCaseMixin):
self.assert_invalid_token(response)
def test_it_returns_error_if_token_is_blacklisted(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
db.session.add(BlacklistedToken(token=auth_token))
db.session.commit()
response = client.get(
'/api/auth/profile',
headers=dict(Authorization=f'Bearer {auth_token}'),
)
self.assert_invalid_token(response)
def test_it_returns_user(self, app: Flask, user_1: User) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
@ -2562,3 +2579,85 @@ class TestResendAccountConfirmationEmail(ApiTestCaseMixin):
self.assert_404_with_message(
response, 'the requested URL was not found on the server'
)
class TestUserLogout(ApiTestCaseMixin):
def test_it_returns_error_when_headers_are_missing(
self, app: Flask
) -> None:
client = app.test_client()
response = client.post('/api/auth/logout', headers=dict())
self.assert_401(response, 'provide a valid auth token')
def test_it_returns_error_when_token_is_invalid(self, app: Flask) -> None:
client = app.test_client()
response = client.post(
'/api/auth/logout', headers=dict(Authorization='Bearer invalid')
)
self.assert_invalid_token(response)
def test_it_returns_error_when_token_is_expired(
self, app: Flask, user_1: User
) -> None:
now = datetime.utcnow()
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
with freeze_time(now + timedelta(seconds=4)):
response = client.post(
'/api/auth/logout',
headers=dict(Authorization=f'Bearer {auth_token}'),
)
self.assert_invalid_token(response)
def test_user_can_logout(self, app: Flask, user_1: User) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
response = client.post(
'/api/auth/logout',
headers=dict(Authorization=f'Bearer {auth_token}'),
)
data = json.loads(response.data.decode())
assert data['status'] == 'success'
assert data['message'] == 'successfully logged out'
assert response.status_code == 200
def test_token_is_blacklisted_on_logout(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
client.post(
'/api/auth/logout',
headers=dict(Authorization=f'Bearer {auth_token}'),
)
token = BlacklistedToken.query.filter_by(token=auth_token).first()
assert token.blacklisted_on is not None
def test_it_returns_error_if_token_is_already_blacklisted(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
db.session.add(BlacklistedToken(token=auth_token))
db.session.commit()
response = client.post(
'/api/auth/logout',
headers=dict(Authorization=f'Bearer {auth_token}'),
)
self.assert_invalid_token(response)

View File

@ -1,10 +1,14 @@
from datetime import datetime, timedelta
from typing import Dict
import pytest
from flask import Flask
from freezegun import freeze_time
from fittrackee import db
from fittrackee.tests.utils import random_string
from fittrackee.users.exceptions import UserNotFoundException
from fittrackee.users.models import User, UserSportPreference
from fittrackee.users.models import BlacklistedToken, User, UserSportPreference
from fittrackee.workouts.models import Sport, Workout
@ -195,7 +199,7 @@ class TestUserRecords(UserModelAssertMixin):
assert serialized_user['total_distance'] == 10
assert serialized_user['total_duration'] == '1:00:00'
def test_it_returns_totals_when_user_has_mutiple_workouts(
def test_it_returns_totals_when_user_has_multiple_workouts(
self,
app: Flask,
user_1: User,
@ -284,6 +288,37 @@ class TestUserModelToken:
assert isinstance(auth_token, str)
assert User.decode_auth_token(auth_token) == user_1.id
def test_it_returns_error_when_token_is_invalid(
self, app: Flask, user_1: User
) -> None:
assert (
User.decode_auth_token(random_string())
== 'invalid token, please log in again'
)
def test_it_returns_error_when_token_is_expired(
self, app: Flask, user_1: User
) -> None:
auth_token = user_1.encode_auth_token(user_1.id)
now = datetime.utcnow()
with freeze_time(now + timedelta(seconds=4)):
assert (
User.decode_auth_token(auth_token)
== 'signature expired, please log in again'
)
def test_it_returns_error_when_token_is_blacklisted(
self, app: Flask, user_1: User
) -> None:
auth_token = user_1.encode_auth_token(user_1.id)
db.session.add(BlacklistedToken(token=auth_token))
db.session.commit()
assert (
User.decode_auth_token(auth_token)
== 'blacklisted token, please log in again'
)
class TestUserSportModel:
def test_user_model(

View File

@ -33,7 +33,7 @@ from fittrackee.responses import (
from fittrackee.utils import get_readable_duration
from fittrackee.workouts.models import Sport
from .models import User, UserSportPreference
from .models import BlacklistedToken, User, UserSportPreference
from .utils.controls import check_password, is_valid_email, register_controls
from .utils.token import decode_user_token
@ -1536,3 +1536,70 @@ def resend_account_confirmation_email() -> Union[Dict, HttpResponse]:
return response
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/logout', methods=['POST'])
@require_auth()
def logout_user(auth_user: User) -> Union[Tuple[Dict, int], HttpResponse]:
"""
User logout.
If a valid token is provided, it will be blacklisted.
**Example request**:
.. sourcecode:: http
POST /api/auth/logout HTTP/1.1
Content-Type: application/json
**Example responses**:
- successful logout
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"message": "successfully logged out",
"status": "success"
}
- error on logout
.. sourcecode:: http
HTTP/1.1 401 UNAUTHORIZED
Content-Type: application/json
{
"message": "provide a valid auth token",
"status": "error"
}
:reqheader Authorization: OAuth 2.0 Bearer Token
:statuscode 200: successfully logged out
:statuscode 401:
- provide a valid auth token
- The access token provided is expired, revoked, malformed, or invalid
for other reasons.
:statuscode 500:
- error on token blacklist
"""
auth_token = request.headers.get('Authorization', '').split(' ')[1]
try:
db.session.add(BlacklistedToken(token=auth_token))
db.session.commit()
except Exception:
return {
'status': 'error',
'message': 'error on token blacklist',
}, 500
return {
'status': 'success',
'message': 'successfully logged out',
}, 200

View File

@ -97,7 +97,11 @@ class User(BaseModel):
:return: integer|string
"""
try:
return decode_user_token(auth_token)
resp = decode_user_token(auth_token)
is_blacklisted = BlacklistedToken.check(auth_token)
if is_blacklisted:
return 'blacklisted token, please log in again'
return resp
except jwt.ExpiredSignatureError:
return 'signature expired, please log in again'
except jwt.InvalidTokenError:
@ -233,3 +237,19 @@ class UserSportPreference(BaseModel):
'is_active': self.is_active,
'stopped_speed_threshold': self.stopped_speed_threshold,
}
class BlacklistedToken(BaseModel):
__tablename__ = 'blacklisted_tokens'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(500), unique=True, nullable=False)
blacklisted_on = db.Column(db.DateTime, nullable=False)
def __init__(self, token: str) -> None:
self.token = token
self.blacklisted_on = datetime.utcnow()
@classmethod
def check(cls, auth_token: str) -> bool:
return cls.query.filter_by(token=str(auth_token)).first() is not None