API - handle confirmation when authorizing oauth client

This commit is contained in:
Sam 2022-06-07 15:40:33 +02:00
parent 78fe703494
commit d8d88cda3a
3 changed files with 97 additions and 13 deletions

View File

@ -1,6 +1,8 @@
from typing import Dict, Optional, Tuple, Union from typing import Dict, Optional, Tuple, Union
from urllib.parse import parse_qsl
from flask import Blueprint, Response, request from flask import Blueprint, Response, request
from urllib3.util import parse_url
from fittrackee import db from fittrackee import db
from fittrackee.oauth2.models import OAuth2Client from fittrackee.oauth2.models import OAuth2Client
@ -26,6 +28,13 @@ EXPECTED_METADATA_KEYS = [
DEFAULT_PER_PAGE = 5 DEFAULT_PER_PAGE = 5
def is_errored(url: str) -> Optional[str]:
query = dict(parse_qsl(parse_url(url).query))
if query.get('error'):
return query.get('error_description', 'invalid payload')
return None
@oauth_blueprint.route('/oauth/apps', methods=['GET']) @oauth_blueprint.route('/oauth/apps', methods=['GET'])
@require_auth() @require_auth()
def get_clients(auth_user: User) -> Dict: def get_clients(auth_user: User) -> Dict:
@ -145,15 +154,20 @@ def delete_client(
@oauth_blueprint.route('/oauth/authorize', methods=['POST']) @oauth_blueprint.route('/oauth/authorize', methods=['POST'])
@require_auth() @require_auth()
def authorize(auth_user: User) -> Response: def authorize(auth_user: User) -> Union[HttpResponse, Dict]:
data = request.form data = request.form
if not data or 'client_id' not in data or 'response_type' not in data: if not data or 'client_id' not in data or 'response_type' not in data:
return InvalidPayloadErrorResponse() return InvalidPayloadErrorResponse()
authorization_server.get_consent_grant(end_user=auth_user) confirm = data.get('confirm', 'false')
return authorization_server.create_authorization_response( grant_user = auth_user if confirm.lower() == 'true' else None
grant_user=auth_user response = authorization_server.create_authorization_response(
grant_user=grant_user
) )
error_message = is_errored(url=response.location)
if error_message:
return InvalidPayloadErrorResponse(error_message)
return {'redirect_url': response.location}
@oauth_blueprint.route('/oauth/token', methods=['POST']) @oauth_blueprint.route('/oauth/token', methods=['POST'])

View File

@ -88,6 +88,7 @@ class ApiTestCaseMixin(RandomMixin):
'/api/oauth/authorize', '/api/oauth/authorize',
data={ data={
'client_id': oauth_client.client_id, 'client_id': oauth_client.client_id,
'confirm': True,
'response_type': 'code', 'response_type': 'code',
'scope': 'read' if not scope else scope, 'scope': 'read' if not scope else scope,
}, },
@ -96,7 +97,8 @@ class ApiTestCaseMixin(RandomMixin):
content_type='multipart/form-data', content_type='multipart/form-data',
), ),
) )
parsed_url = parse_url(response.location) data = json.loads(response.data.decode())
parsed_url = parse_url(data['redirect_url'])
code = parse_qs(parsed_url.query).get('code', '') code = parse_qs(parsed_url.query).get('code', '')
return code return code

View File

