API - add scope on endpoints
This commit is contained in:
@ -63,24 +63,33 @@ class ApiTestCaseMixin(RandomMixin):
|
||||
|
||||
@staticmethod
|
||||
def create_oauth_client(
|
||||
user: User, metadata: Optional[Dict] = None
|
||||
user: User,
|
||||
metadata: Optional[Dict] = None,
|
||||
scope: Optional[str] = None,
|
||||
) -> OAuth2Client:
|
||||
oauth_client = create_oauth_client(
|
||||
TEST_OAUTH_CLIENT_METADATA if metadata is None else metadata, user
|
||||
client_metadata = (
|
||||
TEST_OAUTH_CLIENT_METADATA if metadata is None else metadata
|
||||
)
|
||||
if scope is not None:
|
||||
client_metadata['scope'] = scope
|
||||
oauth_client = create_oauth_client(client_metadata, user)
|
||||
db.session.add(oauth_client)
|
||||
db.session.commit()
|
||||
return oauth_client
|
||||
|
||||
@staticmethod
|
||||
def authorize_client(
|
||||
client: FlaskClient, oauth_client: OAuth2Client, auth_token: str
|
||||
client: FlaskClient,
|
||||
oauth_client: OAuth2Client,
|
||||
auth_token: str,
|
||||
scope: Optional[str] = None,
|
||||
) -> Union[List[str], str]:
|
||||
response = client.post(
|
||||
'/api/oauth/authorize',
|
||||
data={
|
||||
'client_id': oauth_client.client_id,
|
||||
'response_type': 'code',
|
||||
'scope': 'read' if not scope else scope,
|
||||
},
|
||||
headers=dict(
|
||||
Authorization=f'Bearer {auth_token}',
|
||||
@ -92,13 +101,15 @@ class ApiTestCaseMixin(RandomMixin):
|
||||
return code
|
||||
|
||||
def create_oauth_client_and_issue_token(
|
||||
self, app: Flask, user: User
|
||||
self, app: Flask, user: User, scope: Optional[str] = None
|
||||
) -> Tuple[FlaskClient, OAuth2Client, str]:
|
||||
client, auth_token = self.get_test_client_and_auth_token(
|
||||
app, user.email
|
||||
)
|
||||
oauth_client = self.create_oauth_client(user)
|
||||
code = self.authorize_client(client, oauth_client, auth_token)
|
||||
oauth_client = self.create_oauth_client(user, scope=scope)
|
||||
code = self.authorize_client(
|
||||
client, oauth_client, auth_token, scope=scope
|
||||
)
|
||||
response = client.post(
|
||||
'/api/oauth/token',
|
||||
data={
|
||||
@ -217,6 +228,31 @@ class ApiTestCaseMixin(RandomMixin):
|
||||
),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def assert_insufficient_scope(response: TestResponse) -> Dict:
|
||||
return assert_oauth_errored_response(
|
||||
response,
|
||||
403,
|
||||
error='insufficient_scope',
|
||||
error_description=(
|
||||
'The request requires higher privileges than provided by '
|
||||
'the access token.'
|
||||
),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def assert_not_insufficient_scope_error(response: TestResponse) -> None:
|
||||
assert response.status_code != 403
|
||||
if response.status_code != 204:
|
||||
data = json.loads(response.data.decode())
|
||||
if 'error' in data:
|
||||
assert 'insufficient_scope' not in data['error']
|
||||
if 'error_description' in data:
|
||||
assert (
|
||||
'The request requires higher privileges than provided by '
|
||||
'the access token.'
|
||||
) != data['error_description']
|
||||
|
||||
|
||||
class CallArgsMixin:
|
||||
@staticmethod
|
||||
|
@ -1,10 +1,11 @@
|
||||
from time import time
|
||||
from typing import Dict
|
||||
from typing import Any, Dict
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
|
||||
from fittrackee.oauth2.client import create_oauth_client
|
||||
from fittrackee.oauth2.client import check_scope, create_oauth_client
|
||||
from fittrackee.oauth2.models import OAuth2Client
|
||||
from fittrackee.users.models import User
|
||||
|
||||
@ -106,7 +107,7 @@ class TestCreateOAuth2Client:
|
||||
def test_oauth_client_has_expected_scope(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
scope = 'profile'
|
||||
scope = 'write'
|
||||
client_metadata: Dict = {**TEST_METADATA, 'scope': scope}
|
||||
|
||||
oauth_client = create_oauth_client(client_metadata, user_1)
|
||||
@ -138,3 +139,28 @@ class TestCreateOAuth2Client:
|
||||
oauth_client = create_oauth_client(TEST_METADATA, user_1)
|
||||
|
||||
assert oauth_client.user_id == user_1.id
|
||||
|
||||
|
||||
class TestOAuthCheckScopes:
|
||||
@pytest.mark.parametrize(
|
||||
'input_scope', ['', 1, random_string(), [random_string(), 'readwrite']]
|
||||
)
|
||||
def test_it_returns_read_if_scope_is_invalid(
|
||||
self, input_scope: Any
|
||||
) -> None:
|
||||
assert check_scope(input_scope) == 'read'
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'input_scope,expected_scope',
|
||||
[
|
||||
('read', 'read'),
|
||||
('read ' + random_string(), 'read'),
|
||||
('write', 'write'),
|
||||
('write read', 'write read'),
|
||||
('write read ' + random_string(), 'write read'),
|
||||
],
|
||||
)
|
||||
def test_it_return_only_valid_scopes(
|
||||
self, input_scope: str, expected_scope: str
|
||||
) -> None:
|
||||
assert check_scope(input_scope) == expected_scope
|
||||
|
259
fittrackee/tests/oauth2/test_oauth2_scopes.py
Normal file
259
fittrackee/tests/oauth2/test_oauth2_scopes.py
Normal file
@ -0,0 +1,259 @@
|
||||
from json import dumps
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from werkzeug.test import TestResponse
|
||||
|
||||
from fittrackee.users.models import User
|
||||
|
||||
from ..mixins import ApiTestCaseMixin
|
||||
from ..utils import random_short_id, random_string
|
||||
|
||||
|
||||
class OAuth2ScopesTestCase(ApiTestCaseMixin):
|
||||
def assert_expected_response(
|
||||
self, response: TestResponse, client_scope: str, endpoint_scope: str
|
||||
) -> None:
|
||||
if client_scope == endpoint_scope:
|
||||
self.assert_not_insufficient_scope_error(response)
|
||||
else:
|
||||
self.assert_insufficient_scope(response)
|
||||
|
||||
|
||||
class TestOAuth2ScopesWithReadAccess(OAuth2ScopesTestCase):
|
||||
scope = 'read'
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'endpoint_url',
|
||||
[
|
||||
'/api/auth/profile',
|
||||
'/api/records',
|
||||
'/api/sports',
|
||||
'/api/sports/1',
|
||||
f'/api/stats/{random_string()}/by_sport',
|
||||
f'/api/stats/{random_string()}/by_time',
|
||||
'/api/users/test',
|
||||
'/api/workouts',
|
||||
f'/api/workouts/{random_short_id()}',
|
||||
f'/api/workouts/{random_short_id()}/chart_data',
|
||||
f'/api/workouts/{random_short_id()}/chart_data/segment/1',
|
||||
f'/api/workouts/{random_short_id()}/gpx',
|
||||
f'/api/workouts/{random_short_id()}/gpx/download',
|
||||
f'/api/workouts/{random_short_id()}/gpx/segment/1',
|
||||
],
|
||||
)
|
||||
def test_access_to_get_endpoints(
|
||||
self, app: Flask, user_1: User, endpoint_url: str
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
endpoint_url,
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_expected_response(
|
||||
response, client_scope=self.scope, endpoint_scope='read'
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'endpoint_url',
|
||||
['/api/users'],
|
||||
)
|
||||
def test_access_to_endpoints_as_admin(
|
||||
self, app: Flask, user_1_admin: User, endpoint_url: str
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1_admin, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
endpoint_url,
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_expected_response(
|
||||
response, client_scope=self.scope, endpoint_scope='read'
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'endpoint_url',
|
||||
[
|
||||
'/api/auth/picture',
|
||||
'/api/auth/profile/edit',
|
||||
'/api/auth/profile/edit/preferences',
|
||||
'/api/auth/profile/edit/sports',
|
||||
'/api/workouts',
|
||||
'/api/workouts/no_gpx',
|
||||
],
|
||||
)
|
||||
def test_access_post_endpoints(
|
||||
self, app: Flask, user_1: User, endpoint_url: str
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
endpoint_url,
|
||||
data=dumps(dict()),
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_expected_response(
|
||||
response, client_scope=self.scope, endpoint_scope='write'
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'endpoint_url',
|
||||
[
|
||||
'/api/auth/profile/edit/account',
|
||||
'/api/workouts/0',
|
||||
],
|
||||
)
|
||||
def test_access_to_patch_endpoints(
|
||||
self, app: Flask, user_1: User, endpoint_url: str
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.patch(
|
||||
endpoint_url,
|
||||
data=dumps(dict()),
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_expected_response(
|
||||
response, client_scope=self.scope, endpoint_scope='write'
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'endpoint_url',
|
||||
[
|
||||
'/api/config',
|
||||
'/api/sports/1',
|
||||
f'/api/users/{random_string()}',
|
||||
],
|
||||
)
|
||||
def test_access_to_patch_endpoints_as_admin(
|
||||
self, app: Flask, user_1_admin: User, endpoint_url: str
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1_admin, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.patch(
|
||||
endpoint_url,
|
||||
data=dumps(dict()),
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_expected_response(
|
||||
response, client_scope=self.scope, endpoint_scope='write'
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'endpoint_url',
|
||||
[
|
||||
'/api/auth/picture',
|
||||
'/api/auth/profile/reset/sports/1',
|
||||
f'/api/users/{random_string()}',
|
||||
'/api/workouts/0',
|
||||
],
|
||||
)
|
||||
def test_access_to_delete_endpoints(
|
||||
self, app: Flask, user_1: User, endpoint_url: str
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1, scope=self.scope
|
||||
)
|
||||
user_1.picture = random_string()
|
||||
|
||||
response = client.delete(
|
||||
endpoint_url,
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_expected_response(
|
||||
response, client_scope=self.scope, endpoint_scope='write'
|
||||
)
|
||||
|
||||
|
||||
class TestOAuth2ScopesWithWriteAccess(TestOAuth2ScopesWithReadAccess):
|
||||
scope = 'write'
|
||||
|
||||
|
||||
class TestOAuth2ScopesWithReadAndWriteAccess(ApiTestCaseMixin):
|
||||
scope = 'read write'
|
||||
|
||||
def test_client_can_access_endpoint_with_read_scope(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
'/api/auth/profile',
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_not_insufficient_scope_error(response)
|
||||
|
||||
def test_client_with_read_can_access_endpoints_with_write_scope(
|
||||
self, app: Flask, user_1: User
|
||||
) -> None:
|
||||
(
|
||||
client,
|
||||
oauth_client,
|
||||
access_token,
|
||||
) = self.create_oauth_client_and_issue_token(
|
||||
app, user_1, scope=self.scope
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
'/api/auth/picture',
|
||||
data=dumps(dict()),
|
||||
content_type='application/json',
|
||||
headers=dict(Authorization=f'Bearer {access_token}'),
|
||||
)
|
||||
|
||||
self.assert_not_insufficient_scope_error(response)
|
@ -2,9 +2,12 @@ import random
|
||||
import string
|
||||
from json import loads
|
||||
from typing import Dict, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from flask import json as flask_json
|
||||
|
||||
from fittrackee.workouts.utils.short_id import encode_uuid
|
||||
|
||||
|
||||
def random_string(
|
||||
length: Optional[int] = None,
|
||||
@ -32,6 +35,10 @@ def random_email() -> str:
|
||||
return random_string(suffix='@example.com')
|
||||
|
||||
|
||||
def random_short_id() -> str:
|
||||
return encode_uuid(uuid4())
|
||||
|
||||
|
||||
def jsonify_dict(data: Dict) -> Dict:
|
||||
return loads(flask_json.dumps(data))
|
||||
|
||||
|
Reference in New Issue
Block a user