API: init api w/ user management

This commit is contained in:
SamR1 2017-12-16 21:00:46 +01:00
parent 40e7d35cb9
commit 844457b33c
15 changed files with 692 additions and 0 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
# Created by .ignore support plugin (hsz.mobi) # Created by .ignore support plugin (hsz.mobi)
.idea .idea
.idea/vcs.xml .idea/vcs.xml
__pycache__/
/mpwo_api/venv/

6
docs/db/mpwo.sql Normal file
View File

@ -0,0 +1,6 @@
CREATE DATABASE IF NOT EXISTS mpwo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE DATABASE IF NOT EXISTS mpwo_test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'mpwo'@'localhost' IDENTIFIED BY 'mpwo';
GRANT ALL PRIVILEGES ON `mpwo` . * TO 'mpwo'@'localhost';
GRANT ALL PRIVILEGES ON `mpwo_dev` . * TO 'mpwo'@'localhost';
FLUSH PRIVILEGES;

View File

@ -0,0 +1,34 @@
import logging
from flask import Flask
from flask_bcrypt import Bcrypt
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
bcrypt = Bcrypt()
appLog = logging.getLogger('mpwo_api')
# instantiate the app
app = Flask(__name__)
# set config
with app.app_context():
app.config.from_object('mpwo_api.config.DevelopmentConfig')
# set up extensions
db.init_app(app)
bcrypt.init_app(app)
from .users.auth import auth_blueprint # noqa
from .users.users import users_blueprint # noqa
app.register_blueprint(users_blueprint, url_prefix='/api')
app.register_blueprint(auth_blueprint, url_prefix='/api')
if app.debug:
logging.getLogger('sqlalchemy').setLevel(logging.INFO)
logging.getLogger('sqlalchemy'
).handlers = logging.getLogger('werkzeug').handlers
logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING)
appLog.setLevel(logging.DEBUG)

View File

@ -0,0 +1,32 @@
class BaseConfig:
"""Base configuration"""
DEBUG = False
TESTING = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
BCRYPT_LOG_ROUNDS = 13
TOKEN_EXPIRATION_DAYS = 30
TOKEN_EXPIRATION_SECONDS = 0
class DevelopmentConfig(BaseConfig):
"""Development configuration"""
DEBUG = True
SQLALCHEMY_DATABASE_URI = \
'mysql+mysqldb://mpwo:mpwo@127.0.0.1:3306/mpwo'
SECRET_KEY = 'development key'
USERNAME = 'admin'
PASSWORD = 'default'
BCRYPT_LOG_ROUNDS = 4
class TestingConfig(BaseConfig):
"""Development configuration"""
DEBUG = True
SQLALCHEMY_DATABASE_URI = \
'mysql+mysqldb://mpwo:mpwo@127.0.0.1:3306/mpwo_test'
SECRET_KEY = 'test key'
USERNAME = 'admin'
PASSWORD = 'default'
BCRYPT_LOG_ROUNDS = 4
TOKEN_EXPIRATION_DAYS = 0
TOKEN_EXPIRATION_SECONDS = 3

View File

View File

@ -0,0 +1,17 @@
from flask_testing import TestCase
from mpwo_api import app, db
class BaseTestCase(TestCase):
def create_app(self):
app.config.from_object('mpwo_api.config.TestingConfig')
return app
def setUp(self):
db.create_all()
db.session.commit()
def tearDown(self):
db.session.remove()
db.drop_all()

View File

