accountant/accountant/views/users.py

99 lines
2.6 KiB
Python

"""Module containing user related views."""
# vim: set tw=80 ts=4 sw=4 sts=4:
from flask import request
from flask_jwt_extended import (jwt_required, get_jwt_identity,
create_access_token, create_refresh_token)
from flask_restplus import Namespace, Resource, fields
from ..models.users import User
# pylint: disable=invalid-name
ns = Namespace('user', description='User management')
# Token with expiration time and type.
token_model = ns.model('Token', {
'access_token': fields.String(
required=True,
readonly=True,
description='Access token value'),
'refresh_token': fields.String(
required=False,
readonly=True,
description='Refresh token value'),
'expiration': fields.DateTime(
dt_format='iso8601',
required=False,
readonly=True,
description='Expiration time of the token'),
})
# User model.
user_model = ns.model('User', {
'id': fields.Integer(
default=None,
required=True,
readonly=True,
description='Id of the user'),
'email': fields.String(
required=True,
readonly=True,
decription='Email address of the user'),
'active': fields.Boolean(
required=True,
readonly=True,
description='Active state of the user')
})
# pylint: enable=invalid-name
# pylint: disable=no-self-use
@ns.route('/login')
class LoginResource(Resource):
"""Resource to handle login operations."""
@ns.marshal_with(token_model)
@ns.doc(
security='basic',
responses={
200: ('OK', token_model),
401: 'Unauthorized'
})
def post(self):
"""Login to retrieve authentication token."""
if not request.authorization:
ns.abort(401, "Missing authorization.")
email = request.authorization['username']
password = request.authorization['password']
user = User.query().filter(
User.email == email
).one_or_none()
if not user or not user.verify_password(password):
ns.abort(401, "Bad user or password.")
return {
'access_token': create_access_token(identity=user),
'refresh_token': create_refresh_token(identity=user)
}, 200
@ns.doc(
security='apikey',
responses={
200: ('OK', user_model)
})
@ns.marshal_with(user_model)
@jwt_required
def get(self):
"""Get authenticated user information."""
user = User.query().get(get_jwt_identity())
# FIXME Alexis Lahouze 2017-05-19 Check user presence
return user, 200