API - init OAuth server and oauth clients creation

This commit is contained in:
Sam
2022-05-27 13:28:26 +02:00
parent c13e9e0286
commit c6cd7ff67c
19 changed files with 700 additions and 1 deletions

View File

View File

@ -0,0 +1,35 @@
from time import time
from typing import Dict
from werkzeug.security import gen_salt
from fittrackee.oauth2.models import OAuth2Client
from fittrackee.users.models import User
def create_oauth_client(metadata: Dict, user: User) -> OAuth2Client:
"""
Create oauth client for 3rd-party applications.
Only Authorization Code Grant with 'client_secret_post' as method
is supported.
"""
client_metadata = {
'client_name': metadata['client_name'],
'client_uri': metadata['client_uri'],
'redirect_uris': metadata['redirect_uris'],
'scope': metadata['scope'],
'grant_types': ['authorization_code'],
'response_types': ['code'],
'token_endpoint_auth_method': 'client_secret_post',
}
client_id = gen_salt(24)
client_id_issued_at = int(time())
client = OAuth2Client(
client_id=client_id,
client_id_issued_at=client_id_issued_at,
user_id=user.id,
)
client.set_client_metadata(client_metadata)
client.client_secret = gen_salt(48)
return client

View File

@ -0,0 +1,14 @@
from authlib.oauth2.rfc7636 import CodeChallenge
from flask import Flask
from .grants import AuthorizationCodeGrant
from .server import authorization_server
def config_oauth(app: Flask) -> None:
authorization_server.init_app(app)
# supported grants
authorization_server.register_grant(
AuthorizationCodeGrant, [CodeChallenge(required=True)]
)

View File

@ -0,0 +1,65 @@
from typing import Optional
from authlib.oauth2 import OAuth2Request
from authlib.oauth2.rfc6749 import grants
from fittrackee import db
from fittrackee.users.models import User
from .models import OAuth2AuthorizationCode, OAuth2Client, OAuth2Token
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = ['client_secret_post']
def save_authorization_code(
self, code: str, request: OAuth2Request
) -> OAuth2AuthorizationCode:
code_challenge = request.data.get('code_challenge')
code_challenge_method = request.data.get('code_challenge_method')
auth_code = OAuth2AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=request.user.id,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
)
db.session.add(auth_code)
db.session.commit()
return auth_code
def query_authorization_code(
self, code: str, client: OAuth2Client
) -> Optional[OAuth2AuthorizationCode]:
auth_code = OAuth2AuthorizationCode.query.filter_by(
code=code, client_id=client.client_id
).first()
if auth_code and not auth_code.is_expired():
return auth_code
return None
def delete_authorization_code(
self, authorization_code: OAuth2AuthorizationCode
) -> None:
db.session.delete(authorization_code)
db.session.commit()
def authenticate_user(
self, authorization_code: OAuth2AuthorizationCode
) -> User:
return User.query.get(authorization_code.user_id)
class RefreshTokenGrant(grants.RefreshTokenGrant):
def authenticate_refresh_token(self, refresh_token: str) -> Optional[str]:
token = OAuth2Token.query.filter_by(
refresh_token=refresh_token
).first()
if token and token.is_refresh_token_active():
return token
return None
def authenticate_user(self, credential: OAuth2Token) -> User:
return User.query.get(credential.user_id)

View File

@ -0,0 +1,59 @@
import time
from typing import Dict
from authlib.integrations.sqla_oauth2 import (
OAuth2AuthorizationCodeMixin,
OAuth2ClientMixin,
OAuth2TokenMixin,
)
from sqlalchemy.ext.declarative import DeclarativeMeta
from fittrackee import db
BaseModel: DeclarativeMeta = db.Model
class OAuth2Client(BaseModel, OAuth2ClientMixin):
__tablename__ = 'oauth2_client'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('users.id', ondelete='CASCADE')
)
user = db.relationship('User')
def serialize(self) -> Dict:
return {
'client_id': self.client_id,
'client_secret': self.client_secret,
'id': self.id,
'name': self.client_name,
'redirect_uris': self.redirect_uris,
'website': self.client_uri,
}
class OAuth2AuthorizationCode(BaseModel, OAuth2AuthorizationCodeMixin):
__tablename__ = 'oauth2_code'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('users.id', ondelete='CASCADE')
)
user = db.relationship('User')
class OAuth2Token(BaseModel, OAuth2TokenMixin):
__tablename__ = 'oauth2_token'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('users.id', ondelete='CASCADE')
)
user = db.relationship('User')
def is_refresh_token_active(self) -> bool:
if self.revoked:
return False
expires_at = self.issued_at + self.expires_in * 2
return expires_at >= time.time()

View File

@ -0,0 +1,53 @@
from typing import Dict, Tuple, Union
from flask import Blueprint, request
from fittrackee import db
from fittrackee.responses import HttpResponse, InvalidPayloadErrorResponse
from fittrackee.users.decorators import authenticate
from fittrackee.users.models import User
from .client import create_oauth_client
oauth_blueprint = Blueprint('oauth', __name__)
EXPECTED_METADATA_KEYS = [
'client_name',
'client_uri',
'redirect_uris',
'scope',
]
@oauth_blueprint.route('/oauth/apps', methods=['POST'])
@authenticate
def create_client(auth_user: User) -> Union[HttpResponse, Tuple[Dict, int]]:
client_metadata = request.get_json()
if not client_metadata:
return InvalidPayloadErrorResponse(
message='OAuth client metadata missing'
)
missing_keys = [
key
for key in EXPECTED_METADATA_KEYS
if key not in client_metadata.keys()
]
if missing_keys:
return InvalidPayloadErrorResponse(
message=(
'OAuth client metadata missing keys: '
f'{", ".join(missing_keys)}'
)
)
new_client = create_oauth_client(client_metadata, auth_user)
db.session.add(new_client)
db.session.commit()
return (
{
'status': 'created',
'data': {'client': new_client.serialize()},
},
201,
)

View File

@ -0,0 +1,16 @@
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.integrations.sqla_oauth2 import (
create_query_client_func,
create_save_token_func,
)
from fittrackee import db
from .models import OAuth2Client, OAuth2Token
query_client = create_query_client_func(db.session, OAuth2Client)
save_token = create_save_token_func(db.session, OAuth2Token)
authorization_server = AuthorizationServer(
query_client=query_client,
save_token=save_token,
)