API - add route to get access token

This commit is contained in:
Sam 2022-05-27 14:08:07 +02:00
parent 249c975810
commit 64b813a44b
4 changed files with 199 additions and 2 deletions

View File

@ -65,3 +65,8 @@ def authorize(auth_user: User) -> Response:
return authorization_server.create_authorization_response(
grant_user=auth_user
)
@oauth_blueprint.route('/oauth/token', methods=['POST'])
def issue_token() -> Response:
return authorization_server.create_token_response()

View File

@ -22,3 +22,19 @@ def assert_errored_response(
if match is not None:
assert re.match(match, data['message'])
return data
def assert_oauth_errored_response(
response: TestResponse,
status_code: int,
error: str,
error_description: Optional[str] = None,
) -> Dict:
assert response.content_type == 'application/json'
assert response.status_code == status_code
data = json.loads(response.data.decode())
assert error in data['error']
if error_description is not None:
assert error_description in data['error_description']
return data

View File

@ -6,7 +6,10 @@ from flask import Flask
from flask.testing import FlaskClient
from werkzeug.test import TestResponse
from .custom_asserts import assert_errored_response
from .custom_asserts import (
assert_errored_response,
assert_oauth_errored_response,
)
from .utils import random_email, random_string
@ -122,6 +125,28 @@ class ApiTestCaseMixin(RandomMixin):
response, 500, error_message=error_message, status=status
)
@staticmethod
def assert_unsupported_grant_type(response: TestResponse) -> Dict:
return assert_oauth_errored_response(
response, 400, error='unsupported_grant_type'
)
@staticmethod
def assert_invalid_client(response: TestResponse) -> Dict:
return assert_oauth_errored_response(
response,
400,
error='invalid_client',
)
@staticmethod
def assert_invalid_request(response: TestResponse) -> Dict:
return assert_oauth_errored_response(
response,
400,
error='invalid_request',
)
class CallArgsMixin:
@staticmethod

View File

@ -5,7 +5,9 @@ from urllib.parse import parse_qs
import pytest
from flask import Flask
from flask.testing import FlaskClient
from urllib3.util import parse_url
from werkzeug.test import TestResponse
from fittrackee import db
from fittrackee.oauth2.client import create_oauth_client
@ -156,7 +158,7 @@ class TestOAuthClientCreation(OAuth2TestCase):
('token_endpoint_auth_method', 'client_secret_post'),
],
)
def test_it_always_create_oauth_client_with_authorization_grant(
def test_it_always_creates_oauth_client_with_authorization_grant(
self,
app: Flask,
user_1: User,
@ -258,3 +260,152 @@ class TestOAuthClientAuthorization(OAuth2TestCase):
assert response.status_code == 302
parsed_url = parse_url(response.location)
assert parse_qs(parsed_url.query).get('code') is not None
class TestOAuthIssueToken(OAuth2TestCase):
route = '/api/oauth/token'
@staticmethod
def assert_token_is_returned(response: TestResponse) -> None:
assert response.status_code == 200
data = json.loads(response.data.decode())
assert data.get('access_token') is not None
assert data.get('expires_in') is not None
assert data.get('token_type') == 'Bearer'
@staticmethod
def authorize_client(
client: FlaskClient, oauth_client: OAuth2Client, auth_token: str
) -> Union[List[str], str]:
response = client.post(
'/api/oauth/authorize',
data={
'client_id': oauth_client.client_id,
'response_type': 'code',
},
headers=dict(
Authorization=f'Bearer {auth_token}',
content_type='multipart/form-data',
),
)
parsed_url = parse_url(response.location)
code = parse_qs(parsed_url.query).get('code', '')
return code
def test_it_returns_error_when_form_is_empty(self, app: Flask) -> None:
client = app.test_client()
response = client.post(
self.route,
data=dict(data='{}'),
headers=dict(content_type='multipart/form-data'),
)
self.assert_unsupported_grant_type(response)
def test_it_returns_error_when_client_id_is_invalid(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
code = self.authorize_client(client, oauth_client, auth_token)
response = client.post(
self.route,
data={
'client_id': self.random_string(),
'client_secret': oauth_client.client_secret,
'grant_type': 'authorization_code',
'code': code,
},
headers=dict(content_type='multipart/form-data'),
)
self.assert_invalid_client(response)
def test_it_returns_error_when_client_secret_is_invalid(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
code = self.authorize_client(client, oauth_client, auth_token)
response = client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'client_secret': self.random_string(),
'grant_type': 'authorization_code',
'code': code,
},
headers=dict(content_type='multipart/form-data'),
)
self.assert_invalid_client(response)
def test_it_returns_error_when_client_not_authorized(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
response = client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'client_secret': oauth_client.client_secret,
'grant_type': 'authorization_code',
'code': self.random_string(),
},
headers=dict(content_type='multipart/form-data'),
)
self.assert_invalid_request(response)
def test_it_returns_error_when_code_is_invalid(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
self.authorize_client(client, oauth_client, auth_token)
response = client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'client_secret': oauth_client.client_secret,
'grant_type': 'authorization_code',
'code': self.random_string(),
},
headers=dict(content_type='multipart/form-data'),
)
self.assert_invalid_request(response)
def test_it_returns_access_token(self, app: Flask, user_1: User) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
code = self.authorize_client(client, oauth_client, auth_token)
response = client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'client_secret': oauth_client.client_secret,
'grant_type': 'authorization_code',
'code': code,
},
headers=dict(content_type='multipart/form-data'),
)
self.assert_token_is_returned(response)