Implement OTP verification
This commit is contained in:
parent
43bcc703d0
commit
d87aade803
|
@ -1,7 +1,7 @@
|
|||
from twilio.rest import Client
|
||||
|
||||
from constants import ACCOUNT_ID, SMS_SENDER, TOKEN
|
||||
from database.crud import fetch_otp
|
||||
from database.crud import fetch_user_by_key
|
||||
|
||||
|
||||
def create_twilio_client(account_sid, auth_token):
|
||||
|
@ -11,6 +11,6 @@ def create_twilio_client(account_sid, auth_token):
|
|||
|
||||
def send_otp(data, db):
|
||||
client = create_twilio_client(account_sid=ACCOUNT_ID, auth_token=TOKEN)
|
||||
code = fetch_otp(access_key=data.access_key, db=db)
|
||||
message = "Your OTP code is {0}".format(code)
|
||||
user = fetch_user_by_key(data=data, db=db)
|
||||
message = "Your OTP code is {0}".format(user.otp)
|
||||
client.messages.create(to=data.mobile, from_=SMS_SENDER, body=message)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.external_services import send_otp
|
||||
|
@ -7,7 +8,10 @@ from database.crud import get_db, insert_data, verify_otp
|
|||
|
||||
router = APIRouter()
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
|
||||
# FIXME Password hash
|
||||
@router.post("/register", response_model=UserCreateResponse)
|
||||
def create_user(data: UserCreate, db: Session = Depends(get_db)):
|
||||
user = insert_data(model="Users", data=data, db=db)
|
||||
|
@ -15,14 +19,15 @@ def create_user(data: UserCreate, db: Session = Depends(get_db)):
|
|||
return user
|
||||
|
||||
|
||||
# FIXME Use OAuth2 for verification
|
||||
# TODO Use OAuth2 for verification
|
||||
@router.post("/login", response_model=UserLoginResponse)
|
||||
def log_in(request: UserLogin, db: Session = Depends(get_db)):
|
||||
def log_in(
|
||||
data: UserLogin, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme),
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@router.post("/otpVerification", response_model=OTPVerifyResponse)
|
||||
def validate_otp(request: OTPVerify, db: Session = Depends(get_db)):
|
||||
if verify_otp(data=request, db=db):
|
||||
return {"message": "The OTP has been verified successfully"}
|
||||
raise HTTPException(status_code=400, detail="The OTP is not correct")
|
||||
def validate_otp(data: OTPVerify, db: Session = Depends(get_db)):
|
||||
response = verify_otp(data=data, db=db)
|
||||
return response
|
||||
|
|
|
@ -22,7 +22,7 @@ class UserCreate(UserBase):
|
|||
device_type: int = Query(None, ge=1, le=2)
|
||||
city_id: int
|
||||
access_key: str = token_hex()
|
||||
otp: int = randbits(16)
|
||||
otp: int = randbits(20)
|
||||
otp_valid_time: datetime = datetime.now() + timedelta(minutes=10)
|
||||
|
||||
class Config:
|
||||
|
@ -74,7 +74,7 @@ class SocialLogin(UserBase):
|
|||
orm_mode = True
|
||||
|
||||
|
||||
class OTPVerify(BaseModel):
|
||||
class OTPBase(BaseModel):
|
||||
access_key: str
|
||||
otp: int
|
||||
|
||||
|
@ -82,13 +82,30 @@ class OTPVerify(BaseModel):
|
|||
orm_mode = True
|
||||
|
||||
|
||||
class OTPVerifyResponse(BaseModel):
|
||||
class OTPVerify(OTPBase):
|
||||
pass
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
class OTPVerifyResponse(OTPBase):
|
||||
id: int
|
||||
full_name: str
|
||||
email: EmailStr
|
||||
gender: int = Query(None, ge=1, le=3)
|
||||
mobile: str = Query(None, min_length=8, max_length=13)
|
||||
otp_valid_time: datetime
|
||||
lang_type: int = Query(None, ge=1, le=2)
|
||||
status: int = Query(None, ge=0, le=1)
|
||||
device_type: int = Query(None, ge=1, le=2)
|
||||
created: datetime
|
||||
updated: datetime = Query(None)
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
access_key: str
|
||||
otp: int = Query(None, ge=6, le=6)
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from fastapi import HTTPException
|
||||
|
||||
from app.schemas import *
|
||||
from database import SessionLocal
|
||||
|
@ -34,29 +35,29 @@ def delete_data(model, data, db):
|
|||
return result
|
||||
|
||||
|
||||
def fetch_user(data, db):
|
||||
result = db.query(Users).filter(Users.email == data.email).first()
|
||||
return result
|
||||
def fetch_user_by_key(data, db):
|
||||
return db.query(Users).filter(Users.access_key == data.access_key).first()
|
||||
|
||||
|
||||
def fetch_otp(access_key, db):
|
||||
result = db.query(Users).filter(Users.access_key == access_key).first()
|
||||
return result.otp
|
||||
def fetch_user_by_email(data, db):
|
||||
return db.query(Users).filter(Users.email == data.email).first()
|
||||
|
||||
|
||||
def activate_account(data: OTPVerify, db):
|
||||
db.query(Users).filter(Users.access_key == data.access_key).update(
|
||||
{Users.status: 1}
|
||||
)
|
||||
db.commit()
|
||||
user = fetch_user_by_key(data=data, db=db)
|
||||
return user
|
||||
|
||||
|
||||
def verify_otp(data: OTPVerify, db):
|
||||
user = fetch_otp(data=data, db=db)
|
||||
same_otp = user.otp == data.otp
|
||||
valid_time = datetime.now() <= user.otp_valid_time
|
||||
valid_otp = same_otp and valid_time
|
||||
user = fetch_user_by_key(data=data, db=db)
|
||||
matching_otp = user.otp == data.otp
|
||||
valid_time = datetime.now() < user.otp_valid_time
|
||||
valid_otp = matching_otp and valid_time
|
||||
if valid_otp:
|
||||
activate_account(data=data, db=db)
|
||||
return True
|
||||
return False
|
||||
result = activate_account(data=data, db=db)
|
||||
return result
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="The OTP is not correct")
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
from pytest import mark
|
||||
from pytest import mark, fixture
|
||||
from secrets import token_hex
|
||||
|
||||
from app.schemas import *
|
||||
from database.models import *
|
||||
from tests import client
|
||||
from tests.queries_test import get_test_db
|
||||
|
||||
|
||||
def test_registration():
|
||||
|
@ -33,3 +35,13 @@ def test_login():
|
|||
}
|
||||
response = client.post("/login", json=user)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_otp_verification(get_test_db):
|
||||
user = get_test_db.query(Users).filter(Users.email == "oyvey@hotmail.com").first()
|
||||
data = {
|
||||
"access_key": user.access_key,
|
||||
"otp": user.otp,
|
||||
}
|
||||
response = client.post("/otpVerification", json=data)
|
||||
assert response.status_code == 200
|
||||
|
|
Loading…
Reference in New Issue