API - add access token revocation

This commit is contained in:
Sam 2022-05-27 14:46:03 +02:00
parent 887553dd5d
commit eeae632b01
3 changed files with 81 additions and 22 deletions

View File

@ -1,7 +1,10 @@
from authlib.integrations.sqla_oauth2 import create_revocation_endpoint
from authlib.oauth2.rfc7636 import CodeChallenge from authlib.oauth2.rfc7636 import CodeChallenge
from flask import Flask from flask import Flask
from .grants import AuthorizationCodeGrant, RefreshTokenGrant from fittrackee import db
from .grants import AuthorizationCodeGrant, OAuth2Token, RefreshTokenGrant
from .server import authorization_server from .server import authorization_server
@ -13,3 +16,8 @@ def config_oauth(app: Flask) -> None:
AuthorizationCodeGrant, [CodeChallenge(required=True)] AuthorizationCodeGrant, [CodeChallenge(required=True)]
) )
authorization_server.register_grant(RefreshTokenGrant) authorization_server.register_grant(RefreshTokenGrant)
# support revocation
revocation_cls = create_revocation_endpoint(db.session, OAuth2Token)
revocation_cls.CLIENT_AUTH_METHODS = ['client_secret_post']
authorization_server.register_endpoint(revocation_cls)

View File

@ -70,3 +70,8 @@ def authorize(auth_user: User) -> Response:
@oauth_blueprint.route('/oauth/token', methods=['POST']) @oauth_blueprint.route('/oauth/token', methods=['POST'])
def issue_token() -> Response: def issue_token() -> Response:
return authorization_server.create_token_response() return authorization_server.create_token_response()
@oauth_blueprint.route('/oauth/revoke', methods=['POST'])
def revoke_token() -> Response:
return authorization_server.create_endpoint_response('revocation')

View File

@ -1,5 +1,5 @@
import json import json
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Tuple, Union
from unittest.mock import patch from unittest.mock import patch
from urllib.parse import parse_qs from urllib.parse import parse_qs
@ -11,7 +11,7 @@ from werkzeug.test import TestResponse
from fittrackee import db from fittrackee import db
from fittrackee.oauth2.client import create_oauth_client from fittrackee.oauth2.client import create_oauth_client
from fittrackee.oauth2.models import OAuth2Client from fittrackee.oauth2.models import OAuth2Client, OAuth2Token
from fittrackee.users.models import User from fittrackee.users.models import User
from ..mixins import ApiTestCaseMixin from ..mixins import ApiTestCaseMixin
@ -37,6 +37,25 @@ class OAuth2TestCase(ApiTestCaseMixin):
db.session.commit() db.session.commit()
return oauth_client return oauth_client
@staticmethod
def authorize_client(
client: FlaskClient, oauth_client: OAuth2Client, auth_token: str
) -> Union[List[str], str]:
response = client.post(
'/api/oauth/authorize',
data={
'client_id': oauth_client.client_id,
'response_type': 'code',
},
headers=dict(
Authorization=f'Bearer {auth_token}',
content_type='multipart/form-data',
),
)
parsed_url = parse_url(response.location)
code = parse_qs(parsed_url.query).get('code', '')
return code
class TestOAuthClientCreation(OAuth2TestCase): class TestOAuthClientCreation(OAuth2TestCase):
route = '/api/oauth/apps' route = '/api/oauth/apps'
@ -274,25 +293,6 @@ class TestOAuthIssueToken(OAuth2TestCase):
assert data.get('refresh_token') is not None assert data.get('refresh_token') is not None
assert data.get('token_type') == 'Bearer' assert data.get('token_type') == 'Bearer'
@staticmethod
def authorize_client(
client: FlaskClient, oauth_client: OAuth2Client, auth_token: str
) -> Union[List[str], str]:
response = client.post(
'/api/oauth/authorize',
data={
'client_id': oauth_client.client_id,
'response_type': 'code',
},
headers=dict(
Authorization=f'Bearer {auth_token}',
content_type='multipart/form-data',
),
)
parsed_url = parse_url(response.location)
code = parse_qs(parsed_url.query).get('code', '')
return code
def test_it_returns_error_when_form_is_empty(self, app: Flask) -> None: def test_it_returns_error_when_form_is_empty(self, app: Flask) -> None:
client = app.test_client() client = app.test_client()
@ -410,3 +410,49 @@ class TestOAuthIssueToken(OAuth2TestCase):
) )
self.assert_token_is_returned(response) self.assert_token_is_returned(response)
class TestOAuthTokenRevocation(OAuth2TestCase):
route = '/api/oauth/revoke'
def create_oauth_client_and_issue_token(
self, client: FlaskClient, user: User, auth_token: str
) -> Tuple[OAuth2Client, Dict]:
oauth_client = self.create_oauth_client(user)
code = self.authorize_client(client, oauth_client, auth_token)
response = client.post(
'/api/oauth/token',
data={
'client_id': oauth_client.client_id,
'client_secret': oauth_client.client_secret,
'grant_type': 'authorization_code',
'code': code,
},
headers=dict(content_type='multipart/form-data'),
)
data = json.loads(response.data.decode())
return oauth_client, data.get('access_token')
def test_it_revokes_user_token(self, app: Flask, user_1: User) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client, access_token = self.create_oauth_client_and_issue_token(
client, user_1, auth_token
)
response = client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'client_secret': oauth_client.client_secret,
'token': access_token,
},
headers=dict(content_type='multipart/form-data'),
)
assert response.status_code == 200
token = OAuth2Token.query.filter_by(
client_id=oauth_client.client_id
).first()
assert token.access_token_revoked_at is not None