API - add email to contact admin in app config
This commit is contained in:
parent
f650a70ace
commit
49100c27e7
@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from importlib import import_module, reload
|
||||
from typing import Any
|
||||
@ -15,6 +16,7 @@ from flask_bcrypt import Bcrypt
|
||||
from flask_dramatiq import Dramatiq
|
||||
from flask_migrate import Migrate
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from sqlalchemy.exc import ProgrammingError
|
||||
|
||||
from fittrackee.emails.email import EmailService
|
||||
|
||||
@ -66,9 +68,16 @@ def create_app() -> Flask:
|
||||
with app.app_context():
|
||||
# Note: check if "app_config" table exist to avoid errors when
|
||||
# dropping tables on dev environments
|
||||
if db.engine.dialect.has_table(db.engine.connect(), 'app_config'):
|
||||
db_app_config = get_or_init_config()
|
||||
update_app_config_from_database(app, db_app_config)
|
||||
try:
|
||||
if db.engine.dialect.has_table(db.engine.connect(), 'app_config'):
|
||||
db_app_config = get_or_init_config()
|
||||
update_app_config_from_database(app, db_app_config)
|
||||
except ProgrammingError as e:
|
||||
# avoid error on AppConfig migration
|
||||
if re.match(
|
||||
r'psycopg2.errors.UndefinedColumn(.*)app_config.', str(e)
|
||||
):
|
||||
pass
|
||||
|
||||
from .application.app_config import config_blueprint # noqa
|
||||
from .users.auth import auth_blueprint # noqa
|
||||
|
@ -11,6 +11,7 @@ from fittrackee.responses import (
|
||||
)
|
||||
from fittrackee.users.decorators import authenticate_as_admin
|
||||
from fittrackee.users.models import User
|
||||
from fittrackee.users.utils.controls import is_valid_email
|
||||
|
||||
from .models import AppConfig
|
||||
from .utils import update_app_config_from_database, verify_app_config
|
||||
@ -87,6 +88,7 @@ def update_application_config(auth_user: User) -> Union[Dict, HttpResponse]:
|
||||
|
||||
{
|
||||
"data": {
|
||||
"admin_contact": "admin@example.com",
|
||||
"gpx_limit_import": 10,
|
||||
"is_registration_enabled": true,
|
||||
"max_single_file_size": 1048576,
|
||||
@ -96,6 +98,7 @@ def update_application_config(auth_user: User) -> Union[Dict, HttpResponse]:
|
||||
"status": "success"
|
||||
}
|
||||
|
||||
:<json string admin_contact: email to contact the administrator
|
||||
:<json integer gpx_limit_import: max number of files in zip archive
|
||||
:<json boolean is_registration_enabled: is registration enabled ?
|
||||
:<json integer max_single_file_size: max size of a single file
|
||||
@ -110,6 +113,7 @@ def update_application_config(auth_user: User) -> Union[Dict, HttpResponse]:
|
||||
- provide a valid auth token
|
||||
- signature expired, please log in again
|
||||
- invalid token, please log in again
|
||||
- valid email must be provided for admin contact
|
||||
:statuscode 403: you do not have permissions
|
||||
:statuscode 500: error when updating configuration
|
||||
"""
|
||||
@ -118,6 +122,9 @@ def update_application_config(auth_user: User) -> Union[Dict, HttpResponse]:
|
||||
return InvalidPayloadErrorResponse()
|
||||
|
||||
ret = verify_app_config(config_data)
|
||||
admin_contact = config_data.get('admin_contact')
|
||||
if admin_contact and not is_valid_email(admin_contact):
|
||||
ret.append('valid email must be provided for admin contact')
|
||||
if ret:
|
||||
return InvalidPayloadErrorResponse(message=ret)
|
||||
|
||||
@ -133,6 +140,8 @@ def update_application_config(auth_user: User) -> Union[Dict, HttpResponse]:
|
||||
config.max_zip_file_size = config_data.get('max_zip_file_size')
|
||||
if 'max_users' in config_data:
|
||||
config.max_users = config_data.get('max_users')
|
||||
if 'admin_contact' in config_data:
|
||||
config.admin_contact = admin_contact if admin_contact else None
|
||||
|
||||
if config.max_zip_file_size < config.max_single_file_size:
|
||||
return InvalidPayloadErrorResponse(
|
||||
|
@ -23,6 +23,7 @@ class AppConfig(BaseModel):
|
||||
db.Integer, default=1048576, nullable=False
|
||||
)
|
||||
max_zip_file_size = db.Column(db.Integer, default=10485760, nullable=False)
|
||||
admin_contact = db.Column(db.String(255), nullable=True)
|
||||
|
||||
@property
|
||||
def is_registration_enabled(self) -> bool:
|
||||
@ -43,6 +44,7 @@ class AppConfig(BaseModel):
|
||||
|
||||
def serialize(self) -> Dict:
|
||||
return {
|
||||
'admin_contact': self.admin_contact,
|
||||
'gpx_limit_import': self.gpx_limit_import,
|
||||
'is_registration_enabled': self.is_registration_enabled,
|
||||
'max_single_file_size': self.max_single_file_size,
|
||||
|
@ -1,4 +1,4 @@
|
||||
"""update User table
|
||||
"""update User and AppConfig tables
|
||||
|
||||
Revision ID: 5e3a3a31c432
|
||||
Revises: e30007d681cb
|
||||
@ -37,8 +37,15 @@ def upgrade():
|
||||
'users',
|
||||
sa.Column('confirmation_token', sa.String(length=255), nullable=True))
|
||||
|
||||
op.add_column(
|
||||
'app_config',
|
||||
sa.Column('admin_contact', sa.String(length=255), nullable=True)
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('app_config', 'admin_contact')
|
||||
|
||||
op.drop_column('users', 'confirmation_token')
|
||||
op.drop_column('users', 'email_to_confirm')
|
||||
op.drop_column('users', 'is_active')
|
@ -1,8 +1,11 @@
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
|
||||
import fittrackee
|
||||
from fittrackee.application.models import AppConfig
|
||||
from fittrackee.users.models import User
|
||||
|
||||
from ..mixins import ApiTestCaseMixin
|
||||
@ -19,6 +22,7 @@ class TestGetConfig(ApiTestCaseMixin):
|
||||
data = json.loads(response.data.decode())
|
||||
assert response.status_code == 200
|
||||
assert 'success' in data['status']
|
||||
assert data['data']['admin_contact'] is None
|
||||
assert data['data']['gpx_limit_import'] == 10
|
||||
assert data['data']['is_registration_enabled'] is True
|
||||
assert data['data']['max_single_file_size'] == 1048576
|
||||
@ -107,12 +111,14 @@ class TestUpdateConfig(ApiTestCaseMixin):
|
||||
client, auth_token = self.get_test_client_and_auth_token(
|
||||
app, user_1_admin.email
|
||||
)
|
||||
admin_email = self.random_email()
|
||||
|
||||
response = client.patch(
|
||||
'/api/config',
|
||||
content_type='application/json',
|
||||
data=json.dumps(
|
||||
dict(
|
||||
admin_contact=admin_email,
|
||||
gpx_limit_import=20,
|
||||
max_single_file_size=10000,
|
||||
max_zip_file_size=25000,
|
||||
@ -122,9 +128,10 @@ class TestUpdateConfig(ApiTestCaseMixin):
|
||||
headers=dict(Authorization=f'Bearer {auth_token}'),
|
||||
)
|
||||
|
||||
data = json.loads(response.data.decode())
|
||||
assert response.status_code == 200
|
||||
data = json.loads(response.data.decode())
|
||||
assert 'success' in data['status']
|
||||
assert data['data']['admin_contact'] == admin_email
|
||||
assert data['data']['gpx_limit_import'] == 20
|
||||
assert data['data']['is_registration_enabled'] is True
|
||||
assert data['data']['max_single_file_size'] == 10000
|
||||
@ -273,3 +280,57 @@ class TestUpdateConfig(ApiTestCaseMixin):
|
||||
self.assert_400(
|
||||
response, 'Max. files in a zip archive must be greater than 0'
|
||||
)
|
||||
|
||||
def test_it_raises_error_if_admin_contact_is_invalid(
|
||||
self, app: Flask, user_1_admin: User
|
||||
) -> None:
|
||||
client, auth_token = self.get_test_client_and_auth_token(
|
||||
app, user_1_admin.email
|
||||
)
|
||||
|
||||
response = client.patch(
|
||||
'/api/config',
|
||||
content_type='application/json',
|
||||
data=json.dumps(
|
||||
dict(
|
||||
admin_contact=self.random_string(),
|
||||
)
|
||||
),
|
||||
headers=dict(Authorization=f'Bearer {auth_token}'),
|
||||
)
|
||||
|
||||
self.assert_400(
|
||||
response, 'valid email must be provided for admin contact'
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'input_description,input_email', [('input string', ''), ('None', None)]
|
||||
)
|
||||
def test_it_empties_error_if_admin_contact_is_an_empty(
|
||||
self,
|
||||
app: Flask,
|
||||
user_1_admin: User,
|
||||
input_description: str,
|
||||
input_email: Optional[str],
|
||||
) -> None:
|
||||
client, auth_token = self.get_test_client_and_auth_token(
|
||||
app, user_1_admin.email
|
||||
)
|
||||
app_config = AppConfig.query.first()
|
||||
app_config.admin_contact = self.random_email()
|
||||
|
||||
response = client.patch(
|
||||
'/api/config',
|
||||
content_type='application/json',
|
||||
data=json.dumps(
|
||||
dict(
|
||||
admin_contact=input_email,
|
||||
)
|
||||
),
|
||||
headers=dict(Authorization=f'Bearer {auth_token}'),
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = json.loads(response.data.decode())
|
||||
assert 'success' in data['status']
|
||||
assert data['data']['admin_contact'] is None
|
||||
|
@ -19,3 +19,4 @@ class TestConfigModel:
|
||||
'target="_blank" rel="noopener noreferrer">OpenStreetMap</a> '
|
||||
'contributors'
|
||||
)
|
||||
assert 'admin_contact' in serialized_app_config
|
||||
|
@ -50,6 +50,7 @@ class TestIsValidEmail:
|
||||
@pytest.mark.parametrize(
|
||||
('input_email',),
|
||||
[
|
||||
(None,),
|
||||
('',),
|
||||
('foo',),
|
||||
('foo@',),
|
||||
|
@ -16,6 +16,8 @@ def is_valid_email(email: str) -> bool:
|
||||
"""
|
||||
Return if email format is valid
|
||||
"""
|
||||
if not email:
|
||||
return False
|
||||
mail_pattern = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)"
|
||||
return re.match(mail_pattern, email) is not None
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user