@ -1,11 +1,9 @@
import json import json
from typing import List, Union from typing import Dict, List, Union
from unittest.mock import patch from unittest.mock import patch
from urllib.parse import parse_qs
import pytest import pytest
from flask import Flask from flask import Flask
from urllib3.util import parse_url
from werkzeug.test import TestResponse from werkzeug.test import TestResponse
from fittrackee.oauth2.models import ( from fittrackee.oauth2.models import (
@ -184,6 +182,7 @@ class TestOAuthClientAuthorization(ApiTestCaseMixin):
data={ data={
'client_id': oauth_client.client_id, 'client_id': oauth_client.client_id,
'response_type': 'code', 'response_type': 'code',
'confirm': True,
}, },
headers=dict(content_type='multipart/form-data'), headers=dict(content_type='multipart/form-data'),
) )
@ -227,8 +226,11 @@ class TestOAuthClientAuthorization(ApiTestCaseMixin):
self.assert_400(response, error_message='invalid payload') self.assert_400(response, error_message='invalid payload')
@pytest.mark.parametrize(
'input_confirmation', [{'confirm': True}, {'confirm': 'true'}]
)
def test_it_creates_authorization_code( def test_it_creates_authorization_code(
self, app: Flask, user_1: User self, app: Flask, user_1: User, input_confirmation: Dict
) -> None: ) -> None:
client, auth_token = self.get_test_client_and_auth_token( client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email app, user_1.email
@ -240,6 +242,7 @@ class TestOAuthClientAuthorization(ApiTestCaseMixin):
data={ data={
'client_id': oauth_client.client_id, 'client_id': oauth_client.client_id,
'response_type': 'code', 'response_type': 'code',
**input_confirmation,
}, },
headers=dict( headers=dict(
Authorization=f'Bearer {auth_token}', Authorization=f'Bearer {auth_token}',
@ -252,7 +255,36 @@ class TestOAuthClientAuthorization(ApiTestCaseMixin):
).first() ).first()
assert code is not None assert code is not None
def test_it_returns_code_in_url(self, app: Flask, user_1: User) -> None: @pytest.mark.parametrize('input_confirmation', [{}, {'confirm': False}])
def test_it_does_not_create_authorization_code_when_no_confirmation(
self, app: Flask, user_1: User, input_confirmation: Dict
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'response_type': 'code',
**input_confirmation,
},
headers=dict(
Authorization=f'Bearer {auth_token}',
content_type='multipart/form-data',
),
)
code = OAuth2AuthorizationCode.query.filter_by(
client_id=oauth_client.client_id
).first()
assert code is None
def test_it_returns_redirect_url_with_code(
self, app: Flask, user_1: User
) -> None:
client, auth_token = self.get_test_client_and_auth_token( client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email app, user_1.email
) )
@ -263,6 +295,7 @@ class TestOAuthClientAuthorization(ApiTestCaseMixin):
data={ data={
'client_id': oauth_client.client_id, 'client_id': oauth_client.client_id,
'response_type': 'code', 'response_type': 'code',
'confirm': True,
}, },
headers=dict( headers=dict(
Authorization=f'Bearer {auth_token}', Authorization=f'Bearer {auth_token}',
@ -270,9 +303,44 @@ class TestOAuthClientAuthorization(ApiTestCaseMixin):
), ),
) )
assert response.status_code == 302 code = OAuth2AuthorizationCode.query.filter_by(
parsed_url = parse_url(response.location) client_id=oauth_client.client_id
assert parse_qs(parsed_url.query).get('code') is not None ).first()
assert response.status_code == 200
data = json.loads(response.data.decode())
assert data['redirect_url'] == (
f'{oauth_client.get_default_redirect_uri()}?code={code.code}'
)
@pytest.mark.parametrize('input_confirmation', [{}, {'confirm': False}])
def test_it_returns_error_when_no_confirmation(
self, app: Flask, user_1: User, input_confirmation: Dict
) -> None:
client, auth_token = self.get_test_client_and_auth_token(
app, user_1.email
)
oauth_client = self.create_oauth_client(user_1)
response = client.post(
self.route,
data={
'client_id': oauth_client.client_id,
'response_type': 'code',
**input_confirmation,
},
headers=dict(
Authorization=f'Bearer {auth_token}',
content_type='multipart/form-data',
),
)
self.assert_400(
response,
error_message=(
'The resource owner or authorization server denied the request'
),
)
class TestOAuthIssueToken(ApiTestCaseMixin): class TestOAuthIssueToken(ApiTestCaseMixin):