Building Flask User Authentication with JWT
Dhruv Prajapati
Posted on September 15, 2020
- This is the last part of Flask Series.
- Part 1 in which i have explained what is REST API and HTTPS and then implemented RESTAPI with Flask.
- Part 2 we implemented simple Login Registration with Flask.
Today we will implement Token based authentication using JWT in Flask.
Modules which i have used to implement JWT based authentication are given below
Flask -> Building Web Application
Flask-JWT-Extended -> To work with JWT Tokens
Flask-RESTful -> To Build RestAPIs
Flask-SqlAlchemy -> Provide support for SQLAlchemy to your application
passlib -> Password Hasing Library
PyJWT -> To Encode and Decode Json Web Token
SQLAlchemy -> Provide ORM for our Application
Werkzeug -> WSGI Web Application Library, it will provide security to encrypt password and matching Password
- To install modules use
pip install <module-name>
What is JWT?
- JWT stands for JSON WEB TOKEN
- It is used to transfer information securely between parties as a JSON object.
- This information can be verfied at both the end because it is digitally signed.
- We can use Public key cryptography or Private key cryptography to signed JWTs.
JSON WEB TOKEN Structure
- JWT is made up from Three parts
- Header
- Paylod
- Signature
Header
- Header consist two parts:The Type of JWT Token and Hashing Algorithm For Example: ```
{
"alg": "RS256",
"typ": "JWT"
}
**Payload**
* Payload is nothing but User Data such as Username, subject, birthdate etc...
For Example:
{
"sub": "123456789",
"name": "Dhruv Prajapati",
"admin": true,
}
**Signature**
* To create the signature part you have to take the encoded header, the encoded payload, a secret, the algorithm specified in the header, and sign that.
For Example:
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret)
You can understand more about [JWT from here](https://jwt.io/)
* So let's start building over project.
* Create Directory named as `Flask-JWT-Authentication`. Our Folder Structure is as given below.
![Alt Text](https://dev-to-uploads.s3.amazonaws.com/i/c6lslamk7a1jmr609es0.PNG)
* First we will create basic structure of our application such as creating Flask Application, API Class Object, Database class Object, JWT Manager Object and other application configuration.
* Let's create `app.py` and put below code.
```python
from flask import Flask
from flask_restful import Api
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
# Making Flask Application
app = Flask(__name__)
# Object of Api class
api = Api(app)
# Application Configuration
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:root@localhost/jwt_auth'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'ThisIsHardestThing'
app.config['JWT_SECRET_KEY'] = 'Dude!WhyShouldYouEncryptIt'
app.config['JWT_BLACKLIST_ENABLED'] = True
app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = ['access', 'refresh']
# SqlAlchemy object
db = SQLAlchemy(app)
# JwtManager object
jwt = JWTManager(app)
- we will create models for our web application. so create
models.py
inside the directory we just created. models.py contains two models. One isUserModel
and other one isRevokedTokenModel
from app import db
from passlib.hash import pbkdf2_sha256 as sha256
class UserModel(db.Model):
"""
User Model Class
"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)
"""
Save user details in Database
"""
def save_to_db(self):
db.session.add(self)
db.session.commit()
"""
Find user by username
"""
@classmethod
def find_by_username(cls, username):
return cls.query.filter_by(username=username).first()
"""
return all the user data in json form available in DB
"""
@classmethod
def return_all(cls):
def to_json(x):
return {
'username': x.username,
'password': x.password
}
return {'users': [to_json(user) for user in UserModel.query.all()]}
"""
Delete user data
"""
@classmethod
def delete_all(cls):
try:
num_rows_deleted = db.session.query(cls).delete()
db.session.commit()
return {'message': f'{num_rows_deleted} row(s) deleted'}
except:
return {'message': 'Something went wrong'}
"""
generate hash from password by encryption using sha256
"""
@staticmethod
def generate_hash(password):
return sha256.hash(password)
"""
Verify hash and password
"""
@staticmethod
def verify_hash(password, hash_):
return sha256.verify(password, hash_)
- User Model contains two fields:
Username
andPassword
. To save the user details in Database model containssave_to_db
method.find_by_username
method will find user by it's username.return_all
anddelete_all
method is used to return all users and delete all users respectively.generate_hash
method will generate hash from user password andverify_hash
method will verify hash and password at login time.
# User Model
# ...
class RevokedTokenModel(db.Model):
"""
Revoked Token Model Class
"""
__tablename__ = 'revoked_tokens'
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(120))
"""
Save Token in DB
"""
def add(self):
db.session.add(self)
db.session.commit()
"""
Checking that token is blacklisted
"""
@classmethod
def is_jti_blacklisted(cls, jti):
query = cls.query.filter_by(jti=jti).first()
return bool(query)
RevokedToken Model contains
jti
field.add
method is used to save token in Database andis_jti_blacklisted
is used to check that JTI is blacklisted or not.Now we will create resources for our APIs. So, let's create
resource.py
First we will create
UserRegistration
andUserLogin
Resources.
from flask_restful import Resource, reqparse
from models import UserModel, RevokedTokenModel
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
jwt_required,
jwt_refresh_token_required,
get_jwt_identity,
get_raw_jwt
)
import pdb
# provide simple and uniform access to any variable
parser = reqparse.RequestParser()
parser.add_argument('username', help='username cannot be blank', required=True)
parser.add_argument('password', help='password cannot be blank', required=True)
class UserRegistration(Resource):
"""
User Registration Api
"""
def post(self):
data = parser.parse_args()
username = data['username']
# Checking that user is already exist or not
if UserModel.find_by_username(username):
return {'message': f'User {username} already exists'}
# create new user
new_user = UserModel(
username=username,
password=UserModel.generate_hash(data['password'])
)
try:
# Saving user in DB and Generating Access and Refresh token
new_user.save_to_db()
access_token = create_access_token(identity=username)
refresh_token = create_refresh_token(identity=username)
return {
'message': f'User {username} was created',
'access_token': access_token,
'refresh_token': refresh_token
}
except:
return {'message': 'Something went wrong'}, 500
class UserLogin(Resource):
"""
User Login Api
"""
def post(self):
data = parser.parse_args()
username = data['username']
# Searching user by username
current_user = UserModel.find_by_username(username)
# user does not exists
if not current_user:
return {'message': f'User {username} doesn\'t exist'}
# user exists, comparing password and hash
if UserModel.verify_hash(data['password'], current_user.password):
# generating access token and refresh token
access_token = create_access_token(identity=username)
refresh_token = create_refresh_token(identity=username)
return {
'message': f'Logged in as {username}',
'access_token': access_token,
'refresh_token': refresh_token
}
else:
return {'message': "Wrong credentials"}
- Now Let's create
UserLogoutAccess
andUserLogoutRefresh
inside theresource.py
class UserLogoutAccess(Resource):
"""
User Logout Api
"""
@jwt_required
def post(self):
jti = get_raw_jwt()['jti']
try:
# Revoking access token
revoked_token = RevokedTokenModel(jti=jti)
revoked_token.add()
return {'message': 'Access token has been revoked'}
except:
return {'message': 'Something went wrong'}, 500
class UserLogoutRefresh(Resource):
"""
User Logout Refresh Api
"""
@jwt_refresh_token_required
def post(self):
jti = get_raw_jwt()['jti']
try:
revoked_token = RevokedTokenModel(jti=jti)
revoked_token.add()
pdb.set_trace()
return {'message': 'Refresh token has been revoked'}
except:
return {'message': 'Something went wrong'}, 500
- We need to create
TokenRefresh
resource so, if token expired then user can get new access token from refresh token. CreateTokenRefresh
resource under theresource.py
class TokenRefresh(Resource):
"""
Token Refresh Api
"""
@jwt_refresh_token_required
def post(self):
# Generating new access token
current_user = get_jwt_identity()
access_token = create_access_token(identity=current_user)
return {'access_token': access_token}
- At last we will create
getAllUsers
anddeleteAllUsers
under the resources. also we will create starting point for authenticated resourcessecretResources
. undersecretResources
you can create authenticated end points which will only accessible when user is authenticated.
class AllUsers(Resource):
def get(self):
"""
return all user api
"""
return UserModel.return_all()
def delete(self):
"""
delete all user api
"""
return UserModel.delete_all()
class SecretResource(Resource):
"""
Secrest Resource Api
You can create crud operation in this way
"""
@jwt_required
def get(self):
return {'answer': 'You are accessing super secret blueprint'}
- At the finishing up the project, create our APIs and database tables under the
app.py
# above code
#...
# Generating tables before first request is fetched
@app.before_first_request
def create_tables():
db.create_all()
# Checking that token is in blacklist or not
@jwt.token_in_blacklist_loader
def check_if_token_in_blacklist(decrypted_token):
jti = decrypted_token['jti']
return models.RevokedTokenModel.is_jti_blacklisted(jti)
# Importing models and resources
import models, resources
# Api Endpoints
api.add_resource(resources.UserRegistration, '/registration')
api.add_resource(resources.UserLogin, '/login')
api.add_resource(resources.UserLogoutAccess, '/logout/access')
api.add_resource(resources.UserLogoutRefresh, '/logout/refresh')
api.add_resource(resources.TokenRefresh, '/token/refresh')
api.add_resource(resources.AllUsers, '/users')
api.add_resource(resources.SecretResource, '/secret')
Here, our building section is completed. Let's run our project
To run the project in
windows machine
go to the project path inCommand Prompt
and use below commands.
set FLASK_APP=app.py
set FLASK_DEBUG=1
flask run --port=8080
- To run the project in
Unix Machine
go to the project path inTerminal
and use below commands.
export FLASK_APP=app.py
export FLASK_DEBUG=1
flask run --port=8080
Here, --port is optional flag.
Now, consume all the apis one by one. I have added some of api access in Postman.
UserRegistration Api
UserLogin Api
GetAllUsers
- You can get whole code of this tutorial from My GitHub Repository
Thank you for Reading. Give like and follow me for more amazing tutorials.
Posted on September 15, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.