API - add typing

This commit is contained in:
Sam
2021-01-02 19:28:03 +01:00
parent 4705393a08
commit 634d06b05a
53 changed files with 1884 additions and 1075 deletions

View File

@ -1,10 +1,12 @@
import datetime
import os
from typing import Dict, Tuple, Union
import jwt
from fittrackee import appLog, bcrypt, db
from fittrackee.responses import (
ForbiddenErrorResponse,
HttpResponse,
InvalidPayloadErrorResponse,
PayloadTooLargeErrorResponse,
UnauthorizedErrorResponse,
@ -32,7 +34,7 @@ auth_blueprint = Blueprint('auth', __name__)
@auth_blueprint.route('/auth/register', methods=['POST'])
def register_user():
def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
"""
register a user
@ -144,7 +146,7 @@ def register_user():
@auth_blueprint.route('/auth/login', methods=['POST'])
def login_user():
def login_user() -> Union[Dict, HttpResponse]:
"""
user login
@ -216,7 +218,7 @@ def login_user():
@auth_blueprint.route('/auth/logout', methods=['GET'])
@authenticate
def logout_user(auth_user_id):
def logout_user(auth_user_id: int) -> Union[Dict, HttpResponse]:
"""
user logout
@ -277,7 +279,9 @@ def logout_user(auth_user_id):
@auth_blueprint.route('/auth/profile', methods=['GET'])
@authenticate
def get_authenticated_user_profile(auth_user_id):
def get_authenticated_user_profile(
auth_user_id: int,
) -> Union[Dict, HttpResponse]:
"""
get authenticated user info
@ -338,7 +342,7 @@ def get_authenticated_user_profile(auth_user_id):
@auth_blueprint.route('/auth/profile/edit', methods=['POST'])
@authenticate
def edit_user(auth_user_id):
def edit_user(auth_user_id: int) -> Union[Dict, HttpResponse]:
"""
edit authenticated user
@ -474,7 +478,7 @@ def edit_user(auth_user_id):
@auth_blueprint.route('/auth/picture', methods=['POST'])
@authenticate
def edit_picture(auth_user_id):
def edit_picture(auth_user_id: int) -> Union[Dict, HttpResponse]:
"""
update authenticated user picture
@ -561,7 +565,7 @@ def edit_picture(auth_user_id):
@auth_blueprint.route('/auth/picture', methods=['DELETE'])
@authenticate
def del_picture(auth_user_id):
def del_picture(auth_user_id: int) -> Union[Tuple[Dict, int], HttpResponse]:
"""
delete authenticated user picture
@ -604,7 +608,7 @@ def del_picture(auth_user_id):
@auth_blueprint.route('/auth/password/reset-request', methods=['POST'])
def request_password_reset():
def request_password_reset() -> Union[Dict, HttpResponse]:
"""
handle password reset request
@ -644,15 +648,15 @@ def request_password_reset():
ui_url = current_app.config['UI_URL']
email_data = {
'expiration_delay': get_readable_duration(
current_app.config.get('PASSWORD_TOKEN_EXPIRATION_SECONDS'),
current_app.config['PASSWORD_TOKEN_EXPIRATION_SECONDS'],
'en' if user.language is None else user.language,
),
'username': user.username,
'password_reset_url': (
f'{ui_url}/password-reset?token={password_reset_token}' # noqa
),
'operating_system': request.user_agent.platform,
'browser_name': request.user_agent.browser,
'operating_system': request.user_agent.platform, # type: ignore
'browser_name': request.user_agent.browser, # type: ignore
}
user_data = {
'language': user.language if user.language else 'en',
@ -666,7 +670,7 @@ def request_password_reset():
@auth_blueprint.route('/auth/password/update', methods=['POST'])
def update_password():
def update_password() -> Union[Dict, HttpResponse]:
"""
update user password

View File

@ -1,18 +1,22 @@
from datetime import datetime
from typing import Dict, Optional, Union
import jwt
from fittrackee import bcrypt, db
from flask import current_app
from sqlalchemy import func
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.sql.expression import select
from ..activities.models import Activity
from .utils_token import decode_user_token, get_user_token
BaseModel: DeclarativeMeta = db.Model
class User(db.Model):
__tablename__ = "users"
class User(BaseModel):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(20), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
@ -36,12 +40,16 @@ class User(db.Model):
)
language = db.Column(db.String(50), nullable=True)
def __repr__(self):
def __repr__(self) -> str:
return f'<User {self.username!r}>'
def __init__(
self, username, email, password, created_at=datetime.utcnow()
):
self,
username: str,
email: str,
password: str,
created_at: Optional[datetime] = datetime.utcnow(),
) -> None:
self.username = username
self.email = email
self.password = bcrypt.generate_password_hash(
@ -50,31 +58,25 @@ class User(db.Model):
self.created_at = created_at
@staticmethod
def encode_auth_token(user_id):
def encode_auth_token(user_id: int) -> str:
"""
Generates the auth token
:param user_id: -
:return: JWToken
"""
try:
return get_user_token(user_id)
except Exception as e:
return e
return get_user_token(user_id)
@staticmethod
def encode_password_reset_token(user_id):
def encode_password_reset_token(user_id: int) -> str:
"""
Generates the auth token
:param user_id: -
:return: JWToken
"""
try:
return get_user_token(user_id, password_reset=True)
except Exception as e:
return e
return get_user_token(user_id, password_reset=True)
@staticmethod
def decode_auth_token(auth_token):
def decode_auth_token(auth_token: str) -> Union[int, str]:
"""
Decodes the auth token
:param auth_token: -
@ -88,21 +90,21 @@ class User(db.Model):
return 'Invalid token. Please log in again.'
@hybrid_property
def activities_count(self):
def activities_count(self) -> int:
return Activity.query.filter(Activity.user_id == self.id).count()
@activities_count.expression
def activities_count(self):
@activities_count.expression # type: ignore
def activities_count(self) -> int:
return (
select([func.count(Activity.id)])
.where(Activity.user_id == self.id)
.label("activities_count")
.label('activities_count')
)
def serialize(self):
def serialize(self) -> Dict:
sports = []
total = (None, None)
if self.activities_count > 0:
total = (0, '0:00:00')
if self.activities_count > 0: # type: ignore
sports = (
db.session.query(Activity.sport_id)
.filter(Activity.user_id == self.id)
@ -136,6 +138,6 @@ class User(db.Model):
'sports_list': [
sport for sportslist in sports for sport in sportslist
],
'total_distance': float(total[0]) if total[0] else 0,
'total_duration': str(total[1]) if total[1] else "0:00:00",
'total_distance': float(total[0]),
'total_duration': str(total[1]),
}

View File

@ -1,9 +1,11 @@
import os
import shutil
from typing import Any, Dict, Tuple, Union
from fittrackee import db
from fittrackee.responses import (
ForbiddenErrorResponse,
HttpResponse,
InvalidPayloadErrorResponse,
NotFoundErrorResponse,
UserNotFoundErrorResponse,
@ -23,7 +25,7 @@ USER_PER_PAGE = 10
@users_blueprint.route('/users', methods=['GET'])
@authenticate
def get_users(auth_user_id):
def get_users(auth_user_id: int) -> Dict:
"""
Get all users
@ -135,10 +137,10 @@ def get_users(auth_user_id):
User.username.like('%' + query + '%') if query else True,
)
.order_by(
User.activities_count.asc()
User.activities_count.asc() # type: ignore
if order_by == 'activities_count' and order == 'asc'
else True,
User.activities_count.desc()
User.activities_count.desc() # type: ignore
if order_by == 'activities_count' and order == 'desc'
else True,
User.username.asc()
@ -178,7 +180,9 @@ def get_users(auth_user_id):
@users_blueprint.route('/users/<user_name>', methods=['GET'])
@authenticate
def get_single_user(auth_user_id, user_name):
def get_single_user(
auth_user_id: int, user_name: str
) -> Union[Dict, HttpResponse]:
"""
Get single user details
@ -251,7 +255,7 @@ def get_single_user(auth_user_id, user_name):
@users_blueprint.route('/users/<user_name>/picture', methods=['GET'])
def get_picture(user_name):
def get_picture(user_name: str) -> Any:
"""get user picture
**Example request**:
@ -290,7 +294,9 @@ def get_picture(user_name):
@users_blueprint.route('/users/<user_name>', methods=['PATCH'])
@authenticate_as_admin
def update_user(auth_user_id, user_name):
def update_user(
auth_user_id: int, user_name: str
) -> Union[Dict, HttpResponse]:
"""
Update user to add admin rights
@ -377,7 +383,9 @@ def update_user(auth_user_id, user_name):
@users_blueprint.route('/users/<user_name>', methods=['DELETE'])
@authenticate
def delete_user(auth_user_id, user_name):
def delete_user(
auth_user_id: int, user_name: str
) -> Union[Tuple[Dict, int], HttpResponse]:
"""
Delete a user account

View File

@ -1,30 +1,44 @@
import re
from datetime import timedelta
from functools import wraps
from typing import Any, Callable, Optional, Tuple, Union
import humanize
from fittrackee.responses import (
ForbiddenErrorResponse,
HttpResponse,
InvalidPayloadErrorResponse,
PayloadTooLargeErrorResponse,
UnauthorizedErrorResponse,
)
from flask import current_app, request
from flask import Request, current_app, request
from .models import User
def is_admin(user_id):
def is_admin(user_id: int) -> bool:
"""
Return if user has admin rights
"""
user = User.query.filter_by(id=user_id).first()
return user.admin
def is_valid_email(email):
def is_valid_email(email: str) -> bool:
"""
Return if email format is valid
"""
mail_pattern = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)"
return re.match(mail_pattern, email) is not None
def check_passwords(password, password_conf):
def check_passwords(password: str, password_conf: str) -> str:
"""
Verify if password and password confirmation are the same and have
more than 8 characters
If not, it returns not empty string
"""
ret = ''
if password_conf != password:
ret = 'Password and password confirmation don\'t match.\n'
@ -33,7 +47,14 @@ def check_passwords(password, password_conf):
return ret
def register_controls(username, email, password, password_conf):
def register_controls(
username: str, email: str, password: str, password_conf: str
) -> str:
"""
Verify if user name, email and passwords are valid
If not, it returns not empty string
"""
ret = ''
if not 2 < len(username) < 13:
ret += 'Username: 3 to 12 characters required.\n'
@ -43,7 +64,12 @@ def register_controls(username, email, password, password_conf):
return ret
def verify_extension_and_size(file_type, req):
def verify_extension_and_size(
file_type: str, req: Request
) -> Optional[HttpResponse]:
"""
Return error Response if file is invalid
"""
if 'file' not in req.files:
return InvalidPayloadErrorResponse('No file part.', 'fail')
@ -66,7 +92,7 @@ def verify_extension_and_size(file_type, req):
if not (
file_extension
and file_extension in current_app.config.get(allowed_extensions)
and file_extension in current_app.config[allowed_extensions]
):
return InvalidPayloadErrorResponse(
'File extension not allowed.', 'fail'
@ -81,7 +107,13 @@ def verify_extension_and_size(file_type, req):
return None
def verify_user(current_request, verify_admin):
def verify_user(
current_request: Request, verify_admin: bool
) -> Tuple[Optional[HttpResponse], Optional[int]]:
"""
Return user id, if the provided token is valid and if user has admin
rights if 'verify_admin' is True
"""
default_message = 'Provide a valid auth token.'
auth_header = current_request.headers.get('Authorization')
if not auth_header:
@ -98,9 +130,11 @@ def verify_user(current_request, verify_admin):
return None, resp
def authenticate(f):
def authenticate(f: Callable) -> Callable:
@wraps(f)
def decorated_function(*args, **kwargs):
def decorated_function(
*args: Any, **kwargs: Any
) -> Union[Callable, HttpResponse]:
verify_admin = False
response_object, resp = verify_user(request, verify_admin)
if response_object:
@ -110,9 +144,11 @@ def authenticate(f):
return decorated_function
def authenticate_as_admin(f):
def authenticate_as_admin(f: Callable) -> Callable:
@wraps(f)
def decorated_function(*args, **kwargs):
def decorated_function(
*args: Any, **kwargs: Any
) -> Union[Callable, HttpResponse]:
verify_admin = True
response_object, resp = verify_user(request, verify_admin)
if response_object:
@ -122,25 +158,36 @@ def authenticate_as_admin(f):
return decorated_function
def can_view_activity(auth_user_id, activity_user_id):
def can_view_activity(
auth_user_id: int, activity_user_id: int
) -> Optional[HttpResponse]:
"""
Return error response if user has no right to view activity
"""
if auth_user_id != activity_user_id:
return ForbiddenErrorResponse()
return None
def display_readable_file_size(size_in_bytes):
def display_readable_file_size(size_in_bytes: Union[float, int]) -> str:
"""
Return readable file size from size in bytes
"""
if size_in_bytes == 0:
return '0 bytes'
if size_in_bytes == 1:
return '1 byte'
for unit in [' bytes', 'KB', 'MB', 'GB', 'TB']:
if abs(size_in_bytes) < 1024.0:
return f"{size_in_bytes:3.1f}{unit}"
return f'{size_in_bytes:3.1f}{unit}'
size_in_bytes /= 1024.0
return f"{size_in_bytes} bytes"
return f'{size_in_bytes} bytes'
def get_readable_duration(duration, locale='en'):
def get_readable_duration(duration: int, locale: Optional[str] = 'en') -> str:
"""
Return readable and localized duration from duration in seconds
"""
if locale is not None and locale != 'en':
_t = humanize.i18n.activate(locale) # noqa
readable_duration = humanize.naturaldelta(timedelta(seconds=duration))

View File

@ -1,19 +1,25 @@
from datetime import datetime, timedelta
from typing import Optional
import jwt
from flask import current_app
def get_user_token(user_id, password_reset=False):
expiration_days = (
0
if password_reset
else current_app.config.get('TOKEN_EXPIRATION_DAYS')
def get_user_token(
user_id: int, password_reset: Optional[bool] = False
) -> str:
"""
Return authentication token for a given user.
Token expiration time depends on token type (authentication or password
reset)
"""
expiration_days: float = (
0.0 if password_reset else current_app.config['TOKEN_EXPIRATION_DAYS']
)
expiration_seconds = (
current_app.config.get('PASSWORD_TOKEN_EXPIRATION_SECONDS')
expiration_seconds: float = (
current_app.config['PASSWORD_TOKEN_EXPIRATION_SECONDS']
if password_reset
else current_app.config.get('TOKEN_EXPIRATION_SECONDS')
else current_app.config['TOKEN_EXPIRATION_SECONDS']
)
payload = {
'exp': datetime.utcnow()
@ -23,15 +29,18 @@ def get_user_token(user_id, password_reset=False):
}
return jwt.encode(
payload,
current_app.config.get('SECRET_KEY'),
current_app.config['SECRET_KEY'],
algorithm='HS256',
)
def decode_user_token(auth_token):
def decode_user_token(auth_token: str) -> int:
"""
Return user id from token
"""
payload = jwt.decode(
auth_token,
current_app.config.get('SECRET_KEY'),
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
)
return payload['sub']