diff --git a/fittrackee/oauth2/config.py b/fittrackee/oauth2/config.py index 0eb17145..f47d0b5a 100644 --- a/fittrackee/oauth2/config.py +++ b/fittrackee/oauth2/config.py @@ -1,7 +1,10 @@ +from authlib.integrations.sqla_oauth2 import create_revocation_endpoint from authlib.oauth2.rfc7636 import CodeChallenge from flask import Flask -from .grants import AuthorizationCodeGrant, RefreshTokenGrant +from fittrackee import db + +from .grants import AuthorizationCodeGrant, OAuth2Token, RefreshTokenGrant from .server import authorization_server @@ -13,3 +16,8 @@ def config_oauth(app: Flask) -> None: AuthorizationCodeGrant, [CodeChallenge(required=True)] ) authorization_server.register_grant(RefreshTokenGrant) + + # support revocation + revocation_cls = create_revocation_endpoint(db.session, OAuth2Token) + revocation_cls.CLIENT_AUTH_METHODS = ['client_secret_post'] + authorization_server.register_endpoint(revocation_cls) diff --git a/fittrackee/oauth2/routes.py b/fittrackee/oauth2/routes.py index a889c9f1..a492ed28 100644 --- a/fittrackee/oauth2/routes.py +++ b/fittrackee/oauth2/routes.py @@ -70,3 +70,8 @@ def authorize(auth_user: User) -> Response: @oauth_blueprint.route('/oauth/token', methods=['POST']) def issue_token() -> Response: return authorization_server.create_token_response() + + +@oauth_blueprint.route('/oauth/revoke', methods=['POST']) +def revoke_token() -> Response: + return authorization_server.create_endpoint_response('revocation') diff --git a/fittrackee/tests/oauth2/test_oauth2_routes.py b/fittrackee/tests/oauth2/test_oauth2_routes.py index 05f36a7e..ba9dbb68 100644 --- a/fittrackee/tests/oauth2/test_oauth2_routes.py +++ b/fittrackee/tests/oauth2/test_oauth2_routes.py @@ -1,5 +1,5 @@ import json -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Tuple, Union from unittest.mock import patch from urllib.parse import parse_qs @@ -11,7 +11,7 @@ from werkzeug.test import TestResponse from fittrackee import db from fittrackee.oauth2.client import create_oauth_client -from fittrackee.oauth2.models import OAuth2Client +from fittrackee.oauth2.models import OAuth2Client, OAuth2Token from fittrackee.users.models import User from ..mixins import ApiTestCaseMixin @@ -37,6 +37,25 @@ class OAuth2TestCase(ApiTestCaseMixin): db.session.commit() return oauth_client + @staticmethod + def authorize_client( + client: FlaskClient, oauth_client: OAuth2Client, auth_token: str + ) -> Union[List[str], str]: + response = client.post( + '/api/oauth/authorize', + data={ + 'client_id': oauth_client.client_id, + 'response_type': 'code', + }, + headers=dict( + Authorization=f'Bearer {auth_token}', + content_type='multipart/form-data', + ), + ) + parsed_url = parse_url(response.location) + code = parse_qs(parsed_url.query).get('code', '') + return code + class TestOAuthClientCreation(OAuth2TestCase): route = '/api/oauth/apps' @@ -274,25 +293,6 @@ class TestOAuthIssueToken(OAuth2TestCase): assert data.get('refresh_token') is not None assert data.get('token_type') == 'Bearer' - @staticmethod - def authorize_client( - client: FlaskClient, oauth_client: OAuth2Client, auth_token: str - ) -> Union[List[str], str]: - response = client.post( - '/api/oauth/authorize', - data={ - 'client_id': oauth_client.client_id, - 'response_type': 'code', - }, - headers=dict( - Authorization=f'Bearer {auth_token}', - content_type='multipart/form-data', - ), - ) - parsed_url = parse_url(response.location) - code = parse_qs(parsed_url.query).get('code', '') - return code - def test_it_returns_error_when_form_is_empty(self, app: Flask) -> None: client = app.test_client() @@ -410,3 +410,49 @@ class TestOAuthIssueToken(OAuth2TestCase): ) self.assert_token_is_returned(response) + + +class TestOAuthTokenRevocation(OAuth2TestCase): + route = '/api/oauth/revoke' + + def create_oauth_client_and_issue_token( + self, client: FlaskClient, user: User, auth_token: str + ) -> Tuple[OAuth2Client, Dict]: + oauth_client = self.create_oauth_client(user) + code = self.authorize_client(client, oauth_client, auth_token) + response = client.post( + '/api/oauth/token', + data={ + 'client_id': oauth_client.client_id, + 'client_secret': oauth_client.client_secret, + 'grant_type': 'authorization_code', + 'code': code, + }, + headers=dict(content_type='multipart/form-data'), + ) + data = json.loads(response.data.decode()) + return oauth_client, data.get('access_token') + + def test_it_revokes_user_token(self, app: Flask, user_1: User) -> None: + client, auth_token = self.get_test_client_and_auth_token( + app, user_1.email + ) + oauth_client, access_token = self.create_oauth_client_and_issue_token( + client, user_1, auth_token + ) + + response = client.post( + self.route, + data={ + 'client_id': oauth_client.client_id, + 'client_secret': oauth_client.client_secret, + 'token': access_token, + }, + headers=dict(content_type='multipart/form-data'), + ) + + assert response.status_code == 200 + token = OAuth2Token.query.filter_by( + client_id=oauth_client.client_id + ).first() + assert token.access_token_revoked_at is not None