94 lines
2.5 KiB
Python
94 lines
2.5 KiB
Python
"""Module containing user related views."""
|
|
|
|
# vim: set tw=80 ts=4 sw=4 sts=4:
|
|
|
|
from flask import request
|
|
from flask_jwt_extended import (jwt_required, get_jwt_identity,
|
|
create_access_token, create_refresh_token)
|
|
from flask_restplus import Namespace, Resource, fields
|
|
|
|
from ..models.users import User
|
|
|
|
|
|
# pylint: disable=invalid-name
|
|
ns = Namespace('user', description='User management')
|
|
|
|
# Token with expiration time and type.
|
|
token_model = ns.model('Token', {
|
|
'access_token': fields.String(
|
|
required=True,
|
|
readonly=True,
|
|
description='Access token value'),
|
|
'refresh_token': fields.String(
|
|
required=False,
|
|
readonly=True,
|
|
description='Refresh token value'),
|
|
'expiration': fields.DateTime(
|
|
dt_format='iso8601',
|
|
required=False,
|
|
readonly=True,
|
|
description='Expiration time of the token'),
|
|
})
|
|
|
|
# User model.
|
|
user_model = ns.model('User', {
|
|
'id': fields.Integer(
|
|
default=None,
|
|
required=True,
|
|
readonly=True,
|
|
description='Id of the user'),
|
|
'email': fields.String(
|
|
required=True,
|
|
readonly=True,
|
|
decription='Email address of the user'),
|
|
'active': fields.Boolean(
|
|
required=True,
|
|
readonly=True,
|
|
description='Active state of the user')
|
|
})
|
|
# pylint: enable=invalid-name
|
|
|
|
|
|
# pylint: disable=no-self-use
|
|
@ns.route('/login')
|
|
class LoginResource(Resource):
|
|
"""Resource to handle login operations."""
|
|
|
|
@ns.marshal_with(token_model)
|
|
@ns.doc(
|
|
security='basic',
|
|
responses={
|
|
200: ('OK', token_model),
|
|
401: 'Unauthorized'
|
|
})
|
|
def post(self):
|
|
"""Login to retrieve authentication token."""
|
|
|
|
email = request.authorization['username']
|
|
password = request.authorization['password']
|
|
|
|
user = User.query().filter(
|
|
User.email == email
|
|
).one_or_none()
|
|
|
|
if not user or not user.verify_password(password):
|
|
ns.abort(401, "Bad user or password.")
|
|
|
|
return {
|
|
'access_token': create_access_token(identity=user),
|
|
'refresh_token': create_refresh_token(identity=user)
|
|
}, 200
|
|
|
|
@ns.doc(
|
|
security='apikey',
|
|
responses={
|
|
200: ('OK', user_model)
|
|
})
|
|
@ns.marshal_with(user_model)
|
|
@jwt_required
|
|
def get(self):
|
|
"""Get authenticated user information."""
|
|
user = User.query().get(get_jwt_identity())
|
|
|
|
return user, 200
|