2022-05-27 13:28:26 +02:00
|
|
|
import json
|
2022-05-27 16:02:05 +02:00
|
|
|
from typing import List, Union
|
2022-05-27 13:28:26 +02:00
|
|
|
from unittest.mock import patch
|
2022-05-27 13:34:32 +02:00
|
|
|
from urllib.parse import parse_qs
|
2022-05-27 13:28:26 +02:00
|
|
|
|
|
|
|
import pytest
|
|
|
|
from flask import Flask
|
2022-05-27 13:34:32 +02:00
|
|
|
from urllib3.util import parse_url
|
2022-05-27 14:08:07 +02:00
|
|
|
from werkzeug.test import TestResponse
|
2022-05-27 13:28:26 +02:00
|
|
|
|
2022-05-27 14:46:03 +02:00
|
|
|
from fittrackee.oauth2.models import OAuth2Client, OAuth2Token
|
2022-05-27 13:28:26 +02:00
|
|
|
from fittrackee.users.models import User
|
|
|
|
|
|
|
|
from ..mixins import ApiTestCaseMixin
|
2022-05-27 16:02:05 +02:00
|
|
|
from ..utils import TEST_OAUTH_CLIENT_METADATA
|
2022-05-27 13:28:26 +02:00
|
|
|
|
|
|
|
|
2022-05-27 16:02:05 +02:00
|
|
|
class TestOAuthClientCreation(ApiTestCaseMixin):
|
2022-05-27 13:28:26 +02:00
|
|
|
route = '/api/oauth/apps'
|
|
|
|
|
|
|
|
def test_it_returns_error_when_no_user_authenticated(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
2022-05-27 16:02:05 +02:00
|
|
|
data=json.dumps(TEST_OAUTH_CLIENT_METADATA),
|
2022-05-27 13:28:26 +02:00
|
|
|
content_type='application/json',
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_401(response)
|
|
|
|
|
|
|
|
def test_it_returns_error_when_no_metadata_provided(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data=json.dumps(dict()),
|
|
|
|
content_type='application/json',
|
|
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_400(
|
|
|
|
response, error_message='OAuth client metadata missing'
|
|
|
|
)
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
'missing_key',
|
|
|
|
[
|
|
|
|
'client_name',
|
|
|
|
'client_uri',
|
|
|
|
'redirect_uris',
|
|
|
|
'scope',
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_it_returns_error_when_metadata_key_is_missing(
|
|
|
|
self, app: Flask, user_1: User, missing_key: str
|
|
|
|
) -> None:
|
2022-05-27 16:02:05 +02:00
|
|
|
metadata = TEST_OAUTH_CLIENT_METADATA.copy()
|
2022-05-27 13:28:26 +02:00
|
|
|
del metadata[missing_key]
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data=json.dumps(metadata),
|
|
|
|
content_type='application/json',
|
|
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_400(
|
|
|
|
response,
|
|
|
|
error_message=f'OAuth client metadata missing keys: {missing_key}',
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_it_creates_oauth_client(self, app: Flask, user_1: User) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
2022-05-27 16:02:05 +02:00
|
|
|
data=json.dumps(TEST_OAUTH_CLIENT_METADATA),
|
2022-05-27 13:28:26 +02:00
|
|
|
content_type='application/json',
|
|
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert response.status_code == 201
|
|
|
|
oauth_client = OAuth2Client.query.first()
|
|
|
|
assert oauth_client is not None
|
|
|
|
|
|
|
|
def test_it_returns_serialized_oauth_client(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
client_id = self.random_string()
|
|
|
|
client_secret = self.random_string()
|
|
|
|
with patch(
|
|
|
|
'fittrackee.oauth2.client.gen_salt',
|
|
|
|
side_effect=[client_id, client_secret],
|
|
|
|
):
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
2022-05-27 16:02:05 +02:00
|
|
|
data=json.dumps(TEST_OAUTH_CLIENT_METADATA),
|
2022-05-27 13:28:26 +02:00
|
|
|
content_type='application/json',
|
|
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
|
|
)
|
|
|
|
|
|
|
|
data = json.loads(response.data.decode())
|
|
|
|
assert data['data']['client']['client_id'] == client_id
|
|
|
|
assert data['data']['client']['client_secret'] == client_secret
|
2022-05-27 16:02:05 +02:00
|
|
|
assert (
|
|
|
|
data['data']['client']['name']
|
|
|
|
== TEST_OAUTH_CLIENT_METADATA['client_name']
|
|
|
|
)
|
2022-05-27 13:28:26 +02:00
|
|
|
assert (
|
|
|
|
data['data']['client']['redirect_uris']
|
2022-05-27 16:02:05 +02:00
|
|
|
== TEST_OAUTH_CLIENT_METADATA['redirect_uris']
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
data['data']['client']['website']
|
|
|
|
== TEST_OAUTH_CLIENT_METADATA['client_uri']
|
2022-05-27 13:28:26 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
'input_key,expected_value',
|
|
|
|
[
|
2022-05-27 14:18:50 +02:00
|
|
|
('grant_types', ['authorization_code', 'refresh_token']),
|
2022-05-27 13:28:26 +02:00
|
|
|
('response_types', ['code']),
|
|
|
|
('token_endpoint_auth_method', 'client_secret_post'),
|
|
|
|
],
|
|
|
|
)
|
2022-05-27 14:08:07 +02:00
|
|
|
def test_it_always_creates_oauth_client_with_authorization_grant(
|
2022-05-27 13:28:26 +02:00
|
|
|
self,
|
|
|
|
app: Flask,
|
|
|
|
user_1: User,
|
|
|
|
input_key: str,
|
|
|
|
expected_value: Union[List, str],
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
|
|
|
|
client.post(
|
|
|
|
self.route,
|
|
|
|
data=json.dumps(
|
2022-05-27 16:02:05 +02:00
|
|
|
{**TEST_OAUTH_CLIENT_METADATA, input_key: self.random_string()}
|
2022-05-27 13:28:26 +02:00
|
|
|
),
|
|
|
|
content_type='application/json',
|
|
|
|
headers=dict(Authorization=f'Bearer {auth_token}'),
|
|
|
|
)
|
|
|
|
|
|
|
|
oauth_client = OAuth2Client.query.first()
|
|
|
|
assert getattr(oauth_client, input_key) == expected_value
|
2022-05-27 13:34:32 +02:00
|
|
|
|
|
|
|
|
2022-05-27 16:02:05 +02:00
|
|
|
class TestOAuthClientAuthorization(ApiTestCaseMixin):
|
2022-05-27 13:34:32 +02:00
|
|
|
route = '/api/oauth/authorize'
|
|
|
|
|
|
|
|
def test_it_returns_error_not_authenticated(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
client = app.test_client()
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'response_type': 'code',
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_401(response)
|
|
|
|
|
|
|
|
def test_it_returns_error_when_client_id_is_missing(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={'response_type': 'code'},
|
|
|
|
headers=dict(
|
|
|
|
Authorization=f'Bearer {auth_token}',
|
|
|
|
content_type='multipart/form-data',
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_400(response, error_message='invalid payload')
|
|
|
|
|
|
|
|
def test_it_returns_error_when_response_type_is_missing(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={'client_id': oauth_client.client_id},
|
|
|
|
headers=dict(
|
|
|
|
Authorization=f'Bearer {auth_token}',
|
|
|
|
content_type='multipart/form-data',
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_400(response, error_message='invalid payload')
|
|
|
|
|
|
|
|
def test_it_returns_code_in_url(self, app: Flask, user_1: User) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'response_type': 'code',
|
|
|
|
},
|
|
|
|
headers=dict(
|
|
|
|
Authorization=f'Bearer {auth_token}',
|
|
|
|
content_type='multipart/form-data',
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert response.status_code == 302
|
|
|
|
parsed_url = parse_url(response.location)
|
|
|
|
assert parse_qs(parsed_url.query).get('code') is not None
|
2022-05-27 14:08:07 +02:00
|
|
|
|
|
|
|
|
2022-05-27 16:02:05 +02:00
|
|
|
class TestOAuthIssueToken(ApiTestCaseMixin):
|
2022-05-27 14:08:07 +02:00
|
|
|
route = '/api/oauth/token'
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def assert_token_is_returned(response: TestResponse) -> None:
|
|
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data.decode())
|
|
|
|
assert data.get('access_token') is not None
|
|
|
|
assert data.get('expires_in') is not None
|
2022-05-27 14:18:50 +02:00
|
|
|
assert data.get('refresh_token') is not None
|
2022-05-27 14:08:07 +02:00
|
|
|
assert data.get('token_type') == 'Bearer'
|
|
|
|
|
|
|
|
def test_it_returns_error_when_form_is_empty(self, app: Flask) -> None:
|
|
|
|
client = app.test_client()
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data=dict(data='{}'),
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_unsupported_grant_type(response)
|
|
|
|
|
|
|
|
def test_it_returns_error_when_client_id_is_invalid(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
code = self.authorize_client(client, oauth_client, auth_token)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': self.random_string(),
|
|
|
|
'client_secret': oauth_client.client_secret,
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
'code': code,
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_invalid_client(response)
|
|
|
|
|
|
|
|
def test_it_returns_error_when_client_secret_is_invalid(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
code = self.authorize_client(client, oauth_client, auth_token)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'client_secret': self.random_string(),
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
'code': code,
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_invalid_client(response)
|
|
|
|
|
|
|
|
def test_it_returns_error_when_client_not_authorized(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'client_secret': oauth_client.client_secret,
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
'code': self.random_string(),
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_invalid_request(response)
|
|
|
|
|
|
|
|
def test_it_returns_error_when_code_is_invalid(
|
|
|
|
self, app: Flask, user_1: User
|
|
|
|
) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
self.authorize_client(client, oauth_client, auth_token)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'client_secret': oauth_client.client_secret,
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
'code': self.random_string(),
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_invalid_request(response)
|
|
|
|
|
|
|
|
def test_it_returns_access_token(self, app: Flask, user_1: User) -> None:
|
|
|
|
client, auth_token = self.get_test_client_and_auth_token(
|
|
|
|
app, user_1.email
|
|
|
|
)
|
|
|
|
oauth_client = self.create_oauth_client(user_1)
|
|
|
|
code = self.authorize_client(client, oauth_client, auth_token)
|
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'client_secret': oauth_client.client_secret,
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
'code': code,
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assert_token_is_returned(response)
|
2022-05-27 14:46:03 +02:00
|
|
|
|
|
|
|
|
2022-05-27 16:02:05 +02:00
|
|
|
class TestOAuthTokenRevocation(ApiTestCaseMixin):
|
2022-05-27 14:46:03 +02:00
|
|
|
route = '/api/oauth/revoke'
|
|
|
|
|
|
|
|
def test_it_revokes_user_token(self, app: Flask, user_1: User) -> None:
|
2022-05-27 16:02:05 +02:00
|
|
|
(
|
|
|
|
client,
|
|
|
|
oauth_client,
|
|
|
|
access_token,
|
|
|
|
) = self.create_oauth_client_and_issue_token(app, user_1)
|
2022-05-27 14:46:03 +02:00
|
|
|
|
|
|
|
response = client.post(
|
|
|
|
self.route,
|
|
|
|
data={
|
|
|
|
'client_id': oauth_client.client_id,
|
|
|
|
'client_secret': oauth_client.client_secret,
|
|
|
|
'token': access_token,
|
|
|
|
},
|
|
|
|
headers=dict(content_type='multipart/form-data'),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
|
token = OAuth2Token.query.filter_by(
|
|
|
|
client_id=oauth_client.client_id
|
|
|
|
).first()
|
|
|
|
assert token.access_token_revoked_at is not None
|