API - init user account activation

This commit is contained in:
Sam
2022-03-19 22:02:06 +01:00
parent b5b4ac8f92
commit a1f80e9745
26 changed files with 1334 additions and 67 deletions

View File

@ -6,12 +6,13 @@ from typing import Dict, Tuple, Union
import jwt
from flask import Blueprint, current_app, request
from sqlalchemy import exc, func, or_
from sqlalchemy import exc, func
from werkzeug.exceptions import RequestEntityTooLarge
from werkzeug.utils import secure_filename
from fittrackee import appLog, bcrypt, db
from fittrackee.emails.tasks import (
account_confirmation_email,
email_updated_to_current_address,
email_updated_to_new_address,
password_change_email,
@ -46,6 +47,9 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
"""
register a user
The newly created account is inactive. The user must confirm his email
to activate it.
**Example request**:
.. sourcecode:: http
@ -63,8 +67,6 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
Content-Type: application/json
{
"auth_token": "JSON Web Token",
"message": "successfully registered",
"status": "success"
}
@ -76,18 +78,18 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
Content-Type: application/json
{
"message": "Errors: email: valid email must be provided\\n",
"message": "Errors: email: valid email must be provided\n",
"status": "error"
}
:<json string username: user name (3 to 30 characters required)
:<json string username: username (3 to 30 characters required)
:<json string email: user email
:<json string password: password (8 characters required)
:statuscode 201: successfully registered
:statuscode 400:
- invalid payload
- sorry, that user already exists
- sorry, that username is already taken
- Errors:
- username: 3 to 30 characters required
- username: only alphanumeric characters and the underscore
@ -125,30 +127,44 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
return InvalidPayloadErrorResponse(ret)
try:
# check for existing user
user = User.query.filter(
or_(
func.lower(User.username) == func.lower(username),
func.lower(User.email) == func.lower(email),
)
func.lower(User.username) == func.lower(username)
).first()
if user:
return InvalidPayloadErrorResponse(
'sorry, that user already exists'
'sorry, that username is already taken'
)
# add new user to db
new_user = User(username=username, email=email, password=password)
new_user.timezone = 'Europe/Paris'
db.session.add(new_user)
db.session.commit()
# generate auth token
auth_token = new_user.encode_auth_token(new_user.id)
return {
'status': 'success',
'message': 'successfully registered',
'auth_token': auth_token,
}, 201
# if a user exists with same email address, no error is returned
# since a user has to confirm his email to activate his account
user = User.query.filter(
func.lower(User.email) == func.lower(email)
).first()
if not user:
new_user = User(username=username, email=email, password=password)
new_user.timezone = 'Europe/Paris'
new_user.confirmation_token = secrets.token_urlsafe(16)
db.session.add(new_user)
db.session.commit()
ui_url = current_app.config['UI_URL']
email_data = {
'username': new_user.username,
'fittrackee_url': ui_url,
'operating_system': request.user_agent.platform, # type: ignore # noqa
'browser_name': request.user_agent.browser, # type: ignore
'account_confirmation_url': (
f'{ui_url}/account-confirmation'
f'?token={new_user.confirmation_token}'
),
}
user_data = {
'language': 'en',
'email': new_user.email,
}
account_confirmation_email.send(user_data, email_data)
return {'status': 'success'}, 200
# handler errors
except (exc.IntegrityError, exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@ -158,6 +174,7 @@ def register_user() -> Union[Tuple[Dict, int], HttpResponse]:
def login_user() -> Union[Dict, HttpResponse]:
"""
user login
Only user with active account can log in
**Example request**:
@ -209,9 +226,9 @@ def login_user() -> Union[Dict, HttpResponse]:
email = post_data.get('email', '')
password = post_data.get('password')
try:
# check for existing user
user = User.query.filter(
func.lower(User.email) == func.lower(email)
func.lower(User.email) == func.lower(email),
User.is_active == True, # noqa
).first()
if user and bcrypt.check_password_hash(user.password, password):
# generate auth token
@ -258,6 +275,7 @@ def get_authenticated_user_profile(
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -357,6 +375,7 @@ def edit_user(auth_user: User) -> Union[Dict, HttpResponse]:
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -516,6 +535,7 @@ def update_user_account(auth_user: User) -> Union[Dict, HttpResponse]:
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -709,6 +729,7 @@ def edit_user_preferences(auth_user: User) -> Union[Dict, HttpResponse]:
"email": "sam@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,
@ -1312,3 +1333,63 @@ def update_email() -> Union[Dict, HttpResponse]:
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)
@auth_blueprint.route('/auth/account/confirm', methods=['POST'])
def confirm_account() -> Union[Dict, HttpResponse]:
"""
activate user account after registration
**Example request**:
.. sourcecode:: http
POST /api/auth/account/confirm HTTP/1.1
Content-Type: application/json
**Example response**:
HTTP/1.1 200 OK
Content-Type: application/json
{
"auth_token": "JSON Web Token",
"message": "account confirmation successful",
"status": "success"
}
:<json string token: confirmation token
:statuscode 200: account confirmation successful
:statuscode 400: invalid payload
:statuscode 500: error, please try again or contact the administrator
"""
post_data = request.get_json()
if not post_data or post_data.get('token') is None:
return InvalidPayloadErrorResponse()
token = post_data.get('token')
try:
user = User.query.filter_by(confirmation_token=token).first()
if not user:
return InvalidPayloadErrorResponse()
user.is_active = True
user.confirmation_token = None
db.session.commit()
# generate auth token
auth_token = user.encode_auth_token(user.id)
response = {
'status': 'success',
'message': 'account confirmation successful',
'auth_token': auth_token,
}
return response
except (exc.OperationalError, ValueError) as e:
return handle_error_and_return_response(e, db=db)

View File

@ -47,6 +47,7 @@ class User(BaseModel):
)
language = db.Column(db.String(50), nullable=True)
imperial_units = db.Column(db.Boolean, default=False, nullable=False)
is_active = db.Column(db.Boolean, default=False, nullable=False)
email_to_confirm = db.Column(db.String(255), nullable=True)
confirmation_token = db.Column(db.String(255), nullable=True)
@ -149,6 +150,7 @@ class User(BaseModel):
'email': self.email,
'email_to_confirm': self.email_to_confirm,
'first_name': self.first_name,
'is_active': self.is_active,
'last_name': self.last_name,
'location': self.location,
'nb_sports': len(sports),

