diff --git a/fittrackee/oauth2/routes.py b/fittrackee/oauth2/routes.py index 121b6801..a889c9f1 100644 --- a/fittrackee/oauth2/routes.py +++ b/fittrackee/oauth2/routes.py @@ -65,3 +65,8 @@ def authorize(auth_user: User) -> Response: return authorization_server.create_authorization_response( grant_user=auth_user ) + + +@oauth_blueprint.route('/oauth/token', methods=['POST']) +def issue_token() -> Response: + return authorization_server.create_token_response() diff --git a/fittrackee/tests/custom_asserts.py b/fittrackee/tests/custom_asserts.py index 168dd073..78465a6b 100644 --- a/fittrackee/tests/custom_asserts.py +++ b/fittrackee/tests/custom_asserts.py @@ -22,3 +22,19 @@ def assert_errored_response( if match is not None: assert re.match(match, data['message']) return data + + +def assert_oauth_errored_response( + response: TestResponse, + status_code: int, + error: str, + error_description: Optional[str] = None, +) -> Dict: + assert response.content_type == 'application/json' + assert response.status_code == status_code + + data = json.loads(response.data.decode()) + assert error in data['error'] + if error_description is not None: + assert error_description in data['error_description'] + return data diff --git a/fittrackee/tests/mixins.py b/fittrackee/tests/mixins.py index 63c7c8a1..d46a49fd 100644 --- a/fittrackee/tests/mixins.py +++ b/fittrackee/tests/mixins.py @@ -6,7 +6,10 @@ from flask import Flask from flask.testing import FlaskClient from werkzeug.test import TestResponse -from .custom_asserts import assert_errored_response +from .custom_asserts import ( + assert_errored_response, + assert_oauth_errored_response, +) from .utils import random_email, random_string @@ -122,6 +125,28 @@ class ApiTestCaseMixin(RandomMixin): response, 500, error_message=error_message, status=status ) + @staticmethod + def assert_unsupported_grant_type(response: TestResponse) -> Dict: + return assert_oauth_errored_response( + response, 400, error='unsupported_grant_type' + ) + + @staticmethod + def assert_invalid_client(response: TestResponse) -> Dict: + return assert_oauth_errored_response( + response, + 400, + error='invalid_client', + ) + + @staticmethod + def assert_invalid_request(response: TestResponse) -> Dict: + return assert_oauth_errored_response( + response, + 400, + error='invalid_request', + ) + class CallArgsMixin: @staticmethod diff --git a/fittrackee/tests/oauth2/test_oauth2_routes.py b/fittrackee/tests/oauth2/test_oauth2_routes.py index a2388423..5fdabf8a 100644 --- a/fittrackee/tests/oauth2/test_oauth2_routes.py +++ b/fittrackee/tests/oauth2/test_oauth2_routes.py @@ -5,7 +5,9 @@ from urllib.parse import parse_qs import pytest from flask import Flask +from flask.testing import FlaskClient from urllib3.util import parse_url +from werkzeug.test import TestResponse from fittrackee import db from fittrackee.oauth2.client import create_oauth_client @@ -156,7 +158,7 @@ class TestOAuthClientCreation(OAuth2TestCase): ('token_endpoint_auth_method', 'client_secret_post'), ], ) - def test_it_always_create_oauth_client_with_authorization_grant( + def test_it_always_creates_oauth_client_with_authorization_grant( self, app: Flask, user_1: User, @@ -258,3 +260,152 @@ class TestOAuthClientAuthorization(OAuth2TestCase): assert response.status_code == 302 parsed_url = parse_url(response.location) assert parse_qs(parsed_url.query).get('code') is not None + + +class TestOAuthIssueToken(OAuth2TestCase): + route = '/api/oauth/token' + + @staticmethod + def assert_token_is_returned(response: TestResponse) -> None: + assert response.status_code == 200 + data = json.loads(response.data.decode()) + assert data.get('access_token') is not None + assert data.get('expires_in') is not None + assert data.get('token_type') == 'Bearer' + + @staticmethod + def authorize_client( + client: FlaskClient, oauth_client: OAuth2Client, auth_token: str + ) -> Union[List[str], str]: + response = client.post( + '/api/oauth/authorize', + data={ + 'client_id': oauth_client.client_id, + 'response_type': 'code', + }, + headers=dict( + Authorization=f'Bearer {auth_token}', + content_type='multipart/form-data', + ), + ) + parsed_url = parse_url(response.location) + code = parse_qs(parsed_url.query).get('code', '') + return code + + def test_it_returns_error_when_form_is_empty(self, app: Flask) -> None: + client = app.test_client() + + response = client.post( + self.route, + data=dict(data='{}'), + headers=dict(content_type='multipart/form-data'), + ) + + self.assert_unsupported_grant_type(response) + + def test_it_returns_error_when_client_id_is_invalid( + self, app: Flask, user_1: User + ) -> None: + client, auth_token = self.get_test_client_and_auth_token( + app, user_1.email + ) + oauth_client = self.create_oauth_client(user_1) + code = self.authorize_client(client, oauth_client, auth_token) + + response = client.post( + self.route, + data={ + 'client_id': self.random_string(), + 'client_secret': oauth_client.client_secret, + 'grant_type': 'authorization_code', + 'code': code, + }, + headers=dict(content_type='multipart/form-data'), + ) + + self.assert_invalid_client(response) + + def test_it_returns_error_when_client_secret_is_invalid( + self, app: Flask, user_1: User + ) -> None: + client, auth_token = self.get_test_client_and_auth_token( + app, user_1.email + ) + oauth_client = self.create_oauth_client(user_1) + code = self.authorize_client(client, oauth_client, auth_token) + + response = client.post( + self.route, + data={ + 'client_id': oauth_client.client_id, + 'client_secret': self.random_string(), + 'grant_type': 'authorization_code', + 'code': code, + }, + headers=dict(content_type='multipart/form-data'), + ) + + self.assert_invalid_client(response) + + def test_it_returns_error_when_client_not_authorized( + self, app: Flask, user_1: User + ) -> None: + client, auth_token = self.get_test_client_and_auth_token( + app, user_1.email + ) + oauth_client = self.create_oauth_client(user_1) + + response = client.post( + self.route, + data={ + 'client_id': oauth_client.client_id, + 'client_secret': oauth_client.client_secret, + 'grant_type': 'authorization_code', + 'code': self.random_string(), + }, + headers=dict(content_type='multipart/form-data'), + ) + + self.assert_invalid_request(response) + + def test_it_returns_error_when_code_is_invalid( + self, app: Flask, user_1: User + ) -> None: + client, auth_token = self.get_test_client_and_auth_token( + app, user_1.email + ) + oauth_client = self.create_oauth_client(user_1) + self.authorize_client(client, oauth_client, auth_token) + + response = client.post( + self.route, + data={ + 'client_id': oauth_client.client_id, + 'client_secret': oauth_client.client_secret, + 'grant_type': 'authorization_code', + 'code': self.random_string(), + }, + headers=dict(content_type='multipart/form-data'), + ) + + self.assert_invalid_request(response) + + def test_it_returns_access_token(self, app: Flask, user_1: User) -> None: + client, auth_token = self.get_test_client_and_auth_token( + app, user_1.email + ) + oauth_client = self.create_oauth_client(user_1) + code = self.authorize_client(client, oauth_client, auth_token) + + response = client.post( + self.route, + data={ + 'client_id': oauth_client.client_id, + 'client_secret': oauth_client.client_secret, + 'grant_type': 'authorization_code', + 'code': code, + }, + headers=dict(content_type='multipart/form-data'), + ) + + self.assert_token_is_returned(response)