@ -0,0 +1,192 @@
import json
import time
from mpwo_api.tests.base import BaseTestCase
from mpwo_api.tests.utils import add_user
class TestAuthBlueprint(BaseTestCase):
def test_user_registration(self):
with self.client:
response = self.client.post(
'/api/auth/register',
data=json.dumps(dict(
username='justatest',
email='test@test.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully registered.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 201)
def test_user_registration_user_already_exists(self):
add_user('test', 'test@test.com', 'test')
with self.client:
response = self.client.post(
'/api/auth/register',
data=json.dumps(dict(
username='test',
email='test@test.com',
password='test'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'error')
self.assertTrue(data['message'] == 'Sorry. That user already exists.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 400)
def test_user_registration_invalid_json(self):
with self.client:
response = self.client.post(
'/api/auth/register',
data=json.dumps(dict()),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 400)
self.assertIn('Invalid payload.', data['message'])
self.assertIn('error', data['status'])
def test_user_registration_invalid_json_keys_no_username(self):
with self.client:
response = self.client.post(
'/api/auth/register',
data=json.dumps(dict(email='test@test.com', password='test')),
content_type='application/json',
)
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 400)
self.assertIn('Invalid payload.', data['message'])
self.assertIn('error', data['status'])
def test_user_registration_invalid_json_keys_no_email(self):
with self.client:
response = self.client.post(
'/api/auth/register',
data=json.dumps(dict(
username='test', password='test')),
content_type='application/json',
)
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 400)
self.assertIn('Invalid payload.', data['message'])
self.assertIn('error', data['status'])
def test_user_registration_invalid_json_keys_no_password(self):
with self.client:
response = self.client.post(
'/api/auth/register',
data=json.dumps(dict(
username='test', email='test@test.com')),
content_type='application/json',
)
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 400)
self.assertIn('Invalid payload.', data['message'])
self.assertIn('error', data['status'])
def test_registered_user_login(self):
with self.client:
add_user('test', 'test@test.com', 'test')
response = self.client.post(
'/api/auth/login',
data=json.dumps(dict(
email='test@test.com',
password='test'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged in.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 200)
def test_no_registered_user_login(self):
with self.client:
response = self.client.post(
'/api/auth/login',
data=json.dumps(dict(
email='test@test.com',
password='test'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'error')
self.assertTrue(data['message'] == 'User does not exist.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 404)
def test_valid_logout(self):
add_user('test', 'test@test.com', 'test')
with self.client:
# user login
resp_login = self.client.post(
'/api/auth/login',
data=json.dumps(dict(
email='test@test.com',
password='test'
)),
content_type='application/json'
)
# valid token logout
response = self.client.get(
'/api/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged out.')
self.assertEqual(response.status_code, 200)
def test_invalid_logout_expired_token(self):
add_user('test', 'test@test.com', 'test')
with self.client:
resp_login = self.client.post(
'/api/auth/login',
data=json.dumps(dict(
email='test@test.com',
password='test'
)),
content_type='application/json'
)
# invalid token logout
time.sleep(4)
response = self.client.get(
'/api/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'error')
self.assertTrue(
data['message'] == 'Signature expired. Please log in again.')
self.assertEqual(response.status_code, 401)
def test_invalid_logout(self):
with self.client:
response = self.client.get(
'/api/auth/logout',
headers=dict(Authorization='Bearer invalid'))
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'error')
self.assertTrue(
data['message'] == 'Invalid token. Please log in again.')
self.assertEqual(response.status_code, 401)

View File

@ -0,0 +1,83 @@
import json
from mpwo_api.tests.base import BaseTestCase
from mpwo_api.users.models import User
from mpwo_api.tests.utils import add_user
class TestUserService(BaseTestCase):
"""Tests for the Users Service."""
def test_users(self):
""" => Ensure the /ping route behaves correctly."""
response = self.client.get('/api/ping')
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 200)
self.assertIn('pong!', data['message'])
self.assertIn('success', data['status'])
def test_single_user(self):
"""=> Get single user details"""
user = add_user('test', 'test@test.com', 'test')
with self.client:
response = self.client.get(f'/api/users/{user.id}')
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 200)
self.assertIn('success', data['status'])
self.assertTrue('created_at' in data['data'])
self.assertIn('test', data['data']['username'])
self.assertIn('test@test.com', data['data']['email'])
def test_single_user_no_id(self):
"""=> Ensure error is thrown if an id is not provided."""
with self.client:
response = self.client.get(f'/api/users/blah')
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 404)
self.assertIn('fail', data['status'])
self.assertIn('User does not exist', data['message'])
def test_single_user_wrong_id(self):
"""=> Ensure error is thrown if the id does not exist."""
with self.client:
response = self.client.get(f'/api/users/99999999999')
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 404)
self.assertIn('fail', data['status'])
self.assertIn('User does not exist', data['message'])
def test_users_list(self):
"""=> Ensure get single user behaves correctly."""
add_user('test', 'test@test.com', 'test')
add_user('toto', 'toto@toto.com', 'toto')
with self.client:
response = self.client.get('/api/users')
data = json.loads(response.data.decode())
self.assertEqual(response.status_code, 200)
self.assertIn('success', data['status'])
self.assertEqual(len(data['data']['users']), 2)
self.assertTrue('created_at' in data['data']['users'][0])
self.assertTrue('created_at' in data['data']['users'][1])
self.assertIn('test', data['data']['users'][0]['username'])
self.assertIn('toto', data['data']['users'][1]['username'])
self.assertIn('test@test.com', data['data']['users'][0]['email'])
self.assertIn('toto@toto.com', data['data']['users'][1]['email'])
def test_encode_auth_token(self):
"""=> Ensure correct auth token generation"""
user = add_user('test', 'test@test.com', 'test')
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
def test_decode_auth_token(self):
user = add_user('test', 'test@test.com', 'test')
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(auth_token), user.id)

View File

@ -0,0 +1,9 @@
from mpwo_api.users.models import User
from mpwo_api import db
def add_user(username, email, password):
user = User(username=username, email=email, password=password)
db.session.add(user)
db.session.commit()
return user

View File

View File

@ -0,0 +1,125 @@
from flask import Blueprint, jsonify, request
from sqlalchemy import exc, or_
from mpwo_api import bcrypt, db
from .models import User
auth_blueprint = Blueprint('auth', __name__)
@auth_blueprint.route('/auth/register', methods=['POST'])
def register_user():
# get post data
post_data = request.get_json()
if not post_data:
response_object = {
'status': 'error',
'message': 'Invalid payload.'
}
return jsonify(response_object), 400
username = post_data.get('username')
email = post_data.get('email')
password = post_data.get('password')
try:
# check for existing user
user = User.query.filter(
or_(User.username == username, User.email == email)).first()
if not user:
# add new user to db
new_user = User(
username=username,
email=email,
password=password
)
db.session.add(new_user)
db.session.commit()
# generate auth token
auth_token = new_user.encode_auth_token(new_user.id)
response_object = {
'status': 'success',
'message': 'Successfully registered.',
'auth_token': auth_token.decode()
}
return jsonify(response_object), 201
else:
response_object = {
'status': 'error',
'message': 'Sorry. That user already exists.'
}
return jsonify(response_object), 400
# handler errors
except (exc.IntegrityError, exc.OperationalError, ValueError) as e:
db.session.rollback()
response_object = {
'status': 'error',
'message': 'Invalid payload.'
}
return jsonify(response_object), 400
@auth_blueprint.route('/auth/login', methods=['POST'])
def login_user():
# get post data
post_data = request.get_json()
if not post_data:
response_object = {
'status': 'error',
'message': 'Invalid payload.'
}
return jsonify(response_object), 400
email = post_data.get('email')
password = post_data.get('password')
try:
# check for existing user
user = User.query.filter(User.email == email).first()
if user and bcrypt.check_password_hash(user.password, password):
# generate auth token
auth_token = user.encode_auth_token(user.id)
response_object = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': auth_token.decode()
}
return jsonify(response_object), 200
else:
response_object = {
'status': 'error',
'message': 'User does not exist.'
}
return jsonify(response_object), 404
# handler errors
except (exc.IntegrityError, exc.OperationalError, ValueError) as e:
db.session.rollback()
response_object = {
'status': 'error',
'message': 'Try again'
}
return jsonify(response_object), 500
@auth_blueprint.route('/auth/logout', methods=['GET'])
def logout_user():
# get auth token
auth_header = request.headers.get('Authorization')
if auth_header:
auth_token = auth_header.split(" ")[1]
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
response_object = {
'status': 'success',
'message': 'Successfully logged out.'
}
return jsonify(response_object), 200
else:
response_object = {
'status': 'error',
'message': resp
}
return jsonify(response_object), 401
else:
response_object = {
'status': 'error',
'message': 'Provide a valid auth token.'
}
return jsonify(response_object), 403

View File

@ -0,0 +1,64 @@
import datetime
import jwt
from flask import current_app
from mpwo_api import bcrypt, db
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
def __repr__(self):
return '<User %r>' % self.username
def __init__(
self, username, email, password,
created_at=datetime.datetime.utcnow()):
self.username = username
self.email = email
self.password = bcrypt.generate_password_hash(
password, current_app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
self.created_at = created_at
def encode_auth_token(self, user_id):
"""Generates the auth token"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(
days=current_app.config.get('TOKEN_EXPIRATION_DAYS'),
seconds=current_app.config.get('TOKEN_EXPIRATION_SECONDS')
),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
current_app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
@staticmethod
def decode_auth_token(auth_token):
"""
Decodes the auth token
:param auth_token: -
:return: integer|string
"""
try:
payload = jwt.decode(
auth_token,
current_app.config.get('SECRET_KEY'))
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'

View File

@ -0,0 +1,60 @@
from flask import Blueprint, jsonify
from .models import User
users_blueprint = Blueprint('users', __name__)
@users_blueprint.route('/users', methods=['GET'])
def get_users():
"""Get all users"""
users = User.query.all()
users_list = []
for user in users:
user_object = {
'id': user.id,
'username': user.username,
'email': user.email,
'created_at': user.created_at
}
users_list.append(user_object)
response_object = {
'status': 'success',
'data': {
'users': users_list
}
}
return jsonify(response_object), 200
@users_blueprint.route('/users/<user_id>', methods=['GET'])
def get_single_user(user_id):
"""Get single user details"""
response_object = {
'status': 'fail',
'message': 'User does not exist'
}
try:
user = User.query.filter_by(id=int(user_id)).first()
if not user:
return jsonify(response_object), 404
else:
response_object = {
'status': 'success',
'data': {
'username': user.username,
'email': user.email,
'created_at': user.created_at
}
}
return jsonify(response_object), 200
except ValueError:
return jsonify(response_object), 404
@users_blueprint.route('/ping', methods=['GET'])
def ping_pong():
return jsonify({
'status': 'success',
'message': 'pong!'
})

24
mpwo_api/requirements.txt Normal file
View File

@ -0,0 +1,24 @@
bcrypt==3.1.4
cffi==1.11.2
click==6.7
flake8==3.5.0
flake8-isort==2.2.2
flake8-polyfill==1.0.1
Flask==0.12.2
Flask-Bcrypt==0.7.1
Flask-SQLAlchemy==2.3.2
Flask-Testing==0.6.2
isort==4.2.15
itsdangerous==0.24
Jinja2==2.10
MarkupSafe==1.0
mccabe==0.6.1
mysqlclient==1.3.12
pycodestyle==2.3.1
pycparser==2.18
pyflakes==1.6.0
PyJWT==1.5.3
six==1.11.0
SQLAlchemy==1.1.15
testfixtures==5.3.1
Werkzeug==0.13

44
mpwo_api/server.py Normal file
View File

@ -0,0 +1,44 @@
import unittest
from mpwo_api import app, db
from mpwo_api.users.models import User
@app.cli.command()
def recreate_db():
"""Recreates a database."""
db.drop_all()
db.create_all()
db.session.commit()
print('Database (re)creation done.')
@app.cli.command()
def seed_db():
"""Seeds the database."""
db.session.add(User(
username='test',
email='test@test.com',
password='test'
))
db.session.add(User(
username='test2',
email='test2@test.com',
password='test2'
))
db.session.commit()
@app.cli.command()
def test():
"""Runs the tests without code coverage."""
tests = unittest.TestLoader().discover(
'mpwo_api/tests', pattern='test*.py')
result = unittest.TextTestRunner(verbosity=2).run(tests)
if result.wasSuccessful():
return 0
return 1
if __name__ == '__main__':
app.run()