# Configure login functionality with JWT
As we have implemented the user registration with password hash and salt, now it is time to login our user.
We are going to use JWT based auth. This section is a updated and combined version of: Auth users in FastAPI with JWT Tokens (opens new window) and FastAPI OAuth2 with Password (and hashing), Bearer with JWT tokens (opens new window).
First, let's add password verification function to verify sent password. Add following method in the Authenticate class in users/authentication.py
file:
@staticmethod
def verify_password(*, password: str, salt: str, hashed_pw: str) -> bool:
return pwd_context.verify(password + salt, hashed_pw)
We need to send raw password, salt and check if it is equal to hashed password in the database.
The next is to install our dependency package to manage JWT, this is suggested in FastAPI doc as well:
poetry add python-jose[cryptography]
Okay, great let's add needed JWT settings to .env file and also to our config manager.
Please keep in mind that you can generate this SECRET_KEY
using:
openssl rand -hex 32
Add followings to .env
file:
# JWT related variables
SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
JWT_ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES=30
JWT_TOKEN_PREFIX="Bearer"
JWT_AUDIENCE="backend:auth"
Then add following into the Settings
class in app/core/config.py
:
JWT_SETTINGS: Optional[Dict[str, Any]] = None
SECRET_KEY: str
JWT_ALGORITHM: str
ACCESS_TOKEN_EXPIRE_MINUTES: int
JWT_TOKEN_PREFIX: str
JWT_AUDIENCE: str
@validator('JWT_SETTINGS', pre=True)
def assemble_jwt_settings(cls, v: Optional[str], values: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(v, str):
return v
return {
"SECRET_KEY": values.get("SECRET_KEY"),
"JWT_ALGORITHM": values.get("JWT_ALGORITHM"),
"ACCESS_TOKEN_EXPIRE_MINUTES": values.get("ACCESS_TOKEN_EXPIRE_MINUTES"),
"JWT_TOKEN_PREFIX": values.get("JWT_TOKEN_PREFIX"),
"JWT_AUDIENCE": values.get("JWT_AUDIENCE"),
}
Then we need to define Pydantic schemas for our token management in users/schemas.py
:
# Add JWT schemas
class JWTMeta(CoreModel):
iss: str = "azepug.az"
aud: str = settings.JWT_AUDIENCE
iat: float = datetime.timestamp(datetime.now())
exp: float = datetime.timestamp(datetime.now() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
class JWTCreds(CoreModel):
"""How we'll identify users"""
sub: EmailStr
username: str
class JWTPayload(JWTMeta, JWTCreds):
"""
JWT Payload right before it's encoded - combine meta and username
"""
pass
class AccessToken(CoreModel):
access_token: str
token_type: str
Let'now try to create our Access Token in users/authentication.py
file:
@staticmethod
def create_access_token_for_user(
*,
user: UserInDB,
secret_key: str = str(settings.SECRET_KEY),
audience: str = settings.JWT_AUDIENCE,
expires_in: int = settings.ACCESS_TOKEN_EXPIRE_MINUTES,
) -> Optional[str]:
if not user or not isinstance(user, UserInDB):
return None
jwt_meta = JWTMeta(
aud=audience,
iat=datetime.timestamp(datetime.now()),
exp=datetime.timestamp(datetime.now() + timedelta(minutes=expires_in)),
)
jwt_creds = JWTCreds(sub=user.email, username=user.username)
token_payload = JWTPayload(
**jwt_meta.dict(),
**jwt_creds.dict(),
)
return jwt.encode(
token_payload.dict(), secret_key, algorithm=settings.JWT_ALGORITHM
)
The code itself is quite standart and nothing fancy here. Basically we are returning back the Access Token.
But if you have a question where we will use this generated token - basically we need to return back to the frontend this access token, in the frontend side we will get it and store in the localStorage
and at each request we will send in the header as Beared
token.
Do you remember our UserPublic schema? We need to update it to add the access_token
:
class UserPublic(DateTimeModelMixin, UserBase):
access_token: Optional[AccessToken]
class Config:
orm_mode = True
That means we are going to return back to the frontend User public data + access_token.
Nice, then let me quickly explain our login flow. Basically:
- We are going to send the
POST
request to thelogin
endpoint with username and password. - We are going to get the user with this provided username from the database.
- Then we need to get the stored salt from this found user and verify the password if it is the one stored in the database or not.
- If the user exists and the provided password is valid then we are going to create the access token and send the UserPublic data to the frontend.
- In the frontend we need to get this access token and store it in the
state
and in thelocalStorage
.
Does it make sense? If yes then let's continue for getting our user with username from the database.
I am going to add this function in the users/crud.py
:
async def get_user_by_username(user_name: str) -> UserInDB:
async with db.with_bind(settings.DATABASE_URI) as engine:
found_user = await User.query.where(User.username == user_name).gino.first()
return UserInDB.from_orm(found_user)
Basically we are getting back the user from the database if the provided username exists in users table.
Then let's define our /login
endpoint. Before that we need another Pydantic schema exactly for the login action:
class UserLogin(CoreModel):
"""
username and password are required for logging in the user
"""
username: str
password: constr(min_length=7, max_length=100)
@validator("username", pre=True)
def username_is_valid(cls, username: str) -> str:
return validate_username(username)
So basically we are expecting username and password as part of UserLogin schema.
Finally our endpoint:
@router.post(
'/login',
tags=["user login"],
description="Log in the User",
response_model=UserPublic
)
async def user_login(user: UserLogin) -> UserPublic:
from ..crud import get_user_by_username
found_user = await get_user_by_username(user_name=user.username)
if auth_service.verify_password(password=user.password, salt=found_user.salt, hashed_pw=found_user.password):
# If the provided password is valid one then we are going to create an access token
token = auth_service.create_access_token_for_user(user=found_user)
access_token = AccessToken(access_token=token, token_type='bearer')
return UserPublic(**found_user.dict(), access_token=access_token)
Sending the login request:
curl -X POST http://127.0.0.1:8000/users/login -d '{"username": "example", "password": "12345789"}' | jq
{
"email": "example@gmail.com",
"username": "example",
"email_verified": false,
"is_active": true,
"is_superuser": false,
"created_at": "2021-05-16T15:19:30.408545",
"updated_at": "2021-05-16T15:19:30.408578",
"access_token": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJleGFtcGxlQGdtYWlsLmNvbSIsInVzZXJuYW1lIjoiZXhhbXBsZSIsImlzcyI6ImF6ZXB1Zy5heiIsImF1ZCI6ImJhY2tlbmQ6YXV0aCIsImlhdCI6MTYyMTIzNDY1OS41NjQ4NDIsImV4cCI6MTYyMTIzNjQ1OS41NjQ4NDh9.CQzIWY_TcDn51WooCahyb5S4oCZXOdCeXkr3BmZ7UQM",
"token_type": "bearer"
}
}
As you see we have successfully logged in our user as we get back the access token 😃
We are still struggling with issue related to Gino as the global db seems to be uninitialized as we send second request:
gino.exceptions.UninitializedError: Gino engine is not initialized.
In the next chapters we will try to fix this issue as well. Also we need to add some extra functionality for our token management.
The code changes for this episode -> episode-6 (opens new window)