View File

@ -52,7 +52,7 @@ def set_admin(username: str) -> None:
@authenticate_as_admin
def get_users(auth_user: User) -> Dict:
"""
Get all users
Get all users (regardless their account status)
**Example request**:
@ -87,6 +87,7 @@ def get_users(auth_user: User) -> Dict:
"created_at": "Sun, 14 Jul 2019 14:09:58 GMT",
"email": "admin@example.com",
"first_name": null,
"is_admin": true,
"imperial_units": false,
"language": "en",
"last_name": null,
@ -149,6 +150,7 @@ def get_users(auth_user: User) -> Dict:
"created_at": "Sat, 20 Jul 2019 11:27:03 GMT",
"email": "sam@example.com",
"first_name": null,
"is_admin": false,
"language": "fr",
"last_name": null,
"location": null,
@ -269,6 +271,7 @@ def get_single_user(
"email": "admin@example.com",
"first_name": null,
"imperial_units": false,
"is_admin": true,
"language": "en",
"last_name": null,
"location": null,
@ -421,6 +424,7 @@ def update_user(auth_user: User, user_name: str) -> Union[Dict, HttpResponse]:
"email": "admin@example.com",
"first_name": null,
"imperial_units": false,
"is_active": true,
"language": "en",
"last_name": null,
"location": null,

View File

@ -62,8 +62,12 @@ def verify_user(
current_request: Request, verify_admin: bool
) -> Tuple[Optional[HttpResponse], Optional[User]]:
"""
Return authenticated user, if the provided token is valid and user has
admin rights if 'verify_admin' is True
Return authenticated user if
- the provided token is valid
- the user account is active
- the user has admin rights if 'verify_admin' is True
If not, it returns Error Response
"""
default_message = 'provide a valid auth token'
auth_header = current_request.headers.get('Authorization')
@ -74,7 +78,7 @@ def verify_user(
if isinstance(resp, str):
return UnauthorizedErrorResponse(resp), None
user = User.query.filter_by(id=resp).first()
if not user:
if not user or not user.is_active:
return UnauthorizedErrorResponse(default_message), None
if verify_admin and not user.admin:
return ForbiddenErrorResponse(), None